Try it out on Kubernetes
You can try CamelKafkaConnector using the Strimzi Operator, which simplifies Kafka cluster deployment and management on top of plain Kubernetes. This procedure assumes that you have cluster-admin
rights, Internet access and an external registry for pushing images (i.e. quay.io).
Deploy Kafka and KafkaConnect
First, we create a new namespace and deploy a 3-nodes Kafka cluster:
NAMESPACE="kafka"
STRIMZI="0.20.x"
# create a new namespace
kubectl create namespace $NAMESPACE && kubectl config set-context --current --namespace=$NAMESPACE
# deploy Strimzi operator
curl -L https://github.com/strimzi/strimzi-kafka-operator/releases/download/$STRIMZI/strimzi-cluster-operator-$STRIMZI.yaml \
| sed "s/namespace: .*/namespace: $NAMESPACE/g" | kubectl apply -f -
# deploy Kafka cluster
kubectl apply -f https://raw.githubusercontent.com/strimzi/strimzi-kafka-operator/release-$STRIMZI/examples/kafka/kafka-persistent.yaml
Next, we build a custom KafkaConnect image to include all needed connectors (use your own registry here):
CKC_VERSION="0.7.0"
STRIMZI_IMG="strimzi/kafka:latest-kafka-2.6.0"
REGISTRY_URL="quay.io"
REGISTRY_USR="fvaleri"
TMP="/tmp/my-connect"
BASEURL="https://repo1.maven.org/maven2/org/apache/camel/kafkaconnector"
PLUGINS=(
"$BASEURL/camel-file-kafka-connector/$CKC_VERSION/camel-file-kafka-connector-$CKC_VERSION-package.zip"
"$BASEURL/camel-sjms2-kafka-connector/$CKC_VERSION/camel-sjms2-kafka-connector-$CKC_VERSION-package.zip"
)
# download connect plugins
rm -rf $TMP && mkdir -p $TMP/plugins
for url in "${PLUGINS[@]}"; do
curl -sL $url -o $TMP/plugins/file.zip && unzip -qq $TMP/plugins/file.zip -d $TMP/plugins
rm -f $TMP/plugins/file.zip
done
# build and push the custom image
echo -e "FROM $STRIMZI_IMG\nCOPY ./plugins/ /opt/kafka/plugins/\nUSER 1001" > $TMP/Dockerfile
sudo podman build --layers=false -t $REGISTRY_USR/my-connect:1.0.0 -f $TMP/Dockerfile
sudo podman login -u $REGISTRY_USR $REGISTRY_URL
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:1.0.0
sudo podman push localhost/$REGISTRY_USR/my-connect:1.0.0 $REGISTRY_URL/$REGISTRY_USR/my-connect:latest
Note: your plugin must be under a directory in the plugins (i.e /opt/kafka/plugins/myplugin/<my-plugin-jars-here>
).
Finally, we deploy the KafkaConnect single-node cluster using our custom image:
kubectl apply -f - <<'EOF'
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
name: my-connect
annotations:
# enable connect operator
strimzi.io/use-connector-resources: "true"
spec:
replicas: 1
version: 2.6.0
image: $REGISTRY_URL/$REGISTRY_USR/my-connect:1.0.0
bootstrapServers: my-cluster-kafka-bootstrap:9092
resources:
requests:
cpu: 250m
memory: 512Mi
limits:
cpu: 500m
memory: 1Gi
jvmOptions:
gcLoggingEnabled: false
config:
group.id: my-connect
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
offset.storage.topic: my-connect-offsets
config.storage.topic: my-connect-configs
status.storage.topic: my-connect-status
config.storage.replication.factor: 3
offset.storage.replication.factor: 3
status.storage.replication.factor: 3
EOF
Create KafkaConnector instance
A soon as the infrastructure is running, we can create an instance of a connector plugin:
kubectl apply -f - <<'EOF'
kind: KafkaConnector
apiVersion: kafka.strimzi.io/v1alpha1
metadata:
name: file-sink
labels:
# must match connect cluster name
strimzi.io/cluster: my-connect
spec:
tasksMax: 1
class: org.apache.camel.kafkaconnector.file.CamelFileSinkConnector
config:
topics: my-topic
camel.sink.url: file:/tmp/?fileName=test.txt&fileExist=Append
EOF
You can check the status of the connector instance using:
kubectl describe kafkaconnector file-sink
Check received messages
To test the connector instance, we can send a message to the topic and see if it is written to file:
# send a message to Kafka
echo "Hello CamelKafkaConnector" | kubectl exec -i my-cluster-kafka-0 -c kafka -- \
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-topic
# read the message from file
POD_NAME=$(kubectl get pods | grep my-connect | grep Running | cut -d " " -f1) && \
kubectl exec -i $POD_NAME -- cat /tmp/test.txt