JavaScript and TypeScript sample code for implementing concurrent request control

  • 2021-10-24 18:41:16
  • OfStack

Scene

Suppose there are 10 requests, but the maximum number of concurrent requests is 5, and the request results are required, so this is a simple concurrent request control

Simulation

Simple simulation of 1 request with setTimeout


let startTime = Date.now();
const timeout = (timeout: number, ret: number) => {
 return (idx?: any) =>
 new Promise((resolve) => {
  setTimeout(() => {
  const compare = Date.now() - startTime;
  console.log(`At ${Math.floor(compare / 100)}00 return`, ret);
  resolve(idx);
  }, timeout);
 });
};

const timeout1 = timeout(1000, 1);
const timeout2 = timeout(300, 2);
const timeout3 = timeout(400, 3);
const timeout4 = timeout(500, 4);
const timeout5 = timeout(200, 5);

By simulating the request in this way, the essence is Promise

When there is no concurrency control,


const run = async () => {
 startTime = Date.now();
 await Promise.all([
 timeout1(),
 timeout2(),
 timeout3(),
 timeout4(),
 timeout5(),
 ]);
};

run();

At 200 return 5
At 300 return 2
At 400 return 3
At 500 return 4
At 1000 return 1

You can see that the output is 5 2 3 4 1, which is output according to the time of timeout

Concurrency condition

Assuming that the maximum number of concurrency at the same time is 2, create 1 class


class Concurrent {
 private maxConcurrent: number = 2;

 constructor(count: number = 2) {
 this.maxConcurrent = count;
 }
}

The first concurrency control

Think about 1, split the Promise array according to the maximum concurrency number, remove Promise if it is fulfilled, and then add Promise in pending state. Promise. race can help us meet this requirement


class Concurrent {
 private maxConcurrent: number = 2;

 constructor(count: number = 2) {
 this.maxConcurrent = count;
 }
 public async useRace(fns: Function[]) {
 const runing: any[] = [];
 //  According to the concurrent number, put  Promise  Add in 
 // Promise  Will call back 1 Index, so that we can know which one  Promise  Already  resolve  It's over 
 for (let i = 0; i < this.maxConcurrent; i++) {
  if (fns.length) {
  const fn = fns.shift()!;
  runing.push(fn(i));
  }
 }
 const handle = async () => {
  if (fns.length) {
  const idx = await Promise.race<number>(runing);
  const nextFn = fns.shift()!;
  //  Remove the completed  Promise , put the new one in 
  runing.splice(idx, 1, nextFn(idx));
  handle();
  } else {
  //  If the array has been emptied, the surface has no need to execute  Promise  It can be changed to  Promise.all
  await Promise.all(runing);
  }
 };
 handle();
 }
}

const run = async () => {
 const concurrent = new Concurrent();
 startTime = Date.now();
 await concurrent.useRace([timeout1, timeout2, timeout3, timeout4, timeout5]);
};

At 300 return 2
At 700 return 3
At 1000 return 1
At 1200 return 5
At 1200 return 4

You can see that the output has changed. Why is this so? Under the analysis of 1, the maximum concurrency number is 2

//1 2 is performed first
1 Needs 1000 MS to finish
2 requires 300 MS

When 2 is finished, the timeline changes to 300. Remove 2, add 3 and start executing 3
3 needs 400MS, and the execution time becomes 700. Remove 3, add 4 and start executing 4
4 requires 500MS
When the timeline comes to 1000MS, 1 executes removes 1 joins 5 starts executes 5
The timeline came to 1200MS, and 4 and 5 just finished executing at the same time

Option 2

You can use the mechanism of await, which is actually a small skill

The await expression pauses the execution of the current async function until Promise processing completes. If Promise is normally processed (fulfilled), the resolve function parameter of its callback is taken as the value of the await expression, and async function is continued.

If the current concurrency number has exceeded the maximum concurrency number, you can set a new Promise, and await, waiting for other requests to complete, resolve, remove wait, so you need to add two new states, the current concurrency number, and the array used to store the callback function resolve


class Concurrent {
 private maxConcurrent: number = 2;
 private list: Function[] = [];
 private currentCount: number = 0;

 constructor(count: number = 2) {
 this.maxConcurrent = count;
 }
 public async add(fn: Function) {
 this.currentCount += 1;
 //  If the maximum has exceeded the maximum concurrency 
 if (this.currentCount > this.maxConcurrent) {
  // wait  Yes 1 A  Promise , just call the  resolve  Will become  fulfilled  Status 
  const wait = new Promise((resolve) => {
  this.list.push(resolve);
  });
  //  Without calling  resolve  When, here will be 1 Straight blockage 
  await wait;
 }
 //  Execute function 
 await fn();
 this.currentCount -= 1;
 if (this.list.length) {
  //  Put  resolve  Take it out, call it, and so on  wait  It's finished, and you can execute it below 
  const resolveHandler = this.list.shift()!;
  resolveHandler();
 }
 }
}

const run = async () => {
 const concurrent = new Concurrent();
 startTime = Date.now();
 concurrent.add(timeout1);
 concurrent.add(timeout2);
 concurrent.add(timeout3);
 concurrent.add(timeout4);
 concurrent.add(timeout5);
};

run();

At 300 return 2
At 700 return 3
At 1000 return 1
At 1200 return 5
At 1200 return 4

Summarize

These two ways can realize concurrency control, but the way of realization is not very 1, mainly by Promise realization, in addition, the implementation method does not consider the abnormal situation, this can be added by oneself


Related articles: