Retrieving Messages from History
PubNub allows you to store and retrieve messages as they are sent over the network by using the Storage & Playback feature.
PubNub Storage uses a time-series based database in which each message is stored on the channel it was published, timestamped to the nearest 10 nanoseconds. PubNub message retention policy can be configured from your account with the following options: 1 day, 7 days, 1 month, and Forever.
Retrieving unread counts of messages in spaces
The messageCounts
method returns a count of unread messages in one or more spaces. The count returned is the number of messages in history with a timetoken value greater than the passed value in the timetoken
parameter.
You will need to cache the timetoken of the last retrieved message on a channel or the last time the user was online.
pubnub.messageCounts({
channels: ['space-1', 'space-2'],
channelTimetokens: ['15518041524300251'],
}, (status, results) => {
// handle status, response
console.log(status);
console.log(results);
});
pubnub.messageCounts(
channels: [
"space-1": "15526611838554310",
"space-2": "15526611838554309"
],
channelTimetokens: ["15518041524300251"]
) { result in
switch result {
case let .success(response):
print("Successful Message Count Response: \(response)")
case let .failure(error):
print("Failed Message Count Response: \(error.localizedDescription)")
}
}
Long lastHourTimetoken = (Calendar.getInstance().getTimeInMillis() - TimeUnit.HOURS.toMillis(1)) * 10000L;
pubnub.messageCounts()
.channels(Arrays.asList("space-1", "space-2"))
.channelsTimetoken(Arrays.asList(lastHourTimetoken))
.async(new PNCallback<PNMessageCountResult>() {
public void onResponse(PNMessageCountResult result, PNStatus status) {
if (!status.isError()) {
for (Map.Entry<String, Long> messageCountEntry : result.getChannels().entrySet()) {
messageCountEntry.getKey(); // the channel name
messageCountEntry.getValue(); // number of messages for that channel
}
} else {
// Handle error accordingly.
status.getErrorData().getThrowable().printStackTrace();
}
}
});
List<string> channelList = new List<string>();
channelList.Add("space-1");
channelList.Add("space-2");
pubnub.MessageCounts().Channels(channelList).ChannelsTimetoken(new List<long>{15518041524300251}).Async((result, status) => {
if (!status.Error) {
if((result.Channels != null)){
Debug.Log(string.Format("MessageCounts, {0}", result.Channels.Count));
foreach(KeyValuePair<string, int> kvp in result.Channels){
Debug.Log(string.Format("==kvp.Key {0}, kvp.Value {1} ", kvp.Key, kvp.Value));
}
}
} else {
Debug.Log(status.Error);
Debug.Log(status.ErrorData.Info);
}
});
Retrieving past messages in spaces
Use the methods in this section to fetch the historical messages that have been sent in a space. The request returns a list of messages that were sent before the start
timetoken and after the end
timetoken.
// start, end, count are optional
pubnub.fetchMessages(
{
channels: ['space-1', 'space-2'],
start: '15343325214676133',
end: '15343325004275466',
count: 25
},
(status, response) => {
// handle response
}
);
pubnub.fetchMessageHistory(
for: ["space-1","space-2"],
end: 13406746780720711
) { result in
switch result {
case let .success(response):
print("Successful History Fetch Response: \(response)")
case let .failure(error):
print("Failed History Fetch Response: \(error.localizedDescription)")
}
}
pubnub.fetchMessages()
.channels(Arrays.asList("space-1", "space-2"))
.maximumPerChannel(25)
.includeMessageActions(true)
.includeMeta(true)
.async(new PNCallback<PNFetchMessagesResult>() {
public void onResponse(PNFetchMessagesResult result, PNStatus status) {
if (!status.isError()) {
Map<String, List<PNFetchMessageItem>> channels = result.getChannels();
for (PNFetchMessageItem messageItem : channels.get("my_channel")) {
System.out.println(messageItem.getMessage());
System.out.println(messageItem.getMeta());
System.out.println(messageItem.getTimetoken());
HashMap<String, HashMap<String, List<PNFetchMessageItem.Action>>> actions =
messageItem.getActions();
for (String type : actions.keySet()) {
System.out.println("Action type: " + type);
for (String value : actions.get(type).keySet()) {
System.out.println("Action value: " + value);
for (PNFetchMessageItem.Action action : actions.get(type).get(value)) {
System.out.println("Action timetoken: " + action.getActionTimetoken());
System.out.println("Action publisher: " + action.getUuid());
}
}
}
}
} else {
status.getErrorData().getThrowable().printStackTrace();
}
}
});
pubnub.FetchMessages()
.Start(15343325214676133)
.End(15343325004275466)
.Channels(new List<string>{"space-1", "space-2"})
.IncludeMeta(true).IncludeMessageActions(true)
.Count(25)
.Async ((result, status) => {
if(status.Error){
Debug.Log (string.Format(" FetchMessages Error: {0} {1} {2}", status.StatusCode, status.ErrorData, status.Category));
} else {
Debug.Log (string.Format("In FetchMessages, result: "));
foreach(KeyValuePair<string, List<PNMessageResult>> kvp in result.Channels){
Debug.Log("kvp channelname" + kvp.Key);
foreach(PNMessageResult pnMessageResut in kvp.Value){
Debug.Log("Channel: " + pnMessageResut.Channel);
Debug.Log("payload: " + pnMessageResut.Payload.ToString());
Debug.Log("timetoken: " + pnMessageResut.Timetoken.ToString());
}
}
}
});
Retrieving most recent messages in spaces
Use the methods in this section to retrieve the most recent messages that were published in the specified spaces before the start
timetoken. If you want to retrieve messages newer than a given timetoken, for example to catch up on missed messages when a device comes back online, specify the end
parameter.
pubnub.fetchMessages(
{
channels: ['space-1', 'space-2'],
start: "15343325214676133",
end: "15343325004275466",
count: 25
},
(status, response) => {
// handle response
}
);
pubnub.fetchMessageHistory(
for: ["space-1","space-2"],
max: 25
) { result in
switch result {
case let .success(response):
print("Successful History Fetch Response: \(response)")
case let .failure(error):
print("Failed History Fetch Response: \(error.localizedDescription)")
}
}
pubnub.fetchMessages()
.channels(Arrays.asList("space-1"))
.maximumPerChannel(25)
.async(new PNCallback<PNFetchMessagesResult>() {
public void onResponse(PNFetchMessagesResult result, PNStatus status) {
if (!status.isError()) {
Map<String, List<PNFetchMessageItem>> channels = result.getChannels();
for (PNFetchMessageItem messageItem : channels.get("my_channel")) {
System.out.println(messageItem.getMessage());
System.out.println(messageItem.getMeta());
System.out.println(messageItem.getTimetoken());
HashMap<String, HashMap<String, List<PNFetchMessageItem.Action>>> actions =
messageItem.getActions();
for (String type : actions.keySet()) {
System.out.println("Action type: " + type);
for (String value : actions.get(type).keySet()) {
System.out.println("Action value: " + value);
for (PNFetchMessageItem.Action action : actions.get(type).get(value)) {
System.out.println("Action timetoken: " + action.getActionTimetoken());
System.out.println("Action publisher: " + action.getUuid());
}
}
}
}
} else {
status.getErrorData().getThrowable().printStackTrace();
}
}
});
pubnub.FetchMessages()
.Start(15343325214676133)
.Channels(new List<string>{"space-1", "space-2"})
.IncludeMeta(true).IncludeMessageActions(true)
.Count(25)
.Async ((result, status) => {
if(status.Error){
Debug.Log (string.Format(" FetchMessages Error: {0} {1} {2}", status.StatusCode, status.ErrorData, status.Category));
} else {
Debug.Log (string.Format("In FetchMessages, result: "));
foreach(KeyValuePair<string, List<PNMessageResult>> kvp in result.Channels){
Debug.Log("kvp channelname" + kvp.Key);
foreach(PNMessageResult pnMessageResut in kvp.Value){
Debug.Log("Channel: " + pnMessageResut.Channel);
Debug.Log("payload: " + pnMessageResut.Payload.ToString());
Debug.Log("timetoken: " + pnMessageResut.Timetoken.ToString());
}
}
}
});
Retrieving more than 100 messages from history
By default, history returns a maximum of 100 messages. To retrieve more than 100, use timestamps to page through the history.
const getAllMessages = (timetoken) => {
pubnub.fetchMessages({
channels: ['space-1', 'space-2'],
start: timetoken, // start time token to fetch
}, (status, response) => {
const messages = response.messages;
const start = response.startTimeToken;
const end = response.endTimeToken;
// if 'messages' were retrieved, do something useful with them
if (messages !== undefined && messages.length > 0) {
console.log(messages.length);
console.log('start:', start);
console.log('end:', end);
}
/**
* if 100 'messages' were retrieved, there might be more, so call
* history again
*/
if (messages.length === 100) {
getAllMessages(start);
}
});
};
// Usage examples:
// getAllMessages(13406746780720711);
// getAllMessages(null);
func pageAllMessageHistory(
_ client: PubNub?,
channel: String,
max count: Int,
start: Timetoken? = nil,
completion: ((Result<MessageHistoryChannelsPayload, Error>) -> Void)?
) {
client?.fetchMessageHistory(for: [channel], max: count, start: start) { [weak self] (result) in
completion?(result)
switch result {
case .success(let response):
// Get the specific channel info you're looking for
let channelResponse = response[channel]
// Re-request using the end response timetoken as the start timetoken
if channelResponse?.messages.count == count {
self?.pageAllMessageHistory(client, channel: channel, max: count, start: channelResponse?.endTimetoken, completion: completion)
} else {
// Paging is completed
PubNub.log.debug("Message History paging completed at \(String(describing: channelResponse?.endTimetoken))")
}
case .failure:
// Already responded to completion with failure
break
}
}
}
/**
* Fetches channel history in a recursive manner, in chunks of specified size, starting from the most recent,
* with every subset (with predefined size) sorted by the timestamp the messages were published.
*
* @param channel The channel where to fetch history from
* @param start The timetoken which the fetching starts from, or `0`
* @param count Chunk size
* @param callback Callback which fires when a chunk is fetched
*/
private void getAllMessages(final String channel, Long start, final int count, final CallbackSkeleton callback) {
pubnub.history()
.channel(channel)
.count(count)
.start(start)
.includeTimetoken(true)
.async(new PNCallback<PNHistoryResult>() {
public void onResponse(PNHistoryResult result, PNStatus status) {
if (!status.isError() && !result.getMessages().isEmpty()) {
callback.handleResponse(result);
getAllMessages(channel, result.getMessages().get(0).getTimetoken(), count, callback);
}
}
});
}
}
// Usage: GetHistoryRecursive(0, "space-1");
void GetHistoryRecursive(long start, string channel) {
pubnub.History()
.Channel(channel)
.Start(start)
.Reverse(true)
.IncludeTimetoken(true)
.Async((result, status) => {
if (status.Error) {
Debug.Log(string.Format("In Example, History Error: {0} {1} {2}", status.StatusCode, status.ErrorData, status.Category));
} else {
if ((result.Messages!=null) && (result.Messages.Count>0)) {
foreach (PNHistoryItemResult histItem in result.Messages) {
Debug.Log(string.Format("histItem: {0}, {1}", histItem.Entry.ToString(), histItem.Timetoken.ToString()));
}
GetHistoryRecursive(result.EndTimetoken, channel);
}
}
});
}