Redis USES linked lists to implement message queues

  • 2020-05-10 23:10:10
  • OfStack

preface

The Redis linked list is often used for message queuing services to complete message exchange between multiple programs. In my opinion, one of the benefits of redis message queue is that it can be distributed and Shared, just like memcache as the cache of mysql and mysql as the cache of mysql.

Linked lists implement message queues

The Redis linked list supports front and back inserts and front and back pulls, so if you insert an element to the tail and pull an element to the head, this is a kind of message queue, also known as a consumer/producer model. You can do this using lpush and rpop. However, there is one problem. If there is no data in the linked list, the consumer will call rpop in the while loop, thus wasting cpu resources. Fortunately, Redis provides a blocking version of pop command brpop or blpop, which is used as brpop/blpop list timeout When the list is empty, brpop/blpop will block until the timeout is set or list inserts an element.

It can be used as follows:


charles@charles-Aspire-4741:~/mydir/mylib/redis$ ./src/redis-cli
127.0.0.1:6379> lpush list hello
(integer) 1
127.0.0.1:6379> brpop list 0
1) "list"
2) "hello"
127.0.0.1:6379> brpop list 0
// It's blocked here 
/* ---------------------------------------------------- */
// When I was in the other 1 A client lpush1 The client output is 
127.0.0.1:6379> brpop list 0
1) "list"
2) "world"
(50.60s)// Blocking time 

When the list is empty, brpop is blocked, waiting for a timeout or another client lpush1 element. Next, take a look at how the source code implements the blocking brpop command. To achieve client blocking, simply ask the server not to send a message to the client, and the client will block in the read call, waiting for the message to arrive. This is very easy to implement, the key is how to determine the client block linked list has data arrival and notify the client to unblock? Redis, will block the key and block on this key client list is stored in a dictionary, and then every time to insert a linked list database, is whether this new insert the list of client blocking, so, just remove the obstruction of the client, and send just inserted into the list elements to the client, the client is unblocked.

Take a look at the data structures and the properties of server and client


// The blocking state 
typedef struct blockingState {
 /* Generic fields. */
 mstime_t timeout;  /*  timeout  */
 /* REDIS_BLOCK_LIST */
 dict *keys;    /* The keys we are waiting to terminate a blocking
        * operation such as BLPOP. Otherwise NULL. */
 robj *target;   /* The key that should receive the element,
        * for BRPOPLPUSH. */
 /* REDIS_BLOCK_WAIT */
 int numreplicas;  /* Number of replicas we are waiting for ACK. */
 long long reploffset; /* Replication offset to reach. */
} blockingState;
// Continue to list 
typedef struct readyList {
 redisDb *db;// The database where the ready key resides 
 robj *key;// Ready keys 
} readyList;
// Client about properties 
typedef struct redisClient {
 int btype;    /* Type of blocking op if REDIS_BLOCKED. */
 blockingState bpop;  /* blocking state */
}
// Server-related properties 
struct redisServer {
  /* Blocked clients */
 unsigned int bpop_blocked_clients; /* Number of clients blocked by lists */
 list *unblocked_clients; /* list of clients to unblock before next loop */
 list *ready_keys;  /* List of readyList structures for BLPOP & co */
}
// Database-related properties 
typedef struct redisDb {
  //keys->redisCLient mapping 
  dict *blocking_keys;  /* Keys with clients waiting for data (BLPOP) */
 dict *ready_keys;   /* Blocked keys that received a PUSH */
}redisDB

You must have a good understanding of the above data structure, otherwise it is difficult to understand the following code, which needs to manipulate the above data structure. The analysis starts with the brpop command execution function, and the brpop command execution function is


void brpopCommand(redisClient *c) {
 blockingPopGenericCommand(c,REDIS_TAIL);
}
//++++++++++++++++++++++++++++++++++++++++++++++++++
void blockingPopGenericCommand(redisClient *c, int where) {
 robj *o;
 mstime_t timeout;
 int j;
 if (getTimeoutFromObjectOrReply(c,c->argv[c->argc-1],&timeout,UNIT_SECONDS)
  != REDIS_OK) return;// Save the timeout in timeout In the 
 for (j = 1; j < c->argc-1; j++) {
  o = lookupKeyWrite(c->db,c->argv[j]);// Find a linked list of operations in the database 
  if (o != NULL) {// If not empty 
   if (o->type != REDIS_LIST) {// Not a linked list type 
    addReply(c,shared.wrongtypeerr);// An error 
    return;
   } else {
    if (listTypeLength(o) != 0) {// The list is not empty 
     /* Non empty list, this is like a non normal [LR]POP. */
     char *event = (where == REDIS_HEAD) ? "lpop" : "rpop";
     robj *value = listTypePop(o,where);// From the list pop Out of the 1 An element 
     redisAssert(value != NULL);
     // Send to the client pop Out of the element information 
     addReplyMultiBulkLen(c,2);
     addReplyBulk(c,c->argv[j]);
     addReplyBulk(c,value);
     decrRefCount(value);
     notifyKeyspaceEvent(REDIS_NOTIFY_LIST,event,
          c->argv[j],c->db->id);
     if (listTypeLength(o) == 0) {// If the list is empty, drop the list from the database 
      dbDelete(c->db,c->argv[j]);
      notifyKeyspaceEvent(REDIS_NOTIFY_GENERIC,"del",
           c->argv[j],c->db->id);
     }
     /*  omit 1 Part of the  */
    }
   }
  }
 }
  /*  If the list is empty, block the client  */
  blockForKeys(c, c->argv + 1, c->argc - 2, timeout, NULL);
}

As you can see from the source, brpop can manipulate multiple list variables, for example brpop list1 list2 0 , but can only output the first list with elements. If list1 has no elements and list2 has elements, then output list2 elements; If both have elements, output the element list1; If there are no elements, wait for one of the lists to be inserted, and then return at 2. Finally, call blockForyKeys block


void blockForKeys(redisClient *c, robj **keys, int numkeys, mstime_t timeout, robj *target) {
 dictEntry *de;
 list *l;
 int j;
 c->bpop.timeout = timeout;// Timeout time is assigned to the client blockingState attribute 
 c->bpop.target = target;// This property applies to brpoplpush The input object of the command, if yes brpop, // the target Is empty 
 if (target != NULL) incrRefCount(target);// Not empty, increase the reference count 
 for (j = 0; j < numkeys; j++) {
  /*  Will block the key deposit c.bpop.keys In the dictionary  */
  if (dictAdd(c->bpop.keys,keys[j],NULL) != DICT_OK) continue;
  incrRefCount(keys[j]);
  /* And in the other "side", to map keys -> clients */
  // Will block the key And client add in c->db->blocking_keys
  de = dictFind(c->db->blocking_keys,keys[j]);
  if (de == NULL) {
   int retval;
   /* For every key we take a list of clients blocked for it */
   l = listCreate();
   retval = dictAdd(c->db->blocking_keys,keys[j],l);
   incrRefCount(keys[j]);
   redisAssertWithInfo(c,keys[j],retval == DICT_OK);
  } else {
   l = dictGetVal(de);
  }
  listAddNodeTail(l,c);// Add to the client point list of blocking keys 
 }
 blockClient(c,REDIS_BLOCKED_LIST);// Sets the client blocking flag 
}

The blockClient function simply sets the client properties as follows


void blockClient(redisClient *c, int btype) {
 c->flags |= REDIS_BLOCKED;// Set the sign 
 c->btype = btype;// Blocking operation type 
 server.bpop_blocked_clients++;
}

Since the brpop command execution function ends after this function, the client is blocked in the read call because no message is sent to the client. So how do you unblock the client?

Insert 1 element to unblock

The execution function of any instruction calls the call function in the processCommand function, then the command execution function in the call function, and lpush as well. When the lpush has been executed, the list is not empty, return to the processCommand call, and execute the following statement


if (listLength(server.ready_keys))
   handleClientsBlockedOnLists();

These two lines of code are to check whether server.ready_keys is empty. If it is not empty, it means that there are some ready linked lists. Now the question is again, where does this server.ready_keys update the list?

In the dbAdd function, when the value type added to the database is REDIS-LIST, then the signalListAsReady function is called to add the pointer to the list into server.ready_keys:


//db.c
void dbAdd(redisDb *db, robj *key, robj *val) {
 sds copy = sdsdup(key->ptr);
 int retval = dictAdd(db->dict, copy, val);// Add the data to the database 
 redisAssertWithInfo(NULL,key,retval == REDIS_OK);
 // Determines if it is a linked list type, and if so, the call has linked list already ready function 
 if (val->type == REDIS_LIST) signalListAsReady(db, key);
 if (server.cluster_enabled) slotToKeyAdd(key);
 }
//t_list.c
void signalListAsReady(redisDb *db, robj *key) {
 readyList *rl;
 /*  No client blocks on the key and returns directly . */
 if (dictFind(db->blocking_keys,key) == NULL) return;
 /*  This key is close to being awakened, so there is no need to rejoin the team  */
 if (dictFind(db->ready_keys,key) != NULL) return;
 /* Ok,  In addition to the above two cases, put this key in server.ready_keys */
 rl = zmalloc(sizeof(*rl));
 rl->key = key;
 rl->db = db;
 incrRefCount(key);
 listAddNodeTail(server.ready_keys,rl);// Add the end of the list 
 /* We also add the key in the db->ready_keys dictionary in order
  * to avoid adding it multiple times into a list with a simple O(1)
  * check. */
 incrRefCount(key);
 // Put the block key in at the same time db->ready_keys
 redisAssert(dictAdd(db->ready_keys,key,NULL) == DICT_OK);
}

OK, when server.ready_keys already has a ready key, is called in the processCommand function handleClientsBlockedOnLists() Function to handle blocking clients. In this function,


void handleClientsBlockedOnLists(void) {
 while(listLength(server.ready_keys) != 0) {
  list *l;
  /*  will server.ready_keys Assigned to 1 A new one list, then server.ready_keys empty  */
  l = server.ready_keys;
  server.ready_keys = listCreate();
  /*  Each iteration 1 A ready each 1 a readyList */
  while(listLength(l) != 0) {
   listNode *ln = listFirst(l);// For the first 1 A ready readyList
   readyList *rl = ln->value;
   /*  from rl Belongs to the database deleted rl */
   dictDelete(rl->db->ready_keys,rl->key);
   /*  The query rl Belongs to the database lookup rl->key , Reply to blocking client rl->key Number one in the list 1 An element */
   robj *o = lookupKeyWrite(rl->db,rl->key);
   if (o != NULL && o->type == REDIS_LIST) {
    dictEntry *de;
    /*  in rl->db->blocking_keys Find block in rl->key Client linked list  */
    de = dictFind(rl->db->blocking_keys,rl->key);
    if (de) {
     list *clients = dictGetVal(de);// Convert to client linked list 
     int numclients = listLength(clients);
     while(numclients--) {// Send a message to each client 
      listNode *clientnode = listFirst(clients);
      redisClient *receiver = clientnode->value;// Blocked client 
      robj *dstkey = receiver->bpop.target;//brpoplpush Command destination linked list 
      int where = (receiver->lastcmd &&
          receiver->lastcmd->proc == blpopCommand) ?
         REDIS_HEAD : REDIS_TAIL;// Gets the direction to take out 
      robj *value = listTypePop(o,where);// Takes the elements of the ready list 
      if (value) {
       /* Protect receiver->bpop.target, that will be
        * freed by the next unblockClient()
        * call. */
       if (dstkey) incrRefCount(dstkey);
       unblockClient(receiver);// Set the client to a non-blocking state 
       if (serveClientBlockedOnList(receiver,
        rl->key,dstkey,rl->db,value,
        where) == REDIS_ERR)
       {
        /* If we failed serving the client we need
         * to also undo the POP operation. */
         listTypePush(o,value,where);
       }// Reply the client to the contents of the elements in the list 
       if (dstkey) decrRefCount(dstkey);
       decrRefCount(value);
      } else {
       break;
      }
     }
    }
    // If the list is empty, it is deleted from the database 
    if (listTypeLength(o) == 0) dbDelete(rl->db,rl->key);
    /* We don't call signalModifiedKey() as it was already called
     * when an element was pushed on the list. */
   }
   /*  recycling rl */
   decrRefCount(rl->key);
   zfree(rl);
   listDelNode(l,ln);
  }
  listRelease(l); /* We have the new list on place at this point. */
 }
}

It can be seen from this source that if there are two clients blocking on a linked list at the same time, then if the linked list inserts an element, only the client that blocks first receives the message, and the client that blocks later continues to block, this is also the idea of blocking first and serving first. The handleClientsBlockedOnLists function is called unblockClient(receiver) , this function is used to contact the client block flag, and to find the client list that db is blocking on key, and to delete the client that is touching the block from the list. Then call serveClientBlockOnList to reply the client to the element you just inserted in the list.


int serveClientBlockedOnList(redisClient *receiver, robj *key, robj *dstkey, redisDb *db, robj *value, int where)
{
 robj *argv[3];
 if (dstkey == NULL) {
  /* Propagate the [LR]POP operation. */
  argv[0] = (where == REDIS_HEAD) ? shared.lpop :
           shared.rpop;
  argv[1] = key;
  propagate((where == REDIS_HEAD) ?
   server.lpopCommand : server.rpopCommand,
   db->id,argv,2,REDIS_PROPAGATE_AOF|REDIS_PROPAGATE_REPL);
  /* BRPOP/BLPOP */
  addReplyMultiBulkLen(receiver,2);
  addReplyBulk(receiver,key);
  addReplyBulk(receiver,value);
 } else {
  /* BRPOPLPUSH */
   /*  omit  */
 }
}

The propagate function mainly sends command information to aof and slave. The omitted part of the function is brpoplpush list list1 0 When the target list of the command, list1, is not empty, insert elements from the list list, pop, into list1. When a message is sent to the client, the client returns from the read function call, becoming non-blocking.

Unblock by timeout

If there is no data insertion in list 1, then the client side will block 1 straight down, which is definitely not possible. Therefore, brpop also supports timeout blocking, which means that the server returns a null value after the block time exceeds 1 fixed value, so the client side will be unblocked.

For the time timeout, it is placed in the time event that 100ms executes once; The timeout unblocking function is also in serverCron; in serverCron->clientsCron->clientsCronHandleTimeout


int clientsCronHandleTimeout(redisClient *c, mstime_t now_ms) {
 time_t now = now_ms/1000;
 //..........
 else if (c->flags & REDIS_BLOCKED) {
  /* Blocked OPS timeout is handled with milliseconds resolution.
   * However note that the actual resolution is limited by
   * server.hz. */
  if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
   /* Handle blocking operation specific timeout. */
   replyToBlockedClientTimedOut(c);
   unblockClient(c);
  }
 }
 //.............

Delete the irrelevant code of this function, the main part is to determine whether the client is blocked, if so, whether the timeout period expires, if so, call replyToBlockedClientTimedOut to reply an empty reply to the client, and contact the client block.

conclusion

The linked list message queue implementation has been parsed for a while, has everyone learned it? I hope this article can bring you a definite help, if you have any questions can leave a message to communicate.


Related articles: