Code to implement message queues using Redis flows

  • 2020-09-28 09:14:46
  • OfStack

Now that you've covered the basic features of Redis streams, it's time to use them to build some real-world applications. As one of the typical applications of flows, message queues are very exemplary, so we will use the related functions of Redis flows to build a message queue application that has similar functions to those we have previously built using other Redis data structures.

Listing 10-1 shows a message queue implementation with basic functionality:

The code begins with several transformation functions, which are responsible for converting and formatting the program's relevant inputs and outputs. The MessageQueue class is used to implement message queue. The three methods of adding message, removing message and returning message quantity respectively use the flow XADD command, XDEL command and XLEN command. Two fetch methods of message queue, get_message() and get_by_range(), invoke the stream's XRANGE command in two forms, respectively; Finally, the iterate() method for iterating messages USES the XREAD command flow to iterate.

Listing 10-1 shows a message queue implemented using the Redis flow: /stream/ es22EN_queue.py


def reconstruct_message_list(message_list):
  """
   To enable multiple messages to be returned to the caller in a more structured manner, 
   will  Redis  Return multiple messages from the original format: 
  [(id1, {k1:v1, k2:v2, ...}), (id2, {k1:v1, k2:v2, ...}), ...]
   Convert to the following format: 
  [{id1: {k1:v1, k2:v2, ...}}, {id2: {k1:v1, k2:v2, ...}}, ...]
  """  result = []
  for id, kvs in message_list:
    result.append({id: kvs})
  return result
def get_message_from_nested_list(lst):
  """
   Extract the message body from the nested list. 
  """
  return lst[0][1]
class MessageQueue:
  """
   use  Redis  The message queue that the flow implements. 
  """
  def __init__(self, client, stream_key):
    self.client = client
    self.stream = stream_key
  def add_message(self, key_value_pairs):
    """
     Stores the given key-value pair in a message and returns the corresponding message  ID  . 
    """
    return self.client.xadd(self.stream, key_value_pairs)
  def get_message(self, message_id):
    """
     According to the given message  ID  Returns the corresponding message, or if the message does not exist  None  . 
    """
    reply = self.client.xrange(self.stream, message_id, message_id)
    if len(reply) == 1:
      return get_message_from_nested_list(reply)

  def remove_message(self, message_id):
    """
     According to the given message  ID  Delete the message and ignore the action if the message does not exist. 
    """
    self.client.xdel(self.stream, message_id)

  def len(self):
    """
     Returns the length of the message queue. 
    """
    return self.client.xlen(self.stream)

  def get_by_range(self, start_id, end_id, max_item=10):
    """
     According to the given  ID  The interval returns a message in the queue. 
    """
    reply = self.client.xrange(self.stream, start_id, end_id, max_item)
    return reconstruct_message_list(reply)

  def iterate(self, start_id=0, max_item=10):
    """
     Iterate through the message queue and return the most  N  Bar is greater than the given  ID  The news. 
    """
    reply = self.client.xread({self.stream: start_id}, max_item)
    if len(reply) == 0:
      return list()
    else:
      messages = get_message_from_nested_list(reply)
      return reconstruct_message_list(messages)

For this message queue implementation, we can create an instance of it by executing the following code:


>>> from redis import Redis
>>> from message_queue import MessageQueue
>>> client = Redis(decode_responses=True)
>>> mq = MessageQueue(client, "mq")

Then add 10 messages to the queue by executing the following code:


>>> for i in range(10):
...  key = "key{0}".format(i)
...  value = "value{0}".format(i)
...  msg = {key:value}
...  mq.add_message(msg)
...
'1554113926280-0'
'1554113926280-1'
'1554113926281-0'
'1554113926281-1'
'1554113926281-2'
'1554113926281-3'
'1554113926281-4'
'1554113926281-5'
'1554113926281-6'
'1554113926282-0'

You can also get the specified message based on ID, or use the get_by_range() method to get multiple messages at the same time:


>>> mq.get_message('1554113926280-0')
{'key0': 'value0'}
>>> mq.get_message('1554113926280-1')
{'key1': 'value1'}
>>> mq.get_by_range("-", "+", 3)
[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]

Or iterate over the message queue using the iterate() method, and so on:


>>> mq.iterate(0, 3)
[{'1554113926280-0': {'key0': 'value0'}}, {'1554113926280-1': {'key1': 'value1'}}, {'1554113926281-0': {'key2': 'value2'}}]
>>> mq.iterate('1554113926281-0', 3)
[{'1554113926281-1': {'key3': 'value3'}}, {'1554113926281-2': {'key4': 'value4'}}, {'1554113926281-3': {'key5': 'value5'}}]

conclusion


Related articles: