golang implements the delay message queue function example of redis

  • 2020-09-16 07:32:11
  • OfStack

preface

In the learning process, it is found that zset of redis can also be used to realize the lightweight delayed message queue function. Although the reliability still needs to be improved, it can be fully realized for some functions with less high requirements on data reliability. This paper mainly adopts zadd, zrangebyscore and zdel in zset in redis to realize a small demo.

Prepare in advance to install redis, ES14en-ES15en

Because I'm using macOS, straight up


$ brew install redis
$ go get github.com/garyburd/redigo/redis

Because it is lazy, objectId in bson is directly adopted when only 1id is generated for the task, so:


$ go get gopkg.in/mgo.v2/bson

Only 1id is not necessary, but if there is a practical application to carry later, it is easy to find the corresponding task.

producers

10w tasks are generated through 1 for cycle, with each task having a different time


func producer() {
 count := 0
 // generate 100000 A task 
 for count < 100000 {
 count++
 dealTime := int64(rand.Intn(5)) + time.Now().Unix()
 uuid := bson.NewObjectId().Hex()
 redis.Client.AddJob(&job.JobMessage{
 Id: uuid,
 DealTime: dealTime,
 }, + int64(dealTime))
 }
}

The AddJob function is in another package and takes the randomly generated time in the previous function as the timestamp to be processed.


//  Add tasks 
func (client *RedisClient) AddJob(msg *job.JobMessage, dealTime int64) {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 conn.Do("zadd", key, dealTime, util.JsonEncode(msg))
}

consumers

The consumer processing process is divided into two steps:

Gets tasks that are less than or equal to the current timestamp Determine who has the current task by deleting it

Because when you get a task with a timestamp less than or equal to the current task, there may be multiple go routine reading the current task at the same time, and only one task can handle the current task. So we need to pass a scheme to determine who to deal with this task (of course, if only one consumer can be read directly processing) : this time could be obtained through redis delete operation, because when you delete specified value only successful operation will return to zero, so we can think to delete the current queue of success that go routine got the current task.

Here's the code:


//  consumers 
func consumer() {
 //  Start the 10 a go routine1 Up to take 
 count := 0
 for count < 10 {
 go func() {
 for {
 jobs := redis.Client.GetJob()
 if len(jobs) <= 0 {
  time.Sleep(time.Second * 1)
  continue
 }
 currentJob := jobs[0]
 //  If the current robbery redis Queue success, 
 if redis.Client.DelJob(currentJob) > 0 {
  var jobMessage job.JobMessage
  util.JsonDecode(currentJob, &jobMessage) // The custom of json Analytic function 
  handleMessage(&jobMessage)
 }

 }

 }()
 count++
 }
}

//  Function to handle tasks 
func handleMessage(msg *job.JobMessage) {
 fmt.Printf("deal job: %s, require time: %d \n", msg.Id, msg.DealTime)
 go func() {
 countChan <- true
 }()
}

redis section of the code, get tasks and delete tasks


//  Access to task 
func (client *RedisClient) GetJob() []string {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 timeNow := time.Now().Unix()
 ret, err := redis.Strings(conn.Do("zrangebyscore", key, 0, timeNow, "limit", 0, 1))
 if err != nil {
 panic(err)
 }
 return ret
}

//  Delete the current task,   Used to determine whether the current task has been stolen 
func (client *RedisClient) DelJob(value string) int {
 conn := client.Get()
 defer conn.Close()

 key := "JOB_MESSAGE_QUEUE"
 ret, err := redis.Int(conn.Do("zrem", key, value))
 if err != nil {
 panic(err)
 }
 return ret
}

The code looks something like this. At the end of your run, you can handle 1w every 3-4 seconds, which is really fast...


Related articles: