PHP+memcache for Message Queuing Case Sharing

  • 2021-06-28 09:02:19
  • OfStack

The principle of the memche message queue is to write an article on key to serialize messages or logs after they have been serialized with a consecutive number and a prefix record.Then use a timer to drop the content to a file or database.

php implements the usefulness of message queues, such as the time-consuming problem of sending a large number of messages while sending a message, so queues can be used.
Lightweight queue servers that facilitate queuing are:
starling Lightweight Persistent Server Supporting memcache Protocol
https://github.com/starling/starling
Beanstalkd is lightweight, efficient, and supports persistence, handling around 3,000 queues per second
http://kr.github.com/beanstalkd/
Message queuing can also be implemented in php using memcache/memcached.


    <?php  
    /** 
    * Memcache  Message Queue Class  
    */  
    class QMC {  
    const PREFIX = 'ASDFASDFFWQKE';  
    /** 
    *  Initialization mc 
    * @staticvar string $mc 
    * @return Memcache 
    */  
    static private function mc_init() {  
    static $mc = null;  
    if (is_null($mc)) {  
    $mc = new Memcache;  
    $mc->connect('127.0.0.1', 11211);  
    }  
    return $mc;  
    }  
    /** 
    * mc  Counter , Increase Count and Return New Count  
    * @param string $key    Counter  
    * @param int $offset    Count Increment , Can be negative .0 To keep the count unchanged  
    * @param int $time      time  
    * @return int/false     Failure is return false, Count after returning update counter on success  
    */  
    static public function set_counter( $key, $offset, $time=0 ){  
    $mc = self::mc_init();  
    $val = $mc->get($key);  
    if( !is_numeric($val) || $val < 0 ){  
    $ret = $mc->set( $key, 0, $time );  
    if( !$ret ) return false;  
    $val = 0;  
    }  
    $offset = intval( $offset );  
    if( $offset > 0 ){  
    return $mc->increment( $key, $offset );  
    }elseif( $offset < 0 ){  
    return $mc->decrement( $key, -$offset );  
    }  
    return $val;  
    }  
    /** 
    *  Write Queue  
    * @param string $key 
    * @param mixed $value 
    * @return bool 
    */  
    static public function input( $key, $value ){  
    $mc = self::mc_init();  
    $w_key = self::PREFIX.$key.'W';  
    $v_key = self::PREFIX.$key.self::set_counter($w_key, 1);  
    return $mc->set( $v_key, $value );  
    }  
    /** 
    *  Read data from queue  
    * @param string $key 
    * @param int $max   Maximum number of reads  
    * @return array 
    */  
    static public function output( $key, $max=100 ){  
    $out = array();  
    $mc = self::mc_init();  
    $r_key = self::PREFIX.$key.'R';  
    $w_key = self::PREFIX.$key.'W';  
    $r_p   = self::set_counter( $r_key, 0 );// Read Pointer   
    $w_p   = self::set_counter( $w_key, 0 );// Write Pointer   
    if( $r_p == 0 ) $r_p = 1;  
    while( $w_p >= $r_p ){  
    if( --$max < 0 ) break;  
    $v_key = self::PREFIX.$key.$r_p;  
    $r_p = self::set_counter( $r_key, 1 );  
    $out[] = $mc->get( $v_key );  
    $mc->delete($v_key);  
    }  
    return $out;  
    }  
    }  
    /** 
     Usage method : 
    QMC::input($key, $value );// Write Queue  
    $list = QMC::output($key);// Read Queue  
    */  
    ?>  

Message queue based on PHP shared memory implementation:


<?php  
/** 
*  Using shared memory PHP Circular memory queue implementation  
*  Support multi-process ,  Supports storage of various data types  
*  notes :  Complete Entry or Exit Operation , Use as soon as possible unset(),  To release the critical zone  
* 
* @author wangbinandi@gmail.com 
* @created 2009-12-23 
*/  
class ShmQueue  
{  
private $maxQSize = 0; //  Maximum Queue Length   
private $front = 0; //  Queue Head Pointer   
private $rear = 0;  //  End of Queue Pointer   
private $blockSize = 256;  //  Block size (byte)  
private $memSize = 25600;  //  Maximum Shared Memory (byte)  
private $shmId = 0;  
private $filePtr = './shmq.ptr';  
private $semId = 0;  
public function __construct()  
{  
$shmkey = ftok(__FILE__, 't');  
$this->shmId = shmop_open($shmkey, "c", 0644, $this->memSize );  
$this->maxQSize = $this->memSize / $this->blockSize;  
//  Application ?1 Semaphores   
$this->semId = sem_get($shmkey, 1);  
sem_acquire($this->semId); //  Application for entry to critical zone   
$this->init();  
}  
private function init()  
{  
if ( file_exists($this->filePtr) ){  
$contents = file_get_contents($this->filePtr);  
$data = explode( '|', $contents );  
if ( isset($data[0]) && isset($data[1])){  
$this->front = (int)$data[0];  
$this->rear  = (int)$data[1];  
}  
}  
}  
public function getLength()  
{  
return (($this->rear - $this->front + $this->memSize) % ($this->memSize) )/$this->blockSize;  
}  
public function enQueue( $value )  
{  
if ( $this->ptrInc($this->rear) == $this->front ){ //  Team full   
return false;  
}  
$data = $this->encode($value);  
shmop_write($this->shmId, $data, $this->rear );  
$this->rear = $this->ptrInc($this->rear);  
return true;  
}  
public function deQueue()  
{  
if ( $this->front == $this->rear ){ //  Queue empty   
return false;  
}  
$value = shmop_read($this->shmId, $this->front, $this->blockSize-1);  
$this->front = $this->ptrInc($this->front);  
return $this->decode($value);  
}  
private function ptrInc( $ptr )  
{  
return ($ptr + $this->blockSize) % ($this->memSize);  
}  
private function encode( $value )  
{  
$data = serialize($value) . "__eof";  
echo '';  
echo strlen($data);  
echo '';  
echo $this->blockSize -1;  
echo '';  
if ( strlen($data) > $this->blockSize -1 ){  
throw new Exception(strlen($data)." is overload block size!");  
}  
return $data;  
}  
private function decode( $value )  
{  
$data = explode("__eof", $value);  
return unserialize($data[0]);  
}  
public function __destruct()  
{  
$data = $this->front . '|' . $this->rear;  
file_put_contents($this->filePtr, $data);  
sem_release($this->semId); //  Out critical zone ,  Release semaphore   
}  
}  
/* 
//  Entry operation  
$shmq = new ShmQueue(); 
$data = 'test data'; 
$shmq->enQueue($data); 
unset($shmq); 
//  Queue operation  
$shmq = new ShmQueue(); 
$data = $shmq->deQueue(); 
unset($shmq); 
*/  
?>

For a very large message queue, it is too expensive to serialize and deserialize large databases frequently.Below is a message queue I implemented with PHP. Just insert one data at the tail to operate on the tail, without operating on the entire message queue for reading, reading and manipulating.However, this message queue is not thread safe, I just try to avoid the possibility of conflict.If the message is not very dense, such as one in a few seconds, consider using it this way.
If you want thread security, one recommendation is to lock through a file and then proceed.Here is the code:
The code is as follows:


    class Memcache_Queue   
    {   
    private $memcache;   
    private $name;   
    private $prefix;   
    function __construct($maxSize, $name, $memcache, $prefix = "__memcache_queue__")   
    {   
    if ($memcache == null) {   
    throw new Exception("memcache object is null, new the object first.");   
    }   
    $this->memcache = $memcache;   
    $this->name = $name;   
    $this->prefix = $prefix;   
    $this->maxSize = $maxSize;   
    $this->front = 0;   
    $this->real = 0;   
    $this->size = 0;   
    }   
    function __get($name)   
    {   
    return $this->get($name);   
    }   
    function __set($name, $value)   
    {   
    $this->add($name, $value);   
    return $this;   
    }   
    function isEmpty()   
    {   
    return $this->size == 0;   
    }   
    function isFull()   
    {   
    return $this->size == $this->maxSize;   
    }   
    function enQueue($data)   
    {   
    if ($this->isFull()) {   
    throw new Exception("Queue is Full");   
    }   
    $this->increment("size");   
    $this->set($this->real, $data);   
    $this->set("real", ($this->real + 1) % $this->maxSize);   
    return $this;   
    }   
    function deQueue()   
    {   
    if ($this->isEmpty()) {   
    throw new Exception("Queue is Empty");   
    }   
    $this->decrement("size");   
    $this->delete($this->front);   
    $this->set("front", ($this->front + 1) % $this->maxSize);   
    return $this;   
    }   
    function getTop()   
    {   
    return $this->get($this->front);   
    }   
    function getAll()   
    {   
    return $this->getPage();   
    }   
    function getPage($offset = 0, $limit = 0)   
    {   
    if ($this->isEmpty() || $this->size < $offset) {   
    return null;   
    }   
    $keys[] = $this->getKeyByPos(($this->front + $offset) % $this->maxSize);   
    $num = 1;   
    for ($pos = ($this->front + $offset + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)   
    {   
    $keys[] = $this->getKeyByPos($pos);   
    $num++;   
    if ($limit > 0 && $limit == $num) {   
    break;   
    }   
    }   
    return array_values($this->memcache->get($keys));   
    }   
    function makeEmpty()   
    {   
    $keys = $this->getAllKeys();   
    foreach ($keys as $value) {   
    $this->delete($value);   
    }   
    $this->delete("real");   
    $this->delete("front");   
    $this->delete("size");   
    $this->delete("maxSize");   
    }   
    private function getAllKeys()   
    {   
    if ($this->isEmpty())   
    {   
    return array();   
    }   
    $keys[] = $this->getKeyByPos($this->front);   
    for ($pos = ($this->front + 1) % $this->maxSize; $pos != $this->real; $pos = ($pos + 1) % $this->maxSize)   
    {   
    $keys[] = $this->getKeyByPos($pos);   
    }   
    return $keys;   
    }   
    private function add($pos, $data)   
    {   
    $this->memcache->add($this->getKeyByPos($pos), $data);   
    return $this;   
    }   
    private function increment($pos)   
    {   
    return $this->memcache->increment($this->getKeyByPos($pos));   
    }   
    private function decrement($pos)   
    {   
    $this->memcache->decrement($this->getKeyByPos($pos));   
    }   
    private function set($pos, $data)   
    {   
    $this->memcache->set($this->getKeyByPos($pos), $data);   
    return $this;   
    }   
    private function get($pos)   
    {   
    return $this->memcache->get($this->getKeyByPos($pos));   
    }   
    private function delete($pos)   
    {   
    return $this->memcache->delete($this->getKeyByPos($pos));   
    }   
    private function getKeyByPos($pos)   
    {   
    return $this->prefix . $this->name . $pos;   
    }   
    }   


Related articles: