Tutorials

AMQP and Wolfram Alpha

Ably Reactor Queues are traditional message queues that provide a reliable and straightforward mechanism for customers to consume, process, store, augment or reroute data from our realtime platform efficiently by your servers.

In this tutorial we will show you how to:

  • Provision a Reactor Queue and set up a queue rule to republish messages from a channel into the queue
  • Create a simple Express.js web server to serve up the web app and issue authentication tokens for the Realtime browser client
  • Publish questions from a Realtime browser client to a channel, which are automatically republished by Ably to the provisioned message queue
  • Set up a worker server to consume the messages (questions) from the message queue, communicate with the Wolfram Alpha API to get the answer, and publish the answer as a message back to the client using the REST API
  • Subscribe to answers published from the workers in the web app using a realtime pub/sub channel

The following diagram depicts the architecture for this tutorial:


Reactor Queues and Wolfram Alpha diagram

The diagram may look complicated, but fortunately getting the app up and running with Ably is not! Let’s get started.

P.S. If you want to skip ahead and try out the completed tutorial now, install it in one-click on Heroku for free:

Step 1 – Set up a free account with Ably

In order to run these tutorials locally, you will need an Ably API key. If you are not already signed up, you should sign up now for a free Ably account. Once you have an Ably account:

  1. Log into your app dashboard
  2. Under “Your apps”, click on “Manage app” for any app you wish to use for this tutorial, or create a new one with the “Create New App” button
  3. Click on the “API Keys” tab
  4. Copy the secret “API Key” value from your Root key and store it so that you can use it later in this tutorial

    Copy API Key screenshot

Step 2 – Install Ably on your server

To start using Ably on your Node.js server, you first need to install the NPM module. The NPM module can be installed as follows:

npm install ably

The server client library must be instanced with the private API key you copied in Step 1. This will ensure that the basic authentication scheme is used, which is almost always the right choice for your own trusted servers where risk of compromise of your API key credentials is low.

Add the following to a file named server.js to instance the Ably library inside your Node.js server:

var Ably = require('ably');
var rest = new Ably.Rest({ key: ApiKey });

See this step in Github

Step 3 – Set up your web server & base app HTML

Before a client connects to Ably, it will check if it has suitable credentials to authenticate with Ably. As we’ll be using the recommended token authentication scheme in the client for this demo, when the client starts up and attempt to connect to Ably, it will request a token immediately so that it can then authenticate with Ably.

As tokens are typically issued from your own web servers, we’ll set up a simple web server now for this tutorial and add the base app static HTML and assets.

Express.js is a very popular and simple web server framework for Node.js. Let’s get this setup:

First we’ll add the express NPM module and create a package.json:

{
  "name": "ably-tutorials",
  "repository": "https://github.com/ably/tutorials",
  "license": "Apache-2.0",
  "engines": {
    "node": ">=0.10"
  },
  "scripts": {
    "start": "node ./server.js"
  },
  "dependencies": {
    "ably": "~1.0",
    "express": "~4.0"
  }
}

Run npm install to install the Node package dependencies.

Then we’ll set up a vanilla HTTP Express.js server in server.js:

const Express = require('express');
const ServerPort = 3000;

/* Start a web server */
const app = Express();

/* Server static HTML files from /public folder */
app.use(Express.static('public'));
app.listen(3000);

console.log('Web server listening on port', ServerPort);

At this point, you should also create a public folder for your HTML, CSS and JS files and then add app.js, index.html and style.css

If you would like to try running the server now, you can do so with node server.js. Once running, open your browser to http://localhost:3000/ and you should see the base HTML app.

See this step in Github

Step 4 – Plug in Ably client-side, add auth endpoint and update UI

First let’s first make sure the web server is capable of issuing token request to clients that need to authenticate with Ably. If you are unfamiliar with client and server token based authentication with Ably, please see the client server token auth tutorial.

By adding the following route to your Express.js server, it is now ready to service clients wanting to authenticate with Ably.

/* Issue token requests to browser clients sending a request to the /auth endpoint */
app.get('/auth', function (req, res) {
  rest.auth.createTokenRequest(function(err, tokenRequest) {
    if (err) {
      res.status(500).send('Error requesting token: ' + JSON.stringify(err));
    } else {
      res.setHeader('Content-Type', 'application/json');
      res.send(JSON.stringify(tokenRequest));
    }
  });
});

Try running your server again with node server.js and visiting http://localhost:3000/auth and you should see a JSON token request in the form:

{
  "keyName": "I2ACJQ.Lv8AUQ",
  "ttl": 3600000,
  "timestamp": 1473894038255,
  "capability": "{\"*\":[\"*\"]}",
  "nonce": "f6799a8e7fa6f77b8e1dac55314789b1",
  "mac": "35nifY9SRZ8KRDfKOPIS1qYWGP16r2lD59zJo9TH8pA="
}

Now that we have a web server running and an authentication endpoint, we are ready to setup the Ably Realtime client that uses token auth.

First add the Ably client library loader from our CDN to the HTML file public/index.html

<head>
  <script src="//cdn.ably.io/lib/ably.min-1.js" crossorigin="anonymous"></script>
</head>

Then add the following to your public/app.js Javascript file:

var ably = new Ably.Realtime({ authUrl: '/auth' });
ably.connection.on('connecting', function() { showStatus('Connecting to Ably...'); });
ably.connection.on('connected', function() { clearStatus(); });
ably.connection.on('disconnected', function() { showStatus('Disconnected from Ably...'); });
ably.connection.on('suspended', function() { showStatus('Disconnected from Ably for a while...'); });

var $status = $('#status');
function showStatus(text) {
  $status.text(text).show();
}
function clearStatus() {
  $status.hide();
}

The Ably Realtime client will now automatically obtain a token request from the provided authUrl and then connect to Ably using that token request for authentication. At this point, run your server again with node server.js and visiting http://localhost:3000/ you should see the status element change to “Connecting to Ably…” whilst a connection is established, and the status become empty once the connection is established.

Now let’s hook up the publishing of Wolfram questions to an Ably channel:

var questionsChannel = ably.channels.get('wolfram:questions');
var $askButton = $('#ask-btn');
var $question = $('#question');

$askButton.on('click', () => {
  /* Publish question to the Ably channel so that the queue worker receives it via queues */
  questionsChannel.publish('question', $question.val(), (err) => {
    if (err) {
      return showStatus('Failed to publish question!');
    }
  });
  $question.val(''); /* clear question for next */
});

And add a subscriber that listens for Wolfram answers:

var answersChannel = ably.channels.get('wolfram:answers');
var $answers = $('#answers');

answersChannel.subscribe((message) => {
  var $question = $('<div class="question">').text(message.data.question);
  var $answer = $('<div class="answer">').text(message.data.answer);
  $answers.prepend($('<div>').append($question).append($answer));
});

At this point the web app will instance an Ably Realtime library and connect to Ably, will publish messages to the wolfram:questions channel when requested, and will automatically subscribe to answers from the wolfram:answers channel. However the queues and workers are not in place yet, so at this point publishing a question won’t do anything.

See this complete step in Github

Step 5 – Provision Reactor Queue and queue rules

Unlike pub/sub channels that are created on demand, Ably Reactor Message Queues must be provisioned in advance via the Ably app dashboards. Follow these steps to provision a new queue:

  1. Log into your app dashboard
  2. Under “Your apps”, click on “Manage app” for any app you wish to use for this tutorial, or create a new one with the “Create New App” button
  3. Click on the “Queues” tab
  4. Click the “Provision a new Queue” button. When prompted, enter the name wolfram for your queue and click “Create” to create your queue. See screenshot example below.


Provision a Wolfram queue

Once your queue is provisioned it is immediately ready to start receiving messages. However, without a queue rule, no messages will be routed to the queue. Let’s set up a rule that ensures that questions published by the web app Realtime client are republished to this new queue, and in turn the workers we will set up next can start consuming these messages. Follow these steps to set up a new queue rule:

  1. Ensure you are still logged into your app dashboard and navigate back to the app dashboard you used to provision your queue.
  2. Click on the “Reactor” tab
  3. Click the “New Reactor Rule” button, then choose “Reactor Queues”. When prompted, choose the wolfram queue you just provisioned, and enter ^wolfram:questions in the channel filter field to ensure that only questions published by clients are republished to the queue. The questions will then sit in the queue waiting for one or more workers (consumers) to process the questions. See screenshot example below to set up a queue rule:


Create a queue rule

Before we move on, let’s confirm that this rule is set up correctly. Try running your server again with node server.js and visit http://localhost:3000/. Now type in a question in the question text field and click the “Ask Wolfram” button. The question should be published by the Realtime library to the channel wolfram.questions, which in turn the queue rule should ensure that the message is republished to the wolfram queue you have just provisioned. Visit the “Queues” tab of your app dashboard again and check the “Message ready” count has increased. See screenshot below:


Check that queue has updated

Step 6 – Add queue worker

We’re now ready to start consuming messages from the queue. There are plenty of AMQP compatible libraries available, however for this tutorial we will us the popular amqplib library. First let’s add the amqplib library to the package.json file:

"dependencies": {
  "ably": "~1.0",
  "amqplib": "^0.4",
  "express": ">=4.14.0"
}

Run npm install to install the Node package dependencies.

Now create a worker.js file that will consume from the AMQP queue, process messages in real time and post a placeholder response back to the web client using REST:

'use strict';

const amqp = require('amqplib/callback_api');
const Ably = require('ably');

/* Start the worker that consumes from the AMQP QUEUE */
exports.start = function(apiKey, answersChannelName, queueName, queueEndpoint) {
  const appId = apiKey.split('.')[0]; /* API Keys are in format appId.keyId:secret */
  const queue = appId + ":" + queueName;
  const endpoint = queueEndpoint;
  const url = 'amqps://' + apiKey + '@' + endpoint;
  const rest = new Ably.Rest({ key: apiKey });
  const answersChannel = rest.channels.get(answersChannelName);

  /* Connect to Ably queue */
  amqp.connect(url, (err, conn) => {
    if (err) {
      return console.error('worker:', 'Queue error!', err);
    }

    /* Create a communication channel */
    conn.createChannel((err, ch) => {
      if (err) {
        return console.error('worker:', 'Queue error!', err);
      }

      /* Wait for messages published to the Ably Reactor queue */
      ch.consume(queue, (item) => {
        /* Parse the JSON envelope */
        const decodedEnvelope = JSON.parse(item.content);

        /* Use the Ably library to decode the message.
           The envelope messages attribute will only contain one message. However, in future versions,
           we may allow optional bundling of messages into a single queue message and as such this
           attribute is an Array to support that in future */
        const messages = Ably.Realtime.Message.fromEncodedArray(decodedEnvelope.messages);
        messages.forEach((message) => {
          answersChannel.publish('answer', 'Placeholder until Wolfram connected', (err) => {
            if (err) {
              console.error('worker:', 'Failed to publish question', message.data, ' - err:', JSON.stringify(err));
            }
          })
        });

        /* Remove message from queue */
        ch.ack(item);
      });
    });

    conn.on('error', (err) => { console.error('worker:', 'Connection error!', err); });
  });
};

Finally, we need to connect the worker to the web server we have created. In a production environment, your worker servers would almost certainly be run on different instances to your web servers. This decoupling is advisable to ensure that you can scale up your workers independently of your web servers to meet demand. However, for simplicity of this demo, when the web server is started a worker server is also started concurrently. Add the following to your server.js

const worker = require('./worker');
worker.start(ApiKey, 'wolfram:answers', 'wolfram', 'us-east-1-a-queue.ably.io:5671/shared');

Note that the:

  • The queue endpoint is constructed from the endpoint and vhost visible for each queue in the queue tab of your dashboard (see above)
  • The queue name argument wolfram passed to worker.start is in fact appended to the appId in the start function. All fully qualified queue names follow the format appId:queueName as you will see in your queue tab
  • Whilst the code above can handle multiple messages per envelope, we currently only support one message per envelope. The envelope messages attribute is an Array so that in future we could optionally support message bundling

See this complete step in Github

Step 7 – Connect the worker to Wolfram Alpha

Everything is now in place to consume questions using a worker and publish the answers back to the client, however we need to plug in the Wolfram Alpha questions API.

Firstly you need to obtain a Wolfram Alpha AppID. Visit the Wolfram Alpha API signup page and click the “Create account” button:


Wolfram sign up

Once you have created an account, visit your developer portal “My Apps” section and click “Sign up to get your first AppID”:


Wolfram get app ID

Finally, copy the AppID to your clipboard:


Wolfram app ID

Now that you have an AppID you can use to communicate with the Wolfram Alpha API, you’ll need an HTTP library for the worker. request provides a straightforward API for HTTP requests, so add it to package.json file:

"dependencies": {
  "ably": "~1.0",
  "amqplib": "^0.4",
  "express": ">=4.14.0",
  "request": ">=2.79.0"
}

Run npm install to install the Node package dependencies.

Now add the following to worker.js so that consumed questions are sent to Wolfram Alpha, and answers from Wolfram Alpha are published back to the web client through an Ably pub/sub channel:

const WolframAppId = 'INSERT-YOUR-WOLFRAM-APP-ID-HERE';
const WolframEndpoint = 'http://api.wolframalpha.com/v1/result';
const request = require('request');
const querystring = require("querystring");

/* Send question over HTTP to Wolfram to get an answer */
function getAnswerAndPublish(ablyChannel, wolframUrl, question) {
  request(wolframUrl, (error, response, body) => {
    if (!error && response.statusCode == 200) {
      publishAnswer(ablyChannel, question, body)
    } else {
      if (body) {
        publishAnswer(ablyChannel, question, "Wolfram couldn't compute: " + body);
      } else {
        publishAnswer(ablyChannel, question, "Wolfram error: " + JSON.stringify(error));
      }
    }
  });
}

function publishAnswer(ablyChannel, question, answer) {
  ablyChannel.publish('answer', { question: question, answer: answer }, (err) => {
    if (err) {
      console.error('worker:', 'Failed to publish question', JSON.stringify(err));
    }
  });
}

And replace the existing messages.forEach block in the start function with the following:

messages.forEach(function(message) {
  var question = message.data;
  var url = WolframEndpoint + '?' + querystring.stringify({ appid: WolframAppId, i: question});
  getAnswerAndPublish(answersChannel, url, question);
});

That’s it, you are now issuing HTTP requests to Wolfram whenever a message is consumed from the message queue, and publishing the responses back to the web client using the REST library.

Try running your server again with node server.js and visit http://localhost:3000/. Now type in a question in the question text field and click the “Ask Wolfram” button. You should see a response from Wolfram appear within the time it takes for Wolfram to answer your question.

See this step in Github

We’ve covered a lot of ground in this tutorial. You’ve successfully covered off a lot of Ably functionality including token authentication, realtime pub/sub and message queues. We strongly recommend you review the next steps below for essential further reading to help you understand how to use Reactor Message Queues in more detail.

Try this tutorial in one click on Heroku

The simplest way to try out this demo is to just install it now on Heroku for free. When installing with Heroku, you will automatically be provisioned with a free Ably account and the steps you need to take to configure Wolfram and the queue rules will be shown to you after the Heroku app is provisioned.

Download tutorial source code

git clone https://github.com/ably/tutorials.git

Checkout the tutorial branch:

git checkout queue-wolfram-alpha-nodejs

And then run the demo locally by adding your Ably API key and Wolfram AppID to server.js, setting up the queues and queue rules as described in this tutorial and running the demo node server.js

Next steps

1. Find out more about Reactor Message Queues
2. Find out when you should use message queues as opposed to pub/sub channels
2. Read an article on why Message Queues solve the problem of consuming realtime data on your servers
3. Find out more about other Ably Reactor services
4. Learn more about other Ably features by stepping through our other Ably tutorials
5. Gain a good technical overview of how the Ably realtime platform works
6. Get in touch if you need help


Ready to get started?

Our free plan includes 3m messages per month, 100 peak connections, 100 peak channels, and loads of features.