Discussion on the coordinated application of Redis in distributed system

  • 2020-05-12 06:25:48
  • OfStack

In a distributed system, each process (this article USES process to describe the distributed systems running in the main body, they can be in with a physical nodes can be on different physical nodes) is usually need to operate in the coordination between each other, sometimes different processes the data processing have dependencies, must, in accordance with the order of the 1 set for processing, sometimes in some specific time 1 need a process to deal with some affairs, and so on, people often use technology such as distributed lock, election algorithm to coordinate the behavior between each process. Because of complex features of distributed system itself, as well as to the requirement of fault tolerance, these techniques are usually heavy, such as Paxos algorithm, bully election algorithm, ZooKeeper, etc., focus on news communication rather than Shared memory, often also is notoriously complex and difficult to understand, when the problems encountered in the concrete implementation and implementation is a challenge.
Redis is often referred to as a kind of NoSQL software, but it is essentially a distributed data structure server software that provides a distributed memory-based data structure storage service. In the implementation, only 1 thread is used to process the specific memory data structure to ensure the atomic characteristics of its data operation command; It also support based on Lua script, each Redis instances to use with a Lua interpreter explained running Lua scripts, thus Lua script also had the atomic properties, the characteristics of the atomic operation makes ways of distributed system based on the pattern of Shared memory as possible, and have a lot of appeal, and complex based on message mechanism, based on the pattern of Shared memory for many technicians obviously much easier to understand, especially those who already know multithreaded or multi-process technology. In practice, and not all of the distributed system were like distributed database systems require strict model, and the use of technology is also not 1 all need to have a solid theoretical foundation and mathematical proof, this makes to coordinate the technology of distributed system based on the Redis have the practical value of 1 set, in fact, people also have a lot of try. In this paper, some coordination techniques are introduced.

signal/wait operations
In a distributed system, and some process need to wait for state changes of other processes, or notify other processes own state changes, for example, the process between the dependency order on the operation, has the process needs to wait for, have a process to launch signal to the process of waiting for subsequent operations, the work can be Redis Pub/Sub series of commands to complete, such as:


import redis, time
rc = redis.Redis()
def wait( wait_for ):
ps = rc.pubsub()  
ps.subscribe( wait_for )
ps.get_message()
wait_msg = None
while True:
msg = ps.get_message()
if msg and msg['type'] == 'message':
wait_msg = msg
break
time.sleep(0.001)
ps.close()
return wait_msgdef
signal_broadcast( wait_in, data ):
wait_count = rc.publish(wait_in, data)
return wait_count
It is easy to extend this method to implement other waiting strategies, such as try wait, wait timeout, wait wait for all signals or any one signal to arrive, etc. Because Redis itself supports message subscriptions based on pattern matching (using the psubscribe command), setting the wait signal can also be done through pattern matching.
And any other data manipulation, subscribe message is instant the perishable, not be kept in memory, do not persist, if the client to the server connection is not resend, but in the case of the configuration master/slave node, the publish command synchronization to slave node, so that we can at the same time master and slave nodes connected to subscribe to a channel, which can receive the publishers publish news, at the same time even master in use process is out of order, Or if the connection to master fails, we can still get the subscribed messages from the slave node for better robustness. Also, because the data is not written to disk, this approach has performance advantages.
In the above method, the signal is broadcast, and all processes in wait will receive the signal. If you want to set the signal to unicast, and only one of them is allowed to receive the signal, you can do this by means of a channel name mode, such as:
Channel name = channel name prefix (channel) + subscriber globally only 1 ID(myid)
Where only 1 ID can be UUID, or it can be a random number string, ensuring that only 1 is global. Before sending signal, use the command "pubsub channels channel*" to obtain the channels to which all subscribers subscribe, and then send a signal to one of the randomly assigned channels; While waiting, you need to pass your own only 1 ID, merge the channel name prefix and only 1 ID into one channel name, and then wait as in the previous example 1. Here's an example:


import random
single_cast_script="""
local channels = redis.call('pubsub', 'channels', ARGV[1]..'*');
if #channels == 0
then
return 0;
end;
local index= math.mod(math.floor(tonumber(ARGV[2])), #channels) + 1;     
return redis.call( 'publish', channels[index], ARGV[3]); """
def wait_single( channel, myid):
return wait( channel + myid )
def signal_single( channel, data):
rand_num = int(random.random() * 65535)
return rc.eval( single_cast_script, 0, channel, str(rand_num), str(data) )

Distributed locks Distributed Locks
The implementation of distributed lock is a direction that people have explored a lot. On the official website of Redis, there is a special document that introduces the distributed lock based on Redis, in which the Redlock algorithm is proposed and the implementation cases of multiple languages are listed. Here is a brief introduction.
The Redlock algorithm focuses on three elements that satisfy distributed locks:
Security: mutual exclusion is guaranteed and only one client can hold a lock at any time
Free from deadlocks: even if the client that currently holds the lock crashes or is separated from the cluster, the other clients will always end up with the lock.
Fault tolerance: as long as most of the Redis nodes are online, the client can acquire and release the lock.

A simple and straightforward way to implement a lock is to use the SET NX command to set an Key with a live cycle TTL to acquire the lock, to release the lock by deleting Key, and to ensure that deadlocks are avoided through the live cycle. However, there is a single point of failure risk with this approach, and if the master/slave node is deployed, it may lead to security conflicts under certain conditions, such as:

The client A obtains the lock from the master node The master node crashed before copying key to the slave node The slave node is promoted to the new master node The client B obtained the lock from the new master node, which was actually held by the client A. As a result, two clients in the system held the same mutex in the same period of time, which broke the security of the mutex.

In the Redlock algorithm, locking is done by a command like the following:


SET resource_name my_random_value NX PX 30000

Here, my_random_value is a globally different random number. Each client needs to generate this random number and remember it, which will be used when unlocking later.
Unlocking requires an Lua script. You cannot simply delete Key directly, or you may release the lock held by others:


if redis.call("get",KEYS[1]) == ARGV[1] then return   
redis.call("del",KEYS[1])else return 0end

The value of ARGV[1] is the value of my_random_value when the lock was added earlier.
If better fault tolerance is needed, a cluster with N (N is odd) redundant Redis nodes can be set up. In this case, the algorithm for one client to acquire and release locks is as follows:
Get the current timestamp, timestamp_1, in milliseconds.
With the same Key and random values, the locks are acquired from the N nodes in turn. Each time the locks are acquired, a timeout is set. The timeout period should be less than the automatic release time of the lock on all nodes, so as to avoid taking too long on a certain node.
The client subtracts the current timestamp from the timestamp timestamp_1 in step 1 to calculate the total elapsed time to acquire the lock. The client is considered to have successfully acquired the lock only if it has acquired more than half of the nodes and the total time taken is less than the lock's lifetime.
If a lock is acquired, its survival time is the initial default lock survival time minus the total time taken to acquire the lock.
If the client cannot obtain the lock, it should be unlocked on all nodes immediately.
If you want to retry, reacquire the lock after a random delay.
A client that has acquired a lock releases the lock by simply unlocking it on all nodes.

The Redlock algorithm does not need to guarantee that the clocks between the Redis nodes are synchronized (either physically or logically), unlike some traditional distributed locking algorithms based on synchronized clocks. Details of the Redlock algorithm can be found in the official Redis documentation, as well as in the multi-language implementations listed in the documentation.

Election algorithm
In a distributed system, some transactions are often needed in a certain time period are done by a process, or by a process as leader to coordinate with other process, this time will need to use the election algorithm, the election of traditional algorithms have bullying election algorithm (arbitrary election algorithm), ring election algorithm, Paxos algorithm, Zab algorithm (ZooKeeper), etc., some of these algorithm relies on the reliable delivery of messages and clock synchronization, a bit too complicated, difficult to achieve and validated. The new Raft algorithm is much easier than the other algorithms, but it still relies on heartbeat broadcasting and logical clocks, leader constantly broadcasts messages to follower to maintain membership, and other algorithms are required for node expansion.
The voting algorithm is similar to a distributed lock in that you can have no more than one leader resource at any given time. Of course, we can also use the previously described distributed lock to achieve, set an leader resource, get the resource lock for leader, the life cycle of the lock passed, and then re-compete for the resource lock. This is a kind of competitive algorithm, this method can lead to have more gap period without leader, bad also to realize leader re-election, and leader re-election is a bigger benefit, such as leader to perform tasks on schedule 1, compare view logs and troubleshoot problems also more convenient, if we need an algorithm implementation leader can be re-elected, so this method can be used:


import redis
rc = redis.Redis()
local_selector = 0def master():
global local_selector
master_selector = rc.incr('master_selector')
if master_selector == 1:
 # initial / restarted
local_selector = master_selector
else:
if local_selector > 0: # I'm the master before
if local_selector > master_selector: # lost, maybe the db is fail-overed.  
local_selector = 0
else: # continue to be the master
local_selector = master_selector
if local_selector > 0: # I'm the current master
rc.expire('master_selector', 20) return local_selector > 0

This algorithm encourages reelection, and a new leader needs to be reelected only if the current leader fails or a task takes longer than its term, or if the Redis node fails to recover. In master/slave mode, if an master node fails and an slave node is promoted to a new master node, it will not result in two leader even if the master_selector value is not synchronized successfully. If a certain leader 1 is continuously added, the value of master_selector will increase continuously. Considering that master_selector is a 64-bit integer type, it is impossible to overflow within a foreseeable period of time, and master_selector will reset to start from 1 every time when leader is replaced, this way of increasing is acceptable. However, if the Redis client (such as Node.js) does not support 64-bit integer types, you need to deal with this situation. Processing time over the term, if the current leader process, other processes can regenerate new leader old leader processed process after the transaction, if the new leader experience the process of the term is more than or equal to the old process of leader term frequency, then there may be two leader progress, in order to avoid this kind of situation, after each leader process in dealing with office affairs should check whether 1 under their own processing time than the term, if more than the term, You should set local_selector to 0 before calling master to check if you are an leader process.

The message queue
Message queues are the communication infrastructure between distributed systems, through which complex coordination and interoperation between processes can be constructed. Redis also provides primitives for constructing message queues, such as the Pub/Sub series command, which provides a subscription-based/publish-based method for sending and receiving messages, but the Pub/Sub messages are not kept within Redis, and thus are not persisted, for scenarios where the transmitted message is lost and does not matter.
If you want to take persistence into consideration, you can consider the list series operation command, which USES the LPUSH series command (LPUSH, RPUSH, etc.) to push a message to an list, and the POP series command (LPOP, RPOP,BLPOP,BRPOP, etc.) to get a message on an list. FIFO and FILO can be obtained through different combinations, such as:


import redis
rc = redis.Redis()
def fifo_push(q, data):
 rc.lpush(q, data)
def fifo_pop(q):
return rc.rpop(q)
def filo_push(q, data):
rc.lpush(q, data)
def filo_pop(q):
return rc.lpop(q)

If the BLPOP,BRPOP commands are used instead of LPOP, RPOP, then blocking waits are also supported when list is empty. However, even this way to achieve the persistent, if when POP message returned by the network failure, is still message loss happens, according to this demand Redis provides RPOPLPUSH and BRPOPLPUSH command to extract news first stored in another 1 list, the client can first list view and process the message from this data, processed then delete the message from the list data, to ensure that the message will not be lost, the sample is as follows:


def safe_fifo_push(q, data):
rc.lpush(q, data)
def safe_fifo_pop(q, cache):
msg = rc.rpoplpush(q, cache) # check and do something on msg    
rc.lrem(cache, 1) # remove the msg in cache list. return msg

If you use the BRPOPLPUSH command instead of the RPOPLPUSH command, you can block the wait while q is empty.


Related articles: