Reactor Queues

Our Reactor Queues are traditional message queues that provide a reliable and straightforward mechanism for customers to consume, process, store, augment or reroute data from our realtime platform efficiently by your servers. Reactor Queues are offered as part of our Ably Reactor service which is available to all customers of the Ably platform.

What are Reactor Queues?

Ably’s Reactor Queues provide an asynchronous machine-to-machine communication protocol that follows a traditional message queueing pattern. At a high level, each machine participates in one or both roles: producers (Ably channels) publish messages (data) to a queue; consumers retrieve messages from the queue. The queue service is responsible for: storing published messages by placing them at the back of the queue; picking off the oldest messages from the front of the queue and handing them to a consumer; ensuring a FIFO or ‘first in, first out’ policy is followed; ensuring messages that are consumed successfully are only handed to one of the consumers. This messaging pattern provides decoupling (publishers can publish without waiting for consumers), scalability (adding more consumers increases throughput capacity) and resilience (messages are stored until a consumer has acknowledged the message has been processed successfully).

Ably’s Realtime Core provides channels for realtime data distribution using the pub/sub messaging pattern. Unlike queues, pub/sub channels provide fan-out so that every message published on a channel is received by all devices subscribed for that data. When delivered with our connection state recovery, this pattern provides a decoupled, resilient and scalable means to publishing realtime data to any number of devices.

No single pattern is better than the other, both have their merits and valid use cases. Take for example a delivery van driving through a city publishing its location periodically. Any number of customers waiting for their parcel can subscribe for updates and thus a pub/sub channel is well suited due to its inherent fan-out capability. However, emails may need to be triggered when the van is nearing its destination as well. A message queueing pattern is a better fit here as multiple worker servers can share the workload by consuming the location messages from the queue and performing work for each message without having to share any state. The message queue ensures that work is distributed evenly to the pool of servers, work is not duplicated (resulting for example in more than one email notification being sent) and the system is resilient to crashes or spikes in load (messages are stored until a consumer is ready to retrieve them).

Ably combines both pub/sub and queueing functionality in its platform as seen in the diagram below:


Ably Reactor Message Queues diagram

When should I use queues instead of pub/sub channels?

Queues are more appropriate where:

  • “Work” needs to be distributed between your servers for each published message. For example, “work” could be to generate an image and attach it to an email when a message is published
  • Messages should be delivered to only one consumer regardless of how many consumers are listening for new messages
  • You need an architectural design to process realtime data that scales horizontally by simply adding more consumer “worker” servers
  • You want to consume realtime data from channels on your servers statelessly i.e. you do not want to keep track of which channels or clients are active or share state between your servers
  • You want a backlog of messages to build up if the consumers cannot process data quickly enough or if the consumers go offline
  • You can provision the queues you need in advance. For example, you may have one queue for chat messages and another for analytics events

Please bear in mind that with the Ably platform all realtime data originates from pub/sub channels i.e. you never publish directly to a queue, you publish to a channel. If a queue rule exists that matches the channel name, then the message published will be automatically published into the designated queue. Therefore if you need to publish and consume data, you will have to publish data to channels over REST or Realtime protocols, and consume your data using an AMQP or STOMP client library.

Using the Reactor Queues

All Ably accounts have access to Reactor Queue functionality, however to get started you need to provision a physical queue and set up a queue rule to move data from channels into that queue.

Provisioning Reactor Queues

Unlike pub/sub channels that can exist in any datacenter and are provisioned on-demand by clients, queues need to be provisioned in advance and exist in one region.

Queues are setup within your app dashboard and you will need to configure:

  • A unique name for the queue. This name (along with the app ID prefixed automatically) will be used when consuming the queue from your queue client libraries.
  • The region that queue will be physically located in. Note that all queues exist across two datacenters in each region for high availability.
  • The TTL (time-to-live) for your messages. If the TTL period passes and a message has not been consumed from the queue, then the message is moved to the dead letter queue
  • The max length for your queue which is the total number of messages that your queue can retain in memory and/or on disk. When the queue is considered full based on the max length, a message published to the queue will be accepted however the oldest message on that queue will be moved into the dead letter queue to make room for the new message

Please note that the total number of queues, TTL and max length for each queue is a limited based on your account type. Find out more about account and package limits.

Follow step-by-step instructions to provision a queue now »

Setting up queue rules

Once you have provisioned a physical queue, you need to set up one or more queue rules to republish messages, presence events or channel events from pub/sub channels into a queue. Queue rules can either be used to publish to internal queues (hosted by Ably) or external external streams or queues (such as Kinesis, Kafka, RabbitMQ). Publishing to external streams or queues is part of our Ably Reactor Firehose servers which is only available to Enterprise customers.

Queues rules are setup in the Reactor tab found within your app dashboard. For internal queue rules you set up you will need to configure:

  • The source for the realtime data which is either:
    • Messages – messages are enqueued as soon as they are published on a channel;
    • Presence events – when clients enter, update their data, or leave channels, the presence event is enqueued; or
    • Channel lifecycle events (coming soon) – when a channel is opened (following the first client attaching to this channel) or closed (when there are no more clients attached to the channel), the lifecycle event is enqueued
  • An optional channel filter that allows you to filter which channels produce messages or events for your queues. Regular expressions are supported such as ^click_.*_mouse$
  • The encoding for your message which is either JSON (the default text format) or MsgPack (a binary format)
  • Whether messages published to the queue are wrapped in an envelope or not. The default envelope that wraps all messages published to queues provides additional metadata such as the channel, appId, site, and ruleId. Non-enveloped messages contain only the payload (data element of the message) and some metadata is provided in the message headers. See examples of enveloped and non-enveloped messages.

Follow step-by-step instructions to set up a queue rule now »

Queue dashboards and stats

Provisioned queues are visible in your app dashboard and provide near-realtime stats for the current state of each queue. See an example screenshot below:


Queue dashboard example

Whilst the queue dashboard stats show the current state of your queue, your app and account dashboard provide up-to-date live and historical stats for all messages published to your queues. See an example screenshot from an app dashboard below:


App stats example

Testing your queue rules

Once your Reactor Queue is provisioned, and your Queue rules are configured, there are a number of ways we recommend customers can debug the configured rules and queues:

Checking queue dashboard stats

Use the dev console to generate messages or events that match your queue rule. You can confirm messages are being delivered if the “Messages ready” count in your queue dashboard increases (see above). Note that the messages ready count won’t increase if you have a client consuming messages from this queue.

Using a CLI to consume messages

Install a command line tool for consuming messages using the AMQP protocol to check that messages published on channels (using the dev console or from any other source) are being pushed into the queues based on the queue rules.

You can install Node AMQP Consume CLI with:

npm install amqp-consume-cli -g

Then you need to go to your app dashboard to retrieve an API key that has access to the queues (your root key will typically have access to subscribe to all queues). Then check the server endpoint, vhost and queue name (which is always prefixed with a scope which is your appId) from the queue dashboard (see above) and issue a command such as:

amqp-consume --queue-name [Name] \
  --host [Server endpoint host] --port [Server endpoint port] \
  --ssl --vhost shared --creds [your API key]

Whenever a message is published to the queue you are subscribing to, the amqp-consume tool will output the message details such as:

Message received
Attributes: { contentType: 'application/json',
  headers: {},
  deliveryMode: 1,
  timestamp: 1485914937984 }
Data: {
  "source":"channel.message",
  "appId":"ael724",
  "channel":"foo",
  "site":"eu-west-1-A",
  "ruleId":"cOOo9g",
  "messages":[
    {
      "id":"vjzxPR-XK3:3:0",
      "name":"event",
      "connectionId":"vjzxPR-XK3",
      "timestamp":1485914937909,
      "data":"payload"
    }
  ]
}

Please note that the messages attribute is an Array so that future envelope options may allow messages to be bundled into a single envelope (WebHooks currently bundle messages). However, with the current queue rule design, an envelope will only ever contain one message.

Consuming messages from queues

Consuming messages from Ably Reactor Message Queues is mostly the same as consuming from any other queue supporting AMQP or STOMP protocols. However, there a few tips below to avoid common pitfalls.

Using AMQP

The AMQP protocol provides a rich set of functionality to amongst other things bind to exchanges, provision queues and configure routing. This functionality exists so that queues can be dynamically provisioned by clients and messages can be routed to these queues as required.

However, unlike our pub/sub channels, queues are pre-provisioned via our queue dashboards and all routing is handled by the queue rules. As such, when subscribing to messages from the provisioned queues, you must not attempt to bind to an exchange or declare a queue as these requests will be rejected. Instead, you should subscribe directly to the queue you wish to consume messages from.

Take the following queue as an example:

Queue dashboard example

In order to subscribe to messages from this queue you will need:

The queue name
UATwBQ:example-queue which is made up of your app ID and the name you assigned to your queue
The host
us-east-1-a-queue.ably.io
The port
5671 which is the TLS port you consume from. We only support TLS connections for security reasons
The vhost
shared
The username
the part before the : of an API key that has access to queues. For example, the username for an API key such as APPID.KEYID:SECRET would be APPID.KEYID.
The password
the part after the : of the API key. For example, the password for an API key such as APPID.KEYID:SECRET would be SECRET.

A simple example of subscribing to this queue in Node.js can be seen below:

const url = 'amqps://APPID.KEYID:SECRET@us-east-1-a-queue.ably.io/shared'
amqp.connect(url, (err, conn) => {
  if (err) { return handleError(err) }

  /* Opens a channel for communication. The word channel is overloaded
     and this has nothing to do with pub/sub channels */
  conn.createChannel((err, ch) => {
    if (err) { return handleError(err) }

    /* Wait for messages published to the Ably Reactor queue */
    ch.consume('UATwBQ:example-queue', (item) => {
      let decodedEnvelope = JSON.parse(item.content)

      /* The envelope messages attribute will only contain one message. However,
         in future versions, we may allow optional bundling of messages into a
         single queue message and as such this attribute is an Array to support
         that in future */
      let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages)
      messages.forEach((message) => {
        actionMessage(message)
      })

      /* ACK (success) so that message is removed from queue */
      ch.ack(item)
    })
  })
})

Please note:

  • In the example above, the queue rule has been configured to wrap each message in an envelope (the default setting). Therefore the first step is to parse the envelope JSON. See details on enveloped messages below.
  • The Message.fromEncodedArray method is used to decode the message(s) and return an array of Message objects. We strongly recommend you use this method if your client library supports it to ensure messages are decoded correctly and portable across all platforms.
  • Whilst the code above can handle multiple messages per envelope, we currently only support one message per envelope. The messages attribute is an Array so that in future we could optionally support message bundling.

See our tutorials section for a few step-by-step examples using a Reactor Queue with AMQP »

Using STOMP

The STOMP protocol is a simple text-based protocol designed for working with message-oriented middleware. It provides an interoperable wire format that allows STOMP clients to talk with any message broker support the STOMP protocol and as such is a good fit for use with Ably Reactor Queues.

Assuming the following queue has been set up, we’ll show you a simple example of subscribing to a STOMP queue:

Queue dashboard example

In order to subscribe to messages from this queue you will need:

The queue name
UATwBQ:example-queue which is made up of your app ID and the name you assigned to your queue
The host
us-east-1-a-queue.ably.io
The port
61614 which is the STOMP TLS port you consume from (the port in the screenshot above is for AMQP). We only support TLS connections for security reasons
The vhost
shared
The username
the part before the : of an API key that has access to queues. For example, the username for an API key such as APPID.KEYID:SECRET would be APPID.KEYID.
The password
the part after the : of the API key. For example, the password for an API key such as APPID.KEYID:SECRET would be SECRET.

A simple example of subscribing to this queue in Node.js can be seen below:

const connectOptions = {
  'host': 'us-east-1-a-queue.ably.io',
  'port': 61614, /* STOMP TLS port */
  'ssl': true,
  'connectHeaders':{
    'host': 'shared',
    'login': 'APPID.KEYID',
    'passcode': 'SECRET'
  }
}

Stompit.connect(connectOptions, (error, client) => {
  if (err) { return handleError(err) }

  const subscribeHeaders = {
    /* To subscribe to an existing queue, /amq/queue prefix is required */
    'destination': '/amq/queue/UATwBQ:example-queue',
    'ack': 'client-individual' /* each message requires an ACK to confirm it has been processed */
  }
  /* Wait for messages published to the Ably Reactor queue */
  client.subscribe(subscribeHeaders, (error, message) => {
    if (err) { return handleError(err) }

    /* STOMP is a text-based protocol so decode UTF-8 string */
    message.readString('utf-8', (error, body) => {
      if (err) { return handleError(err) }

      let decodedEnvelope = JSON.parse(item.content)

      /* The envelope messages attribute will only contain one message. However,
         in future versions, we may allow optional bundling of messages into a
         single queue message and as such this attribute is an Array to support
         that in future */
      let messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages)
      messages.forEach((message) => {
        actionMessage(message)
      })

      client.ack(message)
    })
  })
})

Please note:

  • In the example above, the queue rule has been configured to wrap each message in an envelope (the default setting). Therefore the first step is to parse the envelope JSON. See details on enveloped messages below.
  • The Message.fromEncodedArray method is used to decode the message(s) and return an array of Message objects. We strongly recommend you use this method if your client library supports it to ensure messages are decoded correctly and portable across all platforms.
  • Whilst the code above can handle multiple messages per envelope, we currently only support one message per envelope. The messages attribute is an Array so that in future we could optionally support message bundling.

See our tutorials section for step-by-step examples using a Reactor Queue with STOMP »

Enveloped and non-enveloped message examples

When you configure a queue rule, you are given the option to envelope messages, which is enabled by default. In most cases, we believe an enveloped message provides more flexibility as it contains additional metadata in a portable format that can be useful such as the clientId of the publisher, or the channel name the message originated from.

However, where performance is a primary concern, you may choose not to envelope messages and instead have only the message payload (data element) published. This has the advantage of requiring one less parsing step, however decoding of the raw payload in the published message will be your responsibility.

Note that messages published to queues are by default encoded as JSON (a text format), however you can choose to have messages encoded with MsgPack (a binary format) in your queue rules.

Enveloped message example

Headers: none

Data:

{
  "source": "channel.message",
  "appId":"ael724",
  "channel": "foo",
  "site": "eu-west-1-A",
  "ruleId": "cOOo9g",
  "messages": [
    {
      "id": "vjzxPR-XK3:3:0",
      "name": "event",
      "connectionId": "vjzxPR-XK3",
      "timestamp": 1485914937909,
      "data": "textPayload"
    }
  ]
}

Please note that the messages attribute is an Array so that future envelope options may allow messages to be bundled into a single envelope (WebHooks currently bundle messages). However, with the current queue rule design, an envelope will only ever contain one message.

Non-enveloped message example

Headers:

  • X-ABLY-ENVELOPE-SOURCE: channel.message
  • X-ABLY-ENVELOPE-APPID: ael724
  • X-ABLY-ENVELOPE-CHANNEL: foo
  • X-ABLY-ENVELOPE-SITE: eu-west-1-A
  • X-ABLY-ENVELOPE-RULE-ID: wYge7g
  • X-ABLY-MESSAGE-ID: vjzxPR-XK3:3:0
  • X-ABLY-MESSAGE-TIMESTAMP: 1485914937909
  • X-ABLY-MESSAGE-CONNECTION-ID: vjzxPR-XK3

Data:

textPayload
Enveloped presence message example

Headers: none

Data:

{
  "source": "channel.presence",
  "appId":"ael724",
  "channel": "foo",
  "site": "eu-west-1-A",
  "ruleId": "z8R85g",
  "presence": [
    {
      "id": "vjzxPR-XK3:5:0",
      "clientId": "bob",
      "connectionId": "vjzxPR-XK3",
      "timestamp": 1485916832961,
      "action": "enter",
      "data": "clientData"
    }
  ]
}

Please note that the presence attribute is an Array so that future envelope options may allow presence messages to be bundled into a single envelope (WebHooks currently bundle messages). However, with the current queue rule design, an envelope will only ever contain one presence message.

Non-enveloped presence message example

Headers:

  • X-ABLY-ENVELOPE-SOURCE: channel.presence
  • X-ABLY-ENVELOPE-APPID: ael724
  • X-ABLY-ENVELOPE-CHANNEL: foo
  • X-ABLY-ENVELOPE-SITE: eu-west-1-A
  • X-ABLY-ENVELOPE-RULE-ID: wYge7g
  • X-ABLY-MESSAGE-ID: vjzxPR-XK3:5:0
  • X-ABLY-MESSAGE-TIMESTAMP: 1485914937909
  • X-ABLY-MESSAGE-CONNECTION-ID: vjzxPR-XK3
  • X-ABLY-MESSAGE-CLIENT-ID: bob
  • X-ABLY-MESSAGE-ACTION: enter

Data:

clientData

Dead letter queues

When you provision a queue, Ably automatically provisions a “special” dead letter queue at the same time. This dead letter queue holds messages that have failed to be processed correctly or expired. It is advisable to consume messages from the dead letter queue so that you can keep track of failed, expired or unprocessable messages. Messages are moved into your dead letter queue when:

  • The message is rejected (basic.reject or basic.nack) with requeue=false;
  • The TTL for the message expires; or
  • The queue is full (max length limit is reached) and a new message is published to the queue. In this case, the oldest message in the queue is removed and placed in the dead letter queue to make room for the new message

Please note that messages already in the dead letter queue that subsequently meet any of the above criteria are deleted i.e. if the TTL for a message in the dead letter queue passes, the message is deleted forever.

A dead letter queue uses the reserved queue name APPID:deadletter where APPID is the app ID in which your queues are provisioned. You will have exactly one deadletter queue per app if you have one or more Reactor Queues, and this queue will appear in your queues dashboard. You can subscribe to a dead letter queue just like any other queue.

Download a client library

For a list of popular AMQP and STOMP client libraries you can use across a wide range of platforms, please see our client library download page.

Queue considerations

When using Reactor Queues, please bear in mind that:

  • Our message queues guarantee at least once delivery using a message acknowledgement protocol (exactly once is not practically achievable)
  • Ably provides reliable ordering for you messages by channel. For example, if messages published in a single channel are republished to a queue, and there is only one consumer for that queue, then the consumer will receive the messages in the order they were published. However, if you have more than one consumer, reliable ordering is not possible, equally if you have messages from multiple channels, reliable ordering is only supported per channel not across all channels.
  • Rate limits apply to queues depending on your account type. Please see the complete list of account limits.
  • There is a default TTL (time-to-live) applied to all messages that is configured when you provision your queue. If a message has not been consumed from a queue within this period, it will be moved to the deadletter queue. If the TTL of the deadletter queue passes, the message is discarded. See account limits.
  • There is a max message limit configured when you provision your queue. If the max message limit is reached for your queue, new messages will be moved to the deadletter queue. Once the deadletter queue reaches its max message limit, new messages will be discarded. See account limits.
  • With the AMQP protocol, it is possible to consume multiple queues from a single connection, and also to consume more than one message at a time. You will need to refer to your client library’s documentation to enable these capabilities. See this StackOverFlow answer as a good starting point.
  • Unlike our Ably pub/sub channels which are implicitly global and distributed, our message queues are provisioned in a single physical region. You can choose the region you want your queue to exist when provisioning your queue. Typically you will want to provision a queue closest to your servers to keep the latency as low as possible.
  • Each message published to the queue will count towards you monthly message quota. See billing info for more details.

Queue Scalability and High Availability

Ably’s Reactor Message Queue service is offered in two flavours, multi-tenanted and dedicated.

Our multi-tenanted queue service is provided as part of the core platform to all customers. The queues are provided in a high availability configuration (your data is stored in at least two datacenters with automatic fail-over capabilities). Our multi-tenanted queue service is designed for low to medium volumes of messages and has a guideline limit of no more than 200 messages per second per account.

For customers with more demanding requirements (up to millions of messages per second), Ably has two solutions for our Enterprise customers:

  • Dedicated queue clusters that scale to millions of messages
  • Ably Reactor Firehose for streaming your realtime data directly into your own streaming or queueing service

Get in touch if you’d like to find out more about our Enterprise offering.

Billing info

Each message published by a rule to a queue counts as one message towards your message quota. For example, if you publish a message on a channel that is in turn republished to a Reactor Queue, that will count as two messages. Find out more about how messages are counted.

Next steps


API reference
Documentation

Get started now with our free plan

It includes 3m messages per month, 100 peak connections, 100 peak channels, and loads of features.

Create your free account