Storage & Playback Overview
Requires that the Storage and Playback add-on is enabled for your key. How do I enable add-on features for my keys? - see http://www.pubnub.com/knowledge-base/discussion/644/how-do-i-enable-add-on-features-for-my-keys |
PubNub's Storage and Playback feature enables developers to store messages as they are published, and retrieve them at a later time. Before using this feature it must be enabled in the PubNub Admin Console.
Storage Applications
Being able to pull an archive of messages from storage has many applications:
- Populate chat, collaboration and machine configuration on app load.
- Store message streams with real-time data management.
- Retrieve historical data streams for reporting, auditing and compliance (HIPAA, SOX, Data Protection Directive, and more).
- Replay live events with delivery of real-time data during rebroadcasting.
Storage API Features
As needed, specific messages can be marked "do not archive" when published to prevent archiving on a per-message basis, and storage retention time can range from 1 day to forever.
PubNub Storage Windows C++ Code Samples
These code samples build off code defined in the Pub & Sub tutorial, so before proceeding, be sure you have completed the Pub & Sub tutorial first. |
Preparing the channel for history operations
To begin, lets populate a new channel with some test publishes that we'll pull from storage using the
history()
method.We'll define a callback for response handing that is applicable for both
publish()
and history()
methods.//Only for Functions calling pattern
static void on_publish(pubnub::context &pn, pubnub_res res) {
if (PNR_OK == res) {
std::cout << message << " status: "<< pn.last_publish_result() << std::endl;
if (i < 500) {
i++;
publish(pn);
}
} else {
std::cout << "Failed with code " << res << std::endl;
}
}
Seeding the Channel with Messages
Before we can pull history for a channel, that channel must first have had messages published on it. We'll do just that by publishing some messages onto a channel to "seed" it for a history call.
//Sync
static void publish(pubnub::context &pn) {
std::string channel("history_channel");
std::stringstream message;
enum pubnub_res res;
int i;
try {
for (i = 0; i <= 500; i++) {
message.str("");
message << "\"message#" << i << "\"";
res = pn.publish(channel, message.str()).await();
if (PNR_OK == res) {
std::cout << message.str() << " status: "<< pn.last_publish_result() << std::endl;
} else {
std::cout << "Failed with code " << res << std::endl;
}
}
} catch (std::exception &ex) {
std::cout << "Exception: " << ex.what() << std::endl;
}
}
//Lambdas
static void publish(pubnub::context &ipn) {
std::string channel("history_channel");
std::stringstream msg_stream;
std::string message;
int i;
msg_stream << "\"message#" << i << "\"";
message = msg_stream.str();
for (i = 0; i < 500; i++) {
ipn.publish(channel, message)
.then([&](pubnub::context &pn, pubnub_res res) {
msg_stream.str(std::string());
msg_stream << "\"message#" << i << "\"";
message = msg_stream.str();
if (PNR_OK == res || PNR_PUBLISH_FAILED == res) {
std::cout << message << " status: "<< pn.last_publish_result() << std::endl;
} else {
std::cout << "Failed with code " << res << std::endl;
}
});
}
}
//Functions
static const std::string channel("history_channel");
static std::string message;
static std::stringstream msg_stream;
static int i;
static void on_publish(pubnub::context &pn, pubnub_res res) {
if (PNR_OK == res || PNR_PUBLISH_FAILED == res) {
std::cout << message << " status: "<< pn.last_publish_result() << std::endl;
} else {
std::cout << "Failed with code " << res << std::endl;
}
}
static void publish(pubnub::context &ipn) {
for (i = 0; i < 500; i++) {
msg_stream.str(std::string());
msg_stream << "\"message#" << i << "\"";
message = msg_stream.str();
ipn.publish(channel, message).then(on_publish);
}
}
In the above example, we
publish
a barrage of test messages to history_channel
.Pulling from storage with a History call
Now that we've populated the channel (and thus the backend storage system), we can pull from storage using the
history()
method call:// Sync
void history(pubnub::context &pn) {
enum pubnub_res res;
res = pn.history("history_channel", 100).await();;
if (PNR_OK == res) {
std::vector<std::string> msg = pn.get_all();
for (std::vector<std::string>::iterator it = msg.begin(); it != msg.end(); ++it) {
std::cout << *it << std::endl;
}
} else {
std::cout << "History request failed" << std::endl;
}
}
// Lambdas
void history(pubnub::context &ipn) {
ipn.history("history_channel").then([=](pubnub::context &pn, pubnub_res res) {
auto msg = pn.get_all();
if (PNR_OK == res) {
for (auto &&m: msg) {
std::cout << m << std::endl;
}
} else {
std::cout << "Request failed" << std::endl;
}
});
}
// Functions
void on_history(pubnub::context &pn, pubnub_res res) {
if (PNR_OK == res) {
std::vector<std::string> msg = pn.get_all();
for (std::vector<std::string>::iterator it = msg.begin(); it != msg.end(); ++it) {
std::cout << *it << std::endl;
}
} else {
std::cout << "History request failed" << std::endl;
}
}
void history(pubnub::context &pn) {
pn.history("history_channel", 100).then(on_history);
}
The response format is
[
["message1", "message2", "message3",... ],
"Start Time Token",
"End Time Token"
]
Setting the Count
The default value of the
count
parameter is 100, but, due to the way that default parameters work in C++, if you want to set the include_token
parameter, you'll have to set the count
, too. So, if you don't have a better idea what to use for count
in that case, use 100.Converting from Timetokens to UNIXTIME
The timetoken response value is a string, representing 17-digit precision unix time (UTC). To convert PubNub's timetoken to Unix timestamp (seconds), divide the timetoken number by 10,000,000 (10^7).
Additional Resources for Data Streams with Publish and Subscribe: