Python implementation of the parsing crontab configuration file code

  • 2020-04-02 13:49:27
  • OfStack

#/usr/bin/env python
#-*- coding:utf-8 -*-
 
"""
1. parsing  crontab  Five inter-number parameters in the configuration file ( points   when   day   month   weeks ) , get their corresponding value range 
2. Time stamp with crontab A one-line time parameter comparison in the configuration determines whether the timestamp is within the time range set by the configuration 
"""
 
#$Id $
 
import re, time, sys
from Core.FDateTime.FDateTime import FDateTime
 
def get_struct_time(time_stamp_int):
	"""
	 Gets the formatting time by integer timestamp   points   when   day   month   weeks 
	Args:
		time_stamp_int  Timestamp for the incoming value ( plastic ) , such as: 1332888820
		 after localtime Convert to 
		time.struct_time(tm_year=2012, tm_mon=3, tm_mday=28, tm_hour=6, tm_min=53, tm_sec=40, tm_wday=2, tm_yday=88, tm_isdst=0)
	Return:
		list____ return   points   when   day   month   weeks 
	"""
 
	st_time = time.localtime(time_stamp_int)
	return [st_time.tm_min, st_time.tm_hour, st_time.tm_mday, st_time.tm_mon, st_time.tm_wday]
 
 
def get_strptime(time_str, str_format):
	""" Get from string   Integer timestamp 
	Args:
		time_str  Timestamp of string type   Such as  '31/Jul/2013:17:46:01'
		str_format  The specified  time_str  The format of the   Such as  '%d/%b/%Y:%H:%M:%S'
	Return:
		 return 10 An integer (int) Timestamps, such as  1375146861
	"""
	return int(time.mktime(time.strptime(time_str, str_format)))
 
def get_str_time(time_stamp, str_format='%Y%m%d%H%M'):
	"""
	 Get the timestamp ,
	Args:
		time_stamp 10 An integer (int) Timestamps, such as  1375146861
		str_format  Specifies the return format with a value type of   string  str
	Rturn:
		 Returns the format   The default is   Time, such as 2013 years 7 month 9 day 1 when 3 points  :201207090103
	"""
	return time.strftime("%s" % str_format, time.localtime(time_stamp))
 
def match_cont(patten, cont):
	"""
	 Regular match ( An exact match )
	Args:
		patten  Regular expression 
		cont____  Match the content 
	Return:
		True or False
	"""
	res = re.match(patten, cont)
	if res:
		return True
	else:
		return False
 
def handle_num(val, ranges=(0, 100), res=list()):
	""" Dealing with pure Numbers """
	val = int(val)
	if val >= ranges[0] and val <= ranges[1]:
		res.append(val)
	return res
 
def handle_nlist(val, ranges=(0, 100), res=list()):
	""" Working with a list of Numbers   Such as  1,2,3,6"""
	val_list = val.split(',')
	for tmp_val in val_list:
		tmp_val = int(tmp_val)
		if tmp_val >= ranges[0] and tmp_val <= ranges[1]:
			res.append(tmp_val)
	return res
 
def handle_star(val, ranges=(0, 100), res=list()):
	""" To deal with an asterisk """
	if val == '*':
		tmp_val = ranges[0]
		while tmp_val <= ranges[1]:
			res.append(tmp_val)
			tmp_val = tmp_val + 1
	return res
 
def handle_starnum(val, ranges=(0, 100), res=list()):
	""" The asterisk / digital   combination   Such as  */3"""
	tmp = val.split('/')
	val_step = int(tmp[1])
	if val_step < 1:
		return res
	val_tmp = int(tmp[1])
	while val_tmp <= ranges[1]:
		res.append(val_tmp)
		val_tmp = val_tmp + val_step
	return res
 
def handle_range(val, ranges=(0, 100), res=list()):
	""" Processing range   Such as  8-20"""
	tmp = val.split('-')
	range1 = int(tmp[0])
	range2 = int(tmp[1])
	tmp_val = range1
	if range1 < 0:
		return res
	while tmp_val <= range2 and tmp_val <= ranges[1]:
		res.append(tmp_val)
		tmp_val = tmp_val + 1
	return res
 
def handle_rangedv(val, ranges=(0, 100), res=list()):
	""" Processing range / Step length   combination   Such as  8-20/3 """
	tmp = val.split('/')
	range2 = tmp[0].split('-')
	val_start = int(range2[0])
	val_end = int(range2[1])
	val_step = int(tmp[1])
	if (val_step < 1) or (val_start < 0):
		return res
	val_tmp = val_start
	while val_tmp <= val_end and val_tmp <= ranges[1]:
		res.append(val_tmp)
		val_tmp = val_tmp + val_step
	return res
 
def parse_conf(conf, ranges=(0, 100), res=list()):
	""" parsing crontab  Any of the five time parameters """
	# Remove the space, and then split 
	conf = conf.strip(' ').strip(' ')
	conf_list = conf.split(',')
	other_conf = []
	number_conf = []
	for conf_val in conf_list:
		if match_cont(PATTEN['number'], conf_val):
			# Record the split purely numeric parameters 
			number_conf.append(conf_val)
		else:
			# Records parameters other than pure Numbers after splitting, such as wildcards  * ,  interval  0-8,  and  0 - 8/3  Such as 
			other_conf.append(conf_val)
	if other_conf:
		# Handle all kinds of parameters except pure Numbers 
		for conf_val in other_conf:
			for key, ptn in PATTEN.items():
				if match_cont(ptn, conf_val):
					res = PATTEN_HANDLER[key](val=conf_val, ranges=ranges, res=res)
	if number_conf:
		if len(number_conf) > 1 or other_conf:
			# Pure number more than 1 , or the pure number coexists with other parameters, then the number is used as the time list 
			res = handle_nlist(val=','.join(number_conf), ranges=ranges, res=res)
		else:
			# If only one pure number exists, the number is time   interval 
			res = handle_num(val=number_conf[0], ranges=ranges, res=res)
	return res
 
def parse_crontab_time(conf_string):
	"""
	 parsing crontab Time configuration parameter 
	Args:
		conf_string   Configure the content ( There are five values: points   when   day   month   weeks )
					  Value range   minutes :0-59  hours :1-23  The date of :1-31  in :1-12  week :0-6(0 Said Sunday )
	Return:
	crontab_range	 list Format,   when   day   month   weeks   The five incoming parameters correspond to the value ranges respectively 
	"""
	time_limit	= ((0, 59), (1, 23), (1, 31), (1, 12), (0, 6))
	crontab_range = []
	clist = []
	conf_length = 5
	tmp_list = conf_string.split(' ')
	for val in tmp_list:
		if len(clist) == conf_length:
			break
		if val:
			clist.append(val)
 
	if len(clist) != conf_length:
		return -1, 'config error whith [%s]' % conf_string
	cindex = 0
	for conf in clist:
		res_conf = []
		res_conf = parse_conf(conf, ranges=time_limit[cindex], res=res_conf)
		if not res_conf:
			return -1, 'config error whith [%s]' % conf_string
		crontab_range.append(res_conf)
		cindex = cindex + 1
	return 0, crontab_range
 
def time_match_crontab(crontab_time, time_struct):
	"""
	 Time stamp with crontab A one-line time parameter comparison in the configuration determines whether the timestamp is within the time range set by the configuration 
	Args:
		crontab_time____crontab Five times in the configuration   when   day   month   weeks ) The parameter corresponds to the time range 
		time_struct____  An integer timestamp, such as: 1375027200  The corresponding   points   when   day   month   weeks 
	Return:
	tuple  Status code ,  State description 
	"""
	cindex = 0
	for val in time_struct:
		if val not in crontab_time[cindex]:
			return 0, False
		cindex = cindex + 1
	return 0, True
 
def close_to_cron(crontab_time, time_struct):
	"""coron Specified range of (crontab_time) In the   The closest   Specify a time  time_struct  The value of the """
	close_time = time_struct
	cindex = 0
	for val_struct in time_struct:
		offset_min = val_struct
		val_close = val_struct
		for val_cron in crontab_time[cindex]:
			offset_tmp = val_struct - val_cron
			if offset_tmp > 0 and offset_tmp < offset_min:
				val_close = val_struct
				offset_min = offset_tmp
		close_time[cindex] = val_close
		cindex = cindex + 1
	return close_time
 
def cron_time_list(
		cron_time,
		year_num=int(get_str_time(time.time(), "%Y")),
		limit_start=get_str_time(time.time(), "%Y%m%d%H%M"),
		limit_end=get_str_time(time.time() + 86400, "%Y%m%d%H%M")
	):
	#print "nfrom ", limit_start , ' to ' ,limit_end
	"""
	 To obtain crontab All time points within the range of the time configuration parameter   the   The time stamp 
	Args:
		cron_time  Conform to the crontab Configure all time points specified 
		year_num____ In what year   To obtain 
		limit_start  The start time 
	Rturn:
		List   A list of all time points ( At the time of year, month and day   Composition of time, as 2013 years 7 month 29 day 18 when 56 Points: 201307291856)
	"""
	# By the hour   and   Minutes to assemble 
	hour_minute = []
	for minute in cron_time[0]:
		minute = str(minute)
		if len(minute) < 2:
			minute = '0%s' % minute
		for hour in cron_time[1]:
			hour = str(hour)
			if len(hour) < 2:
				hour = '0%s' % hour
			hour_minute.append('%s%s' % (hour, minute))
	# According to the days   and   Hours to assemble 
	day_hm = []
	for day in cron_time[2]:
		day = str(day)
		if len(day) < 2:
			day = '0%s' % day
		for hour_mnt in hour_minute:
			day_hm.append('%s%s' % (day, hour_mnt))
	# On a monthly basis   and   Day of the assembly 
	month_dhm = []
	# only 30 In days of 
	month_short = ['02', '04', '06', '09', '11']
	for month in cron_time[3]:
		month = str(month)
		if len(month) < 2:
			month = '0%s' % month
		for day_hm_s in day_hm:
			if month == '02':
				if (((not year_num % 4 ) and (year_num % 100)) or (not year_num % 400)):
					# A leap year 2 In a 29 day 
					if int(day_hm_s[:2]) > 29:
						continue
				else:
					# other 2 In a 28 day 
					if int(day_hm_s[:2]) > 28:
						continue
			if month in month_short:
				if int(day_hm_s[:2]) > 30:
					continue
			month_dhm.append('%s%s' % (month, day_hm_s))
	# According to the years   and   On the assembly 
	len_start = len(limit_start)
	len_end = len(limit_end)
	month_dhm_limit = []
	for month_dhm_s in month_dhm:
		time_ymdhm = '%s%s' % (str(year_num), month_dhm_s)
		# The exclusion from the start time to the end time 
		if (int(time_ymdhm[:len_start]) < int(limit_start)) or 
		 (int(time_ymdhm[:len_end]) > int(limit_end)):
			continue
		month_dhm_limit.append(time_ymdhm)
	if len(cron_time[4]) < 7:
		# Exclude by the time not specified in the week 
		month_dhm_week = []
		for time_minute in month_dhm_limit:
			str_time = time.strptime(time_minute, '%Y%m%d%H%M%S')
			if str_time.tm_wday in cron_time[4]:
				month_dhm_week.append(time_minute)
		return month_dhm_week
	return month_dhm_limit
 
 
#crontab Time parameters are written in various ways   the   Regular match 
PATTEN = {
	# The pure digital 
	'number':'^[0-9]+$',
	# Digital list , Such as  1,2,3,6
	'num_list':'^[0-9]+([,][0-9]+)+$',
	# The asterisk  *
	'star':'^*$',
	# The asterisk / digital   Combinations, such as  */3
	'star_num':'^*/[0-9]+$',
	# interval   Such as  8-20
	'range':'^[0-9]+[-][0-9]+$',
	# interval / Step length   combination   Such as  8-20/3
	'range_div':'^[0-9]+[-][0-9]+[/][0-9]+$'
	# interval / Step length   The list of   Combinations, such as  8-20/3,21,22,34
	#'range_div_list':'^([0-9]+[-][0-9]+[/][0-9]+)([,][0-9]+)+$'
	}
# The processing methods corresponding to each regularity 
PATTEN_HANDLER = {
	'number':handle_num,
	'num_list':handle_nlist,
	'star':handle_star,
	'star_num':handle_starnum,
	'range':handle_range,
	'range_div':handle_rangedv
}
 
 
def isdo(strs,tips=None):
	"""
	 Judge whether the match is successful! 
	"""
	try:
		tips = tips==None and " File name format error: job_ month - weeks - day - when - points _ The file name .txt" or tips
		timer = strs.replace('@',"*").replace('%','/').split('_')[1]
		month,week,day,hour,mins = timer.split('-')
		conf_string = mins+" "+hour+" "+day+" "+month+" "+week
		res, desc = parse_crontab_time(conf_string)
		if res == 0:
			cron_time = desc
		else:
			return False
 
		now =FDateTime.now()
		now = FDateTime.datetostring(now, "%Y%m%d%H%M00")
 
		time_stamp = FDateTime.strtotime(now, "%Y%m%d%H%M00")
 
		#time_stamp = int(time.time())
		# parsing   The timestamp corresponds   points   when   day   month   weeks 
		time_struct = get_struct_time(time_stamp)
		match_res = time_match_crontab(cron_time, time_struct)
		return match_res[1]
	except:
		print tips
		return False
 
def main():
	""" Test example """
	#crontab Configure a line of time parameters 
	#conf_string = '*/10 * * * * (cd /opt/pythonpm/devpapps; /usr/local/bin/python2.5 data_test.py>>output_error.txt)'
	conf_string = '*/10 * * * *'
	# The time stamp 
	time_stamp = int(time.time())
 
	# parsing crontab Time configuration parameter   points   when   day   month   weeks   Range of values 
	res, desc = parse_crontab_time(conf_string)
 
	if res == 0:
		cron_time = desc
	else:
		print desc
		sys, exit(-1)
 
	print "nconfig:", conf_string
	print "nparse result(range for crontab):"
 
	print " minute:", cron_time[0]
	print " hour: ", cron_time[1]
	print " day: ", cron_time[2]
	print " month: ", cron_time[3]
	print " week day:", cron_time[4]
 
	# parsing   The timestamp corresponds   points   when   day   month   weeks 
	time_struct = get_struct_time(time_stamp)
	print "nstruct time(minute hour day month week) for %d :" % 
		 time_stamp, time_struct
 
	# Time stamp with crontab A one-line time parameter comparison in the configuration determines whether the timestamp is within the time range set by the configuration 
	match_res = time_match_crontab(cron_time, time_struct)
	print "nmatching result:", match_res
 
	#crontab Configure a set of times in the Settings range for a specified interval stamp at the last approach 
	most_close = close_to_cron(cron_time, time_struct)
	print "nin range of crontab time which is most colse to struct ", most_close
 
	time_list = cron_time_list(cron_time)
	print "nn %d times need to tart-up:n" % len(time_list)
	print time_list[:10], '...'
 
 
if __name__ == '__main__':
	# Please have a look at   Using the instance 
	strs = 'job_@-@-@-@-@_test02.txt.sh'
	print isdo(strs)
 
	#main()0")


Related articles: