An example of PHP extending Swoole to implement real time asynchronous task queue

  • 2021-12-05 05:50:07
  • OfStack

In this paper, an example is given to describe the implementation of real-time asynchronous task queue by extending PHP to Swoole. Share it for your reference, as follows:

If you want to send 100 emails, for cycle 100 times, users directly rise up, what broken website!

But in fact, we probably have more than 10,000 mails. How to deal with this delay problem?

The answer is asynchronous. Encapsulate the operation of "Send Email", and then asynchronously execute it 10,000 times in the background. In this case, the time the user waits for after submitting the web page is only the time to "push the email task request into the queue". And our back-end service will run out of the user's sight.

In the implementation of "asynchronous queue", some people use MySQL table or redis to store the mail to be sent, and then read the list to be sent regularly every minute, and then process it. This is the timed asynchronous task queue. However, the currently submitted task can only be executed after 1 minute, which is still unpleasant in some application scenarios with real-time requirements. Some scenarios require that the task be executed as soon as it is submitted by 1, but the user does not need to wait for the return result.

This paper will discuss the scheme of implementing real-time asynchronous task queue by extending swoole with php.

Server side

Create a new Server. php in the directory where you intend to put the script (you can also create it yourself). The code is as follows


<?php
class Server
{
  private $serv;
  public function __construct()
  {
    $this->serv = new swoole_server("0.0.0.0", 9501);
    $this->serv->set(array(
      'worker_num' => 1, //1 Set as a server CPU Numeric 1-4 Times 
      'daemonize' => 1, // Execute as a daemon 
      'max_request' => 10000,
      'dispatch_mode' => 2,
      'task_worker_num' => 8, //task Number of processes 
      "task_ipc_mode " => 3, // Use message queuing to communicate and set to scramble mode 
      //"log_file" => "log/taskqueueu.log" ,// Journal 
    ));
    $this->serv->on('Receive', array($this, 'onReceive'));
    // bind callback
    $this->serv->on('Task', array($this, 'onTask'));
    $this->serv->on('Finish', array($this, 'onFinish'));
    $this->serv->start();
  }
  public function onReceive(swoole_server $serv, $fd, $from_id, $data)
  {
    //echo "Get Message From Client {$fd}:{$data}\n";
    // send a task to task worker.
    $serv->task($data);
  }
  public function onTask($serv, $task_id, $from_id, $data)
  {
    $array = json_decode($data, true);
    if ($array['url']) {
      return $this->httpGet($array['url'], $array['param']);
    }
  }
  public function onFinish($serv, $task_id, $data)
  {
    //echo "Task {$task_id} finish\n";
    //echo "Result: {$data}\n";
  }
  protected function httpGet($url, $data)
  {
    if ($data) {
      $url .= '?' . http_build_query($data);
    }
    $curlObj = curl_init(); // Initialization curl , 
    curl_setopt($curlObj, CURLOPT_URL, $url); // Set up the URL 
    curl_setopt($curlObj, CURLOPT_RETURNTRANSFER, 1); // Will curl_exec Returns the result of 
    curl_setopt($curlObj, CURLOPT_SSL_VERIFYPEER, FALSE);
    curl_setopt($curlObj, CURLOPT_SSL_VERIFYHOST, FALSE);
    curl_setopt($curlObj, CURLOPT_HEADER, 0); // Whether to output return header information 
    $response = curl_exec($curlObj); // Execute 
    curl_close($curlObj); // Close a session 
    return $response;
  }
}
$server = new Server();

Client

After starting the service, let's see how to invoke the service. New test file Client_test. php


<?php
class Client
{
  private $client;
  public function __construct()
  {
    $this->client = new swoole_client(SWOOLE_SOCK_TCP);
  }
  public function connect()
  {
    if (!$this->client->connect("127.0.0.1", 9501, 1)) {
      throw new Exception(sprintf('Swoole Error: %s', $this->client->errCode));
    }
  }
  public function send($data)
  {
    if ($this->client->isConnected()) {
      if (!is_string($data)) {
        $data = json_encode($data);
      }
      return $this->client->send($data);
    } else {
      throw new Exception('Swoole Server does not connected.');
    }
  }
  public function close()
  {
    $this->client->close();
  }
}
$data = array(
  "url" => "http://192.168.10.19/send_mail",
  "param" => array(
    "username" => 'test',
    "password" => 'test'
  )
);
$client = new Client();
$client->connect();
if ($client->send($data)) {
  echo 'success';
} else {
  echo 'fail';
}
$client->close();

In the above code, url is the address of the task and param is the required pass parameter.

The asynchronous task queue is implemented by saving the code and executing Client_test. php on the command line or in the browser. The URL you fill in will be executed asynchronously as HTTP GET after each asynchronous task is submitted.

For more readers interested in PHP related contents, please check the topics of this site: "PHP Extended Development Tutorial", "PHP Network Programming Skills Summary", "php curl Usage Summary", "PHP Array (Array) Operation Skills Complete Book", "PHP Data Structure and Algorithm Tutorial", "php Programming Algorithm Summary" and "php String (string) Usage Summary"

I hope this article is helpful to everyone's PHP programming.


Related articles: