PubNub Kafka Sink Connector
This guide shows how to deploy the PubNub Kafka Sink Connector. Use it to integrate Apache Kafka with the PubNub real-time messaging platform and stream events from Kafka topics to PubNub channels.
Choose a path that fits your needs: test with a preconfigured Docker Compose setup, or deploy to a production Kafka environment.
Stream events from PubNub to Kafka
You can also stream PubNub events to Kafka using Kafka Action.
Prerequisites
- I'm just testing
- Use for production
You need the following:
- Docker
- Access to PubNub's Admin Portal
You need the following:
- Apache Kafka and Kafka Connect
- Maven 3.8.6+
- Java 11+
- Access to PubNub's Admin Portal
Steps
Follow the steps for test or production environments.
- I'm just testing
- Use for production
-
Clone the
pubnub-kafka-sink-connector
repository.git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
-
Change to the repository directory.
cd pubnub-kafka-sink-connector
-
Log in to the Admin Portal and get your development publish and subscribe keys from your app's keyset.
-
Edit the configuration options in
examples/pubnub-sink-connector.json
. Add your keys underpublish_key
andsubscribe_key
. -
Use Docker Compose to build Kafka and Kafka Connect images.
docker compose up
-
Deploy the connector to the Kafka Connect cluster.
curl -X POST \
-d @examples/pubnub-sink-connector.json \
-H "Content-Type:application/json" \
http://localhost:8083/connectors -
Verify the connector.
The sample producer in the repository generates test messages (for example,
{"timestamp":1705689453}
) every few seconds.Use the Debug Console to confirm messages are published on the predefined PubNub channels. Provide your publish and subscribe keys, channel
pubnub
, and a user ID. -
When done, undeploy the connector.
curl -X DELETE \
http://localhost:8083/connectors/pubnub-sink-connector -
Stop the Kafka and Kafka Connect containers.
docker compose down
-
Clone the
pubnub-kafka-sink-connector
repository.git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
-
Change to the repository directory.
cd pubnub-kafka-sink-connector
-
Log in to the Admin Portal and get your production publish and subscribe keys from your app's keyset.
-
Edit the configuration options in
examples/pubnub-sink-connector-test.json
. Add your production details, such as Kafka topics and PubNub API keys. -
Compile the connector locally.
In the repository root, build a JAR file:
mvn clean package
After the build,
pubnub-kafka-connector-1.x.jar
is created intarget
. -
Add the packaged connector as a Kafka Connect plugin.
Copy the JAR to the Kafka Connect plugins directory and configure the plugin. See the Kafka Connect guide for deployment steps.
-
Fill in your Kafka Connect host address and deploy the connector.
curl -X POST \
-d @examples/pubnub-sink-connector.json \
-H "Content-Type:application/json" \
http://your_kafka_connect_host:8083/connectorsNew events sent to the Kafka topics are copied to the PubNub channels defined in the configuration.
Configuration options
The configuration in examples/pubnub-sink-connector-test.json
defines how the connector runs.
Here's each parameter with a short description and whether it is required.
{
"name": "pubnub-sink-connector",
"config": {
"topics": "pubnub,pubnub1,pubnub2",
"topics.regex": "",
"pubnub.user_id": "myUserId123",
"pubnub.publish_key": "demo",
"pubnub.subscribe_key": "demo",
"pubnub.secret_key": "demo",
"connector.class": "com.pubnub.kafka.connect.PubNubKafkaSinkConnector",
"tasks.max": "3",
"value.deserializer": "custom.class.serialization.JsonDeserializer",
"value.serializer": "custom.class.serialization.JsonSerializer",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my_dlq_topic",
show all 18 linesParameter | Description |
---|---|
name * | Unique name for the connector instance. |
topics * | Comma-separated list of Kafka topics to consume. The Kafka topic name matches the PubNub channel name. At least one of topics or topics.regex is required. |
topics.regex * | Java regular expression for matching Kafka topics to consume (for example, "topic.*" ). The Kafka topic name matches the PubNub channel name. At least one of topics or topics.regex is required. |
pubnub.user_id * | UTF-8 encoded string (max 92 chars) that identifies a client (end user, device, or server) that connects to PubNub. |
pubnub.publish_key * | PubNub publish key used to authenticate publish requests. |
pubnub.subscribe_key * | PubNub subscribe key, part of standard key configuration. Needed even if subscribing isn't the primary function. |
pubnub.secret_key | PubNub secret key for secure server-to-server communication. Not required for testing. |
pubnub.router | Router class that processes messages flowing through the connector, allowing custom channel IDs and payload overrides. See pubnub.router . |
connector.class * | Java class of the connector implementation. Use the default value. |
tasks.max * | Maximum number of tasks the connector can create (controls parallelism). |
value.deserializer * | Class used to deserialize Kafka message values. Use the default value if applicable. |
value.serializer * | Class used to serialize messages before publishing to PubNub. Use the default value if applicable. |
errors.tolerance | Determines how the connector handles errors. Set to "all" to skip problematic records and continue processing. Requires a dead letter queue on your Kafka Connect cluster. See Confluent docs. |
errors.deadletterqueue.topic.name | Name of the topic for routing messages that fail to process. Requires a dead letter queue. See Confluent docs. |
errors.deadletterqueue.topic.replication.factor | Number of replicas for dead letter records. Requires a dead letter queue. See Confluent docs. |
Topic configuration options
You must specify at least one of topics
or topics.regex
so the connector knows which Kafka topics to consume.
pubnub.router
The class set for pubnub.router
must:
- Implement the
com.pubnub.kafka.connect.PubNubKafkaRouter
interface and have a public no‑args constructor.
public interface PubNubKafkaRouter {
ChannelAndMessage route(SinkRecord record);
class ChannelAndMessage {
String channel;
Object message;
public ChannelAndMessage(String channel, Object message) {
this.channel = channel;
this.message = message;
}
}
}