Method of asynchronous implementation of timed task and periodic task in python
- 2021-07-06 11:19:23
- OfStack
1. How to call
def f1(arg1, arg2):
print('f1', arg1, arg2)
def f2(arg1):
print('f2', arg1)
def f3():
print('f3')
def f4():
print(' Periodic task ', int(time.time()))
timer = TaskTimer()
# Add a task to the task queue
timer.join_task(f1, [1, 2], timing=15.5) # Every day 15:30 Execute
timer.join_task(f2, [3], timing=14) # Every day 14:00 Execute
timer.join_task(f3, [], timing=15) # Every day 15:00 Execute
timer.join_task(f4, [], interval=10) # Every 10 Second execution 1 Times
# Start execution (threads will be created at this time)
timer.start()
f1 ~ f4 are functions that we need to execute regularly.
First, create the TaskTimer object (the code for TaskTimer is below). Call the join_task function to add the functions that need to be executed to the task queue. Finally, start is called, and the task begins to execute.
join_task Parameter:
fun: Functions to execute
arg: Parameters of fun, if not, pass an empty list
interval: If this parameter exists, it means that the task is a periodic task in seconds (note that interval is at least 5 seconds)
timing: If this parameter exists, it means that the task is a timed task and the unit is time
Note: Only one of interval and timing can be selected
2. Source code
import datetime
import time
from threading import Thread
from time import sleep
class TaskTimer:
__instance = None
def __new__(cls, *args, **kwargs):
"""
Singleton pattern
"""
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self):
if not hasattr(self, 'task_queue'):
setattr(self, 'task_queue', [])
if not hasattr(self, 'is_running'):
setattr(self, 'is_running', False)
def write_log(self, level, msg):
cur_time = datetime.datetime.now()
with open('./task.log', mode='a+', encoding='utf8') as file:
s = "[" + str(cur_time) + "][" + level + "] " + msg
print(s)
file.write(s + "\n")
def work(self):
"""
Processing task queue
"""
while True:
for task in self.task_queue:
if task['interval']:
self.cycle_task(task)
elif task['timing']:
self.timing_task(task)
sleep(5)
def cycle_task(self, task):
"""
Periodic task
"""
if task['next_sec'] <= int(time.time()):
try:
task['fun'](*task['arg'])
self.write_log(" Normal ", " Periodic tasks: " + task['fun'].__name__ + " Executed ")
except Exception as e:
self.write_log(" Anomaly ", " Periodic tasks: " + task['fun'].__name__ + " Function internal exception: " + str(e))
finally:
task['next_sec'] = int(time.time()) + task['interval']
def timing_task(self, task):
"""
Timing task
"""
# The number of seconds has passed today
today_sec = self.get_today_until_now()
# To the first 2 Day, reset the task state
if task['today'] != self.get_today():
task['today'] = self.get_today()
task['today_done'] = False
# No. 1 1 Secondary execution
if task['first_work']:
if today_sec >= task['task_sec']:
task['today_done'] = True
task['first_work'] = False
else:
task['first_work'] = False
# It has not been implemented today
if not task['today_done']:
if today_sec >= task['task_sec']: # It's time to start the mission
try:
task['fun'](*task['arg'])
self.write_log(" Normal ", " Timed tasks: " + task['fun'].__name__ + " Executed ")
except Exception as e:
self.write_log(" Anomaly ", " Timed tasks: " + task['fun'].__name__ + " Function internal exception: " + str(e))
finally:
task['today_done'] = True
if task['first_work']:
task['first_work'] = False
def get_today_until_now(self):
"""
Get the number of seconds from this morning to now
"""
i = datetime.datetime.now()
return i.hour * 3600 + i.minute * 60 + i.second
def get_today(self):
"""
Get today's date
"""
i = datetime.datetime.now()
return i.day
def join_task(self, fun, arg, interval=None, timing=None):
"""
interval And timing Can only exist 1 A
:param fun: The task you want to call
:param arg: fun Parameters of
:param interval: Cycle task, unit second
:param timing: Timing task, value: [0,24)
"""
# Parameter check
if (interval != None and timing != None) or (interval == None and timing == None):
raise Exception('interval And timing Only optional 1 A ')
if timing and not 0 <= timing < 24:
raise Exception('timing The value range of is [0,24)')
if interval and interval < 5:
raise Exception('interval At least 5')
# Encapsulation 1 A task
task = {
'fun': fun,
'arg': arg,
'interval': interval,
'timing': timing,
}
# Encapsulate the parameters corresponding to the cycle or timing task
if timing:
task['task_sec'] = timing * 3600
task['today_done'] = False
task['first_work'] = True
task['today'] = self.get_today()
elif interval:
task['next_sec'] = int(time.time()) + interval
# Put task Join the task queue
self.task_queue.append(task)
self.write_log(" Normal ", " New task: " + fun.__name__)
def start(self):
"""
Start a task
Returns the thread identifier
"""
if not self.is_running:
thread = Thread(target=self.work)
thread.start()
self.is_running = True
self.write_log(" Normal ", "TaskTimer Has started running! ")
return thread.ident
self.write_log(" Warning ", "TaskTimer Already running, please do not start again! ")