golang emulates sample code that implements a semaphore with a timeout

  • 2020-06-07 04:40:03
  • OfStack

preface

Recently writing a project, you need to use semaphores and wait for some resources to complete, but N milliseconds at most. Before looking at the body of this article, let's look at the implementation in the C language.

In the C language, there are the following API to implement semaphore wait with timeout:


SYNOPSIS
  #include <pthread.h>
 
  int
  pthread_cond_timedwait(pthread_cond_t *cond, pthread_mutex_t *mutex, const struct timespec *abstime);

Then, after looking at document of golang, it is found that there is no semaphore with timeout implemented in golang. The official document is here.

The principle of

My business scenario looks like this: I have a cached dictionary, and when multiple users request a non-existent key, only one request will penetrate the back end, and all users will queue for the request to complete or the timeout will return.

How do you do that? In fact, if you think a little about the principle of cond, you can simulate 1 cond with timeout.

In golang, to implement both "hang wait" and "timeout return", it is very certain that 1 usually USES the select case syntax, 1 case waits for blocked resources, and 1 case waits for 1 timer.

The blocked resource should implement the completion notification through the conditional variable mechanism. Since select case was decided here, it was natural to replace the completion notification with channel.

The next problem is that many requesters get the resource concurrently, but the resource is not ready, so everyone lines up and hangs, waits for the resource to complete, and notifies when the resource is complete.

So, it is natural to make a queue for this resource, create one chan per requester, put chan on the queue, and then wait for the notification of the chan. At the other end, the queue is traversed after resources are completed and each chan is notified.

The last problem is that only the first requester can penetrate the back end of the request, and subsequent requesters should not penetrate the duplicate request. This can be determined by determining whether this key is in the cache as a condition for the first time and whether the marker init should be queued.

My scene

Above is the idea, below is my business scenario implementation.


func (cache *Cache) Get(key string, keyType int) *string {
 if keyType == KEY_TYPE_DOMAIN {
 key = "#" + key
 } else {
 key = "=" + key
 }
 
 cache.mutex.Lock()
 item, existed := cache.dict[key]
 if !existed {
 item = &cacheItem{}
 item.key = &key
 item.waitQueue = list.New()
 cache.dict[key] = item
 }
 cache.mutex.Unlock()
 
 conf := config.GetConfig()
 
 lastGet := getCurMs()
 
 item.mutex.Lock()
 item.lastGet = lastGet
 if item.init { //  Already exists and initialized 
 defer item.mutex.Unlock()
 return item.value
 }
 
 //  Uninitialized, queued for results 
 wait := waitItem{}
 wait.wait_chan = make(chan *string, 1)
 item.waitQueue.PushBack(&wait)
 item.mutex.Unlock()
 
 //  new key,  Start the goroutine Get the initial value 
 if !existed {
 go cache.initCacheItem(item, keyType)
 }
 
 timer := time.NewTimer(time.Duration(conf.Cache_waitTime) * time.Millisecond)
 
 var retval *string = nil
 
 //  Wait for initialization to complete 
 select {
 case retval = <- wait.wait_chan:
 case <- timer.C:
 }
 return retval
}

Briefly describe the whole process:

First lock the dictionary. If key does not exist, which means I am the first requestor, I will create this key corresponding to value, except init=false means it is initializing. Finally, release the dictionary lock. Next, lock the key, determine that it has been initialized, and return value. Otherwise, create an chan and put it on the waitQueue wait queue. Finally, release the key lock. Then, if the current is the first requestor, the request is routed to the back end (making a network call in a separate coroutine). Now, create a timer for the timeout. Finally, whether it is currently the first requester of key or a concurrent requester during initialization, they are completed by waiting for the result of the select case timeout.

In the initCacheItem function, the data has been obtained successfully


 // 1 Denier is marked init,  Subsequent requests will no longer operate waitQueue
 item.mutex.Lock()
 item.value = newValue
 item.init = true
 item.expire = expire
 item.mutex.Unlock()
 
 //  Wake up all the people in line 
 waitQueue := item.waitQueue
 for elem := waitQueue.Front(); elem != nil; elem = waitQueue.Front() {
 wait := elem.Value.(*waitItem)
 wait.wait_chan <- newValue
 waitQueue.Remove(elem)
 }
First, lock key, mark init=true, assign value, and release the lock. Any subsequent requests can be returned immediately without queuing. After that, because init=true has been marked, at this point there are no more requests to modify waitQueue, so without locking, just traverse the queue and notify each chan in it.

The last

In this way, the effect of condition variable with timeout is realized. In fact, my scene is an example of broadcast's cond. You can refer to the train of thought to achieve the effect you want and use it in a practical way.

conclusion


Related articles: