Analysis of a small pit of PubSub in redigo
- 2020-06-19 12:03:37
- OfStack
preface
Recently, I have been doing some redis-related operations with golang. I chose redigo, the third library. Then, while using Pub/Sub, I found a small pit...
Redis Client
First, let's initialize 1 Redis Client with connection pool:
import (
"github.com/gomodule/redigo/redis"
)
type RedisClient struct {
pool *redis.Pool
}
func NewRedisClient(addr string, db int, passwd string) *RedisClient {
pool := &redis.Pool{
MaxIdle: 10,
IdleTimeout: 300 * time.Second,
Dial: func() (redis.Conn, error) {
c, err := redis.Dial("tcp", addr, redis.DialPassword(passwd), redis.DialDatabase(db))
if err != nil {
return nil, err
}
return c, nil
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
if time.Since(t) < time.Minute {
return nil
}
_, err := c.Do("PING")
return err
},
}
log.Printf("new redis pool at %s", addr)
client := &RedisClient{
pool: pool,
}
return client
}
Publish
Then we can simply implement an publish method:
func (r *RedisClient) Publish(channel, message string) (int, error) {
c := r.pool.Get()
defer c.Close()
n, err := redis.Int(c.Do("PUBLISH", channel, message))
if err != nil {
return 0, fmt.Errorf("redis publish %s %s, err: %v", channel, message, err)
}
return n, nil
}
Subscribe
Here's the slightly more complicated subscribe method with a heartbeat:
func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
psc := redis.PubSubConn{Conn: r.pool.Get()}
defer psc.Close()
log.Printf("redis pubsub subscribe channel: %v", channel)
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
return err
}
done := make(chan error, 1)
// start a new goroutine to receive message
go func() {
for {
switch msg := psc.Receive().(type) {
case error:
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
return
case redis.Message:
if err := consume(msg); err != nil {
done <- err
return
}
case redis.Subscription:
if msg.Count == 0 {
// all channels are unsubscribed
done <- nil
return
}
}
}
}()
// health check
tick := time.NewTicker(time.Minute)
defer tick.Stop()
for {
select {
case <-ctx.Done():
if err := psc.Unsubscribe(); err != nil {
return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
}
return nil
case err := <-done:
return err
case <-tick.C:
if err := psc.Ping(""); err != nil {
return err
}
}
}
return nil
}
Finally, we write a simple main function to call publish & subscribe:
func (r *RedisClient) Subscribe(ctx context.Context, consume ConsumeFunc, channel ...string) error {
psc := redis.PubSubConn{Conn: r.pool.Get()}
defer psc.Close()
log.Printf("redis pubsub subscribe channel: %v", channel)
if err := psc.Subscribe(redis.Args{}.AddFlat(channel)...); err != nil {
return err
}
done := make(chan error, 1)
// start a new goroutine to receive message
go func() {
for {
switch msg := psc.Receive().(type) {
case error:
done <- fmt.Errorf("redis pubsub receive err: %v", msg)
return
case redis.Message:
if err := consume(msg); err != nil {
done <- err
return
}
case redis.Subscription:
if msg.Count == 0 {
// all channels are unsubscribed
done <- nil
return
}
}
}
}()
// health check
tick := time.NewTicker(time.Minute)
defer tick.Stop()
for {
select {
case <-ctx.Done():
if err := psc.Unsubscribe(); err != nil {
return fmt.Errorf("redis pubsub unsubscribe err: %v", err)
}
return nil
case err := <-done:
return err
case <-tick.C:
if err := psc.Ping(""); err != nil {
return err
}
}
}
return nil
}
pit
At first glance, there seems to be nothing unusual? However, if we look at the tcp connection of redis at this time, we can find 1 of the following:
$sudo netstat -antp | grep redis
tcp 0 0 0.0.0.0:6379 0.0.0.0:* LISTEN 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55010 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55015 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55009 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55005 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55012 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55011 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55013 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55007 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55006 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:55014 ESTABLISHED 940/redis-server 0.
tcp 0 0 172.16.8.128:6379 172.16.8.1:54972 ESTABLISHED 940/redis-server 0.
subscribe creates a new connection every time, and connection pool doesn't seem to be working.
Further debugging, we found that defer ES56en.Close () got stuck and the top 10 goroutine did not exit properly.
Concurrent
After a long search, we finally located the problem! To quote redigo:
[Connections support one concurrent caller to the Receive method and one concurrent caller to the Send and Flush methods. No other concurrency is supported including concurrent calls to the Do method.
For full concurrent access to Redis, use the thread-safe Pool to get, use and release a connection from within a goroutine. Connections returned from a Pool have the concurrency restrictions described in the previous paragraph.
In other words, while one connection can concurrently call Receive() and Subscribe() on different goroutine (subscribe calls send and flush), there are no other concurrent operations (Close(), for example).
Refer to issue for other similar questions
Fix
After knowing the reasons above, we can slightly modify the position of 1 defer psc. Close() to solve the problem:
// start a new goroutine to receive message
go func() {
// IMPORTANT!
defer psc.Close()
for {
switch msg := psc.Receive().(type) {
case error:
conclusion