Basic Configuration
In this page we list the Camel Kafka Configuration which are not part of the camel-catalog material and are not part of the kafka connect framework. For the specific connector configuration you can have a look at the single documentation pages.
For a Sink connector the basic options are:
Name | Description | Default | Priority |
---|---|---|---|
camel.sink.marshal | The camel dataformat name to use to marshal data to the destination | null | HIGH |
camel.sink.unmarshal | The camel dataformat name to use to unmarshal data from the topic | null | HIGH |
camel.sink.contentLogLevel | Log level for the record’s content. Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF. | OFF | HIGH |
camel.beans.aggregate | A reference to an aggregate bean, in the form of #class: | null | MEDIUM |
camel.aggregation.size | The size of the aggregation, to be used in combination with camel.beans.aggregate | 10 | MEDIUM |
camel.aggregation.timeout | The timeout of the aggregation, to be used in combination with camel.beans.aggregate | 500L | MEDIUM |
camel.error.handler | The error handler to use: possible value are 'no' or 'default' | default | MEDIUM |
camel.error.handler.max.redeliveries | The maximum redeliveries to be use in case of Default Error Handler | 0 | MEDIUM |
camel.error.handler.redelivery.delay | The initial redelivery delay in milliseconds in case of Default Error Handler | 1000L | MEDIUM |
camel.remove.headers.pattern | The pattern of the headers we want to exclude from the exchange | null | MEDIUM |
camel.idempotency.enabled | If idempotency must be enabled or not | false | LOW |
camel.idempotency.repository.type | The idempotent repository type to use, possible values are memory and kafka | memory | LOW |
camel.idempotency.expression.type | How the idempotency will be evaluated: possible values are body and header | body | LOW |
camel.idempotency.expression.header | The header name that will be evaluated in case of camel.idempotency.expression.type equals to header | null | LOW |
camel.idempotency.memory.dimension | The Memory dimension of the in memory idempotent Repository | 100 | LOW |
camel.idempotency.kafka.topic | The Kafka topic name to use for the idempotent repository | kafka_idempotent_repository | LOW |
camel.idempotency.kafka.bootstrap.servers | A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live | localhost:9092 | LOW |
camel.idempotency.kafka.max.cache.size | Sets the maximum size of the local key cache | 1000 | LOW |
camel.idempotency.kafka.poll.duration.ms | Sets the poll duration (in milliseconds) of the Kafka consumer | 100 | LOW |
For a Source connector the basic options are:
Name | Description | Default | Priority |
---|---|---|---|
camel.source.marshal | The camel dataformat name to use to marshal data to the destination | null | HIGH |
camel.source.unmarshal | The camel dataformat name to use to unmarshal data from the topic | null | HIGH |
camel.source.contentLogLevel | Log level for the record’s content. Valid values: TRACE, DEBUG, INFO, WARN, ERROR, OFF. | OFF | HIGH |
camel.source.maxBatchPollSize | The max number of messages retrieved in a single poll() | 1000L | MEDIUM |
camel.source.maxPollDuration | The maximum time in milliseconds spent in a single call to poll() | 1000L | MEDIUM |
camel.source.pollingConsumerQueueSize | The queue size for the internal hand-off queue between the polling consumer, and producers sending data into the queue. | 1000L | MEDIUM |
camel.source.pollingConsumerBlockTimeout | To use a timeout (in milliseconds) when the producer is blocked if the internal queue is full. If the value is 0 or negative then no timeout is in use. | 0L | MEDIUM |
camel.source.pollingConsumerBlockWhenFull | Whether to block any producer if the internal queue is full. | true | MEDIUM |
camel.source.camelMessageHeaderKey | The name of a camel message header containing an unique key that can be used as a Kafka message key. If this is not specified, then the Kafka message will not have a key. | null | MEDIUM |
camel.beans.aggregate | A reference to an aggregate bean, in the form of #class: | null | MEDIUM |
camel.aggregation.size | The size of the aggregation, to be used in combination with camel.beans.aggregate | 10 | MEDIUM |
camel.aggregation.timeout | The timeout of the aggregation, to be used in combination with camel.beans.aggregate | 500L | MEDIUM |
camel.error.handler | The error handler to use: possible value are 'no' or 'default' | default | MEDIUM |
camel.error.handler.max.redeliveries | The maximum redeliveries to be use in case of Default Error Handler | 0 | MEDIUM |
camel.error.handler.redelivery.delay | The initial redelivery delay in milliseconds in case of Default Error Handler | 1000L | MEDIUM |
camel.remove.headers.pattern | The pattern of the headers we want to exclude from the exchange | null | MEDIUM |
camel.idempotency.enabled | If idempotency must be enabled or not | false | LOW |
camel.idempotency.repository.type | The idempotent repository type to use, possible values are memory and kafka | memory | LOW |
camel.idempotency.expression.type | How the idempotency will be evaluated: possible values are body and header | body | LOW |
camel.idempotency.expression.header | The header name that will be evaluated in case of camel.idempotency.expression.type equals to header | null | LOW |
camel.idempotency.memory.dimension | The Memory dimension of the in memory idempotent Repository | 100 | LOW |
camel.idempotency.kafka.topic | The Kafka topic name to use for the idempotent repository | kafka_idempotent_repository | LOW |
camel.idempotency.kafka.bootstrap.servers | A comma-separated list of host and port pairs that are the addresses of the Kafka brokers where the idempotent repository should live | localhost:9092 | LOW |
camel.idempotency.kafka.max.cache.size | Sets the maximum size of the local key cache | 1000 | LOW |
camel.idempotency.kafka.poll.duration.ms | Sets the poll duration (in milliseconds) of the Kafka consumer | 100 | LOW |
For more options related to single connector you can have a look at Connectors list.
Message Level Headers
Certain sink components offer additional configuration and adjustment of behavior by setting specific headers on the message sent via Kafka. For example the file sink connector allows the user to set the file name either via the configuration property or via the header CamelHeader.CamelFileName
. This is usually noted on the connector documentation, but as a rule of thumb, users are advised to also check the Camel components documentation to verify what message headers are available. If they are available, to configure them directly from your Kafka producer, you can append them with the string CamelHeader.
(i.e.: CamelHeader.CamelFileName
, CamelHeader.CamelFilePath
, etc) and include them on the message headers from Kafka.