In this blog, I will discuss the Redis Message Queue, Stream type of Redis. Redis has introduced a new data type called Stream type since 5.0, which is specially designed for message queues. There are many more message queues such as RabbitMQ, Kafka but here we will just discuss how to implement Redis as a message queue.
In a distributed system, when two components want to communicate based on a message queue, one component sends a message to the message queue, which we call a producer, and the other component consumes the message, and then processes it, which we call a consumer. As shown below:
The message queue is especially to clear up the trouble of processing inconsistent competencies among producers and consumers. It is often an indispensable middleware in large factories. Redis had a message queue function based on publisher and subscriber (pub/sub) before 5.0.
Redis has a disadvantage that when there is a Redis downtime, network disconnection, etc., messages get discarded. However, Redis Stream provides message persistence and master-slave replication functions, allowing any client to access the data at any time, and remember the location of each client’s access, and ensure that the message is not lost.
Redis Message Queue Commands
ADD infohubblog * 1 hello
XADD is used to insert a message into the message queue(In the Current Example message queue name is infohubblog). The key of the message is the 1 and the value is “hello”. The “*” after infohubblog auto-generate globally unique ID. It is automatically generated for the inserted message, 1631288930852-0, the first half of 1631288930852 indicates the UNIX time in milliseconds of the server, and the second half of 0 is a message sequence number in order to distinguish messages which are delivered at the same time.
XTRIM infohubblog maxlen 100
It is used to remove older entries from the message queue based on parameters such as MAXLEN or MINID. When the Stream reaches the maximum length, the old messages will be deleted. In the above example, if the stream reaches a maximum length of 5 then it will delete the old message. Due to the internal implementation mechanism of the stream, and accurate setting of an upper limit of length will consume more resources, so we generally adopt a fuzzy setting method:
XTRIM infohubblog maxlen ~ 5 , which means that the length can exceed 5, which can be 6, 9, etc., It is up to redis to determine when to truncate.
It returns the number of entries inside the stream. In the above example, XLEN returns the length of the message queue i.e. infohubblog.
It is used to remove specific entries from the message queue. In the above example, the command Indicates to delete the message with ID 1631288930852-0 in the message queue infohubblog.
XRANGE infohubblog - +
This command returns all the streams which are in the range of the given IDs.
– and + are Special ID which means minimum possible ID and maximum possible ID present inside the stream. In the above example, it returns all the entries in the stream.
XREAD block 10000 streams infohubblog $
It is used to read the message. “$” represents the latest message, and “block 10000” is blocking time in milliseconds i.e. 10s. In the above example, XREAD is reading a message, if no message arrives, XREAD will block for 10s and then return NIL. If a message arrives within 10s then the message is returned.
XGROUP CREATE infohubblog mygroup 0
XGROUP is used when you want to create New Consumer, Destroy a Consumer Group, Delete Specific Consumer etc. In the above example, I have created a consumer group mygroup for the message queue infohubblog, 0 means to read from the very beginning position.
In Order to Destroy the Consumer execute
XGROUP DESTROY infohubblog consumers.
XREADGROUP group mygroup consumer1 streams infohubblog >
XREADGROUP is a special version of XREAD with the support of consumer groups. In the above example, consumer1 in the consumer group mygroup reads all messages from the message queue infohubblog, where “>” means to start reading from the first unconsumed message.
It should be noted that once the message in the message queue is consumed by a consumer in the consumer group, it can no longer be read by other consumers in the consumer group. The purpose of using consumer groups is to allow multiple consumers in the group to share and read messages. Therefore, we usually let each consumer read part of the message so that the message read load is evenly distributed among multiple consumers.
XPENDING infohubblog mygroup
In order to ensure that consumers can still obtain unprocessed messages after a failure and restart, Streams will automatically use an internal queue to store the messages read by each consumer in the consumer group until the consumer uses the XACK command to notify Streams, The message has been processed. When the consumer restarts, you can use the XPENDING command to view the messages that have been read but have not been confirmed.
XACK infohubblog mygroup 1631289246997-0
It means that the consumer group mygroup has confirmed that it has processed the message with id 1631289246997-0 in the test message queue.
So far, we have understood the usage of using the Stream type to implement message queues.
Why do we use Redis as a message queue?
To use message queues, you should use special message queue middleware such as Kafka and RabbitMQ, and Redis is more suitable for caching. In fact, I think that the technology used is related to the application scenario you are currently encountering. If your message communication is not large and you are not sensitive to data loss, then using Redis as a message queue is a good way. After all, Redis is compared to Kafka. For professional messaging systems, it is more lightweight and has low maintenance costs.
Hope You Like our “Redis Message Queue | Implementation” Blog. Please subscribe to our Blog for upcoming blogs.