Using ace ACE_Task class to implement the method of thread pool

  • 2020-04-02 01:04:17
  • OfStack

This code should be an example of ace, but I think it is very good, so I share with you.
The notes are very detailed.
The header file

#ifndef THREAD_POOL_H
#define THREAD_POOL_H

#include "ace/Task.h"
//add by ychen 20070714 below
#include "ace/Mutex.h"
//add by ychen 20070714 above
#if !defined (ACE_LACKS_PRAGMA_ONCE)
# pragma once
#endif 


class ACE_Event_Handler;

class Thread_Pool : public ACE_Task<ACE_MT_SYNCH>
{
public:
  typedef ACE_Task<ACE_MT_SYNCH> inherited;
  
  enum size_t
  {
    default_pool_size_ = 1
  };
  // Basic constructor
  Thread_Pool (void);
  
  int open (int pool_size = default_pool_size_);
  
  virtual int open (void *void_data)
  {
    return inherited::open (void_data);
  }
  
  virtual int close (u_long flags = 0);
  
  int enqueue (ACE_Event_Handler *handler);
  
    typedef ACE_Atomic_Op<ACE_Mutex, int> counter_t;

protected:
  
  int svc (void);
  
  counter_t active_threads_;
};
#endif 

Implementation file

// thread_pool.cpp,v 1.9 1999/09/22 03:13:42 jcej Exp
#include "thread_pool.h"

#include "ace/Event_Handler.h"

Thread_Pool::Thread_Pool (void)
  : active_threads_ (0)
{
}

int
Thread_Pool::open (int pool_size)
{
  return this->activate (THR_NEW_LWP|THR_DETACHED, pool_size);
}

int
Thread_Pool::close (u_long flags)
{
  ACE_UNUSED_ARG(flags);
  
  int counter = active_threads_.value ();
  
  while (counter--)
    this->enqueue (0);
  
  while (active_threads_.value ())
    ACE_OS::sleep (ACE_Time_Value (0, 250000));
  return(0);
}

int
Thread_Pool::enqueue (ACE_Event_Handler *handler)
{
  
  
  void *v_data = (void *) handler;
  char *c_data = (char *) v_data;
  ACE_Message_Block *mb;
  
  ACE_NEW_RETURN (mb,
                  ACE_Message_Block (c_data),
                  -1);
  
  if (this->putq (mb) == -1)
    {
      
      mb->release ();
      return -1;
    }
  return 0;
}


class Counter_Guard
{
public:
  Counter_Guard (Thread_Pool::counter_t &counter)
    : counter_ (counter)
  {
    ++counter_;
  }
  ~Counter_Guard (void)
  {
    --counter_;
  }
protected:
  Thread_Pool::counter_t &counter_;
};

class Message_Block_Guard
{
public:
  Message_Block_Guard (ACE_Message_Block *&mb)
    : mb_ (mb)
  {
  }
  ~Message_Block_Guard (void)
  {
    mb_->release ();
  }
protected:
  ACE_Message_Block *&mb_;
};

int
Thread_Pool::svc (void)
{
  
  ACE_Message_Block *mb;
  
  Counter_Guard counter_guard (active_threads_);
  
  while (this->getq (mb) != -1)
    {
      
      Message_Block_Guard message_block_guard (mb);
      
      char *c_data = mb->base ();
      
      if (c_data)
        {
          
          void *v_data = (void *) c_data;
          ACE_Event_Handler *handler = (ACE_Event_Handler *) v_data;
          
          if (handler->handle_input (ACE_INVALID_HANDLE) == -1)
            {
              
              handler->handle_close (handler->get_handle (), 0);
              
            }
        }
      else
        
          return 0;  // Ok, shutdown request
      // message_block_guard goes out of scope here and releases the
      // message_block instance.
    }
  return 0;
}

In it, the idea of management is used for two variables in one of the classes. It is managed by the Counter_Guard class and the Message_Block_Guard class, respectively.
Because the ACE_Task class USES the ACE_message_block to encapsulate the message. Therefore, the use of classes prevents memory leaks.
ACE_Event_Handler   Is an event handle, similar to an operator. And when we do that, we do that.

Related articles: