How does redis implement ack for queue messages
- 2020-05-27 07:31:43
Because the queues provided by the company were too painful and were restricted from using other queues, a queue with ack capability was needed for data security.
The native redis USES L/R PUSH/POP to realize the function of the queue, which of course cannot meet the requirements (there is no ack function), so you need to make a small adjustment to list (queue) of redis.
The general idea is to put the data from pop into the backup place at POP, and delete the backup information when there is a request from ACK (the confirmation message is consumed). Every time before pop, it is necessary to check whether there is any expired data in the backup queue without ack. If there is, PUSH will go to list and then POP will come out from list.
The following script is implemented using lua and only needs to be loaded into redis before execution.
The message itself needs to contain the id attribute
There's nothing wrong with push, just native (take LPUSH for example here)
When pop script
local not_empty = function(x) return (type(x) == "table") and (not x.err) and (#x ~= 0) end local qName = ARGV -- The name of the queue local currentTime = ARGV -- The current time, which needs to be passed in from the outside, cannot be used redis Self time, if used self time may result redis In and of itself backup No appears on replay request 1 consistency local considerAsFailMaxTimeSpan = ARGV -- Timeout is set when the message exceeds 1 There's no set time yet ack It is considered that the message needs to be reenlisted local zsetName= qName ..'BACKUP' local hashName= qName ..'CONTEXT' local tmp = redis.call('ZRANGEBYSCORE',zsetName , '-INF', tonumber(currentTime) - tonumber(considerAsFailMaxTimeSpan), 'LIMIT', 0, 1) if (not_empty(tmp)) then redis.call('ZREM', zsetName, tmp) -- What is shown here is the only one for the message 1id redis.call('LPUSH', qName, redis.call('HGET', hashName, tmp)) end tmp = redis.call('RPOP', qName) if (tmp) then local msg = cjson.decode(tmp) local id = msg['id'] redis.call('ZADD', zsetName, tonumber(currentTime), id) redis.call('HSET',hashName , id, tmp) end return tmp
For ack, it is simple to remove the specified id from set and hash
local key = ARGV local qName=ARGV redis.call('ZREM', qName..'BACKUP', key) redis.call('HDEL', qName..'CONTEXT', key)
The load scripts need to be displayed before use in the program, and the sha values of the two scripts can be executed by calling them directly.