Try it out on OpenShift with Strimzi

You can use the Camel Kafka connectors also on Kubernetes and OpenShift with the Strimzi project. Strimzi provides a set of operators and container images for running Kafka on Kubernetes and OpenShift. The following example shows how to run it with Camel Kafka connectors on OpenShift.

Deploy Kafka and Kafka Connect

First we install the Strimzi operator and use it to deploy the Kafka broker and Kafka Connect into our OpenShift project. We need to create security objects as part of installation so it is necessary to switch to admin user. If you use Minishift, you can do it with the following command:

oc login -u system:admin

We will use OpenShift project myproject. If it doesn’t exist yet, you can create it using following command:

oc new-project myproject

If the project already exists, you can switch to it with:

oc project myproject

We can now install the Strimzi operator into this project:

oc apply -f https://github.com/strimzi/strimzi-kafka-operator/releases/download/0.19.0/strimzi-cluster-operator-0.19.0.yaml

Next we will deploy a Kafka broker cluster and a Kafka Connect cluster and then create a Kafka Connect image with the Debezium connectors installed:

# Deploy a single node Kafka broker
oc apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.19.0/examples/kafka/kafka-persistent-single.yaml

# Deploy a single instance of Kafka Connect with no plug-in installed
oc apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/0.19.0/examples/connect/kafka-connect-s2i-single-node-kafka.yaml

Optionally enable the possibility to instantiate Kafka Connectors through specific custom resource:

oc annotate kafkaconnects2is my-connect-cluster strimzi.io/use-connector-resources=true

Add Camel Kafka connector binaries

Strimzi uses Source2Image builds to allow users to add their own connectors to the existing Strimzi Docker images. We now need to build the connectors and add them to the image, if you have built the whole project (mvn clean package) decompress the connectors you need in a folder (i.e. like my-connectors/) so that each one is in its own subfolder (alternatively you can download the latest officially released and packaged connectors from maven):

oc start-build my-connect-cluster-connect --from-dir=./my-connectors/ --follow

We should now wait for the rollout of the new image to finish and the replica set with the new connector to become ready. Once it is done, we can check that the connectors are available in our Kafka Connect cluster. Strimzi is running Kafka Connect in a distributed mode. We can use HTTP to interact with it. To check the available connector plugins, you can run the following command:

oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connector-plugins

You should see something like this:

[{"class":"org.apache.camel.kafkaconnector.CamelSinkConnector","type":"sink","version":"0.0.1-SNAPSHOT"},{"class":"org.apache.camel.kafkaconnector.CamelSourceConnector","type":"source","version":"0.0.1-SNAPSHOT"},{"class":"org.apache.kafka.connect.file.FileStreamSinkConnector","type":"sink","version":"2.3.0"},{"class":"org.apache.kafka.connect.file.FileStreamSourceConnector","type":"source","version":"2.3.0"}]

Create connector instance

Now we can create some instance of a connector plugin - got example of the S3 connector:

oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -X POST \
    -H "Accept:application/json" \
    -H "Content-Type:application/json" \
    http://my-connect-cluster-connect-api:8083/connectors -d @- <<'EOF'
{
  "name": "s3-connector",
  "config": {
    "connector.class": "org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter",
    "topics": "s3-topic",
    "camel.source.path.bucketNameOrArn": "camel-connector-test",
    "camel.source.endpoint.autocloseBody": false,
    "camel.source.maxPollDuration": 10000,
    "camel.component.aws-s3.accessKey": "xxx",
    "camel.component.aws-s3.secretKey": "xxx",
    "camel.component.aws-s3.region": "xxx"
  }
}
EOF

Alternatively, if you have enabled use-connector-resources, you can create the connector instance by creating a specific custom resource:

oc apply -f - << EOF
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-source-connector
  namespace: atarocch-ckc
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
    topics: s3-topic
    camel.source.path.bucketNameOrArn: camel-connector-test
    camel.source.endpoint.autocloseBody: false
    camel.source.maxPollDuration: 10000
    camel.component.aws-s3.accessKey: xxx
    camel.component.aws-s3.secretKey: xxx
    camel.component.aws-s3.region: xxx
EOF

You can check the status of the connector using

oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- curl -s http://my-connect-cluster-connect-api:8083/connectors/s3-connector/status

Check received messages

You can also run the Kafka console consumer to see the messages received from the topic:

oc exec -i `oc get pods --field-selector status.phase=Running -l strimzi.io/name=my-connect-cluster-connect -o=jsonpath='{.items[0].metadata.name}'` -- bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic s3-topic --from-beginning