SSE and Raw HTTP Streaming API

The Ably SSE and raw HTTP streaming API provides a way to get a realtime stream of events from Ably in circumstances where using a full Ably Realtime client library, or even an MQTT library, is impractical.

HTTP streaming allows for a request from a client to be held by a server, allowing it to push data to the client without further requests. This, much like WebSockets, help avoid the overhead involved in normal HTTP requests. Server-sent events (SSE) provide a thin layer on top of HTTP streaming. A common use of SSE is through the use of the EventSource API in all modern web browsers.

It is subscribe-only: you can not interact with the channel, including to publish, enter presence, query the presence set, attach and detach from channels (without closing and re-opening the stream), or anything else.

Customers who do not want to use a client library on platforms that support SSE, and only require simple subscribe-only streams, may choose to use SSE because it’s an open standard, simple, and requires no SDKs on the client-side. HTTP Streaming may be considered on platforms without an SSE client. However, where possible, we strongly recommend the use of one of our Realtime client libraries, which provide more features and higher reliability, and the full use of our normal realtime messaging API.

Getting Started

SSE is incredibly simple to get started with. The code sample below provides an example of how to use it with Ably.

var key ='xVLyHw.h6hu7A:JrJGxkV7AKpO2Fly';
var url ='https://realtime.ably.io/event-stream?channels=myChannel&v=1.2&key=' + key;
var eventSource = new EventSource(url);

eventSource.onmessage = function(event) {
  var message = JSON.parse(event.data);
  console.log('Message: ' + message.name + ' - ' + message.data);
};

Authentication

It is possible to use either basic auth (with an API key) or token auth (using a token issued from your server) with SSE. We recommend token auth on the client side for security reasons, so you have control over who can connect. Basic auth, while lacking this control, is simpler (it doesn’t require you to run an auth server), and you don’t have to worry about the client obtaining a new token when the old one expires.

If using basic auth, you can use a querystring parameter of key or an Authorization: Basic <base64-encoded key> header. If using token auth, you can use an accessToken querystring parameter or an Authorization: Bearer <base64-encoded token> header. See REST API authentication for more information.

Connection state is only retained for two minutes. See Connection state explained for full documentation.

The SSE protocol and the EventSource API are designed so that a dropped connection is resumed transparently; the client implementation will reconnect and supply a lastEventId param that ensures that the resuming connection delivers any events that have arisen since the connection was dropped. Ably uses this mechanism to reattach all channels in a new connection to the exact point that had been reached in the prior connection.

When a token expires the connection will end. However, the default EventSource behaviour of automated reconnection will not work, because the (expired) credentials are part of the connection URL. What is needed is for a new connection to be established, with an updated accessToken. The question then arises as to how to do that with continuity – that is, how to establish a new connection but supply the correct lastEventId so that the new connection resumes from the point that the prior connection became disconnected.

Implementing message continuity with Token Auth

Implementing transparent connection resumes when tokens need to be renewed requires a few additional steps – detecting token expiry and resuming the connection from the point of the last delivered message using the lastEventId attribute:

Detecting token expiry

When a connection is closed as a result of any error (that is, it’s not just a dropped connection), then the error event will occur on the EventSource instance, and the data attribute of the event will contain an Ably error body with the information about the nature of error. In the case of a token error – that is an error arising from a problem with the auth token – the code in the error body will indicate that. Token errors have a code in the range 40140 <= code < 40150. In such cases, the authentication can be retried with a new accessToken.

In the future we plan to send an event on the connection that indicates that the token will expire imminently, which will allow a new connection to be established prior to the closure of the previous connection.

Specifying the lastEventId

Each message received will have a lastEventId attribute containing the last id of any message received on the connection. When constructing a new connection, this value can be specified as a lastEvent param in the URL.

Here’s example of implementing message continuity with Token Auth:

let lastEvent;

const connectToAbly = () => {
  // obtain a token
  const token = <GET-NEW-ABLY-AUTH-TOKEN>

  // establish a connection with that token
  const lastEventParam = lastEvent ? ('&lastEvent=' + lastEvent) : '';
  eventSource = new EventSource(`https://realtime.ably.io/sse?v=1.1&accessToken=${token}&channels=${channel}${lastEventParam}`);

  // handle incoming messages
  eventSource.onmessage = msg => {
    lastEvent = msg.lastEventId;
    // ... normal message processing
  }

  // handle connection errors
  eventSource.onerror = msg => {
    const err = JSON.parse(msg.data);
    const isTokenErr = err.code >= 40140 && err.code < 40150;
    if(isTokenErr) {
      eventSource.close();
      connectToAbly();
    } else {
      // ... handle other types of error -- for example, retry on 5xxxx, close on 4xxxx
    }
  }
}

connectToAbly();

An important thing to note here is that the EventSource API tries to auto-reconnect and re-subscribe to the SSE endpoint when any error occurs, even the token expiry error like in this case. This means that upon manually re-subscribing to the SSE endpoint with a new token, there will be two active subscriptions to the endpoint – one with the old token which would continue to throw an error due to expired credentials and another with the new token. Hence, it is important to close the previous EventSource subscription using eventSource.close() before re-subscribing with the new token as shown in the snippet above.

You can take a look at a demo app and a complete code example for implementing message continuity in an SSE subscription when using token auth.

API routes

Server-sent events

GET realtime.ably.io/sse

Start a streaming HTTP request that conforms to the Server-Sent Events spec, for ease of consuming with an SSE library.

The /event-stream endpoint will give an SSE response if the Accept header is set to text/event-stream. The /sse endpoint is also provided as an easier way of forcing an SSE response.

Request parameters
channels
mandatory. One or more channel names, separated by commas (or the separator if specified). Non-url-safe characters should be URL-encoded (for example, ?channels=foo%3Fbar will subscribe to the channel foo?bar). Alias: channel.
v
mandatory. The version of the api you are requesting. The current version of the API is 1.2, so v=1.2 is recommended.
separator
optional. A separator, to enable easy subscriptions to channels with commas in their name. For example, ?separator=|&channel=fo,o|ba,r will subscribe to the two channels fo,o and ba,r.
key
optional. An Ably API key to use, if using basic auth.
accessToken
optional An Ably auth token to use, if using token auth.
lastEvent
optional. An id to resume from. Only required when starting a new SSE connection which resumes from a previous connection.
rewind
optional. An integer which, if specified, will send a backlog of the number of messages specified once the connection opens. For example, rewind=1 will give you the most recent message sent on the channel. This is best-effort — only messages from within the last two minutes will be available, and only if the channel has been continuously active since the message was sent; it is not a replacement for the history API. It only has an effect for new connections; when resuming a previous connection using lastEvent, it is ignored in favour of sending you the messages you missed since you were last connected.
enveloped
optional. Default is true. If true, the data from each event envelope for a message event will be a Message object. If false, it will be the payload from the message directly. See Envelope format below.
heartbeats
optional. Default is false. if true will use an explicit heartbeat event rather than a newline as a keepalive packet.
Envelope format

See an example of a plain event stream below, except instead of a JSON object with id, event, data members, you get an SSE event.

Keepalive packets are sent as SSE comments (:keepalive).

Code example
var apiKey = 'xVLyHw.h6hu7A:JrJGxkV7AKpO2Fly';
var url = 'https://realtime.ably.io/event-stream?channels=myChannel&v=1.2&key=' + apikey;
var eventSource = new EventSource(url);

eventSource.onmessage = function(event) {
  var message = JSON.parse(event.data);
  console.log('Message: ' + message.name + " - " + message.data);
};
import json
import sseclient

api_key='xVLyHw.h6hu7A:JrJGxkV7AKpO2Fly'
url = "https://realtime.ably.io/sse?channels=myChannel&v=1.2&key=%s" % (api_key)

def with_urllib3(url):
  import urllib3
  http = urllib3.PoolManager()
  return http.request('GET', url, preload_content=False)

response = with_urllib3(url)
client = sseclient.SSEClient(response)
for event in client.events():
  message = json.loads(event.data)
  print("Channel: %s - Message: %s - %s " % (message['channel'], message['name'], message['data']))
curl "https://rest.ably.io/sse?channel=example&v=1.2" \
  --user "xVLyHw.h6hu7A:JrJGxkV7AKpO2Fly"
⏎
id: cbfKayrzgAXDWM:1556806691343-0
event: message
data: {
  "id":"YqigX7VFsR:0:0",
  "name":"foo",
  "timestamp":1556806691341,
  "encoding":"json",
  "channel":"channel",
  "data":"{\"foo\":1}"
}
⏎
:keepalive
⏎
event: error
data:{
  "message":"Token expired. (See https://help.ably.io/error/40142 for help.)",
  "code":40142,
  "statusCode":401,
  "href":"https://help.ably.io/error/40142"
}

Plain HTTP event stream

GET realtime.ably.io/event-stream

Starts a streaming HTTP request. This allows for messages to be easily consumed with any HTTP library that supports streaming. This is similar to the SSE endpoint, but uses JSON envelopes instead of SSE events.

When available, we recommend using an SSE library as opposed to the raw HTTP stream as SSE libraries automatically handle reconnecting and resuming from the last received ID.

Request parameters
channels
mandatory. One or more channel names, separated by commas (or the separator if specified). Non-url-safe characters should be URL-encoded (for example, ?channels=foo%3Fbar will subscribe to the channel foo?bar). Alias: channel.
v
mandatory. The version of the api you are requesting. The current version of the API is 1.2, so v=1.2 is recommended.
separator
optional. A separator, to enable easy subscriptions to channels with commas in their name. For example, ?separator=|&channel=fo,o|ba,r will subscribe to the two channels fo,o and ba,r.
key
optional. An Ably API key to use, if using basic auth.
accessToken
optional An Ably auth token to use, if using token auth.
lastEvent
optional. An id to resume from. Only required when starting a new HTTP connection which resumes from a previous connection.
rewind
optional. An integer which, if specified, will send a backlog of the number of messages specified once the connection opens. For example, rewind=1 will give you the most recent message sent on the channel. This is best-effort — only messages from within the last two minutes will be available, and only if the channel has been continuously active since the message was sent; it is not a replacement for the history API. It only has an effect for new connections; when resuming a previous connection using lastEvent, it is ignored in favour of sending you the messages you missed since you were last connected.
enveloped
optional. Default is true. If true, the data from each event envelope for a message event will be a Message object. If false, it will be the payload from the message directly. See Envelope format below.
heartbeats
optional. Default is false. If true will use an explicit heartbeat event rather than a newline as a keepalive packet.
Envelope format

Once a streaming response is established, every line (other than empty lines sent as keepalive packets) will be a simple JSON object of the following form:

{
  event: <string, the event type, such as message, presence, error or heartbeat>,
  data: <message, presence or error object. Not present for heartbeats>,
  id: <string, the ID to use to resume from this point, see connection state
       recovery for details. Only present for message and presence events>
}

If enveloped is true (the default), the data will be a JSON-stringified object of a type determined by the event:

- For a message event it will be a Message object
- For a presence event it will be a PresenceMessage object
- For an error event it will be an ErrorInfo object

For a payload that is anything other than a string, as there is no client library to decode the payload of a Message or PresenceMessage, you will have to decode it yourself. Objects and arrays will be json-encoded; binary payloads will be base64-encoded. The encoding field of the Message will specify what encoding has been used for the data payload.

If enveloped is false, the data will for a message or presence event be the data payload from the Message or PresenceMessage. Other events are unaffected.

Non-string payloads will be encoded as before, but without enveloping you will not have the benefit of the encoding field to tell you what encoding has been done.

Note that failures on opening the connection (for example, invalid authentication details) may be sent as a non-streamed http response (with a response body of the form {"error": <ErrorInfo>}), not an error event in a streamed response.

Code example
curl "https://rest.ably.io/event-stream?channel=example&v=1.2" \
  --user "xVLyHw.h6hu7A:JrJGxkV7AKpO2Fly"

{
  "id":"cbfKayrzgAXDWM:1556804156735-0",
  "event":"message",
  "data":{
    "id":"oZs6XaGYx8:0:0",
    "name":"message-name",
    "timestamp":1556804156730,
    "encoding":"json",
    "channel":"example",
    "data":"{\"foo\":1}"
  }
}
⏎
{
  "event":"error",
  "data":{
    "message":"Token expired. (See https://help.ably.io/error/40142 for help.)",
    "code":40142,
    "statusCode":401,
    "href":"https://help.ably.io/error/40142"
  }
}
const request = require('request');
const apiKey = 'xVLyHw.h6hu7A:JrJGxkV7AKpO2Fly';
const url = 'https://realtime.ably.io/event-stream?channels=myChannel&v=1.2&key=' + apikey;

request
  .get(url)
  .on('response', function(response) {
    console.log(response.statusCode) // stream started, 200
  })
  .on('data', function(data) {
    console.log('Envelope:', data.toString())
  });

API reference
Documentation