PubNub Kafka Sink Connector

This guide shows how to deploy the PubNub Kafka Sink Connector. Use it to integrate Apache Kafka with the PubNub real-time messaging platform and stream events from Kafka topics to PubNub channels.

Choose a path that fits your needs: test with a preconfigured Docker Compose setup, or deploy to a production Kafka environment.

Stream events from PubNub to Kafka

You can also stream PubNub events to Kafka using Kafka Action.

Prerequisites

You need the following:

Steps

Follow the steps for test or production environments.

  1. Clone the pubnub-kafka-sink-connector repository.

    git clone git@github.com:pubnub/pubnub-kafka-sink-connector.git
  2. Change to the repository directory.

    cd pubnub-kafka-sink-connector
  3. Log in to the Admin Portal and get your development publish and subscribe keys from your app's keyset.

  4. Edit the configuration options in examples/pubnub-sink-connector.json. Add your keys under publish_key and subscribe_key.

  5. Use Docker Compose to build Kafka and Kafka Connect images.

    docker compose up
  6. Deploy the connector to the Kafka Connect cluster.

    curl -X POST \
    -d @examples/pubnub-sink-connector.json \
    -H "Content-Type:application/json" \
    http://localhost:8083/connectors
  7. Verify the connector.

    The sample producer in the repository generates test messages (for example, {"timestamp":1705689453}) every few seconds.

    Use the Debug Console to confirm messages are published on the predefined PubNub channels. Provide your publish and subscribe keys, channel pubnub, and a user ID.

  8. When done, undeploy the connector.

    curl -X DELETE \
    http://localhost:8083/connectors/pubnub-sink-connector
  9. Stop the Kafka and Kafka Connect containers.

    docker compose down

Configuration options

The configuration in examples/pubnub-sink-connector-test.json defines how the connector runs.

Here's each parameter with a short description and whether it is required.

{
"name": "pubnub-sink-connector",
"config": {
"topics": "pubnub,pubnub1,pubnub2",
"topics.regex": "",
"pubnub.user_id": "myUserId123",
"pubnub.publish_key": "demo",
"pubnub.subscribe_key": "demo",
"pubnub.secret_key": "demo",
"connector.class": "com.pubnub.kafka.connect.PubNubKafkaSinkConnector",
"tasks.max": "3",
"value.deserializer": "custom.class.serialization.JsonDeserializer",
"value.serializer": "custom.class.serialization.JsonSerializer",
"errors.tolerance": "all",
"errors.deadletterqueue.topic.name": "my_dlq_topic",
show all 18 lines
* required
ParameterDescription
name *
Unique name for the connector instance.
topics *
Comma-separated list of Kafka topics to consume. The Kafka topic name matches the PubNub channel name. At least one of topics or topics.regex is required.
topics.regex *
Java regular expression for matching Kafka topics to consume (for example, "topic.*"). The Kafka topic name matches the PubNub channel name. At least one of topics or topics.regex is required.
pubnub.user_id *
UTF-8 encoded string (max 92 chars) that identifies a client (end user, device, or server) that connects to PubNub.
pubnub.publish_key *
PubNub publish key used to authenticate publish requests.
pubnub.subscribe_key *
PubNub subscribe key, part of standard key configuration. Needed even if subscribing isn't the primary function.
pubnub.secret_key
PubNub secret key for secure server-to-server communication. Not required for testing.
pubnub.router
Router class that processes messages flowing through the connector, allowing custom channel IDs and payload overrides. See pubnub.router.
connector.class *
Java class of the connector implementation. Use the default value.
tasks.max *
Maximum number of tasks the connector can create (controls parallelism).
value.deserializer *
Class used to deserialize Kafka message values. Use the default value if applicable.
value.serializer *
Class used to serialize messages before publishing to PubNub. Use the default value if applicable.
errors.tolerance
Determines how the connector handles errors. Set to "all" to skip problematic records and continue processing. Requires a dead letter queue on your Kafka Connect cluster. See Confluent docs.
errors.deadletterqueue.topic.name
Name of the topic for routing messages that fail to process. Requires a dead letter queue. See Confluent docs.
errors.deadletterqueue.topic.replication.factor
Number of replicas for dead letter records. Requires a dead letter queue. See Confluent docs.
Topic configuration options

You must specify at least one of topics or topics.regex so the connector knows which Kafka topics to consume.

pubnub.router

The class set for pubnub.router must:

  • Implement the com.pubnub.kafka.connect.PubNubKafkaRouter interface and have a public no‑args constructor.
public interface PubNubKafkaRouter {
ChannelAndMessage route(SinkRecord record);

class ChannelAndMessage {
String channel;
Object message;

public ChannelAndMessage(String channel, Object message) {
this.channel = channel;
this.message = message;
}
}
}
Last updated on