Simple application implementation of RedisMQ in Asp. net core

  • 2021-11-29 06:40:08
  • OfStack

Recently, an external project used message queuing, which was originally implemented with rabbitmq. However, due to the deployment to other people's servers, I want to simplify it as much as possible. In the project, I also want to access redis cache, so I try to use redis to realize simple message queuing.

There are two ways to use redis for message queuing, one is to use pub/sub, and the other is to use list structure to cooperate with brpop for consumption. These two methods have their own characteristics, which are briefly described here:

pub/sub mode supports multi-client consumption, but does not support persistence, which means that all messages published during the time when the client is disconnected will be discarded. list and brpop do not support multi-client consumption by default, but support persistence. This pattern of multi-client consumption can be implemented in disguise, such as the following pseudo code:

# No. 1 1 Step push Message to queue 
lpush listA msg
# No. 1 2 Step, 1 A dedicated distribution client takes out the message, push To each sub-queue 
var msg=brpop listA
lpush listA1 msg
lpush listA2 msg
......
# No. 1 3 Step, a plurality of clients consume messages from corresponding queues 
var client1_msg= brpop listA1
var client2_msg= brpop listA2
......

Message loss is not desirable, so I chose list, and the next step needs to select a suitable client.
Stackexchange. redis is an old client, but because of its multiplexing mode, it can't support blocking pops features of Redis. So I adopted CSRedisCore written by Chinese people.

You first need to add the connection string for redis in appsettings. json:


{
 "ConnectionStrings": {
  "redis": "{ip}:{port},password=123456,prefix=my_"
 }
}

For specific configuration, please refer to the document on github: https://github.com/2881099/csredis

Then configure redis in ConfigureServices of startup. cs:


    public void ConfigureServices(IServiceCollection services)
    {
      //redis Configure 
      RedisHelper.Initialization(new CSRedis.CSRedisClient(Configuration.GetConnectionString("redis")));
    }

Of course, you can also add CSRedisClient instances by means of dependency injection, which is not entangled.

Queues are used in several places in the project, so encapsulate a consumer service first:


  public abstract class RedisMQConsumer : BackgroundService
  {
    protected abstract string CacheKey { get; }

    protected ILogger<RedisMQConsumer> logger;

    public RedisMQConsumer(ILogger<RedisMQConsumer> logger)
    {
      this.logger = logger;
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
      return Task.Run( async() =>
      {
        while (!stoppingToken.IsCancellationRequested)
        {
          try
          {
            var msg = RedisHelper.BRPop(5, CacheKey);
            try
            {
              if (string.IsNullOrEmpty(msg)) continue;
              if (!Process(msg))
              {
                // Add error handling queue, which can be handled manually by writing function in the background 
                RedisHelper.LPush(CacheKey + "_err", msg);
              }
            }
            catch (Exception exp)
            {
              // Add error handling queue, which can be handled manually by writing function in the background 
              RedisHelper.LPush(CacheKey + "_err", msg);
              logger.LogError(exp, "RedisMQConsumer Execute error");
            }
          }
          catch
          {
            // The network may be interrupted 
            await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
          }
            
        }
      }, stoppingToken);
    }

    protected abstract bool Process(string message);
  }

You can then inherit RedisMQConsumer and write the actual logic:


  public class AddOrderMQConsumer : RedisMQConsumer
  {
    public AddOrderMQConsumer(ILogger<RedisMQConsumer> logger) : base(logger)
    {
    }
    protected override string CacheKey => "addOrder";
    protected override bool Process(string message)
    {
      var order = JsonSerializer.Deserialize<Order>(message);
      // Processing logic 
      return true;
    }
  }

Publishing a message simply adds items to the queue:


RedisHelper.LPush("addOrder", order);

Finally, add the consumer service to startup. cs:


    public void ConfigureServices(IServiceCollection services)
    {
      //redis Configure 
      RedisHelper.Initialization(new CSRedis.CSRedisClient(Configuration.GetConnectionString("redis")));
      
      //redis Message Queuing Consumption Service, placed in redis Configure below 
      services.AddHostedService<AddOrderMQConsumer>();
    }

After testing, it is stable and small concurrent projects can be used.


Related articles: