PubNub Python-Asyncio SDK 7.4.2

Get Code: pip

The simplest way to get started is to install PubNub Python SDK via pip:

pip install pubnub

Get Code: git

git clone https://github.com/pubnub/python

Get Code: Source

Hello World

Add PubNub to your project using one of the procedures defined under How to Get It.

User ID

Always set the user_id to uniquely identify the user or device that connects to PubNub. This user_id should be persisted, and should remain unchanged for the lifetime of the user or the device. If you don't set the user_id, you won't be able to connect to PubNub.

import asyncio
from pubnub.callbacks import SubscribeCallback
from pubnub.enums import PNStatusCategory
from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_asyncio import PubNubAsyncio

pnconfig = PNConfiguration()
pnconfig.subscribe_key = 'demo'
pnconfig.publish_key = 'demo'
pnconfig.uuid = "my_custom_uuid"

pubnub = PubNubAsyncio(pnconfig)


async def main():
show all 63 lines

Builders

There are two ways to invoke callbacks in PubNub Python SDK for asyncio:

  • Using result(). The builder returns a future that yields an only result, no state available on success call. Exceptions in this builder are raised explicitly. status field of an error is populated with an associated status object.

    try:
    result = await pubnub.publish().message('hey').channel('blah').result()
    print(result)
    except PubNubException as e:
    print("PubNubException: %s" % e)
    print("category id: #%d" % e.status.category)
    print("operation id: #%d" % e.status.operation)
    _handle_error(e)
    except Exception as e:
    print("Error: %s" % e)
    _handle_error(e)
  • Using future(). The builder returns a future that yields a message envelope that wraps both a result (the same as in result() call) and a state objects. Exception, if any, will not be raised explicitly and you should check for it using e.is_error() helper. To access original exception value use e.value() method.

    e = await pubnub.publish().message('hey').channel('blah').future()

    if e.is_error():
    print("Error: %s" % e)
    print("category id: #%d" % e.status.category)
    print("operation id: #%d" % e.status.operation)
    _handle_error(e)
    else:
    print(e.result)

Copy and paste examples

In addition to the Hello World sample code, we also provide some copy and paste snippets of common API functions:

Init

Instantiate a new Pubnub instance. Only the subscribe_key is mandatory. Also include publish_key if you intend to publish from this instance, and the secret_key if you wish to perform Access Manager administrative operations from this Python-asyncio instance.

Protect your secret_key

For security reasons you should only include the secret-key on a highly secured server. The secret-key is only required for granting rights using our Access Manager.

When you init with secret_key, you get root permissions for the Access Manager. With this feature you don't have to grant access to your servers to access channel data. The servers get all access on all channels.

Initializing the client

User ID

Always set the user_id to uniquely identify the user or device that connects to PubNub. This user_id should be persisted, and should remain unchanged for the lifetime of the user or the device. If you don't set the user_id, you won't be able to connect to PubNub.

from pubnub.pnconfiguration import PNConfiguration
from pubnub.pubnub_asyncio import PubNubAsyncio

pnconfig = PNConfiguration()
pnconfig.subscribe_key = "my_subkey"
pnconfig.publish_key = "my_pubkey"
pnconfig.uuid = "my_custom_uuid"
pnconfig.ssl = False

pubnub = PubNubAsyncio(pnconfig)

Listeners

Add Listeners

from pubnub.callbacks import SubscribeCallback
from pubnub.enums import PNOperationType, PNStatusCategory

class MySubscribeCallback(SubscribeCallback):
def status(self, pubnub, status):

# The status object returned is always related to subscribe but could contain
# information about subscribe, heartbeat, or errors
# use the operationType to switch on different options
if status.operation == PNOperationType.PNSubscribeOperation \
or status.operation == PNOperationType.PNUnsubscribeOperation:
if status.category == PNStatusCategory.PNConnectedCategory:
pass
# This is expected for a subscribe, this means there is no error or issue whatsoever
elif status.category == PNStatusCategory.PNReconnectedCategory:
show all 58 lines

Remove Listeners

my_listener = MySubscribeCallback()

pubnub.add_listener(my_listener)

# some time later
pubnub.remove_listener(my_listener)

Handling Disconnects

from pubnub.callbacks import SubscribeCallback
from pubnub.enums import PNStatusCategory

class HandleDisconnectsCallback(SubscribeCallback):
def status(self, pubnub, status):
if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory:
# internet got lost, do some magic and call reconnect when ready
pubnub.reconnect()
elif status.category == PNStatusCategory.PNTimeoutCategory:
# do some magic and call reconnect when ready
pubnub.reconnect()
else:
logger.debug(status)

def presence(self, pubnub, presence):
show all 26 lines

Listener status events

CategoryDescription
PNTimeoutCategoryFailure to establish a connection to PubNub due to a timeout.
PNBadRequestCategoryThe server responded with a bad response error because the request is malformed.
PNNetworkIssuesCategoryA subscribe event experienced an exception when running. The SDK isn't able to reach the PubNub Data Stream Network. This may be due to many reasons, such as: the machine or device isn't connected to the internet; the internet connection has been lost; your internet service provider is having trouble; or, perhaps the SDK is behind a proxy.
PNReconnectedCategoryThe SDK was able to reconnect to PubNub.
PNConnectedCategorySDK subscribed with a new mix of channels. This is fired every time the channel or channel group mix changes.
PNUnexpectedDisconnectCategoryPreviously started subscribe loop did fail and at this moment client disconnected from real-time data channels.
PNUnknownCategoryReturned when the subscriber gets a non-200 HTTP response code from the server.

Time

Call time() to verify the client connectivity to the origin:

envelope = await pubnub.time().future()
print('current time: %d' % envelope.result)

Subscribe

Subscribe (listen on) a channel:

pubnub.subscribe().channels("my_channel").execute()
Event listeners

The response of the call is handled by adding a Listener. Please see the Listeners section for more details. Listeners should be added before calling the method.

Publish

Publish a message to a channel:

def publish_callback(task):
exception = task.exception()

if exception:
envelope = task.result()
# Handle PNPublishResult(envelope.result) and PNStatus (envelope.status)
pass
else:
# Handle exception
pass

asyncio.ensure_future(
pubnub.publish().channel('such_channel').message(['hello', 'there']).future()
).add_done_callback(publish_callback)

show all 16 lines

Here Now

Get occupancy of who's here now on the channel by UUID:

Requires Presence add-on

This method requires that the Presence add-on is enabled for your key in the Admin Portal. Read the support page on enabling add-on features on your keys.

envelope = await pubnub.here_now().channels(['cool_channel1', 'cool_channel2']).include_uuids(True).future()

if envelope.status.is_error():
# handle error
pass

for channel_data in envelope.result.channels:
print("---")
print("channel: %s" % channel_data.channel_name)
print("occupancy: %s" % channel_data.occupancy)

print("occupants: %s" % channel_data.channel_name)
for occupant in channel_data.occupants:
print("uuid: %s, state: %s" % (occupant.uuid, occupant.state))

Presence

Subscribe to real-time Presence events, such as join, leave, and timeout, by UUID. Setting the presence attribute to a callback will subscribe to presents events on my_channel:

Requires Presence add-on

This method requires that the Presence add-on is enabled for your key in the Admin Portal. Read the support page on enabling add-on features on your keys.

pubnub.subscribe().channels("my_channel").with_presence().execute()
Event listeners

The response of the call is handled by adding a Listener. Please see the Listeners section for more details. Listeners should be added before calling the method.

History

Retrieve published messages from archival storage:

Requires Message Persistence

This method requires that Message Persistence is enabled for your key in the Admin Portal. Read the support page on enabling add-on features on your keys.

envelope = await pubnub.history().channel('history_channel').count(100).future()
# handle messages stored at evelope.result.messages
# status is available as envelope.status

Unsubscribe

Stop subscribing (listening) to a channel:

pubnub.unsubscribe().channels("my_channel").execute()
Event listeners

The response of the call is handled by adding a Listener. Please see the Listeners section for more details. Listeners should be added before calling the method.

Last updated on