Connecting to AWS Lambda via WebSockets

January 12, 2020

AWS Lambda is usually used for short-lived processes like the request/response lifecycle. The default Lambda timeout of 3 seconds reflects this common use-case.

However the maximum execution duration of AWS Lambda functions was increased from 1 minute to 5 minutes a few years ago.

Not only is it possible to connect to AWS Lambda via WebSockets using MQTT or AWS IoTData but this strategy works surprisingly well.

MQTT is both a general publish-subscribe messaging protocol and a popular npm package. MQTT (the npm package) has a clean interface that enables initiating a WebSocket connection, subscribing to topics, listening for messages, and publishing messages.

Overall Strategy

On the client side, we fetch a WebSocket url that formally looks something like this:

wss://XXXXX.iot.us-east-1.amazonaws.com/mqtt?X-Amz-Date=20180812T133259Z&X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=ZZZ%2F20180812%2Fus-east-1%2Fiotdevicegateway%2Faws4_request&X-Amz-SignedHeaders=host&X-Amz-Signature=PPP&X-Amz-Security-Token=QQQ

This can be generated using aws-sign-mqtt:

// set the AWS_IOT_HOST environmental variable:
// export AWS_IOT_HOST=$(aws --region us-east-1 iot describe-endpoint --output text)
const url = require('aws-sign-mqtt')()

This is just a WebSocket URL signed using AWS signature v4.

This WebSocket URL should obviously be generated server-side (lest you expose your AWS credentials).

To demonstrate how AWS Lambda can be used for long-lived WebSocket connections, we'll create two simple Lambda functions. The first is a simple HTTP endpoint that returns a signed WebSocket url. The second lambda function is triggered when the client subscribes to an AWS IoT topic.

const AWS = require('aws-sdk')
const createPresignedURL = require('aws-sign-mqtt')
const { randomBytes } = require('crypto')
// Lambda function #1 - session
// Simple HTTP endpoint that returns the WebSocket URL and channel ID
module.exports.session = (event, context, callback) => {
  const endpointUrl = createPresignedURL()
  // create an arbitrary channel ID to identify the current session
  const channelId = randomBytes(5).toString('hex') 
  callback(null, {
    statusCode: 200,
    body: JSON.stringify({ endpointUrl, channelId }),
    headers: {
      'Access-Control-Allow-Origin': '*'
    }
  })
}
// Lambda function #2 - mqtt
// Sends messages to the client via AWS IotData.
// You must manually set the AWS_IOT_HOST environmental variable which is account-specific.
// From the command line run: aws iot describe-endpoint --output text
module.exports.mqtt = async (payload, context) => {
  // The payload is whatever we send from the browser
  const { channelId } = payload
  const iot = new AWS.IotData({
    endpoint: process.env.AWS_IOT_HOST,
    region: process.env.AWS_REGION
  })
  // we respond with a different topic that uses the channelID
  // to avoid broadcasting the same message to all users
  await iot.publish({
    topic: `foobar/${channelId}/response`,
    payload: JSON.stringify({ message: 'hello' }),
    qos: 1
  }).promise()
  // at this point the Lambda function ends because the Node event loop is closed
}

Here's what our serverless.yml might look like:

service: foobar
provider:
  name: aws
  runtime: nodejs8.10
  region: us-east-1
  environment: 
    AWS_IOT_HOST: ''
  iamRoleStatements:
    - Effect: "Allow"
      Action: 
        - iot:*
      Resource: "*"
functions:
  session:
    handler: handler.session
    events:
      - http:
          method: GET
          path: /
          cors: true # change this in production to "credentails: true" to secure endpoint
  mqtt:
    handler: handler.mqtt
    events: 
      - iot
          sql: 'SELECT * FROM foobar/session'
  

After deploying our function (sls deploy), serverless will print the session endpoint that we need to generate the WebSocket URL:

Service Information
service: foobar
region: us-east-1
stack: foobar-dev
api keys:
  None
endpoints:
  GET - https://xxxxx.execute-api.us-east-1.amazonaws.com/dev
  GET - https://xxxxx.execute-api.us-east-1.amazonaws.com/dev
functions:
  mqtt: foobar-dev-mqtt
  session: foobar-dev-session

If you have the AWS command line interface installed you can try running something like the following from terminal to test your functions:

aws --region us-east-1 iot-data publish --topic 'foobar/session' --payload '{ "channelId": "1" }'

The above command should trigger foobar-dev-mqtt.

Copy the HTTP endpoint (https://xxxxx.execute-api.us-east-1.amazonaws.com/dev) to use in the client-side code.

In the browser, we fetch the WebSocket URL and initiate a connection using mqtt. Here's an example:

// client.js
const mqtt = require('mqtt')
const ENDPOINT_URL = 'https://xxxxx.execute-api.us-east-1.amazonaws.com/dev'
async function connect () {
  // fetch URL and arbitrary channelId from Lambda function #1 (session)
  let { channelId, endpointUrl } = await fetch(ENDPOINT_URL)
    .then(resp => resp.json())
  let channel = mqtt.connect(endpointUrl)
  channel.on('connect', () => {
    channel.subscribe(`foobar/${channelId}/response`, () => {
      // publish a message to the 'foobar/session' topic 
      // which triggers Lambda function #2 (mqtt)
      channel.publish('foobar/session', {
        payload: JSON.stringify({ channelId }),
        qos: 1,
      })
      // Listen for messages
      channel.on('message', (topic, buffer) => {
        console.log({ topic, message: buffer.toString() })
      })
    })
  })
}
connect()

So far we have deployed two lambda functions: the first generates a signed WebSocket URL and the second responds by publishing messages using AWS.IotData.

However, responding in AWS Lambda using IotData is limited because we can only send one message from the client (the first message that initiates the session). What if we want to respond to a stream of messages? In that case, we'll use the MQTT package on the server side as well.

Our new version of Lambda function #2 (mqtt) might look like this:

// handler.js
const mqtt = require('mqtt')
const createPresignedURL = require('aws-sign-mqtt')
const connect = ({ channel }) =>
  new Promise((resolve, reject) => channel.on('connect', resolve))
const subscribe = ({ channel, topic }) =>
  new Promise((resolve, reject) => channel.subscribe(topic, resolve))
 
module.exports.mqtt = async (payload, context, callback) => {
  const { channelId } = payload
  const TOPIC_REQUEST = `foobar/${channelId}/request`
  const TOPIC_RESPONSE = `foobar/${channelId}/response`
  const channel = mqtt.connect(createPresignedURL())
  await connect({ channel })
  await subscribe({ channel, topic: TOPIC_REQUEST })
  let resolve
  let promise = new Promise(res => {
    resolve = res
  })
  const sessionTimeout = () => setTimeout(() => {
    channel.end(resolve)
  }, 30000)
  let timeout = sessionTimeout()
  channel.on('message', (topic, buffer) => {
    console.log({ topic, message: buffer.toString() })
    clearTimeout(timeout)
    timeout = sessionTimeout()
    channel.publish(TOPIC_RESPONSE, {
      qos: 1,
      payload: JSON.stringify({ message: 'PONG' })
    })
  })
  // this promise is resolved after 30 seconds of inactivity
  // ending function execution. If we don't promisify these event
  // handlers, then the Lambda function will end prematurely.
  await promise.then(() => callback(null))
}

The primary difference between this version and what we had before is that now we're using MQTT to listen for messages and respond rather than AWS.IotData. This is necessary for two-way communication between client and server.

We can then revise our client-side code as follows:

// client.js
const mqtt = require('mqtt')
const ENDPOINT_URL = 'https://xxxxx.execute-api.us-east-1.amazonaws.com/dev'
async function connect () {
  // fetch URL and arbitrary channelId from Lambda function #1 (session)
  let { endpointUrl, channelId } = await fetch(ENDPOINT_URL)
    .then(resp => resp.json())
  let channel = mqtt.connect(endpointUrl)
  channel.on('connect', () => {
    channel.subscribe(`foobar/${channelId}/response`, async () => {
      // publish a message to the 'foobar/session' topic 
      // which triggers Lambda function #2 (mqtt)
      await new Promise((resolve, reject) => 
        channel.publish(
          'foobar/session', {
            payload: JSON.stringify({ channelId }),
            qos: 1,
          },
          resolve
        )
      )
      
      // Listen for messages
      channel.on('message', (topic, buffer) => {
        console.log({ topic, message: buffer.toString() })
      })
      
      channel.publish(
        `foobar/${channelId}/request`,
        JSON.stringify({ message: 'hello' }),
        { qos: 1 }
      )
    })
  })
}
connect()

What's different? This time we issue a request using the topic foobar/{channelId}/request. As before, we listen for messages using the topic foobar/{channelId}/response.

When we run the above client-side code, here's what happens:

  1. The client fetches the signed WebSocket URL and channelId from Lambda function #1 (session).
  2. The client uses MQTT to connect to the WebSocket URL
  3. We wait for a connection and then subscribe to the topic foobar/{channelId}/response
  4. We publish a message to foobar/session which triggers Lambda function #2 (mqtt). The payload includes the channelId. 
  5. Lambda function #2 (mqtt) independently generates a signed WebSocket url and connects via MQTT. Lambda function #2 waits for messages and times out after 30 seconds of inactivity. 

Example Projects Using This Approach

Sometimes it's best to look at the source code of real projects rather than muddle through half-baked tutorials. To that end, here are some examples:

Namewhisk

Saffron