PubNub Kafka Sink Connector
This guide shows how to deploy the PubNub Kafka Sink Connector. Use it to integrate Apache Kafka with the PubNub real-time messaging platform and stream events from Kafka topics to PubNub channels.
Choose a path that fits your needs: test with a preconfigured Docker Compose setup, or deploy to a production Kafka environment.
Stream events from PubNub to Kafka
You can also stream PubNub events to Kafka using Kafka Action.
Prerequisites
- I'm just testing
- Use for production
You need the following:
- Docker
- Access to PubNub's Admin Portal
You need the following:
- Apache Kafka and Kafka Connect
- Maven 3.8.6+
- Java 11+
- Access to PubNub's Admin Portal
Steps
Follow the steps for test or production environments.
- I'm just testing
- Use for production
-
Clone the
pubnub-kafka-sink-connector
repository.1git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
-
Change to the repository directory.
1cd pubnub-kafka-sink-connector
-
Log in to the Admin Portal and get your development publish and subscribe keys from your app's keyset.
-
Edit the configuration options in
examples/pubnub-sink-connector.json
. Add your keys underpublish_key
andsubscribe_key
. -
Use Docker Compose to build Kafka and Kafka Connect images.
1docker compose up
-
Deploy the connector to the Kafka Connect cluster.
1curl -X POST \
2 -d @examples/pubnub-sink-connector.json \
3 -H "Content-Type:application/json" \
4 http://localhost:8083/connectors -
Verify the connector.
The sample producer in the repository generates test messages (for example,
{"timestamp":1705689453}
) every few seconds.Use the Debug Console to confirm messages are published on the predefined PubNub channels. Provide your publish and subscribe keys, channel
pubnub
, and a user ID. -
When done, undeploy the connector.
1curl -X DELETE \
2 http://localhost:8083/connectors/pubnub-sink-connector -
Stop the Kafka and Kafka Connect containers.
1docker compose down
-
Clone the
pubnub-kafka-sink-connector
repository.1git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
-
Change to the repository directory.
1cd pubnub-kafka-sink-connector
-
Log in to the Admin Portal and get your production publish and subscribe keys from your app's keyset.
-
Edit the configuration options in
examples/pubnub-sink-connector-test.json
. Add your production details, such as Kafka topics and PubNub API keys. -
Compile the connector locally.
In the repository root, build a JAR file:
1mvn clean package
After the build,
pubnub-kafka-connector-1.x.jar
is created intarget
. -
Add the packaged connector as a Kafka Connect plugin.
Copy the JAR to the Kafka Connect plugins directory and configure the plugin. See the Kafka Connect guide for deployment steps.
-
Fill in your Kafka Connect host address and deploy the connector.
1curl -X POST \
2 -d @examples/pubnub-sink-connector.json \
3 -H "Content-Type:application/json" \
4 http://your_kafka_connect_host:8083/connectorsNew events sent to the Kafka topics are copied to the PubNub channels defined in the configuration.
Configuration options
The configuration in examples/pubnub-sink-connector-test.json
defines how the connector runs.
Here's each parameter with a short description and whether it is required.
1{
2 "name": "pubnub-sink-connector",
3 "config": {
4 "topics": "pubnub,pubnub1,pubnub2",
5 "topics.regex": "",
6 "pubnub.user_id": "myUserId123",
7 "pubnub.publish_key": "demo",
8 "pubnub.subscribe_key": "demo",
9 "pubnub.secret_key": "demo",
10 "connector.class": "com.pubnub.kafka.connect.PubNubKafkaSinkConnector",
11 "tasks.max": "3",
12 "value.deserializer": "custom.class.serialization.JsonDeserializer",
13 "value.serializer": "custom.class.serialization.JsonSerializer",
14 "errors.tolerance": "all",
15 "errors.deadletterqueue.topic.name": "my_dlq_topic",
show all 18 linesParameter | Description |
---|---|
name * | Unique name for the connector instance. |
topics * | Comma-separated list of Kafka topics to consume. The Kafka topic name matches the PubNub channel name. At least one of topics or topics.regex is required. |
topics.regex * | Java regular expression for matching Kafka topics to consume (for example, "topic.*" ). The Kafka topic name matches the PubNub channel name. At least one of topics or topics.regex is required. |
pubnub.user_id * | UTF-8 encoded string (max 92 chars) that identifies a client (end user, device, or server) that connects to PubNub. |
pubnub.publish_key * | PubNub publish key used to authenticate publish requests. |
pubnub.subscribe_key * | PubNub subscribe key, part of standard key configuration. Needed even if subscribing isn't the primary function. |
pubnub.secret_key | PubNub secret key for secure server-to-server communication. Not required for testing. |
pubnub.router | Router class that processes messages flowing through the connector, allowing custom channel IDs and payload overrides. See pubnub.router . |
connector.class * | Java class of the connector implementation. Use the default value. |
tasks.max * | Maximum number of tasks the connector can create (controls parallelism). |
value.deserializer * | Class used to deserialize Kafka message values. Use the default value if applicable. |
value.serializer * | Class used to serialize messages before publishing to PubNub. Use the default value if applicable. |
errors.tolerance | Determines how the connector handles errors. Set to "all" to skip problematic records and continue processing. Requires a dead letter queue on your Kafka Connect cluster. See Confluent docs. |
errors.deadletterqueue.topic.name | Name of the topic for routing messages that fail to process. Requires a dead letter queue. See Confluent docs. |
errors.deadletterqueue.topic.replication.factor | Number of replicas for dead letter records. Requires a dead letter queue. See Confluent docs. |
Topic configuration options
You must specify at least one of topics
or topics.regex
so the connector knows which Kafka topics to consume.
pubnub.router
The class set for pubnub.router
must:
- Implement the
com.pubnub.kafka.connect.PubNubKafkaRouter
interface and have a public no‑args constructor.
1public interface PubNubKafkaRouter {
2 ChannelAndMessage route(SinkRecord record);
3
4 class ChannelAndMessage {
5 String channel;
6 Object message;
7
8 public ChannelAndMessage(String channel, Object message) {
9 this.channel = channel;
10 this.message = message;
11 }
12 }
13}