Since we're on a major migration process of this website, some component documents here are out of sync right now. In the meantime you may want to look at the early version of the new website
https://camel.apache.org/staging/
We would very much like to receive any feedback on the new site, please join the discussion on the Camel user mailing list.
SQL ComponentThe sql: component allows you to work with databases using JDBC queries. The difference between this component and JDBC component is that in case of SQL the query is a property of the endpoint and it uses message payload as parameters passed to the query. This component uses Maven users will need to add the following dependency to their <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-sql</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> The SQL component also supports:
URI formatFrom Camel 2.11 onwards this component can create both consumer (e.g. In previous versions, it could only act as a producer. This component can be used as a Transactional Client. The SQL component uses the following endpoint URI notation: sql:select * from table where id=# order by name[?options]
From Camel 2.11 onwards you can use named parameters by using : sql:select * from table where id=:#myId order by name[?options] When using named parameters, Camel will lookup the names from, in the given precedence: If a named parameter cannot be resolved, then an exception is thrown. From Camel 2.14 onward you can use Simple expressions as parameters as shown: sql:select * from table where id=:#${property.myId} order by name[?options] Notice that the standard From Camel 2.17 onwards you can externalize your SQL queries to files in the classpath or file system as shown: sql:classpath:sql/myquery.sql[?options] And the myquery.sql file is in the classpath and is just a plain text select * from table where id = :#${property.myId} order by name In the file you can use multilines and format the SQL as you wish. And also use comments such as the – dash line. You can append query options to the URI in the following format, Options
Treatment of the message bodyThe SQL component tries to convert the message body to an object of For example, if the message body is an instance of If From Camel 2.16 onwards you can use the option useMessageBodyForSql that allows to use the message body as the SQL statement, and then the SQL parameters must be provided in a header with the key SqlConstants.SQL_PARAMETERS. This allows the SQL component to work more dynamic as the SQL query is from the message body. Result of the queryFor By default, the result is placed in the message body. If the outputHeader parameter is set, the result is placed in the header. This is an alternative to using a full message enrichment pattern to add headers, it provides a concise syntax for querying a sequence or some other small value into a header. It is convenient to use outputHeader and outputType together: from("jms:order.inbox") .to("sql:select order_seq.nextval from dual?outputHeader=OrderId&outputType=SelectOne") .to("jms:order.booking"); Using StreamListFrom Camel 2.18 onwards the producer supports outputType=StreamList that uses an iterator to stream the output of the query. This allows to process the data in a streaming fashion which for example can be used by the Splitter EIP to process each row one at a time, and load data from the database as needed. from("direct:withSplitModel") .to("sql:select * from projects order by id?outputType=StreamList&outputClass=org.apache.camel.component.sql.ProjectModel") .to("log:stream") .split(body()).streaming() .to("log:row") .to("mock:result") .end(); Header valuesWhen performing
When performing
Generated keysAvailable as of Camel 2.12.4, 2.13.1 and 2.14 If you insert data using SQL INSERT, then the RDBMS may support auto generated keys. You can instruct the SQL producer to return the generated keys in headers. You can see more details in this unit test. ConfigurationYou can now set a reference to a select * from table where id=# order by name?dataSource=myDS SampleIn the sample below we execute a query and retrieve the result as a First, we set up a table to use for our sample. As this is based on an unit test, we do it in java: db = new EmbeddedDatabaseBuilder() .setType(EmbeddedDatabaseType.DERBY).addScript("sql/createAndPopulateDatabase.sql").build();
create table projects (id integer primary key, project varchar(10), license varchar(5)); insert into projects values (1, 'Camel', 'ASF'); insert into projects values (2, 'AMQ', 'ASF'); insert into projects values (3, 'Linux', 'XXX'); Then we configure our route and our
from("direct:simple") .to("sql:select * from projects where license = # order by id?dataSource=#jdbc/myDataSource") .to("mock:result"); And then we fire the message into the
MockEndpoint mock = getMockEndpoint("mock:result"); mock.expectedMessageCount(1); // send the query to direct that will route it to the sql where we will execute the query // and bind the parameters with the data from the body. The body only contains one value // in this case (XXX) but if we should use multi values then the body will be iterated // so we could supply a List<String> instead containing each binding value. template.sendBody("direct:simple", "XXX"); mock.assertIsSatisfied(); // the result is a List List<?> received = assertIsInstanceOf(List.class, mock.getReceivedExchanges().get(0).getIn().getBody()); // and each row in the list is a Map Map<?, ?> row = assertIsInstanceOf(Map.class, received.get(0)); // and we should be able the get the project from the map that should be Linux assertEquals("Linux", row.get("PROJECT")); We could configure the <jee:jndi-lookup id="myDS" jndi-name="jdbc/myDataSource"/>
|
Parameter | Default Value | Description |
---|---|---|
createTableIfNotExists | true | Defines whether or not Camel should try to create the table if it doesn't exist. |
tableExistsString | SELECT 1 FROM CAMEL_MESSAGEPROCESSED WHERE 1 = 0 | This query is used to figure out whether the table already exists or not. It must throw an exception to indicate the table doesn't exist. |
createString | CREATE TABLE CAMEL_MESSAGEPROCESSED (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP) | The statement which is used to create the table. |
queryString | SELECT COUNT(*) FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? | The query which is used to figure out whether the message already exists in the repository (the result is not equals to '0'). It takes two parameters. This first one is the processor name ( |
insertString | INSERT INTO CAMEL_MESSAGEPROCESSED (processorName, messageId, createdAt) VALUES (?, ?, ?) | The statement which is used to add the entry into the table. It takes three parameter. The first one is the processor name ( |
deleteString | DELETE FROM CAMEL_MESSAGEPROCESSED WHERE processorName = ? AND messageId = ? | The statement which is used to delete the entry from the database. It takes two parameter. This first one is the processor name ( |
A customized org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository
could look like:
<bean id="messageIdRepository" class="org.apache.camel.processor.idempotent.jdbc.JdbcMessageIdRepository"> <constructor-arg ref="dataSource" /> <constructor-arg value="myProcessorName" /> <property name="tableExistsString" value="SELECT 1 FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE 1 = 0" /> <property name="createString" value="CREATE TABLE CUSTOMIZED_MESSAGE_REPOSITORY (processorName VARCHAR(255), messageId VARCHAR(100), createdAt TIMESTAMP)" /> <property name="queryString" value="SELECT COUNT(*) FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" /> <property name="insertString" value="INSERT INTO CUSTOMIZED_MESSAGE_REPOSITORY (processorName, messageId, createdAt) VALUES (?, ?, ?)" /> <property name="deleteString" value="DELETE FROM CUSTOMIZED_MESSAGE_REPOSITORY WHERE processorName = ? AND messageId = ?" /> </bean>
Using the JDBC based aggregation repository
Available as of Camel 2.6
Using JdbcAggregationRepository in Camel 2.6 In Camel 2.6, the JdbcAggregationRepository is provided in the camel-jdbc-aggregator
component. From Camel 2.7 onwards, the JdbcAggregationRepository
is provided in the camel-sql
component.
JdbcAggregationRepository
is an AggregationRepository
which on the fly persists the aggregated messages. This ensures that you will not loose messages, as the default aggregator will use an in memory only AggregationRepository
.
The JdbcAggregationRepository
allows together with Camel to provide persistent support for the Aggregator.
It has the following options:
Option | Type | Description |
---|---|---|
dataSource | DataSource | Mandatory: The javax.sql.DataSource to use for accessing the database. |
repositoryName | String | Mandatory: The name of the repository. |
transactionManager | TransactionManager | Mandatory: The |
lobHandler | LobHandler | A org.springframework.jdbc.support.lob.LobHandler to handle Lob types in the database. Use this option to use a vendor specific LobHandler, for example when using Oracle. |
returnOldExchange | boolean | Whether the get operation should return the old existing Exchange if any existed. By default this option is false to optimize as we do not need the old exchange when aggregating. |
useRecovery | boolean | Whether or not recovery is enabled. This option is by default true . When enabled the Camel Aggregator automatic recover failed aggregated exchange and have them resubmitted. |
recoveryInterval | long | If recovery is enabled then a background task is run every x'th time to scan for failed exchanges to recover and resubmit. By default this interval is 5000 millis. |
maximumRedeliveries | int | Allows you to limit the maximum number of redelivery attempts for a recovered exchange. If enabled then the Exchange will be moved to the dead letter channel if all redelivery attempts failed. By default this option is disabled. If this option is used then the deadLetterUri option must also be provided. |
| String | An endpoint uri for a Dead Letter Channel where exhausted recovered Exchanges will be moved. If this option is used then the |
storeBodyAsText | boolean | Camel 2.11: Whether to store the message body as String which is human readable. By default this option is |
headersToStoreAsText | List<String> | Camel 2.11: Allows to store headers as String which is human readable. By default this option is disabled, storing the headers in binary format. |
jdbcOptimisticLockingExceptionMapper | jdbcOptimisticLockingExceptionMapper | Camel 2.12: Allows to plugin a custom |
Optimistic Locking
Optimistic locking is set to on by default. If two exchanges attempt to insert at the same time an exception will thrown, caught, converted to an OptimisticLockingException, and rethrown.
What is preserved when persisting
JdbcAggregationRepository
will only preserve any Serializable
compatible data types. If a data type is not such a type its dropped and a WARN
is logged. And it only persists the Message
body and the Message
headers. The Exchange
properties are not persisted.
From Camel 2.11 onwards you can store the message body and select(ed) headers as String in separate columns.
Recovery
The JdbcAggregationRepository
will by default recover any failed Exchange. It does this by having a background tasks that scans for failed Exchanges in the persistent store. You can use the checkInterval
option to set how often this task runs. The recovery works as transactional which ensures that Camel will try to recover and redeliver the failed Exchange. Any Exchange which was found to be recovered will be restored from the persistent store and resubmitted and send out again.
The following headers is set when an Exchange is being recovered/redelivered:
Header | Type | Description |
---|---|---|
Exchange.REDELIVERED | Boolean | Is set to true to indicate the Exchange is being redelivered. |
Exchange.REDELIVERY_COUNTER | Integer | The redelivery attempt, starting from 1. |
Only when an Exchange has been successfully processed it will be marked as complete which happens when the confirm
method is invoked on the AggregationRepository
. This means if the same Exchange fails again it will be kept retried until it success.
You can use option maximumRedeliveries
to limit the maximum number of redelivery attempts for a given recovered Exchange. You must also set the deadLetterUri
option so Camel knows where to send the Exchange when the maximumRedeliveries
was hit.
You can see some examples in the unit tests of camel-sql, for example this test.
Database
To be operational, each aggregator uses two table: the aggregation and completed one. By convention the completed has the same name as the aggregation one suffixed with "_COMPLETED"
. The name must be configured in the Spring bean with the RepositoryName
property. In the following example aggregation will be used.
The table structure definition of both table are identical: in both case a String value is used as key (id) whereas a Blob contains the exchange serialized in byte array.
However one difference should be remembered: the id field does not have the same content depending on the table.
In the aggregation table id holds the correlation Id used by the component to aggregate the messages. In the completed table, id holds the id of the exchange stored in corresponding the blob field.
Here is the SQL query used to create the tables, just replace "aggregation"
with your aggregator repository name.
CREATE TABLE aggregation ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_pk PRIMARY KEY (id) ); CREATE TABLE aggregation_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, constraint aggregation_completed_pk PRIMARY KEY (id) );
Storing body and headers as text
Available as of Camel 2.11
You can configure the JdbcAggregationRepository
to store message body and select(ed) headers as String in separate columns. For example to store the body, and the following two headers companyName
and accountName
use the following SQL:
CREATE TABLE aggregationRepo3 ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_pk PRIMARY KEY (id) ); CREATE TABLE aggregationRepo3_completed ( id varchar(255) NOT NULL, exchange blob NOT NULL, body varchar(1000), companyName varchar(1000), accountName varchar(1000), constraint aggregationRepo3_completed_pk PRIMARY KEY (id) );
And then configure the repository to enable this behavior as shown below:
<bean id="repo3" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="repositoryName" value="aggregationRepo3"/> <property name="transactionManager" ref="txManager3"/> <property name="dataSource" ref="dataSource3"/> <!-- configure to store the message body and following headers as text in the repo --> <property name="storeBodyAsText" value="true"/> <property name="headersToStoreAsText"> <list> <value>companyName</value> <value>accountName</value> </list> </property> </bean>
Codec (Serialization)
Since they can contain any type of payload, Exchanges are not serializable by design. It is converted into a byte array to be stored in a database BLOB field. All those conversions are handled by the JdbcCodec
class. One detail of the code requires your attention: the ClassLoadingAwareObjectInputStream
.
The ClassLoadingAwareObjectInputStream
has been reused from the Apache ActiveMQ project. It wraps an ObjectInputStream
and use it with the ContextClassLoader
rather than the currentThread
one. The benefit is to be able to load classes exposed by other bundles. This allows the exchange body and headers to have custom types object references.
Transaction
A Spring PlatformTransactionManager
is required to orchestrate transaction.
Service (Start/Stop)
The start
method verify the connection of the database and the presence of the required tables. If anything is wrong it will fail during starting.
Aggregator configuration
Depending on the targeted environment, the aggregator might need some configuration. As you already know, each aggregator should have its own repository (with the corresponding pair of table created in the database) and a data source. If the default lobHandler is not adapted to your database system, it can be injected with the lobHandler
property.
Here is the declaration for Oracle:
<bean id="lobHandler" class="org.springframework.jdbc.support.lob.OracleLobHandler"> <property name="nativeJdbcExtractor" ref="nativeJdbcExtractor"/> </bean> <bean id="nativeJdbcExtractor" class="org.springframework.jdbc.support.nativejdbc.CommonsDbcpNativeJdbcExtractor"/> <bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <!-- Only with Oracle, else use default --> <property name="lobHandler" ref="lobHandler"/> </bean>
Optimistic locking
From Camel 2.12 onwards you can turn on optimisticLocking
and use this JDBC based aggregation repository in a clustered environment where multiple Camel applications shared the same database for the aggregation repository. If there is a race condition there JDBC driver will throw a vendor specific exception which the JdbcAggregationRepository
can react upon. To know which caused exceptions from the JDBC driver is regarded as an optimistick locking error we need a mapper to do this. Therefore there is a org.apache.camel.processor.aggregate.jdbc.JdbcOptimisticLockingExceptionMapper
allows you to implement your custom logic if needed. There is a default implementation org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper
which works as follows:
The following check is done:
If the caused exception is an SQLException
then the SQLState is checked if starts with 23.
If the caused exception is a DataIntegrityViolationException
If the caused exception class name has "ConstraintViolation" in its name.
optional checking for FQN class name matches if any class names has been configured
You can in addition add FQN classnames, and if any of the caused exception (or any nested) equals any of the FQN class names, then its an optimistick locking error.
Here is an example, where we define 2 extra FQN class names from the JDBC vendor.
<bean id="repo" class="org.apache.camel.processor.aggregate.jdbc.JdbcAggregationRepository"> <property name="transactionManager" ref="transactionManager"/> <property name="repositoryName" value="aggregation"/> <property name="dataSource" ref="dataSource"/> <property name"jdbcOptimisticLockingExceptionMapper" ref="myExceptionMapper"/> </bean> <!-- use the default mapper with extra FQN class names from our JDBC driver --> <bean id="myExceptionMapper" class="org.apache.camel.processor.aggregate.jdbc.DefaultJdbcOptimisticLockingExceptionMapper"> <property name="classNames"> <util:set> <value>com.foo.sql.MyViolationExceptoion</value> <value>com.foo.sql.MyOtherViolationExceptoion</value> </util:set> </property> </bean>
See Also