These docs are for PubNub 4.0 for Python-aiohttp V4 which is our latest and greatest! For the docs of the older versions of the SDK, please check PubNub 3.0 for Python, PubNub 3.0 for Python-Tornado and PubNub 3.0 for Python-Twisted.
If you have questions about the PubNub for Python SDK, please contact us at email@example.com.
The simplest way to get started is to install PubNub Python SDK via pypi:
View Supported Platforms
|Always set the |
import asyncio from pubnub.callbacks import SubscribeCallback from pubnub.enums import PNStatusCategory from pubnub.pnconfiguration import PNConfiguration from pubnub.pubnub_asyncio import PubNubAsyncio pnconfig = PNConfiguration() pnconfig.subscribe_key = 'demo' pnconfig.publish_key = 'demo' pnconfig.uuid = "my_custom_uuid" pubnub = PubNubAsyncio(pnconfig) async def main(): def my_publish_callback(task): # Check whether request successfully completed or not exception = task.exception() if exception is None: envelope = task.result() pass # Message successfully published to specified channel. else: pass # Handle message publish error. Check 'category' property to find out possible issue # because of which request did fail. # Request can be resent using: [status retry]; class MySubscribeCallback(SubscribeCallback): def presence(self, pubnub, presence): pass # handle incoming presence data def status(self, pubnub, status): if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory: pass # This event happens when radio / connectivity is lost elif status.category == PNStatusCategory.PNConnectedCategory: # Connect event. You can do stuff like publish, and know you'll get it. # Or just use the connected event to confirm you are subscribed for # UI / internal notifications, etc asyncio.ensure_future(pubnub.publish() .channel("awesomeChannel") .message("hello!!").future())\ .add_done_callback(my_publish_callback) elif status.category == PNStatusCategory.PNReconnectedCategory: pass # Happens as part of our regular operation. This event happens when # radio / connectivity is lost, then regained. elif status.category == PNStatusCategory.PNDecryptionErrorCategory: pass # Handle message decryption error. Probably client configured to # encrypt messages and on live data feed it received plain text. def message(self, pubnub, message): # Handle new message stored in message.message pass pubnub.add_listener(MySubscribeCallback()) pubnub.subscribe().channels('awesomeChannel').execute() loop = asyncio.get_event_loop() loop.run_until_complete(main())
There are two ways to invoke callbacks in PubNub Python V4 SDK for
result(). The builder returns a future that yields an only result, no state available on success call. Exceptions in this builder are raised explicitly.
statusfield of an error is populated with an associated status object.
try: result = yield from pubnub.publish().message('hey').channel('blah').result() print(result) except PubNubException as e: print("PubNubException: %s" % e) print("category id: #%d" % e.status.category) print("operation id: #%d" % e.status.operation) _handle_error(e) except Exception as e: print("Error: %s" % e) _handle_error(e)
future(). The builder returns a future that yields a message envelope that wraps both a result (the same as in
result()call) and a state objects. Exception, if any, will not be raised explicitly and you should check for it using
e.is_error()helper. To access original exception value use
e = yield from pubnub.publish().message('hey').channel('blah').future() if e.is_error(): print("Error: %s" % e) print("category id: #%d" % e.status.category) print("operation id: #%d" % e.status.operation) _handle_error(e) else: print(e.result)
Instantiate a new Pubnub instance. Only the
subscribe_key is mandatory. Also include
publish_key if you intend to publish from this instance, and the
secret_key if you wish to perform PAM administrative operations from this Python-aiohttp V4 instance.
For security reasons you should only include the secret-key on a highly secured server. The secret-key is only required for granting rights using our Access Manager.
When you init with
|Always set the |
from pubnub.pnconfiguration import PNConfiguration from pubnub.pubnub_asyncio import PubNubAsyncio pnconfig = PNConfiguration() pnconfig.subscribe_key = "my_subkey" pnconfig.publish_key = "my_pubkey" pnconfig.uuid = "my_custom_uuid" pnconfig.ssl = False pubnub = PubNubAsyncio(pnconfig)
from pubnub.callbacks import SubscribeCallback from pubnub.enums import PNOperationType, PNStatusCategory class MySubscribeCallback(SubscribeCallback): def status(self, pubnub, status): # The status object returned is always related to subscribe but could contain # information about subscribe, heartbeat, or errors # use the operationType to switch on different options if status.operation == PNOperationType.PNSubscribeOperation \ or status.operation == PNOperationType.PNUnsubscribeOperation: if status.category == PNStatusCategory.PNConnectedCategory: pass # This is expected for a subscribe, this means there is no error or issue whatsoever elif status.category == PNStatusCategory.PNReconnectedCategory: pass # This usually occurs if subscribe temporarily fails but reconnects. This means # there was an error but there is no longer any issue elif status.category == PNStatusCategory.PNDisconnectedCategory: pass # This is the expected category for an unsubscribe. This means there # was no error in unsubscribing from everything elif status.category == PNStatusCategory.PNUnexpectedDisconnectCategory: pass # This is usually an issue with the internet connection, this is an error, handle # appropriately retry will be called automatically elif status.category == PNStatusCategory.PNAccessDeniedCategory: pass # This means that PAM does not allow this client to subscribe to this # channel and channel group configuration. This is another explicit error else: pass # This is usually an issue with the internet connection, this is an error, handle appropriately # retry will be called automatically elif status.operation == PNOperationType.PNSubscribeOperation: # Heartbeat operations can in fact have errors, so it is important to check first for an error. # For more information on how to configure heartbeat notifications through the status # PNObjectEventListener callback, consult http://www.pubnub.com/docs/python-aiohttp/api-reference-configuration#configuration if status.is_error(): pass # There was an error with the heartbeat operation, handle here else: pass # Heartbeat operation was successful else: pass # Encountered unknown status type def presence(self, pubnub, presence): pass # handle incoming presence data def message(self, pubnub, message): pass # handle incoming messages def signal(self, pubnub, signal): pass # handle incoming signals pubnub.add_listener(MySubscribeCallback())
my_listener = MySubscribeCallback() pubnub.add_listener(my_listener) # some time later pubnub.remove_listener(my_listener)
from pubnub.callbacks import SubscribeCallback from pubnub.enums import PNStatusCategory class HandleDisconnectsCallback(SubscribeCallback): def status(self, pubnub, status): if status.category == PNStatusCategory.PNUnexpectedDisconnectCategory: # internet got lost, do some magic and call reconnect when ready pubnub.reconnect() elif status.category == PNStatusCategory.PNTimeoutCategory: # do some magic and call reconnect when ready pubnub.reconnect() else: logger.debug(status) def presence(self, pubnub, presence): pass def message(self, pubnub, message): pass def signal(self, pubnub, signal): pass disconnect_listener = HandleDisconnectsCallback() pubnub.add_listener(disconnect_listener)
|Failure to establish a connection to PubNub due to a timeout.|
|The server responded with a bad response error because the request is malformed.|
|A subscribe event experienced an exception when running. The SDK isn't able to reach the PubNub Data Stream Network. This may be due to many reasons, such as: the machine or device isn't connected to the internet; the internet connection has been lost; your internet service provider is having trouble; or, perhaps the SDK is behind a proxy.|
|The SDK was able to reconnect to PubNub.|
|SDK subscribed with a new mix of channels. This is fired every time the |
|Previously started subscribe loop did fail and at this moment client disconnected from real-time data channels.|
|Returned when the subscriber gets a non-200 HTTP response code from the server.|
time()to verify the client connectivity to the origin:
envelope = await pubnub.time().future() print('current time: %d' % envelope.result)
def publish_callback(task): exception = task.exception() if exception is None: envelope = task.result() # Handle PNPublishResult(envelope.result) and PNStatus (envelope.status) pass else: # Handle exception pass asyncio.ensure_future(pubnub.publish().channel('such_channel').message(['hello', 'there']).future()) \ .add_done_callback(publish_callback) await asyncio.sleep(10)
here nowon the channel by UUID:
Requires you to enable the
envelope = await pubnub.here_now()\ .channels(['cool_channel1', 'cool_channel2'])\ .include_uuids(True)\ .future() if envelope.status.is_error(): # handle error return for channel_data in envelope.result.channels: print("---") print("channel: %s" % channel_data.channel_name) print("occupancy: %s" % channel_data.occupancy) print("occupants: %s" % channel_data.channel_name) for occupant in channel_data.occupants: print("uuid: %s, state: %s" % (occupant.uuid, occupant.state))
timeout, by UUID. Setting the presence attribute to a callback will subscribe to presents events on
Requires you to enable the
pubnub.subscribe()\ .channels("my_channel")\ .with_presence()\ .execute()
Requires that the
envelope = await pubnub.history()\ .channel('history_channel')\ .count(100).future() # handle messages stored at evelope.result.messages # status is available as envelope.status