SQS offers two modes for message polling: Short Polling and Long Polling. To ensure high availability, SQS stores messages in multiple servers and duplicates each message for added durability.
When your code calls the ReceiveMessage
function with a WaitTimeSeconds
parameter of 0
seconds, SQS activates short polling by randomly checking a subset of servers for new messages. The response, with or without new messages, is returned immediately. The random distribution by which the servers are checked ensures that even if some existing messages weren’t returned in the first pass, they’ll appear in the next subsequent short polls. However, because short polling only checks a subset of servers, there is a higher chance of receiving an empty response and incurring additional costs.
In contrast, long polling is like the queue pushing messages to the queue listener. To use long polling, the queue listener calls ReceiveMessage
with a WaitTimeSeconds
parameter greater than 0
. This mode checks all servers for messages and waits for up to WaitTimeSeconds
or until a message is available, whichever is earlier. While long polling reduces the number of empty responses, it is still limited to a maximum wait time of 20
seconds.
In heavily-loaded queue environments, messages are almost always available, but in less-loaded environments, there may be periods when the queue is empty and the job processor returns to long polling after experiencing timeouts on the ReceiveMessage
function.
Vanilla Typescript Implementation
This is a simple TypeScript implementation of long polling for an Amazon Simple Queue Service (SQS) queue.
interface ListenQueueProps extends AWS.SQS.ReceiveMessageRequest {
client: AWS.SQS;
}
async function listenQueue({
client,
...queueProps
}) {
while (true) {
const res = await client.receiveMessage(queueProps).promise();
if ('Messages' in res) {
// Handling the returned messages
for(const msg of res.Messages) {
await handleMessage(msg);
}
}
}
}
The listenQueue
continuously polls the SQS queue for new messages using the receiveMessage
method of the SQS javascript client object.
If the response from the receiveMessage
method contains messages, the function loops through the messages and passes each message to the handleMessage
, function for processing. The handleMessage
function is not defined here and left for the reader to implement to fit her processing needs.
To make the listenQueue
function more reusable and general, the handleMessage
function can be passed as a parameter to the function.
interface ListenQueueProps extends AWS.SQS.ReceiveMessageRequest {
client: AWS.SQS;
handleMessage: (message: AWS.SQS.Message) => Promise<void>;
}
async function listenQueue({
client,
handleMessage,
...queueProps
}) {
while (true) {
const res = await client.receiveMessage(queueProps).promise()
if ('Messages' in res) {
// Handling the returned messages
for(const msg of res.Messages) {
await handleMessage(msg);
}
}
}
}
AsyncGenerators
Asynchronous programming is all about non-blocking calls that allow a program to perform other tasks while waiting for a resource to become available.
One of the most useful features of asynchronous programming is generators, a special type of iterator that allows infinite return values. With generators, we can write code that produces a stream of values without the need to create an entire array or list upfront. Additionally, generators allow us to control the flow of execution between the generator function and its caller function using the yield
keyword.
function* circularNames() {
let index = 0;
const names = ['James', 'David', 'John', 'Sarah'];
while (true) {
yield names[index]
index = (index + 1) % names.length;
}
}
For instance, let’s consider the circularNames generator function in the above code block. This generator function returns an infinite stream of names in a circular fashion, starting from the first name once it reaches the end. To achieve this, we use a while
loop with ayield
statement that passes control to the calling code and waits asynchronously until the control is returned back to the generator function with a corresponding yield
call.
const gen = circularNames()
console.log(gen.next().value) // James
console.log(gen.next().value) // David
console.log(gen.next().value) // John
console.log(gen.next().value) // Sarah
console.log(gen.next().value) // James
To consume the values produced by a generator function, we can use a for...of
loop. However, when dealing with asynchronous operations, we need to use an async generator function instead. An async generator function combines the power of generators with promises, allowing a generator function to yield promises.
Let’s take an example of using an async generator function to implement a queue listener. In this case, we want to listen to an AWS SQS queue and process any messages that become available. We can achieve this by creating an async generator function that infinitely produces an array of messages whenever they are available on the queue.
interface ListenQueueProps extends AWS.SQS.ReceiveMessageRequest {
client: AWS.SQS;
}
async function* listenQueue({
client,
...queueProps
}) {
while (true) {
const res = await client.receiveMessage(queueProps).promise();
if ('Messages' in res && res.Messages.length > 0) {
yield res.Messages;
}
}
}
To consume the messages, we can use a for await
loop. This loop asynchronously waits for the generator function to produce a new array of messages and then processes each message one by one.
for await (const messages of listenQueue({client, QueueUrl: queueUrl, WaitTimeSeconds: 20})) {
for(const msg of messages) {
await handleMessage(message)
}
}
By separating the logic for listening to the queue and processing messages, we achieve a better separation of concerns and make our code more modular and maintainable.
Safe Shutdown
In containerized environments like Kubernetes and ECS, containers can be removed at any given time due to scaling-in pods/tasks or even moving a container between different hosts. When this happens, we want to ensure that our applications shut down safely to prevent message processing from being interrupted. Safe shutdown is a way to signal to the container process that it’s going to be removed shortly. This is usually done by sending a SIGTERM
signal, giving the process time for cleanup and safely stopping any long processing activities.
In the context of working with queues, a safe shutdown process involves stopping the listening process to the queue as soon as possible and preventing any new messages from being processed. This is done to avoid message processing being interrupted mid-way when the container is scheduled for removal.
With the vanilla implementation of the queue listener, we need to inject the message-handling cancellation into the listener function. We can achieve this by declaring a cancel
variable in the listener module and providing a function to change the cancel
variable and check the cancel
field after every poll.
let cancel = false
function cancelListening() {
cancel = true
}
async function listenQueue({
client,
...queueProps
}) {
while (true) {
const res = await client.receiveMessage(queueProps).promise()
// Cancelling the long polling right after response from queue
if (cancel) {
return
}
if ('Messages' in res && res.Messages.length > 0) {
for(const msg of res.Messages) {
await handleMessage(msg);
if (cancel) {
return
}
}
}
// Cancelling the long polling right before starting a new long polling
if (cancel) {
return
}
}
}
When we get a SIGTERM
, we want to stop listening, preparing ourselves to be removed due to scaling.
process.on("SIGTERM", () => {
cancelListening()
})
With async generator implementation, we have greater control over stopping the execution of the generator function, using the generator return()
function. return()
acts as if a return statement was added at the current generator function’s suspended flow position.
Here’s an implementation for handling the process SIGTERM
signal and stopping the generator execution.
const listener = listenQueue({{
client: sqs, QueueUrl: queueUrl, WaitTimeSeconds: 20
}})
// Listening to Node process SIGTERM signal
process.on("SIGTERM", () => {
// Signaling the generator to complete its flow and return
listener.return(undefined)
})
for await (const messages of listener) {
// message can be SQS.Message or undefined returned at the SIGTERM handler
if(messages) {
for(const msg of messages) {
await handleMessage(message)
}
}
}
Note that we update the message handling loop to filter out undefined
messages returned by the return()
function in the SIGTERM
handler. The return()
function yields undefined
to the for-await
loop as the last message.
However, the current listener generator function yields only when a new message is available. Therefore, the return()
function gets the chance to act upon the generator function only after we have already fetched a message, which works against our intention to stop the process as soon as possible.
To address this, we can modify the listenQueue
function to yield after every call to receiveMessage
. Here’s the updated code:
export async function* listenQueue({
client,
...queueProps
}) {
while (true) {
const res = await client.receiveMessage(queueProps).promise();
// Yielding more often so external generator return call has more
// opportunities to stop the execution of the listener
yield res && res.Messages && res.Messages.length > 0 ?
res.Messages[0] : undefined;
}
}
With this modification, the SIGTERM
handler has more chances to call return()
and stop the generator function. However, we should note that the yield statement may return undefined
when the long polling results in an empty response. There’s no need to adapt our calling code further to handle this since we’re already filtering the undefined
messages.
Cancellation Token
When using the generator return
to stop a loop, it exposes the internals of the generator to its caller, causing it to filter out undefined messages. Additionally, it doesn’t stop the loop immediately but only after the next yield.
By using a Cancellation Token, we can both conceal the internal yield
and achieve an immediate loop cancel.
A Cancellation Token is a promise that gets resolved by the caller whenever it’s ready to cancel.
Let’s define a CancellationToken class to handle the cancellation.
class CancellationToken {
// Using symbol to identify the result in Promise.race
public static CANCEL = Symbol('CANCEL');
// Cancel state
public cancelled: boolean = false;
private _res: (c: Symbol) => void;
// Exposing the promise to await on cancellation resolution
promise = new Promise<Symbol>(res => {
this._res = res;
})
// Queue listener code calls `cancel` to resolve the cancellation promise
cancel() {
this._res(CancellationToken.CANCEL);
this.cancelled = true;
}
}
In the above code, we define a CANCEL
symbol to identify the result in Promise.race
. We expose a promise to await on cancellation resolution, and when cancel()
is called, it resolves the cancellation promise.
Now, let’s update our listenQueue function to take the cancellation token into account.
async function* listenQueue({
client,
cancellationToken,
...queueProps
}: ListenQueueProps) {
while (true) {
// Either a new message is recieved or cancellation token is resolved
const res = await Promise.race([
client.receiveMessage(queueProps).promise(),
cancellationToken.promise
]);
// Return immidiately when race is resolved with cancellation
if(res === CancellationToken.CANCEL) {
return;
}
// yield only when new message arrived
else if('Messages' in res && res.Messages && res.Messages.length > 0) {
yield res.Messages[0];
}
}
}
The listenQueue function takes a CancellationToken
object as a parameter. It uses Promise.race
to wait for either a new message or the cancellation token to be resolved. If the cancellation token is resolved, the loop returns. If a new message is received, it yields the first message.
Finally, the calling code needs to be updated to use the cancellation token.
const cancellationToken = new CancellationToken();
const listener = listenQueue({
client, QueueUrl, WaitTimeSeconds: 20, cancellationToken
});
// Listening to Node process SIGTERM signal
process.on("SIGTERM", () => {
// Signaling the generator to complete its flow and return
cancellationToken.cancel();
})
for await (const message of listener) {
await handleMessage(message)
}
After implementing the cancellation token in our queue listener, we were able to remove the check for undefined
messages in the message handling loop. This is because our cancellation token now ensures that only real messages are yielded, making the loop cleaner and more efficient.
Furthermore, the concept of a cancellation token can be expanded to other parts of our application to achieve a finer-grained stopping capability. For instance, we can write our handleMessage
function in a way that always race promises inside with the cancellation token. This approach allows us to stop the message processing in the middle of execution if needed. This can be particularly useful if we have long-running operations within our message processing code, and we want to be able to gracefully stop them if necessary.
async function handleMessage(message: Message, cancellationToken: CancellationToken) {
try {
const result1 = await Promise.race([
processMessagePart1(message), // some long-running operation
cancellationToken.promise
]);
if(result1 === CancellationToken.CANCEL) {
return;
}
const result2 = await Promise.race([
processMessagePart2(message, result1), // some long-running operation
cancellationToken.promise
]);
if(result2 === CancellationToken.CANCEL) {
return;
}
// handle result
} catch (error) {
// handle error
}
}
Handling Last Messages with Cancellation Token
Using a Cancellation Token allows us to exit the loop faster when we’re waiting on long polling. However, it’s important to understand that we’re not canceling the client.receiveMessage
promise because promises cannot be canceled. Instead, we’re merely ignoring its result.
The client.receiveMessage
promise runs until the process gets shut down, the polling timeout is reached, or a new message appears in the queue, whichever comes first. But what if a new message is received just before we shut down the listener processor?
Each message received from an SQS queue has a Visibility Timeout defined in seconds. This is a period during which the message isn’t visible to other listeners. If the listener doesn’t remove the message from the queue within the Visibility Timeout period, SQS automatically pops the message back into the queue, making it visible to other listeners to grab.
In case our process is going down, we should return the newly received messages to the queue immediately. We can achieve this by using the changeMessageVisibilityBatch
method with VisibilityTimeout
set to 0
. This will prevent the messages from being hidden from other listeners for the default queue’s Visibility Timeout period.
Here’s the handleReceivedMessage
function that returns messages back to the queue immediately when the cancellation token is triggered:
function handleReceivedMessage(cancellationToken) {
return async function (msg) {
// Return the received message when cancellation token is not triggered
if(!cancellationToken.cancelled) {
return res;
}
// Return messages back immidiately to queue
await client.changeMessageVisibilityBatch({
QueueUrl: queueProps.QueueUrl,
Entries: res.Messages.map(
(msg, i) => ({
ReceiptHandle: msg.ReceiptHandle,
Id: `${i}`,
VisibilityTimeout: 0
})
)
}).promise();
// Making sure the loop exists
return CancellationToken.CANCEL;
}
}
To ensure that we handle the last messages in the queue properly, we’ll update the listenQueue
function to check the result of each received message. If the result is CancellationToken.CANCEL
, we’ll exit the loop. If the result contains any messages, we’ll yield them.
async function* listenQueue({
client,
cancellationToken,
...queueProps
}: ListenQueueProps) {
while (true) {
const res = await Promise.race([
cancellationToken.promise,
client.receiveMessage(queueProps).promise()
.then(handleReceivedMessage(cancellationToken))
]);
if(res === CancellationToken.CANCEL) {
return;
}
const {Messages} = (res as AWS.SQS.ReceiveMessageResult);
if(Messages.length > 0) {
yield Messages;
}
}
}
Using a Cancellation Token and changeMessageVisibilityBatch
method helps us handle the last messages in the queue efficiently, preventing them from being hidden from other listeners for the default Visibility Timeout period.