python simple implementation of mysql data synchronization to ElasticSearch tutorial

  • 2020-10-23 20:11:08
  • OfStack

Before, the blog used ES1en-ES2en-ES3en to synchronize mysql data to ElasticSearch. However, since the synchronization time was at least one time per minute, it could not meet the requirements of online business, so I could only implement one by myself. However, the time was relatively tight, so I simply realized one

Ideas:

Online has a lot of thought with what mysql binlog function of what, but I know about mysql is limited, so with a very stiff method to get data query mysql, insert es again, because the data volume is not large, and 10 seconds interval synchronization, efficiency can also, in order to avoid the time difference between the server and mysql update and query the time difference, so the query conditions update time is 1 times and synchronous start time, so no matter how much data update how many won't take a few according to, Since the principle is not to miss any data in synchronization, it is also possible for the program to differentiate the time difference and interval time more often, because using one id in mysql as id in es also avoids duplication of data

Use:

Just write the configuration file as escongif.py, then sql, and then execute mstes.py directly. I also refer to logstash-ES30en-ES31en configuration

MsToEs

|-- esconfig. py (configuration file)

|-- mstes. py (sync program)

|-- sql_manage.py (database management)

|-- aa. sql (sql file required)

|-- bb. sql (sql file required)

sql_manage.py:


# -*-coding:utf-8 -*-
__author__ = "ZJL"
from sqlalchemy.pool import QueuePool
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
import traceback
import esconfig
#  For operations that do not require rollbacks and commits 
def find(func):
 def wrapper(self, *args, **kwargs):
  try:
   return func(self, *args, **kwargs)
  except Exception as e:
   print(traceback.format_exc())
   print(str(e))
   return traceback.format_exc()
  finally:
   self.session.close()
 return wrapper
class MysqlManager(object):
 def __init__(self):
  mysql_connection_string = esconfig.mysql.get("mysql_connection_string")
  self.engine = create_engine('mysql+pymysql://'+mysql_connection_string+'?charset=utf8', poolclass=QueuePool,
         pool_recycle=3600)
  # self.DB_Session = sessionmaker(bind=self.engine)
  # self.session = self.DB_Session()
  self.DB_Session = sessionmaker(bind=self.engine, autocommit=False, autoflush=True, expire_on_commit=False)
  self.db = scoped_session(self.DB_Session)
  self.session = self.db()
 @find
 def select_all_dict(self, sql, keys):
  a = self.session.execute(sql)
  a = a.fetchall()
  lists = []
  for i in a:
   if len(keys) == len(i):
    data_dict = {}
    for k, v in zip(keys, i):
     data_dict[k] = v
    lists.append(data_dict)
   else:
    return False
  return lists
 #  Shut down 
 def close(self):
  self.session.close()

aa. sql:


select 
 CONVERT(c.`id`,CHAR)    as id, 
 c.`code`   as code, 
 c.`project_name` as project_name, 
 c.`name`   as name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')  as update_time, 
from `cc` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

bb.sql:


select 
 CONVERT(c.`id`,CHAR)    as id, 
 CONVERT(c.`age`,CHAR)    as age, 
 c.`code`   as code, 
 c.`name`   as name, 
 c.`project_name` as project_name, 
 date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s') as update_time, 
from `bb` c 
where date_format(c.`update_time`,'%Y-%m-%dT%H:%i:%s')>='::datetime_now'; 

esconfig.py:


# -*- coding: utf-8 -*-
#__author__="ZJL"
# sql  The file name and es In the type The name 1 to 
mysql = {
 # mysql The connection information 
 "mysql_connection_string": "root:123456@127.0.0.1:3306/xxx",
 # sql File information 
 "statement_filespath":[
  # sql The corresponding es Index and es type 
  {
   "index":"a1",
   "sqlfile":"aa.sql",
   "type":"aa"
  },
  {
   "index":"a1",
   "sqlfile":"bb.sql",
   "type":"bb"
  },
 ],
}
# es the ip And port 
elasticsearch = {
 "hosts":"127.0.0.1:9200",
}
#  Field order and sql File field order 1 To, this is deposited in es The name of the field used here es the type Name as identity 
db_field = {
  "aa":
   ("id",
   "code",
   "name",
   "project_name",
   "update_time",
   ),
 "bb":
  ("id",
   "code",
   "age",
   "project_name",
   "name",
   "update_time",
   ),
}
es_config = {
 #  How many seconds to synchronize 1 time 
 "sleep_time":10,
 #  To solve the time difference between servers 
 "time_difference":3,
 # show_json  Used to show the import json Format data, 
 "show_json":False,
}

mstes.py:


# -*- coding: utf-8 -*-
#__author__="ZJL"
from sql_manage import MysqlManager
from esconfig import mysql,elasticsearch,db_field,es_config
from elasticsearch import Elasticsearch
from elasticsearch import helpers
import traceback
import time
class TongBu(object):
 def __init__(self):
  try:
   #  Whether to show json The data is in the console 
   self.show_json = es_config.get("show_json")
   #  How many seconds to synchronize 1 time 
   self.sleep_time = es_config.get("sleep_time")
   #  In order to solve the error caused by data update during synchronization 
   self.time_difference = es_config.get("time_difference")
   #  The current time , Leave after use 
   self.datetime_now = ""
   # es the ip And port 
   es_host = elasticsearch.get("hosts")
   #  The connection es
   self.es = Elasticsearch(es_host)
   #  The connection mysql
   self.mm = MysqlManager()
  except :
   print(traceback.format_exc())
 def tongbu_es_mm(self):
  try:
   #  Synchronous start time 
   start_time = time.time()
   print("start..............",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   #  this list For batch insertion es
   actions = []
   #  Get all sql file list
   statement_filespath = mysql.get("statement_filespath",[])
   if self.datetime_now:
    #  The current time plus the time difference ( The interval plus the time it takes to perform synchronization is equal to PI 1 Subsynchronous start time ) Restring formatting 
    # sql In the format of the time between the year, month, day and minutes and seconds can not be blank, or import es Times parsing error, so here the time format is also unified 1 In the middle and 1 a T
    self.datetime_now = time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time()-(self.sleep_time+self.time_difference)))
   else:
    self.datetime_now = "1999-01-01T00:00:00"
   if statement_filespath:
    for filepath in statement_filespath:
     # sql file 
     sqlfile = filepath.get("sqlfile")
     # es The index of the 
     es_index = filepath.get("index")
     # es the type
     es_type = filepath.get("type")
     #  read sql The file content 
     with open(sqlfile,"r") as opf:
      sqldatas = opf.read()
      # ::datetime_now is 1 Six custom special strings are used for incremental updates 
      if "::datetime_now" in sqldatas:
       sqldatas = sqldatas.replace("::datetime_now",self.datetime_now)
      else:
       sqldatas = sqldatas
      # es and sql Mapping of fields 
      dict_set = db_field.get(es_type)
      #  access mysql , 1 a list Elements are dictionaries, keys are field names, and values are data 
      db_data_list = self.mm.select_all_dict(sqldatas, dict_set)
      if db_data_list:
       #  Assemble the data into es The format of the 
       for db_data in db_data_list:
        action = {
         "_index": es_index,
         "_type": es_type,
         "@timestamp": time.strftime("%Y-%m-%dT%H:%M:%S", time.localtime(time.time())),
         "_source": db_data
        }
        #  If there is no id Fields are generated automatically 
        es_id = db_data.get("id", "")
        if es_id:
         action["_id"] = es_id
        #  Whether or not shown json To the terminal 
        if self.show_json:
         print(action)
        #  Put the assembled data in list In the 
        actions.append(action)
   # list Do not insert data in batch to null es In the 
   if len(actions) > 0 :
    helpers.bulk(self.es, actions)
  except Exception as e:
   print(traceback.format_exc())
  else:
   end_time = time.time()
   print("end...................",time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(start_time)))
   self.time_difference = end_time-start_time
  finally:
   #  Close the database if you report an error 
   self.mm.close()
def main():
 tb = TongBu()
 #  How many seconds to synchronize 1 time 
 sleep_time = tb.sleep_time
 #  An endless loop is performed to import the data, plus the time interval 
 while True:
  tb.tongbu_es_mm()
  time.sleep(sleep_time)
if __name__ == '__main__':
 main()

Related articles: