Implementation code of ASP. NET Core 3. x concurrency restriction

  • 2021-11-10 09:20:23
  • OfStack

Preface

Added after Microsoft. AspNetCore. ConcurrencyLimiter AspNetCore 3.0 for queuing incoming requests to avoid the shortage of thread pool.
What we might do in our daily development is to configure the number of connections and the request queue size for an web server, so today we will look at how to implement a concurrency limit and queue length limit in the form of middleware.

Queue Policy

Add Nuget

Install-Package Microsoft.AspNetCore.ConcurrencyLimiter


    public void ConfigureServices(IServiceCollection services)
    {
      services.AddQueuePolicy(options =>
      {
        // Maximum number of concurrent requests 
        options.MaxConcurrentRequests = 2;
        // Request queue length limit 
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
      // Adding Concurrency Restriction Middleware 
      app.UseConcurrencyLimiter();
      app.Run(async context =>
      {
        Task.Delay(100).Wait(); // 100ms sync-over-async

        await context.Response.WriteAsync("Hello World!");
      });
      if (env.IsDevelopment())
      {
        app.UseDeveloperExceptionPage();
      }

      app.UseHttpsRedirection();

      app.UseRouting();

      app.UseAuthorization();

      app.UseEndpoints(endpoints =>
      {
        endpoints.MapControllers();
      });
    }   

With the above simple configuration, we can introduce it into our code, so as to limit the concurrency and the length of the queue; Then the question arises, how did he achieve it?


 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
    services.Configure(configure);
    services.AddSingleton<IQueuePolicy, QueuePolicy>();
    return services;
}

QueuePolicy adopts SemaphoreSlim semaphore design, SemaphoreSlim and Semaphore (semaphore) support concurrent multithreading into protected code, and the maximum number of tasks will be specified when the object is initialized. When threads request to access resources, the semaphore decreases, and when they are released, the semaphore count increases again.


   /// <summary>
    ///    Construction method ( Initialization Queue Strategy )
    /// </summary>
    /// <param name="options"></param>
    public QueuePolicy(IOptions<QueuePolicyOptions> options)
    {
      _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
      if (_maxConcurrentRequests <= 0)
      {
        throw new ArgumentException(nameof(_maxConcurrentRequests), "MaxConcurrentRequests must be a positive integer.");
      }

      _requestQueueLimit = options.Value.RequestQueueLimit;
      if (_requestQueueLimit < 0)
      {
        throw new ArgumentException(nameof(_requestQueueLimit), "The RequestQueueLimit cannot be a negative number.");
      }
      // Use SemaphoreSlim To limit the maximum number of tasks 
      _serverSemaphore = new SemaphoreSlim(_maxConcurrentRequests);
    }

ConcurrencyLimiterMiddleware middleware


    /// <summary>
    /// Invokes the logic of the middleware.
    /// </summary>
    /// <param name="context">The <see cref="HttpContext"/>.</param>
    /// <returns>A <see cref="Task"/> that completes when the request leaves.</returns>
    public async Task Invoke(HttpContext context)
    {
      var waitInQueueTask = _queuePolicy.TryEnterAsync();

      // Make sure we only ever call GetResult once on the TryEnterAsync ValueTask b/c it resets.
      bool result;

      if (waitInQueueTask.IsCompleted)
      {
        ConcurrencyLimiterEventSource.Log.QueueSkipped();
        result = waitInQueueTask.Result;
      }
      else
      {
        using (ConcurrencyLimiterEventSource.Log.QueueTimer())
        {
          result = await waitInQueueTask;
        }
      }

      if (result)
      {
        try
        {
          await _next(context);
        }
        finally
        {
          _queuePolicy.OnExit();
        }
      }
      else
      {
        ConcurrencyLimiterEventSource.Log.RequestRejected();
        ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
        context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
        await _onRejected(context);
      }
    }

Every time when we request, we will first call _ queuePolicy. TryEnterAsync (). After entering this method, we will first open a private lock lock, and then judge whether the total request amount is ≥ (the limited size of the request queue + the maximum number of concurrent requests). If the current number exceeds, then I will throw it directly and send you a 503 state;


 if (result)
 {
     try
     {
       await _next(context);
     }
     finally
    {
      _queuePolicy.OnExit();
    }
    }
    else
    {
      ConcurrencyLimiterEventSource.Log.RequestRejected();
      ConcurrencyLimiterLog.RequestRejectedQueueFull(_logger);
      context.Response.StatusCode = StatusCodes.Status503ServiceUnavailable;
      await _onRejected(context);
    }

Problem comes, if my side has not yet reached the size you set, my request has not created pressure on your server, then you give me a deal with it.

await _serverSemaphore.WaitAsync(); Asynchronous waiting to enter the semaphore, if no thread is granted access to the semaphore, then enter the execution of the protection code; Otherwise, this thread will wait here until the semaphore is released


 lock (_totalRequestsLock)
  {
    if (TotalRequests >= _requestQueueLimit + _maxConcurrentRequests)
    {
       return false;
    }
      TotalRequests++;
    }
    // Asynchronous waiting to enter the semaphore, if no thread is granted access to the semaphore, then enter the execution of the protection code; Otherwise, this thread will wait here until the semaphore is released 
    await _serverSemaphore.WaitAsync();
    return true;
  }

After returning successfully, the middleware side will be processed again, _ queuePolicy. OnExit (); Call _ serverSemaphore. Release () through this call; Release the signal light and then decrease the total number of requests

Stack Policy

Let's look at another method, stack strategy. How does he do it? 1 Let's have a look. Attach how to use the code.


   public void ConfigureServices(IServiceCollection services)
    {
      services.AddStackPolicy(options =>
      {
        // Maximum number of concurrent requests 
        options.MaxConcurrentRequests = 2;
        // Request queue length limit 
        options.RequestQueueLimit = 1;
      });
      services.AddControllers();
    }

With the above configuration, we can implement the corresponding strategy for our application. Let's see how it is implemented again


 public static IServiceCollection AddStackPolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
    {
      services.Configure(configure);
      services.AddSingleton<IQueuePolicy, StackPolicy>();
      return services;
    }

You can see that this strategy is done through the StackPolicy class. Let's take a look at the main methods in 1


    /// <summary>
    ///    Construction method ( Initialization parameter )
    /// </summary>
    /// <param name="options"></param>
    public StackPolicy(IOptions<QueuePolicyOptions> options)
    {
      // Stack allocation 
      _buffer = new List<ResettableBooleanCompletionSource>();
      // Queue size 
      _maxQueueCapacity = options.Value.RequestQueueLimit;
      // Maximum number of concurrent requests 
      _maxConcurrentRequests = options.Value.MaxConcurrentRequests;
      // Remaining free space 
      _freeServerSpots = options.Value.MaxConcurrentRequests;
    }

When we call through middleware request, _ queuePolicy. TryEnterAsync (), we will first judge whether we still have access requests. If _ freeServerSpots > 0, then we will directly return true and let middleware directly execute the next step. If the current queue = the queue size we set, then we need to cancel the previous request; Every cancellation is to cancel the previous reservation and the subsequent request first;


  public ValueTask<bool> TryEnterAsync()
    {
      lock (_bufferLock)
      {
        if (_freeServerSpots > 0)
        {
          _freeServerSpots--;
          return _trueTask;
        }
        //  If the queue is full, cancel the previous request 
        if (_queueLength == _maxQueueCapacity)
        {
          _hasReachedCapacity = true;
          _buffer[_head].Complete(false);
          _queueLength--;
        }
        var tcs = _cachedResettableTCS ??= new ResettableBooleanCompletionSource(this);
        _cachedResettableTCS = null;
        if (_hasReachedCapacity || _queueLength < _buffer.Count)
        {
          _buffer[_head] = tcs;
        }
        else
        {
          _buffer.Add(tcs);
        }
        _queueLength++;
        // increment _head for next time
        _head++;
        if (_head == _maxQueueCapacity)
        {
          _head = 0;
        }
        return tcs.GetValueTask();
      }
    }

Call _ queuePolicy. OnExit () when we request; Out of the stack, and then decreasing the request length


 public static IServiceCollection AddQueuePolicy(this IServiceCollection services, Action<QueuePolicyOptions> configure)
{
    services.Configure(configure);
    services.AddSingleton<IQueuePolicy, QueuePolicy>();
    return services;
}
0

Summarize

Based on the characteristics of stack structure, in practical application, only the following two operations are usually performed on the stack:

Adding elements to the stack is called "stacking" (stacking or pressing); Extracting the specified elements from the stack is called "out of the stack" (or bouncing the stack);

There are two ways to implement the queue storage structure:

Sequential queue: a queue structure realized on the basis of sequential table; Chain queue: a queue structure realized on the basis of linked list;

Related articles: