Method of asynchronous implementation of timed task and periodic task in python

  • 2021-07-06 11:19:23
  • OfStack

1. How to call


def f1(arg1, arg2):
  print('f1', arg1, arg2)
 
 
def f2(arg1):
  print('f2', arg1)
 
 
def f3():
  print('f3')
 
 
def f4():
  print(' Periodic task ', int(time.time()))
 
 
timer = TaskTimer()
#  Add a task to the task queue 
timer.join_task(f1, [1, 2], timing=15.5) #  Every day 15:30 Execute 
timer.join_task(f2, [3], timing=14) #  Every day 14:00 Execute 
timer.join_task(f3, [], timing=15) #  Every day 15:00 Execute 
timer.join_task(f4, [], interval=10) #  Every 10 Second execution 1 Times 
#  Start execution (threads will be created at this time) 
timer.start()

f1 ~ f4 are functions that we need to execute regularly.

First, create the TaskTimer object (the code for TaskTimer is below). Call the join_task function to add the functions that need to be executed to the task queue. Finally, start is called, and the task begins to execute.

join_task Parameter:

fun: Functions to execute

arg: Parameters of fun, if not, pass an empty list

interval: If this parameter exists, it means that the task is a periodic task in seconds (note that interval is at least 5 seconds)

timing: If this parameter exists, it means that the task is a timed task and the unit is time

Note: Only one of interval and timing can be selected

2. Source code


import datetime
import time
from threading import Thread
from time import sleep
 
 
class TaskTimer:
  __instance = None
 
  def __new__(cls, *args, **kwargs):
    """
     Singleton pattern 
    """
    if not cls.__instance:
      cls.__instance = object.__new__(cls)
    return cls.__instance
 
  def __init__(self):
    if not hasattr(self, 'task_queue'):
      setattr(self, 'task_queue', [])
 
    if not hasattr(self, 'is_running'):
      setattr(self, 'is_running', False)
 
  def write_log(self, level, msg):
    cur_time = datetime.datetime.now()
    with open('./task.log', mode='a+', encoding='utf8') as file:
      s = "[" + str(cur_time) + "][" + level + "]  " + msg
      print(s)
      file.write(s + "\n")
 
  def work(self):
 
    """
     Processing task queue 
    """
    while True:
      for task in self.task_queue:
        if task['interval']:
          self.cycle_task(task)
        elif task['timing']:
          self.timing_task(task)
 
      sleep(5)
 
  def cycle_task(self, task):
    """
     Periodic task 
    """
    if task['next_sec'] <= int(time.time()):
      try:
        task['fun'](*task['arg'])
        self.write_log(" Normal ", " Periodic tasks: " + task['fun'].__name__ + "  Executed ")
      except Exception as e:
        self.write_log(" Anomaly ", " Periodic tasks: " + task['fun'].__name__ + "  Function internal exception: " + str(e))
      finally:
        task['next_sec'] = int(time.time()) + task['interval']
 
  def timing_task(self, task):
    """
     Timing task 
    """
    #  The number of seconds has passed today 
    today_sec = self.get_today_until_now()
 
    #  To the first 2 Day, reset the task state 
    if task['today'] != self.get_today():
      task['today'] = self.get_today()
      task['today_done'] = False
 
    #  No. 1 1 Secondary execution 
    if task['first_work']:
      if today_sec >= task['task_sec']:
        task['today_done'] = True
        task['first_work'] = False
      else:
        task['first_work'] = False
 
    #  It has not been implemented today 
    if not task['today_done']:
      if today_sec >= task['task_sec']: #  It's time to start the mission 
        try:
          task['fun'](*task['arg'])
          self.write_log(" Normal ", " Timed tasks: " + task['fun'].__name__ + "  Executed ")
        except Exception as e:
          self.write_log(" Anomaly ", " Timed tasks: " + task['fun'].__name__ + "  Function internal exception: " + str(e))
        finally:
          task['today_done'] = True
          if task['first_work']:
            task['first_work'] = False
 
  def get_today_until_now(self):
    """
     Get the number of seconds from this morning to now 
    """
    i = datetime.datetime.now()
    return i.hour * 3600 + i.minute * 60 + i.second
 
  def get_today(self):
    """
     Get today's date 
    """
    i = datetime.datetime.now()
    return i.day
 
  def join_task(self, fun, arg, interval=None, timing=None):
    """
    interval And timing Can only exist 1 A 
    :param fun:  The task you want to call 
    :param arg: fun Parameters of 
    :param interval:  Cycle task, unit second 
    :param timing:  Timing task, value: [0,24)
    """
    #  Parameter check 
    if (interval != None and timing != None) or (interval == None and timing == None):
      raise Exception('interval And timing Only optional 1 A ')
 
    if timing and not 0 <= timing < 24:
      raise Exception('timing The value range of is [0,24)')
 
    if interval and interval < 5:
      raise Exception('interval At least 5')
 
    #  Encapsulation 1 A task
    task = {
      'fun': fun,
      'arg': arg,
      'interval': interval,
      'timing': timing,
    }
    #  Encapsulate the parameters corresponding to the cycle or timing task 
    if timing:
      task['task_sec'] = timing * 3600
      task['today_done'] = False
      task['first_work'] = True
      task['today'] = self.get_today()
    elif interval:
      task['next_sec'] = int(time.time()) + interval
 
    #  Put task Join the task queue 
    self.task_queue.append(task)
 
    self.write_log(" Normal ", " New task: " + fun.__name__)
 
  def start(self):
    """
     Start a task 
     Returns the thread identifier 
    """
    if not self.is_running:
      thread = Thread(target=self.work)
 
      thread.start()
 
      self.is_running = True
 
      self.write_log(" Normal ", "TaskTimer Has started running! ")
 
      return thread.ident
 
    self.write_log(" Warning ", "TaskTimer Already running, please do not start again! ")

Related articles: