Batch operation of redis database with python

  • 2021-09-24 22:50:44
  • OfStack

Method 1: Use pipeline

When using pipelining to send commands, redis server must partially request to be put into the queue (using memory) and send the results once after execution. During the use of pipeline, it will be "exclusively" linked, and other operations of non-"pipeline" type cannot be carried out until pipeline is closed; If the instruction set of pipeline is large and large, you can use other new link operations in order not to affect other operations (redis maximum time lua-time-limit default is 5s). Batch operations are as follows:


import redis

r = redis.Redis(host='127.0.0.1', port=6379, password='1234567890')
with r.pipeline() as ctx:
  a = time.time()
  ctx.hset('current', "time2", a)
  ctx.hset('current', "time3", a)
  res = ctx.execute()
  print("result: ", res)

Using pipe line to perform transactional operations in the form of optimistic locks


# -*- coding:utf-8 -*-

import redis
from redis import WatchError
from concurrent.futures import ProcessPoolExecutor

r = redis.Redis(host='127.0.0.1', port=6379)

#  Inventory reduction function ,  Cycle until inventory reduction is completed 
#  Adequate stock ,  Successful inventory reduction ,  Return True
#  Insufficient stock ,  Inventory reduction failure ,  Return False
def decr_stock():
  # python Medium redis Transactions are made through pipeline Encapsulated by 
  with r.pipeline() as pipe:
    while True:
      try:
        # watch Inventory key , multi After that, if the key Changed by other clients ,  Transaction operations throw WatchError Anomaly 
        pipe.watch('stock:count')
        count = int(pipe.get('stock:count'))
        if count > 0: #  Have stock 
          #  Transaction start 
          pipe.multi() # multi  Judge  watch  Monitoring  key  Is it changed by other clients 
          pipe.decr('stock:count')
          #  Push the command over 
          # execute Returns a list of command execution results ,  There are only 1 A decr Returns the current value 
          result = pipe.execute()[0]
          print("result: ", result)
          return True
        else:
          return False
      except WatchError as e:
        #  Print WatchError Anomaly ,  Observation quilt watch Locked condition 
        print(e.args)
      finally:
        pipe.unwatch()


def worker():
  while True:
    #  Exit without inventory 
    if not decr_stock():
      break


#  Beginning of the experiment 
#  Set inventory to 100
r.set("stock:count", 100)

#  Multi-process simulates multiple client submissions 
with ProcessPoolExecutor(max_workers=2) as pool:
  for _ in range(10):
    pool.submit(worker)

Method 2: Use register_script

Distributed execution, sending scripts to the redis server, obtaining 1 call handle for this connection, according to which different parameter calls can be executed innumerable times


import redis
import time

  r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
  
  lua = """
  local key = KEYS[1]
  local field = ARGV[1]
  local timestamp_new = ARGV[2]
  
  -- get timestamp of the key in redis
  local timestamp_old = redis.call('hget', key, field)
  -- if timestamp_old == nil, it means the key is not exist
  if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
    redis.call('hset', key, field .. 1, timestamp_new)
    -- timestamp_new > timestamp_old
    return redis.pcall('hset', key, field, timestamp_new)
  end
  
  """

  cmd = r.register_script(lua)

  cur_time = time.time()
  cmd(keys=['current'], args=["time", cur_time])

register_script is implemented by calling lua. Note the return values (nil, false, 1) of redis. call (method, key, field), where false is returned without key value. If there are errors in the middle, all statements will not take effect from time to time.

Method 3: Use script_load and evalsha

In short, it is sent to the redis server through script_load, which causes the lua script to be loaded, memory resident, returns the flag, and executes according to the flag through evalsha. This connection is separated from the redis client.


import redis
import time

  r = redis.Redis(host='127.0.0.1', port=31320, password='12345678')
  
  lua = """
  local key = KEYS[1]
  local field = ARGV[1]
  local timestamp_new = ARGV[2]
  
  -- get timestamp of the key in redis
  local timestamp_old = redis.call('hget', key, field)
  -- if timestamp_old == nil, it means the key is not exist
  if timestamp_old == nil or timestamp_old == false or timestamp_new > timestamp_old then
    redis.call('hset', key, field .. 1, timestamp_new)
    -- timestamp_new > timestamp_old
    return redis.pcall('hset', key, field, timestamp_new)
  end
  
  """
  sha = r.script_load(lua)
  print(r.evalsha(sha, 1, 'current', 'time', time.time()))

Redis Management Lua Script: (script_ under Python...)

script load

This command is used to load the Lua script into Redis memory

script exists

scripts exists sha1 [sha1...]
This command is used to determine whether sha1 has been loaded into Redis memory

script flush

This command is used to clear all Lua scripts that have been loaded in Redis memory. After script flush is executed, all sha no longer exist.

script kill

This command is used to kill the executing Lua script.

Method 4: eval

The usage method is similar to method 3, but eval is a sexual request, and each request must carry lua script

The above is the python batch operation redis database details, more about python batch operation redis database information please pay attention to this site other related articles!


Related articles: