An instance of concurrent programming in Python

  • 2020-04-02 13:47:34
  • OfStack

A list,

We call a running program a process. Each process has its own system state, which contains the memory state, a list of open files, a program pointer to track instruction execution, and a call stack to hold local variables. Typically, a process executes in the order of a single sequence control flow, which is called the main thread of the process. At any given moment, a program does only one thing.

A program can create a new process (such as os.fork() or subprocess.popen ()) from an OS or subprocess module in a Python library function. However, these processes, called child processes, operate independently, with separate system states and main threads. Because processes are independent of each other, they execute concurrently with the original process. This means that the original process can perform other work after the child process is created.

Although processes are independent of each other, they can communicate with each other through a mechanism called interprocess communication (IPC). A typical pattern is based on messaging, which can be understood simply as a pure byte buffer, and the send() or recv() operation primitive can transmit or receive messages over I/O channels such as pipes or network sockets. There are also some IPC patterns that can be done using a memory-mapped mechanism (such as the mmap module), where a process can create Shared areas in memory, and changes to these areas are visible to all processes.

Multiple processes can be used in scenarios where multiple tasks need to be performed simultaneously, with different processes responsible for different parts of the task. However, another way to subdivide work into tasks is to use threads. Like a process, a thread has its own control flow and execution stack, but it runs within the process that created it, sharing all of its parent's data and system resources. Threads are useful when applications need to complete concurrent tasks, but the underlying problem is that tasks must share a large amount of system state.

When using multiple processes or threads, the operating system is responsible for scheduling. This is done by giving each process (or thread) a small slice of time and quickly looping between all the active tasks, dividing the CPU time into small pieces and dividing the tasks. For example, if you have 10 active processes executing on your system, the operating system will allocate an appropriate tenth of the CPU time to each process and cycle between 10 processes. When the system has more than one CPU core, the operating system can schedule processes on different CPU cores, keeping the system load average for parallel execution.

Programs written using concurrent execution mechanisms need to consider some complex issues. The main source of complexity is the issue of synchronization and sharing of data. Often, multiple tasks trying to update the same data structure at the same time can cause problems with dirty data and inconsistent program state (formally, a resource race problem). To solve this problem, you need to use a mutex or other similar synchronization primitive to identify and protect key parts of the program. For example, if several different threads are trying to write data to the same file at the same time, you need a mutex for those writes to execute sequentially, and while one thread is writing, the other threads must wait until the current thread releases the resource.

Concurrent programming in Python

Python has long supported different forms of concurrent programming, including threads, child processes, and other concurrent implementations using the generator function.

Python supports both messaging and thread-based concurrent programming on most systems. While most programmers are more familiar with thread interfaces, Python's threading mechanism has many limitations. Python USES an internal global interpreter lock (GIL), which allows only one thread to execute, to be thread-safe. This allows Python programs to run on a single processor, even on multicore systems. There is a lot of debate in the Python community about GIL, but there is no prospect of it being removed in the foreseeable future.

Python provides some nifty tools for managing concurrent operations based on threads and processes. Even simple programs can use these tools to make tasks run concurrently and faster. The subprocess module provides an API for the creation and communication of child processes. This is especially good for running text-related programs, because these apis support the transfer of data over the new process's standard I/o channel. The signal module exposes the UNIX system's semaphore mechanism to the user for passing event information between processes. Signals are processed asynchronously and usually interrupt the program's current work when a signal arrives. The signaling mechanism enables coarse-grained messaging systems, but there are other, more reliable in-process communication technologies that deliver more complex messages. The threading module provides a set of advanced, object-oriented apis for concurrent operations. Thread objects run concurrently within a process, sharing memory resources. Threads are better at extending I/ o-intensive tasks. The multiprocessing module is similar to the threading module, but it provides operations on the process. Each process class is a real operating system process and does not share memory resources, but the multiprocessing module provides a mechanism for sharing data between processes and passing messages. In general, changing from thread-based to process-based programs is as simple as modifying some import declarations.

Threading module example

Consider the threading module as an example of a simple problem: how to do a large number in piecewise parallel.


import threading
 
class SummingThread(threading.Thread):
  def __init__(self, low, high):
    super(SummingThread, self).__init__()
    self.low = low
    self.high = high
    self.total = 0
 
  def run(self):
    for i in range(self.low, self.high):
      self.total += i
 
thread1 = SummingThread(0, 500000)
thread2 = SummingThread(500000, 1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print(result)

Custom Threading class library

I wrote a small Python class library that is easy to use with threads and contains some useful classes and functions.

Key parameters:

* do_threaded_work. This function assigns a given set of tasks to a corresponding handler (in an uncertain order).

* ThreadedWorker and this class creates a thread that pulls work tasks from a synchronized work queue and writes the processing results to the synchronized work queue

* start_logging_with_thread_info writes the thread id to all log messages. (dependent log environment)

* stop_logging_with_thread_info is used to remove the thread id from all log messages. (dependent log environment)


import threading
import logging
 
def do_threaded_work(work_items, work_func, num_threads=None, per_sync_timeout=1, preserve_result_ordering=True):
  """ Executes work_func on each work_item. Note: Execution order is not preserved, but output ordering is (optionally).
 
    Parameters:
    - num_threads        Default: len(work_items) --- Number of threads to use process items in work_items.
    - per_sync_timeout     Default: 1        --- Each synchronized operation can optionally timeout.
    - preserve_result_ordering Default: True       --- Reorders result_item to match original work_items ordering.
 
    Return: 
    --- list of results from applying work_func to each work_item. Order is optionally preserved.
 
    Example:
 
    def process_url(url):
      # TODO: Do some work with the url
      return url
 
    urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
    # process urls in parallel
    result_items = do_threaded_work(urls_to_process, process_url)
 
    # print(results)
    print(repr(result_items))
  """
  global wrapped_work_func
  if not num_threads:
    num_threads = len(work_items)
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  index = 0
  for work_item in work_items:
    if preserve_result_ordering:
      work_queue.put((index, work_item))
    else:
      work_queue.put(work_item)
    index += 1
 
  if preserve_result_ordering:
    wrapped_work_func = lambda work_item: (work_item[0], work_func(work_item[1]))
 
  start_logging_with_thread_info()
 
  #spawn a pool of threads, and pass them queue instance 
  for _ in range(num_threads):
    if preserve_result_ordering:
      t = ThreadedWorker(work_queue, result_queue, work_func=wrapped_work_func, queue_timeout=per_sync_timeout)
    else:
      t = ThreadedWorker(work_queue, result_queue, work_func=work_func, queue_timeout=per_sync_timeout)
    t.setDaemon(True)
    t.start()
 
  work_queue.join()
  stop_logging_with_thread_info()
 
  logging.info('work_queue joined')
 
  result_items = []
  while not result_queue.empty():
    result = result_queue.get(timeout=per_sync_timeout)
    logging.info('found result[:500]: ' + repr(result)[:500])
    if result:
      result_items.append(result)
 
  if preserve_result_ordering:
    result_items = [work_item for index, work_item in result_items]
 
  return result_items
 
class ThreadedWorker(threading.Thread):
  """ Generic Threaded Worker
    Input to work_func: item from work_queue
 
  Example usage:
 
  import Queue
 
  urls_to_process = ["http://url1.com", "http://url2.com", "http://site1.com", "http://site2.com"]
 
  work_queue = Queue.Queue()
  result_queue = Queue.Queue()
 
  def process_url(url):
    # TODO: Do some work with the url
    return url
 
  def main():
    # spawn a pool of threads, and pass them queue instance 
    for i in range(3):
      t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
      t.setDaemon(True)
      t.start()
 
    # populate queue with data  
    for url in urls_to_process:
      work_queue.put(url)
 
    # wait on the queue until everything has been processed   
    work_queue.join()
 
    # print results
    print repr(result_queue)
 
  main()
  """
 
  def __init__(self, work_queue, result_queue, work_func, stop_when_work_queue_empty=True, queue_timeout=1):
    threading.Thread.__init__(self)
    self.work_queue = work_queue
    self.result_queue = result_queue
    self.work_func = work_func
    self.stop_when_work_queue_empty = stop_when_work_queue_empty
    self.queue_timeout = queue_timeout
 
  def should_continue_running(self):
    if self.stop_when_work_queue_empty:
      return not self.work_queue.empty()
    else:
      return True
 
  def run(self):
    while self.should_continue_running():
      try:
        # grabs item from work_queue
        work_item = self.work_queue.get(timeout=self.queue_timeout)
 
        # works on item
        work_result = self.work_func(work_item)
 
        #place work_result into result_queue
        self.result_queue.put(work_result, timeout=self.queue_timeout)
 
      except Queue.Empty:
        logging.warning('ThreadedWorker Queue was empty or Queue.get() timed out')
 
      except Queue.Full:
        logging.warning('ThreadedWorker Queue was full or Queue.put() timed out')
 
      except:
        logging.exception('Error in ThreadedWorker')
 
      finally:
        #signals to work_queue that item is done
        self.work_queue.task_done()
 
def start_logging_with_thread_info():
  try:
    formatter = logging.Formatter('[thread %(thread)-3s] %(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to start logging with thread info')
 
def stop_logging_with_thread_info():
  try:
    formatter = logging.Formatter('%(message)s')
    logging.getLogger().handlers[0].setFormatter(formatter)
  except:
    logging.exception('Failed to stop logging with thread info')

Use the sample


from test import ThreadedWorker
from queue import Queue
 
urls_to_process = ["http://facebook.com", "http://pypix.com"]
 
work_queue = Queue()
result_queue = Queue()
 
def process_url(url):
  # TODO: Do some work with the url
  return url
 
def main():
  # spawn a pool of threads, and pass them queue instance 
  for i in range(5):
    t = ThreadedWorker(work_queue, result_queue, work_func=process_url)
    t.setDaemon(True)
    t.start()
 
  # populate queue with data  
  for url in urls_to_process:
    work_queue.put(url)
 
  # wait on the queue until everything has been processed   
  work_queue.join()
 
  # print results
  print(repr(result_queue))
 
main()


Related articles: