Pulsar Sink
Provided by: "Apache Software Foundation"
Support Level for this Kamelet is: "Stable"
Send documents to Pulsar.
Configuration Options
The following table summarizes the configuration options available for the pulsar-sink
Kamelet:
Property | Name | Description | Type | Default | Example |
---|---|---|---|---|---|
Pulsar Namespace Name | Required The Pulsar Namespace Name. | string | |||
Service URL | Required The Pulsar Service URL to point while creating the client from URI. | string | |||
Tenant Name | Required The Tenant Name. | string | |||
Topic Name | Required The topic name or regexp. | string | |||
Topic Type | Required The topic type. Enum values: * persistent * non-persistent | string | |||
Authentication Class | The Authentication FQCN to be used while creating the client from URI. | string | |||
Authentication Params | The Authentication Parameters to be used while creating the client from URI. | string | |||
Enable Batching | Control whether automatic batching of messages is enabled for the producer. | boolean | true | ||
Batching Maximum Messages | The maximum size to batch messages. | integer | 1000 | ||
Batching Maximum Publish Delay in Microsecond | The maximum time period within which the messages sent will be batched if batchingEnabled is true. | integer | 1000 | ||
Block If Queue Full | Whether to block the producing thread if pending messages queue is full or to throw a ProducerQueueIsFullError. | boolean | false | ||
Compression Type | Compression type to use. Enum values: * NONE * LZ4 * ZLIB * ZSTD * SNAPPY | string | NONE | ||
Initial SequenceId | The first message published will have a sequence Id of initialSequenceId 1. | integer | -1 | ||
Number Of Consumer Threads | Whether the producer should be started lazy (on the first message). By starting lazy you can use this to allow CamelContext and routes to startup in situations where a producer may otherwise fail during starting and cause the route to fail being started. By deferring this startup to be lazy then the startup failure can be handled during routing messages via Camel’s routing error handlers. Beware that when the first message is processed then creating and starting the producer may take a little time and prolong the total processing time of the processing. | boolean | false | ||
Maximum Pending Messages | Size of the pending massages queue. When the queue is full, by default, any further sends will fail unless blockIfQueueFull=true. | integer | 1000 | ||
Maximum Pending Messages Across Partitions | The maximum number of pending messages for partitioned topics. The maxPendingMessages value will be reduced if (number of partitions maxPendingMessages) exceeds this value. Partitioned topics have a pending message queue for each partition. | integer | 50000 | ||
Message Routing Mode | Message Routing Mode to use. Enum values: * SinglePartition * RoundRobinPartition * CustomPartition | string | RoundRobinPartition | ||
Producer Name | Name of the producer. If unset, lets Pulsar select a unique identifier. | string | |||
Send Timeout in Milliseconds | Send timeout in milliseconds. | integer | 30000 |
Dependencies
At runtime, the pulsar-sink
Kamelet relies upon the presence of the following dependencies:
-
camel:pulsar
-
camel:kamelet
-
camel:core
Camel JBang usage
Prerequisites
-
You’ve installed JBang.
-
You have executed the following command:
jbang app install camel@apache/camel
Supposing you have a file named route.yaml with this content:
- route:
from:
uri: "kamelet:timer-source"
parameters:
period: 10000
message: 'test'
steps:
- to:
uri: "kamelet:log-sink"
You can now run it directly through the following command
camel run route.yaml
Camel K Environment Usage
This section describes how you can use the pulsar-sink
.
Knative sink
You can use the pulsar-sink
Kamelet as a Knative sink by binding it to a Knative object.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: pulsar-sink-pipe
spec:
source:
ref:
kind: Channel
apiVersion: messaging.knative.dev/v1
name: mychannel
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: pulsar-sink
properties:
namespaceName: The Pulsar Namespace Name
serviceUrl: The Service URL
tenant: The Tenant Name
topic: The Topic Name
topicType: The Topic Type
Prerequisite
You have Camel K installed on the cluster.
Procedure for using the cluster CLI
-
Save the
pulsar-sink-pipe.yaml
file to your local drive, and then edit it as needed for your configuration. -
Run the sink by using the following command:
kubectl apply -f pulsar-sink-pipe.yaml
Procedure for using the Kamel CLI
Configure and run the sink by using the following command:
kamel bind channel:mychannel -p "sink.namespaceName=The Pulsar Namespace Name" -p "sink.serviceUrl=The Service URL" -p "sink.tenant=The Tenant Name" -p "sink.topic=The Topic Name" -p "sink.topicType=The Topic Type" pulsar-sink
This command creates the Kamelet Pipe in the current namespace on the cluster.
Kafka sink
You can use the pulsar-sink
Kamelet as a Kafka sink by binding it to a Kafka topic.
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: pulsar-sink-pipe
spec:
source:
ref:
kind: KafkaTopic
apiVersion: kafka.strimzi.io/v1beta1
name: my-topic
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: pulsar-sink
properties:
namespaceName: The Pulsar Namespace Name
serviceUrl: The Service URL
tenant: The Tenant Name
topic: The Topic Name
topicType: The Topic Type
Prerequisites
-
You’ve installed Strimzi.
-
You’ve created a topic named
my-topic
in the current namespace. -
You have Camel K installed on the cluster.
Procedure for using the cluster CLI
-
Save the
pulsar-sink-pipe.yaml
file to your local drive, and then edit it as needed for your configuration. -
Run the sink by using the following command:
kubectl apply -f pulsar-sink-pipe.yaml
Procedure for using the Kamel CLI
Configure and run the sink by using the following command:
kamel bind kafka.strimzi.io/v1beta1:KafkaTopic:my-topic -p "sink.namespaceName=The Pulsar Namespace Name" -p "sink.serviceUrl=The Service URL" -p "sink.tenant=The Tenant Name" -p "sink.topic=The Topic Name" -p "sink.topicType=The Topic Type" pulsar-sink
This command creates the Kamelet Pipe in the current namespace on the cluster.