How does redis implement ack for queue messages

  • 2020-05-27 07:31:43
  • OfStack

preface

Because the queues provided by the company were too painful and were restricted from using other queues, a queue with ack capability was needed for data security.

The native redis USES L/R PUSH/POP to realize the function of the queue, which of course cannot meet the requirements (there is no ack function), so you need to make a small adjustment to list (queue) of redis.

The general idea is to put the data from pop into the backup place at POP, and delete the backup information when there is a request from ACK (the confirmation message is consumed). Every time before pop, it is necessary to check whether there is any expired data in the backup queue without ack. If there is, PUSH will go to list and then POP will come out from list.

The following script is implemented using lua and only needs to be loaded into redis before execution.

The message itself needs to contain the id attribute

There's nothing wrong with push, just native (take LPUSH for example here)

When pop script


local not_empty = function(x)
 return (type(x) == "table") and (not x.err) and (#x ~= 0)
end

local qName = ARGV[1] -- The name of the queue 
local currentTime = ARGV[2] -- The current time, which needs to be passed in from the outside, cannot be used redis Self time, if used self time may result redis In and of itself backup No appears on replay request 1 consistency 
local considerAsFailMaxTimeSpan = ARGV[3] -- Timeout is set when the message exceeds 1 There's no set time yet ack It is considered that the message needs to be reenlisted 

local zsetName= qName ..'BACKUP'
local hashName= qName ..'CONTEXT'

local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1)
if (not_empty(tmp)) then
 redis.call('ZREM', zsetName, tmp[1]) -- What is shown here is the only one for the message 1id
 redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp[1]))
end
tmp = redis.call('RPOP', qName)
if (tmp) then
 local msg = cjson.decode(tmp)
 local id = msg['id']
 redis.call('ZADD', zsetName, tonumber(currentTime), id)
 redis.call('HSET',hashName , id, tmp)
end
return tmp

For ack, it is simple to remove the specified id from set and hash


 local key = ARGV[1]
 local qName=ARGV[2]
 redis.call('ZREM', qName..'BACKUP', key)
 redis.call('HDEL', qName..'CONTEXT', key)

The load scripts need to be displayed before use in the program, and the sha values of the two scripts can be executed by calling them directly.

conclusion


Related articles: