Method for Python to obtain datax execution results and save them to database

  • 2021-07-13 05:49:18
  • OfStack

Execute the datax job, create an execution file, and execute it at 1 o'clock every day in crontab (the following is relevant):

The job_start and job_finish rows are self-added to facilitate identification of which table.


#!/bin/bash
source /etc/profile
user1="root"
pass1="pwd"
user2="root"
pass2="pwd"
job_path="/opt/datax/job/"
 
jobfile=(
job_table_a.json
job_table_b.json
)
 
for filename in ${jobfile[@]}
do
	echo "job_start: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
	python /opt/datax/bin/datax.py -p "-Duser1=${user1} -Dpass1=${pass1} -Duser2=${user2} -Dpass2=${pass2}" ${job_path}${filename}
	echo "job_finish: "`date "+%Y-%m-%d %H:%M:%S"`" ${filename}"
done
 
# 0 1 * * * /opt/datax/job/dc_to_ods_incr.sh >> /opt/datax/job/log/dc_to_ods_incr_$(date +\%Y\%m\%d_\%H\%M\%S).log 2>&1
# egrep ' Mission | Speed | Total |job_start|job_finish' /opt/datax/job/log/

datax execution log:


job_start: 2018-08-08 01:13:28 job_table_a.json
 Task start time           : 2018-08-08 01:13:28
 Mission end time           : 2018-08-08 01:14:49
 Total time spent on tasks           :         81s
 Average task flow           :     192.82KB/s
 Record writing speed           :      1998rec/s
 Total number of readout records           :       159916
 Total number of read and write failures           :          0
job_finish: 2018-08-08 01:14:49 job_table_a.json
job_start: 2018-08-08 01:14:49 job_table_b.json
 Task start time           : 2018-08-08 01:14:50
 Mission end time           : 2018-08-08 01:15:01
 Total time spent on tasks           :         11s
 Average task flow           :        0B/s
 Record writing speed           :       0rec/s
 Total number of readout records           :          0
 Total number of read and write failures           :          0
job_finish: 2018-08-08 01:15:01 job_table_b.json

Next, read this information and save it to the database, creating a table in the database:


CREATE TABLE `datax_job_result` (
 `log_file` varchar(200) DEFAULT NULL,
 `job_file` varchar(200) DEFAULT NULL,
 `start_time` datetime DEFAULT NULL,
 `end_time` datetime DEFAULT NULL,
 `seconds` int(11) DEFAULT NULL,
 `traffic` varchar(50) DEFAULT NULL,
 `write_speed` varchar(50) DEFAULT NULL,
 `read_record` int(11) DEFAULT NULL,
 `failed_record` int(11) DEFAULT NULL,
 `job_start` varchar(200) DEFAULT NULL,
 `job_finish` varchar(200) DEFAULT NULL,
 `insert_time` datetime DEFAULT CURRENT_TIMESTAMP
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Execute the following files regularly, because the datax job executes at 1 o'clock. In order to obtain the latest production log within 1 day, the script takes the log file produced within 82800 and the latest log produced within 23 hours. Therefore, it can be executed at any time within 1 day. This file is also executed regularly every day (after judging the completion of datax job)


#!/usr/bin/python
# -*- coding: UTF-8 -*-
# 0 5 * * * source /etc/profile && /usr/bin/python2.7 /opt/datax/job/save_log_to_db.py > /dev/null 2>&1
 
import re
import os
import sqlalchemy
import pandas as pd
import datetime as dt
 
def save_to_db(df):
	engine = sqlalchemy.create_engine("mysql+pymysql://root:pwd@localhost:3306/test", encoding="utf-8") 
	df.to_sql("datax_job_result", engine, index=False, if_exists='append') 
 
def get_the_latest_file(path):
	t0 = dt.datetime.utcfromtimestamp(0)
	d2 = (dt.datetime.now() - t0).total_seconds()
	d1 = d2 - 82800
	for (dirpath, dirnames, filenames) in os.walk(path):
		for filename in sorted(filenames, reverse = True):
			if filename.endswith(".log"):
				f = os.path.join(dirpath,filename)
				ctime = os.stat(f)[-1]
				if ctime>=d1 and ctime <=d2:
					return f
			
def get_job_result_from_logfile(path):
	result = pd.DataFrame(columns=['log_file','job_file','start_time','end_time','seconds','traffic','write_speed','read_record','failed_record','job_start','job_finish'])
	log_file = get_the_latest_file(path)
	index = 0
	content = open(log_file, "r")
	for line in content:
		result.loc[index, 'log_file'] = log_file
		if re.compile(r'job_start').match(line):
			result.loc[index, 'job_file'] = line.split(' ')[4].strip()
			result.loc[index, 'job_start'] = line,
		elif re.compile(r' Task start time ').match(line):
			result.loc[index, 'start_time'] = line.split(' Engrave ')[1].strip().split(' ')[1].strip() + ' ' + line.split(' Engrave ')[1].strip().split(' ')[2].strip()
		elif re.compile(r' Mission end time ').match(line):
			result.loc[index, 'end_time'] = line.split(' Engrave ')[1].strip().split(' ')[1].strip() + ' ' + line.split(' Engrave ')[1].strip().split(' ')[2].strip()
		elif re.compile(r' Total time spent on tasks ').match(line):
			result.loc[index, 'seconds'] = line.split(':')[1].strip().replace('s','')
		elif re.compile(r' Average task flow ').match(line):
			result.loc[index, 'traffic'] = line.split(':')[1].strip()
		elif re.compile(r' Record writing speed ').match(line):
			result.loc[index, 'write_speed'] = line.split(':')[1].strip()
		elif re.compile(r' Total number of readout records ').match(line):
			result.loc[index, 'read_record'] = line.split(':')[1].strip()
		elif re.compile(r' Total number of read and write failures ').match(line):
			result.loc[index, 'failed_record'] = line.split(':')[1].strip()
		elif re.compile(r'job_finish').match(line):
			result.loc[index, 'job_finish'] = line,
			index = index + 1
		else:
			pass
	save_to_db(result)
 
get_job_result_from_logfile("/opt/datax/job/log")

Related articles: