How to use Redis Stream with Node.js

ยท6 min read
How to use Redis Stream with Node.js

Learn how to use Redis Stream with Node.js to build a real-time web application that displays historical moments related to India's history.

Introduction to Redis Streams

Redis Streams is a data structure that provides an immutable, append-only log. It's designed to store and process real-time data, making it perfect for use cases like event logging, messaging, and distributed data processing. Each entry has a unique ID and a value. The ID is auto-generated by default, and it includes a timestamp. The value is a hash. You can query ranges or use blocking commands to read entries as they come. In this tutorial, we'll create a simple web application that displays historical moments related to India's history in real-time using Redis Streams.

Redis streams are ideal for building history preserving message brokers, message queues, unified logs, and chat systems. Unlike Pub/Sub messages which are fire and forget, Redis streams preserve messages in perpetuity.

Publishing Messages with XADD:

To publish a message to a Redis Stream, you can use the XADD command. Let's break down the components of a typical XADD command:

To add date and content data to a Redis Stream using the redis-cli tool, we can use the XADD command. Here's an example of adding blog posts with their publication dates to the blog_posts stream:

XADD blog_posts * date "2023-08-15" content "Redis Streams Introduction"
# "1692114319484-0"   ๐Ÿ‘‰ The unique key output
  • XADD: This is the command used to create a stream and add entries to it.

  • blog_posts: This is the name of the Redis key that represents the stream. The name is user-defined and can be chosen to reflect the nature of the data. In this example, "blog_posts" is just a placeholder.

  • ***: tells Redis to add a unique timestamp ID + sequence number. It instructs Redis to generate a new ID that is greater than any previous ID in the stream. This ensures the uniqueness and increasing order of IDs.

  • field: This is the name of the field in the hash. In our case, we're using the field "date" and "content".

  • value: This is the value of the field in the hash. In our case, we're using the value "2023-08-15" and "Redis Streams Introduction".

Reading Messages with XREAD:

To read messages from a Redis Stream, you can use the XREAD command. Here's the breakdown of the components of an XREAD command:

# Read entries from the stream
XRANGE blog_posts - +
# 1) 1) "1692114319484-0"
#    2) 1) "date"
#       2) "2023-08-15"
#       3) "content"
#       4) "Redis Streams Introduction"
 
# Read entries with a specific date range
XRANGE blog_posts 1692114319484
 
# 1) 1) "1692114319484-0"
#    2) 1) "date"
#       2) "2023-08-15"
#       3) "content"
#       4) "Redis Streams Introduction"
# 2) 1) "1692114819142-0"
#    2) 1) "date"
#       2) "2023-08-15"
#       3) "content"
#       4) "Redis Streams Advance"
 
XRANGE blog_posts 1692114319484 1692114819142 COUNT 2
 
# 1) 1) "1692114319484-0"
#    2) 1) "date"
#       2) "2023-08-15"
#       3) "content"
#       4) "Redis Streams Introduction"
# 2) 1) "1692114819142-0"
#    2) 1) "date"
#       2) "2023-08-15"
#       3) "content"
#       4) "Redis Streams Advance"
 
XREAD COUNT 2 BLOCK 5000 STREAMS blog_posts 1692114319480
# 1) 1) "blog_posts"
#    2) 1) 1) "1692114319484-0"
#          2) 1) "date"
#             2) "2023-08-15"
#             3) "content"
#             4) "Redis Streams Introduction"
#       2) 1) "1692114819142-0"
#          2) 1) "date"
#             2) "2023-08-15"
#             3) "content"
#             4) "Redis Streams Advance"

Note: COUNT will limit to get just two rows

  • XREAD: This command fetches entries from the stream.

  • BLOCK 5000: This parameter indicates that the client will block for a maximum of 5000 milliseconds if there are no entries available. It provides a way to wait for new messages without constantly polling the stream.

  • STREAMS: This directive specifies the keys and IDs to read from. In our case, we're using the key #foo and the ID $. The $ ID represents the latest entry in the stream at the time of the command, effectively ignoring any entries that existed before the command was issued.

Prerequisites

Before getting started, make sure you have the following installed:

Setting Up the Project

Create a new project directory and navigate to it using the terminal. Then, run the following commands to initialize the project and install the required dependencies:

mkdir independence-history-app
cd independence-history-app
npm init -y
npm install express http ws ioredis

Creating the WebSocket Server

We will start by creating a WebSocket server using the ws library. This server will listen for incoming WebSocket connections and send real-time updates from the Redis Stream to connected clients.

Create a file named client.js and add the following code:

const express = require("express");
const http = require("http");
const WebSocket = require("ws");
const Redis = require("ioredis");
const path = require("path");
 
const app = express();
const server = http.createServer(app);
const wss = new WebSocket.Server({ server });
const redis = new Redis();
 
const streamKey = process.env.STREAM || "india-historical-moment";
 
app.get("/", (req, res) => {
  res.sendFile(path.join(__dirname, "index.html"));
});
 
wss.on("connection", async (ws) => {
  let lastId = "0";
  const sleepMs = 5000;
 
  while (true) {
    await new Promise((resolve) => setTimeout(resolve, 300));
    try {
      const resp = await redis.xread(
        "BLOCK",
        sleepMs,
        "STREAMS",
        streamKey,
        lastId
      );
 
      if (resp) {
        const [key, messages] = resp[0];
        const [messageId, data] = messages[0];
        lastId = messageId;
 
        const dataDict = {};
        const dataObj = JSON.parse(data[1]);
        for (const k in dataObj) {
          dataDict[k] = dataObj[k].toString();
        }
        dataDict.id = messageId;
        dataDict.key = key;
        ws.send(JSON.stringify(dataDict));
      }
    } catch (e) {
      console.error("ERROR REDIS CONNECTION:", e);
    }
  }
});
 
server.listen(8000, () => {
  console.log("Web server listening on port 8000");
});

In this code, we create an Express app, serve the index.html file, and set up a WebSocket server using the ws library. The WebSocket server continuously reads data from the Redis Stream and sends it to connected clients.

Creating the Redis Stream Producer

Now, let's create a Redis Stream producer that adds historical moments to the Redis Stream.

Create a file named producer.js and add the following code:

const Redis = require("ioredis");
 
const streamKey = process.env.STREAM || "india-historical-moment";
const producer = process.env.PRODUCER || "indians";
 
const redis = new Redis();
 
async function sendMessages(messages) {
  for (const message of messages) {
    try {
      const { content, date } = message;
 
      const data = {
        producer: producer,
        content: content,
        date: date,
      };
 
      const resp = await redis.xadd(streamKey, "*", "data", JSON.stringify(data));
      console.log(resp);
    } catch (e) {
      console.error("ERROR REDIS CONNECTION:", e);
    }
 
    await new Promise((resolve) => setTimeout(resolve, 500));
  }
}
 
const historicalMessages = [
  {
    content: "๐Ÿ‡ฎ๐Ÿ‡ณ The Indian National Congress was founded.",
    date: "December 28, 1885",
  },
  {
    content: "๐Ÿ‡ฎ๐Ÿ‡ณ Partition of Bengal was announced.",
    date: "July 20, 1905",
  },
  {
    content: "๐Ÿ”ซ The Jallianwala Bagh massacre occurred.",
    date: "April 13, 1919",
  },
  {
    content: "Happy Independence Day! ๐Ÿ‡ฎ๐Ÿ‡ณ",
    date: "August 15, 2023",
  },
];
sendMessages(historicalMessages);

This code defines a function sendMessages that sends historical messages to the Redis Stream. The messages are sent in a loop, and each message contains content and a date.

Creating the HTML Frontend

Finally, let's create an HTML file that displays the real-time historical moments received from the WebSocket server.

Create a file named index.html and add the following code:

<!DOCTYPE html>
<html>
<head>
  <title>Redis Stream Demo</title>
  <style>
    /* Your CSS styles here */
  </style>
</head>
<body>
  <h1>Independence Day Stream</h1>
  <div id="mydata"></div>
  <script>
    var el = document.getElementById("mydata");
    const ws = new WebSocket("ws://localhost:8000");
    ws.onmessage = function(event) {
      const mydata = JSON.parse(event.data);
      var tag = document.createElement("p");
      var text = document.createTextNode(
        `${mydata.date}: ${mydata.content}`
      );
      tag.appendChild(text);
      el.appendChild(tag);
    };
  </script>
</body>
</html>

This HTML file creates a WebSocket connection to the server and displays the received historical moments in a <div> element.

Running the Application

To run the application, follow these steps:

  1. Start the Redis server using the redis-server command.
  2. Run the producer script: node producer.js
  3. Run the server script: node server.js
  4. Open your web browser and navigate to http://localhost:8000.

You should now see the historical moments being displayed in real-time on the web page.

The complete code for this example is available on GitHub.

Source Code

Conclusion

In this tutorial, we explored how to build a real-time web application using Redis Streams and Node.js. We created a WebSocket server that reads data from a Redis Stream and sends it to connected clients. We also created a Redis Stream producer to add historical moments to the stream. By combining Redis Streams and WebSocket communication, you can build powerful real-time applications that display and process data in real-time.

Remember that this is just a basic example. In a real-world scenario, you can customize and enhance this application by adding authentication, error handling,

Next Steps