Details of python simulation event triggering mechanism

  • 2020-07-21 08:50:27
  • OfStack

This article shares the specific code of python simulation event trigger mechanism for your reference, the specific content is as follows

EventManager.py


# -*- encoding: UTF-8 -*-

#  System module 
from queue import Queue, Empty
from threading import *


class EventManager:
  def __init__(self):
    """ Initializes the event manager """
    #  Event object list 
    self.__eventQueue = Queue()
    #  The event manager switch 
    self.__active = False
    #  Event handler thread 
    self.__thread = Thread(target = self.__Run)

    #  Here, __handlers is 1 A dictionary to hold the corresponding event response function 
    #  Where the value for each key is 1 List of the response functions that are listening for this event, 1 For more than 
    self.__handlers = {}  # { The event type :[ A method to handle an event ]}

  def __Run(self):
    """ The engine running """
    while self.__active == True:
      try:
        #  Gets the event block time set to 1 seconds 
        event = self.__eventQueue.get(block = True, timeout = 1) 
        self.__EventProcess(event)
      except Empty:
        pass

  def __EventProcess(self, event):
    """ Handle events """
    #  Checks to see if there is a handler to listen for this event 
    if event.type_ in self.__handlers:
      #  If present, events are passed to the handler for execution in sequence 
      for handler in self.__handlers[event.type_]:
        handler(event)

  def Start(self):
    """ Start the """
    #  Set the event manager to start 
    self.__active = True
    #  Start the event handler thread 
    self.__thread.start()

  def Stop(self):
    """ stop """
    #  Set the event manager to stop 
    self.__active = False
    #  Wait for the event handler thread to exit 
    self.__thread.join()

  def AddEventListener(self, type_, handler):
    """ Bind events and listener handlers """
    #  Try to get a list of handlers corresponding to the event type, or create none 
    try:
      handlerList = self.__handlers[type_]
    except KeyError:
      handlerList = []

    self.__handlers[type_] = handlerList
    #  If the handler to be registered is not in the handler list for the event, the event is registered 
    if handler not in handlerList:
      handlerList.append(handler)

  def RemoveEventListener(self, type_, handler):
    """ Removes the listener's handler """
    # Try it out for yourself 

  def SendEvent(self, event):
    """ Send events to store events in the event queue """
    self.__eventQueue.put(event)

""" The event object """
class Event:
  def __init__(self, type_=None):
    self.type_ = type_   #  The event type 
    self.dict = {}     #  Dictionaries are used to hold specific event data 

test.py


# -*- encoding: UTF-8 -*-

from threading import *
from EventManager import *
import time

# The name of the event   New articles 
EVENT_ARTICAL = "Event_Artical"


# The event source   The public, 
class PublicAccounts:
  def __init__(self,eventManager):
    self.__eventManager = eventManager

  def WriteNewArtical(self):
    # Event object, wrote the new article 
    event = Event(type_=EVENT_ARTICAL)
    event.dict["artical"] = u' How to write more elegant code \n'
    # Send the event 
    self.__eventManager.SendEvent(event)
    print(u' The official account sends the new article ')


# The listener   The subscriber 
class Listener:
  def __init__(self,username):
    self.__username = username

  # The listener's handler   Read the article 
  def ReadArtical(self,event):
    print(u'%s  Receive new articles ' % self.__username)
    print(u' Reading a new article: %s' % event.dict["artical"])


""" Test functions """
def test():
  listner1 = Listener("thinkroom") # The subscriber 1
  listner2 = Listener("steve")# The subscriber 2

  eventManager = EventManager()

  # Bind events and listener response functions ( New articles )
  eventManager.AddEventListener(EVENT_ARTICAL, listner1.ReadArtical)
  eventManager.AddEventListener(EVENT_ARTICAL, listner2.ReadArtical)
  eventManager.Start()

  publicAcc = PublicAccounts(eventManager)
  while True:
    publicAcc.WriteNewArtical()
    time.sleep(2)

if __name__ == '__main__':
  test()

Related articles: