python simple implementation of mysql data synchronization to ElasticSearch tutorial
- 2020-10-23 20:11:08
- OfStack
Before, the blog used ES1en-ES2en-ES3en to synchronize mysql data to ElasticSearch. However, since the synchronization time was at least one time per minute, it could not meet the requirements of online business, so I could only implement one by myself. However, the time was relatively tight, so I simply realized one
Ideas:
Online has a lot of thought with what mysql binlog function of what, but I know about mysql is limited, so with a very stiff method to get data query mysql, insert es again, because the data volume is not large, and 10 seconds interval synchronization, efficiency can also, in order to avoid the time difference between the server and mysql update and query the time difference, so the query conditions update time is 1 times and synchronous start time, so no matter how much data update how many won't take a few according to, Since the principle is not to miss any data in synchronization, it is also possible for the program to differentiate the time difference and interval time more often, because using one id in mysql as id in es also avoids duplication of data
Use:
Just write the configuration file as escongif.py, then sql, and then execute mstes.py directly. I also refer to logstash-ES30en-ES31en configuration
MsToEs
|-- esconfig. py (configuration file)
|-- mstes. py (sync program)
|-- sql_manage.py (database management)
|-- aa. sql (sql file required)
|-- bb. sql (sql file required)
sql_manage.py:
# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
# For operations that do not require rollbacks and commits
def find(func):
def wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as e:
print(traceback.format_exc())
print(str(e))
return traceback.format_exc()
finally:
self.session.close()
return wrapper
class MysqlManager(object):
def __init__(self):
mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
pool_recycle=3600)
# self.DB_Session = sessionmaker(bind=self.engine)
# self.session = self.DB_Session()
self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
self.db = scoped_session(self.DB_Session)
self.session = self.db()
@find
def select_all_dict(self, sql, keys):
a = self.session.execute(sql)
a = a.fetchall()
lists = []
for i in a:
if len(keys) == len(i):
data_dict = {}
for k, v in zip(keys, i):
data_dict[k] = v
lists.append(data_dict)
else:
return False
return lists
# Shut down
def close(self):
self.session.close()
aa. sql:
select
CONVERT(c.`id`,CHAR) as id,
c.`code` as code,
c.`project_name` as project_name,
c.`name` as name,
date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time,
from `cc` c
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
bb.sql:
select
CONVERT(c.`id`,CHAR) as id,
CONVERT(c.`age`,CHAR) as age,
c.`code` as code,
c.`name` as name,
c.`project_name` as project_name,
date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time,
from `bb` c
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now';
esconfig.py:
# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql The file name and es In the type The name 1 to
mysql = {
# mysql The connection information
"mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
# sql File information
"statement_filespath":[
# sql The corresponding es Index and es type
{
"index":"a1",
"sqlfile":"aa.sql",
"type":"aa"
},
{
"index":"a1",
"sqlfile":"bb.sql",
"type":"bb"
},
],
}
# es the ip And port
elasticsearch = {
"hosts":"127.0.0.1:9200",
}
# Field order and sql File field order 1 To, this is deposited in es The name of the field used here es the type Name as identity
db_field = {
"aa":
("id",
"code",
"name",
"project_name",
"update_time",
),
"bb":
("id",
"code",
"age",
"project_name",
"name",
"update_time",
),
}
es_config = {
# How many seconds to synchronize 1 time
"sleep_time":10,
# To solve the time difference between servers
"time_difference":3,
# show_json Used to show the import json Format data,
"show_json":False,
}
mstes.py:
# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
def __init__(self):
try:
# Whether to show json The data is in the console
self.show_json = es_config.get("show_json")
# How many seconds to synchronize 1 time
self.sleep_time = es_config.get("sleep_time")
# In order to solve the error caused by data update during synchronization
self.time_difference = es_config.get("time_difference")
# The current time , Leave after use
self.datetime_now = ""
# es the ip And port
es_host = elasticsearch.get("hosts")
# The connection es
self.es = Elasticsearch(es_host)
# The connection mysql
self.mm = MysqlManager()
except :
print(traceback.format_exc())
def tongbu_es_mm(self):
try:
# Synchronous start time
start_time = time.time()
print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
# this list For batch insertion es
actions = []
# Get all sql file list
statement_filespath = mysql.get("statement_filespath",[])
if self.datetime_now:
# The current time plus the time difference ( The interval plus the time it takes to perform synchronization is equal to PI 1 Subsynchronous start time ) Restring formatting
# sql In the format of the time between the year, month, day and minutes and seconds can not be blank, or import es Times parsing error, so here the time format is also unified 1 In the middle and 1 a T
self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
else:
self.datetime_now = "1999-01-01T00:00:00"
if statement_filespath:
for filepath in statement_filespath:
# sql file
sqlfile = filepath.get("sqlfile")
# es The index of the
es_index = filepath.get("index")
# es the type
es_type = filepath.get("type")
# read sql The file content
with open(sqlfile,"r") as opf:
sqldatas = opf.read()
# ::datetime_now is 1 Six custom special strings are used for incremental updates
if "::datetime_now" in sqldatas:
sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
else:
sqldatas = sqldatas
# es and sql Mapping of fields
dict_set = db_field.get(es_type)
# access mysql , 1 a list Elements are dictionaries, keys are field names, and values are data
db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
if db_data_list:
# Assemble the data into es The format of the
for db_data in db_data_list:
action = {
"_index": es_index,
"_type": es_type,
"@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
"_source": db_data
}
# If there is no id Fields are generated automatically
es_id = db_data.get("id", "")
if es_id:
action["_id"] = es_id
# Whether or not shown json To the terminal
if self.show_json:
print(action)
# Put the assembled data in list In the
actions.append(action)
# list Do not insert data in batch to null es In the
if len(actions) > 0 :
helpers.bulk(self.es, actions)
except Exception as e:
print(traceback.format_exc())
else:
end_time = time.time()
print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
self.time_difference = end_time-start_time
finally:
# Close the database if you report an error
self.mm.close()
def main():
tb = TongBu()
# How many seconds to synchronize 1 time
sleep_time = tb.sleep_time
# An endless loop is performed to import the data, plus the time interval
while True:
tb.tongbu_es_mm()
time.sleep(sleep_time)
if __name__ == '__main__':
main()