IN THIS ARTICLE

    Streams are a very powerful concept. They are programming constructs which move data between objects and services without having to manually loop through the data, which is one reason why they are so popular. Twitter has streams, NodeJS has streams, Java 8 has streams, and even RSS feeds can be represented as streams. PubNub also has streams, though we call them channels. The great thing about streams is that you can pipe them together to do interesting things.

    With just a few Node modules, we can stream a realtime list of all tweets matching a hashtag into a PubNub channel. From there we can do all sorts of things including archiving for later analysis, distributing the stream to millions of people at once, or data visualization. For example, say you want to draw a map of the US showing which states are tweeting different candidates in the 2016 election. In this blog we will pipe a stream of tweets matching a particular hashtag into a PubNub channel using NodeJS streams.

    A Single Line of Code

    Our end goal is to make this single line of code work:

    new TwitterStream(twcfg,"#twitter").pipe(new PubNubOutStream(pncfg,"awesome-tweets"))

    This line creates an ongoing Twitter query for tweets containing the hashtag #twitter, then pipes it into the PubNub channel called awesome-tweets.

    Node comes with many stream types built in, such as files and HTTP sockets, but we can build our own by subclassing Readable/Writable and overriding a function or two. Here we will use NodeJS input and output structures to pipe tweets from Twitter into a PubNub channel.

    Make a LogStream

    Here is a simple output stream which logs everything it receives to the console. It’s a simple object which overrides its _write method with a new one that prints to the console.

    function LogStream() {
       Writable.call(this,{objectMode:true});
       this._write = function(obj,encoding,callback) {
           console.log("LOG",util.inspect(obj,{depth:0}));
           callback();
       };
    }
    util.inherits(LogStream,Writable);
    

    We can tell that LogStream is an output stream because it subclasses Writable. An input stream would subclass Readable. Notice that the LogStream calls Writable.call (its ‘parent class’ constructor) with objectMode set to true. This tells Node that we are working with a stream of objects rather than a stream of bytes. Also notice that the _write method does its work, printing to the console, then invokes the callback function that was passed to it. This callback is very important. It’s what lets the stream system pass control back and forth from reader to writer without overflowing any internal buffers.

    Make a TwitterStream

    Now we need to create a Twitter input stream to pipe into LogStream. We will use the aptly named twitter module, which provides full access to the Twitter APIs. Most of the APIs are REST based, meaning you make a request and get back a single response. However, Twitter also has a streaming API to access realtime hashtag streams. This will open a connection to Twitter and keep it open, so that’s the one we want.

    The code below makes a simple call to the Twitter API.

    var Twitter = require('twitter');
    var client = new Twitter(cfg);
    client.stream('statuses/filter', {track: query}, function(stream) {
       stream.on('data', function(tweet) {
           console.log(“got a tweet”,tweet);
       });
       stream.on('error', function(error) {
           console.log("got an error",error);
       });
    });
    

    Every time a new tweet comes in from the stream it invokes the data callback function to print that tweet to the console. This works, but what we really want is to output those tweets as a proper stream so we can hook it up to our LogStream from above. To make a readable stream we need a single function which extends stream.Readable and overrides the _read function, or else calls the push() method. In our case we want to call push whenever a new tweet comes in. Here’s the expanded code:

    var Twitter = require('twitter');
    var util   = require('util');
    var Readable = require('stream').Readable;
    function TwitterStream(cfg, query) {
       Readable.call(this,{objectMode:true});
       var client = new Twitter(cfg);
       this._read = function() { /* do nothing */ };
       var self = this;
       function connect() {
           client.stream('statuses/filter', {track: query},
               function(stream) {
                  stream.on('data', (tweet) => self.push(tweet));
                  stream.on('error', (error) => connect());
               });
       }
       connect();
    }
    util.inherits(TwitterStream, Readable);
    

    This contains the same code as before, but wrapped up inside the TwitterStream function. Every time a tweet comes in the function will call push on itself. This adds the tweet to the Readable’s internal data store, which will then make the data be pipe-able.

    Now we can output the Twitter stream to the console by piping them together like this:

    var twcfg = {
       consumer_key:"...",
       consumer_secret:"...",
       access_token_key:"...",
       access_token_secret:".."
    };
    new TwitterStream(cfg,"#twitter").pipe(new LogStream());

    Remember to put in your own app keys from Twitter’s Application Management webpage.

    And we get the output

    node twitter2pubnub/example.js
    LOG { created_at: 'Mon Apr 11 17:36:14 +0000 2016',
    id: 719579827968286700,
    id_str: '719579827968286720',
    text: 'RT @UpdatesTiniPL: [Foto] Nowe zdjęcia #TiniStoessel udostępnione przez serwis #Twitter z fanami 11.04 #3 https://t.co/oB82Z0unqf',
    source: 'Twitter for Android',
    truncated: false,
    in_reply_to_status_id: null,
    in_reply_to_status_id_str: null,
    in_reply_to_user_id: null,
    in_reply_to_user_id_str: null,
    in_reply_to_screen_name: null,
    user: [Object],
    geo: null,
    coordinates: null,
    place: null,
    contributors: null,
    retweeted_status: [Object],
    is_quote_status: false,
    retweet_count: 0,
    favorite_count: 0,
    entities: [Object],
    extended_entities: [Object],
    favorited: false,
    retweeted: false,
    possibly_sensitive: false,
    filter_level: 'low',
    lang: 'pl',
    timestamp_ms: '1460396174791' }

    New Line Delimited JSON

    That works quite well. Let’s try another output stream. Instead of printing to the console let’s write a log file to disk. We will use a log format called Newline Delimited JSON, which is just what it sounds like. Each line is a complete JSON object, with newlines separating entries. It’s an extremely simple format with reasonable performance.

    var Writable = require('stream').Writable;
    var util   = require('util');
    var fs     = require('fs');
    function NDJSONOutStream(path) {
       Writable.call(this,{objectMode:true});
       var fout = fs.createWriteStream(path,{flags:'a'});
       this._write = function(obj,encoding,callback) {
           fout.write(JSON.stringify(obj)+"
    ",callback); }; } util.inherits(NDJSONOutStream,Writable);

    The code for NDJSONOutStream looks very similar to LogStream, again we set objectMode to true. Note that we have nested callbacks here. Each time _write is called we write to the underlying file on disk. When write completes then we call callback(). This ensures another call to _write doesn’t happen until the previous one is finished.

    Now we can stream tweets to a disk file like this:

    new TwitterStream(twcfg,"#twitter").pipe(new NDJSONOutStream("mylog.ndjson"));

    Stream to PubNub

    Now that we can store it, let’s pipe the Twitter stream into a real PubNub channel. Channels are PubNub’s equivalent of a NodeJS stream, and they allow both reading and writing. The code below creates a PubNubOutStream which accepts a standard PubNub config object (remember to put in your own free app keys), plus the name of a channel to publish to.

    var pubnub = require("pubnub");
    var pncfg = {
       ssl           : true,  //  enable TLS Tunneling over TCP
       publish_key   : "Enter-your-publish-key",
       subscribe_key : "Enter-your-subscribe-key"
    };
    function PubNubOutStream(cfg, channel) {
       Writable.call(this,{objectMode:true});
       var pn = pubnub(cfg);
       this._write = function(obj,encoding,callback) {
           pn.publish({
               channel: channel,
               message: obj,
               callback: () => callback()
           });
       };
    }
    util.inherits(PubNubOutStream, Writable);

    Now we can pipe a tweet stream into a PubNub channel like this:

    new TwitterStream(twcfg,"#twitter").pipe(new PubNubOutStream(pncfg,"awesome-tweets"))

    And that’s it. An easy way to view these tweets is through the PubNub Debug Console. It provides a simple way to debug your entire PubNub implementation.

    View tweets with PubNub Debug Console

    NodeJS streams are a very powerful way to move data in and out of PubNub channels. From here you could add sentiment analysis, data visualization, geo tracking, or all three! Streams will become even more powerful in the future when they are combined with PubNub BLOCKS, coming this summer.

    To learn more about NodeJS streams check out the Stream Handbook.

    Try PubNub today!
    Build realtime applications that perform reliably and securely, at global scale.
    Try Our APIs
    Try PubNub today!

    More From PubNub