Python is based on the mysql implementation of simple queues and cross process lock instances

  • 2020-04-02 13:47:44
  • OfStack

Generally, in the process of multi-process application development, it is inevitable that multiple processes will access the same resource (critical resource), in this case, we must add a global lock, to achieve synchronous access to resources (that is, only one process can access the resource at a time).

Here's an example:

Suppose we use mysql to implement a task queue, the implementation process is as follows:

1. Create the Job table in Mysql to store queue tasks That is as follows:


create table jobs(
  id auto_increment not null primary key,
  message text not null,
  job_status not null default 0
);

Message is used to store the task information, and job_status is used to identify the task status, assuming that there are only two states, 0: in the queue and 1: out of the queue  
 
2. There is a producer process that puts new data in the job table To queue:


insert into jobs(message) values('msg1');

3. Take the queuing information from the job table, assuming there are multiple consumer processes , the operation to be done is as follows:


select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id For the record just obtained id

4. If there is no cross-process lock, two consumer processes may fetch duplicate messages at the same time, resulting in a message being consumed many times. This is something we don't want to see, so we need to implement a lock across processes.

= = = = = = = = = = = = = = = = = = = = = = = = = line = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

When it comes to cross-process lock implementation, we mainly have several ways to implement it:

(1) the semaphore
(2) file lock FCNTL
(3)socket (port number binding)
(4) signal
Each of these ways has its own advantages and disadvantages, generally speaking, the first two ways may be more, here I will not go into details, you can go to the information.
 
When looking up the data, I found that there is a lock implementation in mysql, which is suitable for application scenarios with low performance requirements, and large concurrent distributed access may have bottlenecks.
 
A demo of this is implemented in python, as follows:
 
File name: glock.py


#!/usr/bin/env python2.7 
# 
# -*- coding:utf-8 -*- 
# 
#  Desc  : 
# 
import logging, time 
import MySQLdb 
class Glock: 
  def __init__(self, db): 
    self.db = db 
  def _execute(self, sql): 
    cursor = self.db.cursor() 
    try: 
      ret = None 
      cursor.execute(sql) 
      if cursor.rowcount != 1: 
        logging.error("Multiple rows returned in mysql lock function.") 
        ret = None 
      else: 
        ret = cursor.fetchone() 
      cursor.close() 
      return ret 
    except Exception, ex: 
      logging.error("Execute sql "%s" failed! Exception: %s", sql, str(ex)) 
      cursor.close() 
      return None 
  def lock(self, lockstr, timeout): 
    sql = "SELECT GET_LOCK('%s', %s)" % (lockstr, timeout) 
    ret = self._execute(sql) 
 
    if ret[0] == 0: 
      logging.debug("Another client has previously locked '%s'.", lockstr) 
      return False 
    elif ret[0] == 1: 
      logging.debug("The lock '%s' was obtained successfully.", lockstr) 
      return True 
    else: 
      logging.error("Error occurred!") 
      return None 
  def unlock(self, lockstr): 
    sql = "SELECT RELEASE_LOCK('%s')" % (lockstr) 
    ret = self._execute(sql) 
    if ret[0] == 0: 
      logging.debug("The lock '%s' the lock is not released(the lock was not established by this thread).", lockstr) 
      return False 
    elif ret[0] == 1: 
      logging.debug("The lock '%s' the lock was released.", lockstr) 
      return True 
    else: 
      logging.error("The lock '%s' did not exist.", lockstr) 
      return None 
#Init logging 
def init_logging(): 
  sh = logging.StreamHandler() 
  logger = logging.getLogger() 
  logger.setLevel(logging.DEBUG) 
  formatter = logging.Formatter('%(asctime)s -%(module)s:%(filename)s-L%(lineno)d-%(levelname)s: %(message)s') 
  sh.setFormatter(formatter) 
  logger.addHandler(sh) 
  logging.info("Current log level is : %s",logging.getLevelName(logger.getEffectiveLevel())) 
def main(): 
  init_logging() 
  db = MySQLdb.connect(host='localhost', user='root', passwd='') 
  lock_name = 'queue' 
 
  l = Glock(db) 
 
  ret = l.lock(lock_name, 10) 
  if ret != True: 
    logging.error("Can't get lock! exit!") 
    quit() 
  time.sleep(10) 
  logging.info("You can do some synchronization work across processes!") 
  ##TODO 
  ## you can do something in here ## 
  l.unlock(lock_name) 
if __name__ == "__main__": 
  main() 

In main:

In l.lock(lock_name, 10), 10 is the time for timeout of 10 seconds. If the lock cannot be acquired in 10 seconds, it will return and perform the following operation.
 
In this demo, where TODO is marked, you can place the logic for consumers to fetch messages from the job table. Above the dividing line.

2. Suppose there are multiple consumer processes, take the queuing information from the job table, and the following operations should be done:


select * from jobs where job_status=0 order by id asc limit 1;
update jobs set job_status=1 where id = ?; -- id For the record just obtained id

In this way, multiple processes can be guaranteed to access critical resources synchronously, ensuring the consistency of the data.
 
During the test, launch two glock.py and the results are as follows:


[@tj-10-47 test]# ./glock.py  
2014-03-14 17:08:40,277 -glock:glock.py-L70-INFO: Current log level is : DEBUG 
2014-03-14 17:08:40,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully. 
2014-03-14 17:08:50,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes! 
2014-03-14 17:08:50,299 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released. 

You can see that the first glock.py is unlocked at 17:08:50, and the next glock.py is unlocked at 17:08:50, which proves that this is completely feasible.


[@tj-10-47 test]# ./glock.py 
2014-03-14 17:08:46,873 -glock:glock.py-L70-INFO: Current log level is : DEBUG
2014-03-14 17:08:50,299 -glock:glock.py-L43-DEBUG: The lock 'queue' was obtained successfully.
2014-03-14 17:09:00,299 -glock:glock.py-L81-INFO: You can do some synchronization work across processes!
2014-03-14 17:09:00,300 -glock:glock.py-L56-DEBUG: The lock 'queue' the lock was released.
[@tj-10-47 test]#


Related articles: