You can do that in PostgreSQL - Queue

Β·11 min read
You can do that in PostgreSQL - Queue

This is 2 of 5 in a series on PostgreSQL features you may not have heard of. This post covers how to use PostgreSQL as a queue. And why you might want to.

Efficient Queue Handling with PostgreSQL

You don't need a separate queue system like RabbitMQ or Kafka to handle your queueing needs. PostgreSQL can do it for you. And it can do it well.

Why use PostgreSQL as a queue?

There are a few reasons why you might want to use PostgreSQL as a queue.

You already have PostgreSQL

If you already have PostgreSQL in your stack, you don't need to add another piece of infrastructure to your stack. You can use PostgreSQL as a queue and save yourself the overhead of managing another piece of infrastructure.

You don't need a lot of throughput

If you don't need a lot of throughput, PostgreSQL can handle your queueing needs. If you need a lot of throughput, you may want to consider a dedicated queueing system like RabbitMQ or Kafka.

You want simplicity

Other queueing systems can be complex to set up and manage. PostgreSQL is simple to set up and manage. If you don't need the extra features of a dedicated queueing system, PostgreSQL can be a good choice.

How to use PostgreSQL as a queue

There are a few ways to use PostgreSQL as a queue. We'll cover a few of them here.

Using a table as a queue

The simplest way to use PostgreSQL as a queue is to use a table as a queue. You can create a table with a column for the message and a column for the status of the message. You can then insert messages into the table and update the status of the message when it is processed.

CREATE TABLE notification_queue
(
   id             serial      NOT NULL,
  --  Different types of notifications (sms, email, push, etc.)
   notification_type varchar(50),
   recipient      varchar(100) NOT NULL,
   subject        varchar(200),
   message        text,
   --  Status of the message (queued, completed, failed, etc.)
   status         varchar(20) DEFAULT 'queued' NOT NULL,
    --  Number of times the message has been tried
   try_count      integer     DEFAULT 0 NOT NULL,
    --  Maximum number of times the message can be tried
   max_tries      integer     DEFAULT 5 NOT NULL,
    --  Priority of the message (higher number = higher priority)
   priority       integer     DEFAULT 0 NOT NULL,
   create_time    timestamp   DEFAULT CURRENT_TIMESTAMP NOT NULL,
   update_time    timestamp,
   PRIMARY KEY (id)
);
 
--  Indexes to speed up queries - optional but recommended
CREATE INDEX idx_notification_queue_status ON notification_queue (status);
CREATE INDEX idx_notification_queue_priority ON notification_queue (priority);
CREATE INDEX idx_notification_queue_create_time ON notification_queue (create_time);
 

Dummy data for the table:

  -- Generate 100 sample records for the notification_queue table
INSERT INTO notification_queue (notification_type, recipient, subject, message, status, try_count, max_tries, priority)
SELECT
    CASE WHEN random() < 0.3 THEN 'email' ELSE 'sms' END AS notification_type,
    'recipient_' || i AS recipient,
    'Subject ' || i AS subject,
    'Message content ' || i AS message,
    'queued' AS status,
    0 AS try_count,
    5 AS max_tries,
    floor(random() * 10) AS priority
FROM generate_series(1, 100) AS i;

Using a table as a queue with Row locks

You can use advisory locks to ensure that only one process is processing messages from the queue at a time. This is useful if you have multiple processes that need to process messages from the queue.

SELECT * FROM notification_queue
WHERE status = 'queued'
AND try_count < max_tries
ORDER BY priority DESC, create_time ASC
LIMIT 10 FOR UPDATE;
-- backend 1
BEGIN;
SELECT * FROM notification_queue
WHERE status = 'queued'
AND try_count < max_tries
ORDER BY priority DESC, create_time ASC
LIMIT 10 FOR UPDATE;
 
 
-- output
BEGIN
 id | notification_type |  recipient   |  subject   |      message       | status | try_count | max_tries | priority |
    create_time         |        update_time
----+-------------------+--------------+------------+--------------------+--------+-----------+-----------+----------+----
------------------------+----------------------------
 31 | sms               | recipient_31 | Subject 31 | Message content 31 | queued |         2 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.764749
 90 | sms               | recipient_90 | Subject 90 | Message content 90 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.763568
 33 | sms               | recipient_33 | Subject 33 | Message content 33 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.768804
 48 | sms               | recipient_48 | Subject 48 | Message content 48 | queued |         1 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.749621
 38 | sms               | recipient_38 | Subject 38 | Message content 38 | queued |         1 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.761489
 95 | sms               | recipient_95 | Subject 95 | Message content 95 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.776116
 64 | sms               | recipient_64 | Subject 64 | Message content 64 | queued |         1 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.781616
 56 | email             | recipient_56 | Subject 56 | Message content 56 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.779562
 49 | sms               | recipient_49 | Subject 49 | Message content 49 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.775464
 87 | sms               | recipient_87 | Subject 87 | Message content 87 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.772976
(10 rows)
-- backend 2
BEGIN;
SELECT * FROM notification_queue
WHERE status = 'queued'
AND try_count < max_tries
ORDER BY priority DESC, create_time ASC
LIMIT 10 FOR UPDATE;
 
-- output
BEGIN
-- hangs until backend 1 commits or rollbacksπŸ˜’

SKIP LOCKED

You can use SKIP LOCKED to skip rows that are locked by another transaction. This is useful if you have multiple processes that need to process messages from the queue.

SELECT * FROM notification_queue
WHERE status = 'queued'
AND try_count < max_tries
ORDER BY priority DESC, create_time ASC
LIMIT 10 FOR UPDATE SKIP LOCKED;
-- backend 1
BEGIN;
SELECT * FROM notification_queue
WHERE status = 'queued'
AND try_count < max_tries
ORDER BY priority DESC, create_time ASC
LIMIT 10 FOR UPDATE SKIP LOCKED;
 
-- output
BEGIN
 id | notification_type |  recipient   |  subject   |      message       | status | try_count | max_tries | priority |
    create_time         |        update_time
----+-------------------+--------------+------------+--------------------+--------+-----------+-----------+----------+----
------------------------+----------------------------
 31 | sms               | recipient_31 | Subject 31 | Message content 31 | queued |         2 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.764749
 90 | sms               | recipient_90 | Subject 90 | Message content 90 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.763568
 33 | sms               | recipient_33 | Subject 33 | Message content 33 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.768804
 48 | sms               | recipient_48 | Subject 48 | Message content 48 | queued |         1 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.749621
 38 | sms               | recipient_38 | Subject 38 | Message content 38 | queued |         1 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.761489
 95 | sms               | recipient_95 | Subject 95 | Message content 95 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.776116
 64 | sms               | recipient_64 | Subject 64 | Message content 64 | queued |         1 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.781616
 56 | email             | recipient_56 | Subject 56 | Message content 56 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.779562
 49 | sms               | recipient_49 | Subject 49 | Message content 49 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.775464
 87 | sms               | recipient_87 | Subject 87 | Message content 87 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.772976
(10 rows)
 
-- backend 2
BEGIN;
SELECT * FROM notification_queue
WHERE status = 'queued'
AND try_count < max_tries
ORDER BY priority DESC, create_time ASC
LIMIT 10 FOR UPDATE SKIP LOCKED;
 
-- output
BEGIN
 id | notification_type |  recipient   |  subject   |      message       | status | try_count | max_tries | priority |
    create_time         |        update_time
----+-------------------+--------------+------------+--------------------+--------+-----------+-----------+----------+----
------------------------+----------------------------
  8 | sms               | recipient_8  | Subject 8  | Message content 8  | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.778655
 40 | sms               | recipient_40 | Subject 40 | Message content 40 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.771701
 75 | sms               | recipient_75 | Subject 75 | Message content 75 | queued |         0 |         5 |        9 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.756837
 42 | sms               | recipient_42 | Subject 42 | Message content 42 | queued |         0 |         5 |        8 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.781034
 98 | email             | recipient_98 | Subject 98 | Message content 98 | queued |         0 |         5 |        8 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.78738
 74 | sms               | recipient_74 | Subject 74 | Message content 74 | queued |         0 |         5 |        8 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.788888
 89 | sms               | recipient_89 | Subject 89 | Message content 89 | queued |         0 |         5 |        8 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.786245
 19 | sms               | recipient_19 | Subject 19 | Message content 19 | queued |         0 |         5 |        8 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.782977
 96 | sms               | recipient_96 | Subject 96 | Message content 96 | queued |         1 |         5 |        8 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.789156
 69 | sms               | recipient_69 | Subject 69 | Message content 69 | queued |         0 |         5 |        7 | 202
3-08-23 16:00:51.644837 | 2023-08-24 01:05:03.800327
(10 rows)

Now both backends can process messages from the queue.

Wait, we are not done yet. We need to update the status of the messages that we are processing.

UPDATE notification_queue
SET status = 'processing',
    try_count = try_count + 1,
    update_time = CURRENT_TIMESTAMP
WHERE id IN (8, 40, 75, 42, 98, 74, 89, 19, 96, 69);

No, we don't want to pick ids then update them. We want to update them in one query. We can do that too in PostgreSQL 🀩.

WITH messages_to_process AS (
  SELECT id
  FROM notification_queue
  WHERE status = 'queued'
  AND try_count < max_tries
  ORDER BY priority DESC, create_time ASC
  LIMIT 10 FOR UPDATE SKIP LOCKED
)
UPDATE notification_queue
SET status = 'processing',
  try_count = try_count + 1,
  update_time = CURRENT_TIMESTAMP
FROM messages_to_process
WHERE notification_queue.id = messages_to_process.id
RETURNING notification_queue.*;
 
-- output
 id | notification_type |  recipient   |  subject   |      message       |   status   | try_count | max_tries | priority |
        create_time         |        update_time
----+-------------------+--------------+------------+--------------------+------------+-----------+-----------+----------+
----------------------------+----------------------------
 75 | sms               | recipient_75 | Subject 75 | Message content 75 | processing |         1 |         5 |        9 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 40 | sms               | recipient_40 | Subject 40 | Message content 40 | processing |         1 |         5 |        9 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
  8 | sms               | recipient_8  | Subject 8  | Message content 8  | processing |         1 |         5 |        9 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 89 | sms               | recipient_89 | Subject 89 | Message content 89 | processing |         1 |         5 |        8 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 19 | sms               | recipient_19 | Subject 19 | Message content 19 | processing |         1 |         5 |        8 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 42 | sms               | recipient_42 | Subject 42 | Message content 42 | processing |         1 |         5 |        8 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 96 | sms               | recipient_96 | Subject 96 | Message content 96 | processing |         2 |         5 |        8 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 98 | email             | recipient_98 | Subject 98 | Message content 98 | processing |         1 |         5 |        8 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 74 | sms               | recipient_74 | Subject 74 | Message content 74 | processing |         1 |         5 |        8 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
 69 | sms               | recipient_69 | Subject 69 | Message content 69 | processing |         1 |         5 |        7 |
 2023-08-23 16:00:51.644837 | 2023-08-26 00:27:55.145569
(10 rows)

You can also delete the messages from the queue after they are processed. Just use the DELETE statement instead of the UPDATE statement

Thats it. You have a working queue in PostgreSQL πŸŽ‰.


Wait a minute, We use ORM in our application. How do we use this queue in our application?

Sequelize ORM

Import Dependencies and Constants

πŸ“¦ Import Dependencies

import NotificationQueue from "./model/notification_queue.js";
import { Sequelize } from "sequelize";
  • The code begins by importing necessary dependencies: NotificationQueue model, Sequelize from the Sequelize library, and a utility function sleep from a custom utils.js module.

πŸ”§ Constants

const BATCH_SIZE = 10;
  • Constants are defined for better manageability: BATCH_SIZE controls the number of jobs fetched in a batch.

Job Processing Logic

βš™οΈ Job Processing Function

async function processJob(job) {
  console.log("Processing job:", job.id);
  // Simulate processing or actual processing logic
  await sleep(1000); // Simulating processing time
  
  // Simulate a 10% chance of failure
  if (Math.random() < 0.1) {
    throw new Error("Processing failed");
  }
}
  • The processJob function represents the core processing logic for each job.
  • It logs the job's ID, simulates processing using a sleep function (for demonstration), and introduces a 10% chance of failure for simulation purposes.

Queue Processing Loop

πŸ”„ Processing Queue

async function processQueue() {
  while (true) {
    const jobs = await NotificationQueue.findAll({
      where: {
        status: "queued",
        try_count: {
          [Sequelize.Op.lte]: RETRY_LIMIT,
        },
      },
      lock: true,
      skipLocked: true,
      order: [
        ["priority", "DESC"],
        ["create_time", "ASC"],
      ],
      limit: BATCH_SIZE,
    });
    
    // Rest of the code inside the loop...
  }
}
  • The processQueue function represents the continuous loop for processing jobs from the queue.
  • It queries the NotificationQueue table for queued jobs within the retry limit.
  • The query fetches a batch of jobs based on priority and creation time using Sequelize.

Job Processing and Error Handling

πŸ” Job Iteration and Processing

if (jobs.length > 0) {
  for (const job of jobs) {
    try {
      await job.update({ status: "processing" });
      await processJob(job);
      await job.update({ status: "completed" });
    } catch (error) {
      // Error handling...
    }
  }
} else {
  await sleep(1000); // Sleep to avoid busy-waiting
}
  • This section iterates through the fetched jobs, processes each job, and handles errors.
  • It updates the job status to "processing," invokes the processJob function, and then updates the status to "completed" if successful.

❗ Error Handling

} catch (error) {
  if (job.try_count + 1 >= job.max_tries) {
    await job.update({ status: "failed" });
    console.error("Job processing failed:", job.id, error);
  } else {
    await job.update({ try_count: job.try_count + 1 });
    console.warn("Job processing failed, retrying:", job.id, error);
  }
}
  • In case of an error during processing, the code checks if the job has exceeded the retry limit.
  • If exceeded, the job status is set to "failed," and a detailed error message is logged.
  • If within the retry limit, the job's retry count is incremented, and a retry attempt is logged.

Utility Function

⏰ Utility Function for Delay

function sleep(ms) {
  return new Promise(resolve => setTimeout(resolve, ms));
}
  • This utility function, sleep, is used to introduce a delay to prevent constant querying and processing. It's a promise-based implementation that resolves after a specified time.

Starting the Queue Processing

πŸš€ Start Queue Processing

processQueue().catch(console.error);
  • Finally, the processQueue function is invoked to initiate the continuous processing of the queue.
  • Any errors occurring during the process are caught and logged to the console.

In sequelize there is no option to Update and Return the updated rows in one query. So we need to use two queries to update and return the updated rows. But we can do that in one query in PostgreSQL 🀩.

Now you have a working queue in PostgreSQL and you can use it in your application πŸŽ‰.


But lets say you want to use your queue logic in multiple applications. You don't want to write the same code in all your applications. You can create a library and use it in all your applications. But you don't want to manage a library. You can use PostgreSQL functions to do that for you.

Yes, you can write your queue logic in PostgreSQL functions and use it in your applications.

PostgreSQL Functions

You can cosider as javascript functions. But you write them in SQL.

CREATE FUNCTION dequeue_notifications(p_batch_size integer) RETURNS TABLE (
    _id integer,
    notification_type character varying,
    recipient character varying,
    subject character varying,
    message text
) AS $$
BEGIN
    -- Declare a result variable to hold the results
    RETURN QUERY
    UPDATE notification_queue AS nq
    SET status = 'processing',
        try_count = nq.try_count + 1,
        update_time = CURRENT_TIMESTAMP
    FROM (
        SELECT id AS selected_id
        FROM notification_queue
        WHERE status = 'queued'
        ORDER BY priority DESC, create_time
        FOR UPDATE SKIP LOCKED
        LIMIT p_batch_size
    ) AS selected_notifications
    WHERE nq.id = selected_notifications.selected_id
    RETURNING 
        nq.id AS _id,
        nq.notification_type,
        nq.recipient,
        nq.subject,
        nq.message;
 
END;
$$ LANGUAGE plpgsql;
 
 

function takes an additional parameter batch_size that determines how many notifications to dequeue in each batch. The function returns notifications that are dequeued.

  • processing means you are processing the notification.
  • queued means you have not processed the notification yet.
  • completed means you have processed the notification successfully.
  • failed means you have tried to process the notification but it failed.

If you want you can mark it as completed also. Or you can delete it from the queue.

To use this function, you would call it with the desired batch size, like this:

SELECT * FROM dequeue_notifications(10); -- Dequeue a batch of 10 notifications
 

Using the function in Sequelize

const sequelize, { QueryTypes } = require("sequelize");
 
const notifications = await sequelize.query(
  "SELECT * FROM dequeue_notifications(10);",
  { type: QueryTypes.SELECT }
);

References