Subscribing to a data stream using SSE and the Event Source API

Server-Sent Events (SSE) is a standard describing how servers can initiate data transmission towards clients once an initial client connection has been established. They are commonly used to send message updates or continuous data streams to a browser client and are designed to enhance cross-browser streaming through a JavaScript API called EventSource. Using this API, a client can easily receive an event stream by sending a request to a particular URL endpoint. Basically, using SSE, a client can receive continuous event updates without needing to use any client library SDK or even an adapter like MQTT.

Ably provides two end-points for HTTP Streaming – event-stream and SSE. While the Event Stream endpoint allows your subscriber to receive a stream of events from the server, the SSE endpoint makes sure that the communication conforms to the standard SSE spec, allowing you to use any of the SSE libraries to easily accept and process the stream.

In this tutorial, we’ll look at subscribing to real-time updates using both these endpoints. For the purpose of this tutorial, we’ll set up a dummy publisher that’ll simply publish new time and date for our subscriber clients to receive this data over SSE/ HTTP Streaming.

Setting up the publisher

The Publisher here refers to the server-side aspect where the messages and events are being published. For the purpose of this tutorial, we will implement a simple example that sends the current date and time as Ably events every two seconds.

Create a simple Node.js file called publisher.js and paste the following code in it, then run this file using the command node publisher.js. Don’t forget to replace the API key placeholder with your own key.

const ably = require('ably').Realtime;
const moment = require('moment');
const ablyRealtime = new ably('INSERT-YOUR-API-KEY-HERE')
const channel = ablyRealtime.channels.get('date-channel');
const newChannel = ablyRealtime.channels.get('time-channel');
setInterval(()=> {
  let d = moment();
  channel.publish('event', `Hello, current date is ${d.format('LL')}`)
  newChannel.publish('event', `Hello, current time is ${d.format("hh:mm:ss a")}`)
}, 2000);

Create a simple Python file called publisher.py and paste the following code in it, then run this file using the command python publisher.py. Don’t forget to replace the API key placeholder with your own key.

from ably import AblyRest
from threading import Timer
import datetime
client = AblyRest('INSERT-YOUR-API-KEY-HERE')
channel = client.channels.get('date-channel')
newChannel = client.channels.get('time-channel')
def sendEvent():
    currentDT = datetime.datetime.now()
    channel.publish('event', "Current date is %s" % (currentDT.strftime("%x")))
    newChannel.publish('event', "Current time is %s" % (currentDT.strftime("%X")))
    Timer(2.0, sendEvent).start()
Timer(2.0, sendEvent).start()

Setting up the UI

Let’s create a simple HTML file, where we’ll implement SSE for subscribing to a stream of data. Create a file called index.html and paste the following HTML code into it:


<!DOCTYPE html>
<html>
  <head>
    <meta charset="utf-8">
        <style>
          ul {
              width: 80%;
              margin-top: 10px;
              margin: 0 auto;
              font-family: courier, courier new;
              background-color: #333;
              height: 300px;
              overflow: scroll;
              color: orange;
          }
          h1{
            text-align: center;
          }
          h3{
            text-align: center;
          }
    </style>
  </head>
  <body>
    <h1>SSE example</h1>

    <h3>Events:</h3>
    <ul id="events"></ul>
  </body>
</html>

In the code above, we create a basic HTML page which consists of an unordered list ul element. The children of this element will be dynamically populated with the real-time event and data updates received from the publisher.

Setting up the subscriber

With SSE

Just before the closing html tag, add the following code to the HTML file:

<script type="text/javascript">

const apiKey = "INSERT-YOUR-API-KEY-HERE";

// can have multiple channels, comma-separated.
// If a channel name is not url-safe, URL-encode it first
const channels = "date-channel,time-channel";

if(apiKey.indexOf("INSERT") === 0) {
  alert("Add your API key to example.html");
  throw 'Cannot proceed without api key';
}

const url = `https://realtime.ably.io/sse?v=1.2&key=${apiKey}&channels=${channels}`
const eventSource = new EventSource(url);
eventSource.onopen = function() {
  addEvent(`Connected and subscribed to channels: ${channels.split(',').join(', ')}`);
}

eventSource.onmessage = function(event) {
  const message = JSON.parse(event.data);
  addEvent(`Channel: ${message.channel} - Message: ${message.name} - ${message.data}`);
}

eventSource.onerror = function(error) {
  if(error.data) {
    addEvent(`Error: ${error.data}`);
  } else {
    addEvent('Error connecting to Ably');
  }
}

const eventList = document.querySelector('ul#events');

function addEvent(text) {
  const newElement = document.createElement("li");
  newElement.textContent = text;
  eventList.appendChild(newElement);
}
</script>

Here, we defined the script section, where we specified the apiKey variable, which should be replaced with your Ably API key. Next, we defined the Ably channel with a variable named channels. We’ve also added a simple check to see if the API key was in fact replaced. Next, we defined a variable called url, which is a template string, feeding in our apiKey and channels. Notice that the route of this URL is /sse, which means we want to listen to the HTTP event streams from the server.

We then feed this URL to our EventSource, listening for the onopen, onmessage and onerror events. Once we get any of these events, we trigger the addEvent function, which appends a new li element to the initial ul element in the HTML markup.

To see this in action, open your HTML page in your browser. You should get a log of new messages being published by the publisher we earlier set up. Note that in our subscriber, we’ve not included the Ably SDK but directly subscribed to the data stream.

Create a new file and call it subscriber.py and paste the following code in it.

#pip3 install sseclient-py - install the sseclient
import json
import sseclient

apiKey = 'INSERT_API_KEY_HERE'

if 'INSERT' in apiKey:
    print("Please replace your API key")
    exit()
channels = 'date-channel,time-channel'
url = 'https://realtime.ably.io/sse?v=1.2&key=%s&channels=%s' % (apiKey, channels)

def with_urllib3(url):
    """Get a streaming response for the given event feed using urllib3."""
    import urllib3
    http = urllib3.PoolManager()
    return http.request('GET', url, preload_content=False)

response = with_urllib3(url)
client = sseclient.SSEClient(response)
for event in client.events():
    message = json.loads(event.data)
    print("Channel: %s  - Message: %s  - %s " % (message['channel'], message['name'], message['data']))

Here, we defined the apiKey variable, which should be replaced with your Ably API key. We then defined our Ably channels with a variable named channels. We’ve added a simple check to see if the API key was in fact replaced. Next, we defined a variable called url, which is a template string, feeding in our apiKey and channels. Notice that the route of this URL is /sse, which means we want to listen to the HTTP event streams from the server.
We then feed this URL to our with_urllib3 function, which attempts to fetch the stream, then passing the response to our sseclient, listening for the events. Once we get any of these events, we print the details to our console.

To see this in action, run the python file with the pythong subscriber.py command. You should get a log of new messages being published by the publisher we earlier set up. Note that in our subscriber, we’ve not included the Ably SDK but directly subscribed to the data stream.

With Event Stream

For the Event Stream endpoint, just replace the URL with https://realtime.ably.io/event-stream?v=1.2&key=${apiKey}&channels=${channels}. The rest of the code is exactly the same as that of SSE.

The /event-stream endpoint will give an SSE response if the Accept header is set to text/event-stream. The /sse endpoint is provided as an easier way of forcing an SSE response.

Conclusion

In this tutorial you have seen how to subscribe to realtime events using SSE and Ably.

You can see the full code for this tutorial on GitHubGitHub.

For further reading, view the SSE documentation or a short video.