SQS Queue Long-Polling using Javascript Generators

March 01, 2023

SQS offers two modes for message polling: Short Polling and Long Polling. To ensure high availability, SQS stores messages in multiple servers and duplicates each message for added durability.

When your code calls the ReceiveMessage function with a WaitTimeSeconds parameter of 0 seconds, SQS activates short polling by randomly checking a subset of servers for new messages. The response, with or without new messages, is returned immediately. The random distribution by which the servers are checked ensures that even if some existing messages weren’t returned in the first pass, they’ll appear in the next subsequent short polls. However, because short polling only checks a subset of servers, there is a higher chance of receiving an empty response and incurring additional costs.

sqs short polling
Short polling example demonstrating SQS checking subset of the queue servers

In contrast, long polling is like the queue pushing messages to the queue listener. To use long polling, the queue listener calls ReceiveMessage with a WaitTimeSeconds parameter greater than 0. This mode checks all servers for messages and waits for up to WaitTimeSeconds or until a message is available, whichever is earlier. While long polling reduces the number of empty responses, it is still limited to a maximum wait time of 20 seconds.

In heavily-loaded queue environments, messages are almost always available, but in less-loaded environments, there may be periods when the queue is empty and the job processor returns to long polling after experiencing timeouts on the ReceiveMessage function.

Vanilla Typescript Implementation

This is a simple TypeScript implementation of long polling for an Amazon Simple Queue Service (SQS) queue.

interface ListenQueueProps extends AWS.SQS.ReceiveMessageRequest {
    client: AWS.SQS;
}

async function listenQueue({
  client,
  ...queueProps
}) {
  while (true) {
    const res = await client.receiveMessage(queueProps).promise();

    if ('Messages' in res) {
      // Handling the returned messages
      for(const msg of res.Messages) {
        await handleMessage(msg);
      }
    }
  }
}

The listenQueue continuously polls the SQS queue for new messages using the receiveMessage method of the SQS javascript client object. If the response from the receiveMessage method contains messages, the function loops through the messages and passes each message to the handleMessage, function for processing. The handleMessage function is not defined here and left for the reader to implement to fit her processing needs.

To make the listenQueue function more reusable and general, the handleMessage function can be passed as a parameter to the function.

interface ListenQueueProps extends AWS.SQS.ReceiveMessageRequest {
    client: AWS.SQS;
    handleMessage: (message: AWS.SQS.Message) => Promise<void>;
}

async function listenQueue({
  client,
  handleMessage,
  ...queueProps
}) {
  while (true) {
    const res = await client.receiveMessage(queueProps).promise()

    if ('Messages' in res) {
      // Handling the returned messages
      for(const msg of res.Messages) {
        await handleMessage(msg);
      }
    }
  }
}

AsyncGenerators

Asynchronous programming is all about non-blocking calls that allow a program to perform other tasks while waiting for a resource to become available.

One of the most useful features of asynchronous programming is generators, a special type of iterator that allows infinite return values. With generators, we can write code that produces a stream of values without the need to create an entire array or list upfront. Additionally, generators allow us to control the flow of execution between the generator function and its caller function using the yield keyword.

function* circularNames() {
  let index = 0;
  const names = ['James', 'David', 'John', 'Sarah'];
  while (true) {
    yield names[index]
    
    index = (index + 1) % names.length;
  }
}

For instance, let’s consider the circularNames generator function in the above code block. This generator function returns an infinite stream of names in a circular fashion, starting from the first name once it reaches the end. To achieve this, we use a while loop with ayield statement that passes control to the calling code and waits asynchronously until the control is returned back to the generator function with a corresponding yield call.

const gen = circularNames()

console.log(gen.next().value) // James
console.log(gen.next().value) // David
console.log(gen.next().value) // John
console.log(gen.next().value) // Sarah
console.log(gen.next().value) // James

To consume the values produced by a generator function, we can use a for...of loop. However, when dealing with asynchronous operations, we need to use an async generator function instead. An async generator function combines the power of generators with promises, allowing a generator function to yield promises.

Let’s take an example of using an async generator function to implement a queue listener. In this case, we want to listen to an AWS SQS queue and process any messages that become available. We can achieve this by creating an async generator function that infinitely produces an array of messages whenever they are available on the queue.

interface ListenQueueProps extends AWS.SQS.ReceiveMessageRequest {
    client: AWS.SQS;
}

async function* listenQueue({
  client,
  ...queueProps
}) {
  while (true) {
    const res = await client.receiveMessage(queueProps).promise();

    if ('Messages' in res && res.Messages.length > 0) {
      yield res.Messages;
    }
  }
}

To consume the messages, we can use a for await loop. This loop asynchronously waits for the generator function to produce a new array of messages and then processes each message one by one.

for await (const messages of listenQueue({client, QueueUrl: queueUrl, WaitTimeSeconds: 20})) {
  for(const msg of messages) {
    await handleMessage(message)
  }
}

By separating the logic for listening to the queue and processing messages, we achieve a better separation of concerns and make our code more modular and maintainable.

Safe Shutdown

In containerized environments like Kubernetes and ECS, containers can be removed at any given time due to scaling-in pods/tasks or even moving a container between different hosts. When this happens, we want to ensure that our applications shut down safely to prevent message processing from being interrupted. Safe shutdown is a way to signal to the container process that it’s going to be removed shortly. This is usually done by sending a SIGTERM signal, giving the process time for cleanup and safely stopping any long processing activities.

In the context of working with queues, a safe shutdown process involves stopping the listening process to the queue as soon as possible and preventing any new messages from being processed. This is done to avoid message processing being interrupted mid-way when the container is scheduled for removal.

With the vanilla implementation of the queue listener, we need to inject the message-handling cancellation into the listener function. We can achieve this by declaring a cancel variable in the listener module and providing a function to change the cancel variable and check the cancel field after every poll.

let cancel = false

function cancelListening() {
  cancel = true
}

async function listenQueue({
  client,
  ...queueProps
}) {
  while (true) {
    const res = await client.receiveMessage(queueProps).promise()
    // Cancelling the long polling right after response from queue
    if (cancel) {
      return
    }

    if ('Messages' in res && res.Messages.length > 0) {
      for(const msg of res.Messages) {
        await handleMessage(msg);
        if (cancel) {
          return
        }
      }
    }

    // Cancelling the long polling right before starting a new long polling
    if (cancel) {
      return
    }
  }
}

When we get a SIGTERM, we want to stop listening, preparing ourselves to be removed due to scaling.

process.on("SIGTERM", () => {
  cancelListening()
})

With async generator implementation, we have greater control over stopping the execution of the generator function, using the generator return() function. return() acts as if a return statement was added at the current generator function’s suspended flow position.

Here’s an implementation for handling the process SIGTERM signal and stopping the generator execution.

const listener = listenQueue({{
  client: sqs, QueueUrl: queueUrl, WaitTimeSeconds: 20
}})

// Listening to Node process SIGTERM signal
process.on("SIGTERM", () => {
  // Signaling the generator to complete its flow and return
  listener.return(undefined)
})

for await (const messages of listener) {
  // message can be SQS.Message or undefined returned at the SIGTERM handler 
  if(messages) {
    for(const msg of messages) {
      await handleMessage(message)
    }
  }
}

Note that we update the message handling loop to filter out undefined messages returned by the return() function in the SIGTERM handler. The return() function yields undefined to the for-await loop as the last message.

However, the current listener generator function yields only when a new message is available. Therefore, the return() function gets the chance to act upon the generator function only after we have already fetched a message, which works against our intention to stop the process as soon as possible.

To address this, we can modify the listenQueue function to yield after every call to receiveMessage. Here’s the updated code:

export async function* listenQueue({
  client,
  ...queueProps
}) {
    while (true) {
        const res = await client.receiveMessage(queueProps).promise();
        
        // Yielding more often so external generator return call has more
        // opportunities to stop the execution of the listener
        yield res && res.Messages && res.Messages.length > 0 ? 
            res.Messages[0] : undefined;
    }
}

With this modification, the SIGTERM handler has more chances to call return() and stop the generator function. However, we should note that the yield statement may return undefined when the long polling results in an empty response. There’s no need to adapt our calling code further to handle this since we’re already filtering the undefined messages.

Cancellation Token

When using the generator return to stop a loop, it exposes the internals of the generator to its caller, causing it to filter out undefined messages. Additionally, it doesn’t stop the loop immediately but only after the next yield.

By using a Cancellation Token, we can both conceal the internal yield and achieve an immediate loop cancel.

A Cancellation Token is a promise that gets resolved by the caller whenever it’s ready to cancel.
Let’s define a CancellationToken class to handle the cancellation.

class CancellationToken {
  // Using symbol to identify the result in Promise.race  
  public static CANCEL = Symbol('CANCEL');
  // Cancel state
  public cancelled: boolean = false;

  private _res: (c: Symbol) => void;
  
  // Exposing the promise to await on cancellation resolution
  promise = new Promise<Symbol>(res => {
      this._res = res;
  })
  
  // Queue listener code calls `cancel` to resolve the cancellation promise
  cancel() {
      this._res(CancellationToken.CANCEL);
      this.cancelled = true;
  }
}

In the above code, we define a CANCEL symbol to identify the result in Promise.race. We expose a promise to await on cancellation resolution, and when cancel() is called, it resolves the cancellation promise.

Now, let’s update our listenQueue function to take the cancellation token into account.

async function* listenQueue({
    client,
    cancellationToken,
    ...queueProps
}: ListenQueueProps) {
    while (true) {
        // Either a new message is recieved or cancellation token is resolved
        const res = await Promise.race([
          client.receiveMessage(queueProps).promise(), 
          cancellationToken.promise
        ]);
        
        // Return immidiately when race is resolved with cancellation 
        if(res === CancellationToken.CANCEL) {
            return;
        }
        
        // yield only when new message arrived
        else if('Messages' in res && res.Messages && res.Messages.length > 0) {
            yield res.Messages[0];
        }
    }
}

The listenQueue function takes a CancellationToken object as a parameter. It uses Promise.race to wait for either a new message or the cancellation token to be resolved. If the cancellation token is resolved, the loop returns. If a new message is received, it yields the first message.

Finally, the calling code needs to be updated to use the cancellation token.

const cancellationToken = new CancellationToken();

const listener = listenQueue({
  client, QueueUrl, WaitTimeSeconds: 20, cancellationToken
});

// Listening to Node process SIGTERM signal
process.on("SIGTERM", () => {
  // Signaling the generator to complete its flow and return
  cancellationToken.cancel();
})

for await (const message of listener) { 
  await handleMessage(message)
}

After implementing the cancellation token in our queue listener, we were able to remove the check for undefined messages in the message handling loop. This is because our cancellation token now ensures that only real messages are yielded, making the loop cleaner and more efficient.

Furthermore, the concept of a cancellation token can be expanded to other parts of our application to achieve a finer-grained stopping capability. For instance, we can write our handleMessage function in a way that always race promises inside with the cancellation token. This approach allows us to stop the message processing in the middle of execution if needed. This can be particularly useful if we have long-running operations within our message processing code, and we want to be able to gracefully stop them if necessary.

async function handleMessage(message: Message, cancellationToken: CancellationToken) {
  try {
    const result1 = await Promise.race([
      processMessagePart1(message), // some long-running operation
      cancellationToken.promise
    ]);

    if(result1 === CancellationToken.CANCEL) {
        return;
    }

    const result2 = await Promise.race([
      processMessagePart2(message, result1), // some long-running operation
      cancellationToken.promise
    ]);

    if(result2 === CancellationToken.CANCEL) {
        return;
    }

    // handle result
  } catch (error) {
    // handle error
  }
}

Handling Last Messages with Cancellation Token

Using a Cancellation Token allows us to exit the loop faster when we’re waiting on long polling. However, it’s important to understand that we’re not canceling the client.receiveMessage promise because promises cannot be canceled. Instead, we’re merely ignoring its result.

The client.receiveMessage promise runs until the process gets shut down, the polling timeout is reached, or a new message appears in the queue, whichever comes first. But what if a new message is received just before we shut down the listener processor?

Each message received from an SQS queue has a Visibility Timeout defined in seconds. This is a period during which the message isn’t visible to other listeners. If the listener doesn’t remove the message from the queue within the Visibility Timeout period, SQS automatically pops the message back into the queue, making it visible to other listeners to grab.

In case our process is going down, we should return the newly received messages to the queue immediately. We can achieve this by using the changeMessageVisibilityBatch method with VisibilityTimeout set to 0. This will prevent the messages from being hidden from other listeners for the default queue’s Visibility Timeout period.

Here’s the handleReceivedMessage function that returns messages back to the queue immediately when the cancellation token is triggered:

function handleReceivedMessage(cancellationToken) {
  return async function (msg) {
    // Return the received message when cancellation token is not triggered
    if(!cancellationToken.cancelled) {
        return res;
    }

    // Return messages back immidiately to queue
    await client.changeMessageVisibilityBatch({
        QueueUrl: queueProps.QueueUrl,
        Entries: res.Messages.map(
            (msg, i) => ({
                ReceiptHandle: msg.ReceiptHandle, 
                Id: `${i}`, 
                VisibilityTimeout: 0
            })
        )
    }).promise();

    // Making sure the loop exists
    return CancellationToken.CANCEL;
  }
}

To ensure that we handle the last messages in the queue properly, we’ll update the listenQueue function to check the result of each received message. If the result is CancellationToken.CANCEL, we’ll exit the loop. If the result contains any messages, we’ll yield them.

async function* listenQueue({
    client,
    cancellationToken,
    ...queueProps
}: ListenQueueProps) {
    while (true) {
        const res = await Promise.race([
            cancellationToken.promise,
            client.receiveMessage(queueProps).promise()
                .then(handleReceivedMessage(cancellationToken))
        ]);
        
        if(res === CancellationToken.CANCEL) {
            return;
        }
        
        const {Messages} = (res as AWS.SQS.ReceiveMessageResult);
        if(Messages.length > 0) {
            yield Messages;
        }
    }
}

Using a Cancellation Token and changeMessageVisibilityBatch method helps us handle the last messages in the queue efficiently, preventing them from being hidden from other listeners for the default Visibility Timeout period.


Profile picture

Written by Assaf Kamil who lives and breathes the cloud, server-side, frontent, DevOps. You should follow them on Twitter