Detailed method of receiving and reading data for python3 Serial serial assistant

  • 2021-06-28 09:38:09
  • OfStack

In fact, there are many serial ports written in python language on the network, but most of them are written by python2. There is no suitable serial assistant written by python, so I can only write a serial assistant by myself. Because I only need the serial port to be able to receive and read data, this serial assistant only implements data reception and reading.

To create a serial assistant, you first need to create a class. The implementation of the refactoring class is as follows:


#coding=gb18030

import threading
import time
import serial

class ComThread:
 def __init__(self, Port='COM3'):
 # Construct properties of serial port 
  self.l_serial = None
  self.alive = False
  self.waitEnd = None
  self.port = Port
  self.ID = None
  self.data = None
 # Define functions for serial port waiting 
 def waiting(self):
  if not self.waitEnd is None:
   self.waitEnd.wait()

 def SetStopEvent(self):
  if not self.waitEnd is None:
   self.waitEnd.set()
  self.alive = False
  self.stop()
 # Start Serial Port Function 
 def start(self):
  self.l_serial = serial.Serial()
  self.l_serial.port = self.port
  self.l_serial.baudrate = 115200
  # Set wait time if stop waiting exceeds 
  self.l_serial.timeout = 2
  self.l_serial.open()
  # Determine if the serial port is open 
  if self.l_serial.isOpen():
   self.waitEnd = threading.Event()
   self.alive = True
   self.thread_read = None
   self.thread_read = threading.Thread(target=self.FirstReader)
   self.thread_read.setDaemon(1)
   self.thread_read.start()
   return True
  else:
   return False

Once the class is created, the function of serial port reading is implemented, which reads data as follows:

First, create a string to hold the received data:


   data = ''
   data = data.encode('utf-8')# Because the serial port uses bytes, it needs to be transcoded, otherwise the serial port will not be recognized 

Start receiving data after you have created a variable to receive data:


n = self.l_serial.inWaiting()# Get Received Data Length 
   if n: 
     # Read and save data data
     data = data + self.l_serial.read(n)
     # Output Received Data 
     print('get data from serial port:', data)
     # display data Type of so that errors can be checked if they occur 
     print(type(data))

After the data is received, it is necessary to process the received data and extract useful information. Because the lower computer uses different protocols, so the processing method is also different. I use the protocol **+ID+*Data+*, so my extraction method is as follows:


# Get Length of Data Not Received 
n = self.l_serial.inWaiting()
# Determine if all the data transferred from the lower computer has been extracted to prevent all data from being retrieved before 
if len(data)>0 and n==0:
 try:
  # Convert data to character type 
  temp = data.decode('gb18030')
  # output temp Type, see if transcoding is successful 
  print(type(temp))
  # Output Received Data 
  print(temp)
   Split data by line break and output 
  car,temp = str(temp).split("\n",1)
  print(car,temp)
 
  # take temp Press ':' Split and get the 2 Data 
  string = str(temp).strip().split(":")[1]
  # Since the previously split data type is a list, it needs to be converted to a string and followed by '*' Divide, get what we need Id and data
  str_ID,str_data = str(string).split("*",1)
 
  print(str_ID)
  print(str_data)
  print(type(str_ID),type(str_data))
  # judge data Last 1 Is Bit Yes '*' Exit if not output exception 
  if str_data[-1]== '*':
   break
  else:
   print(str_data[-1])
   print('str_data[-1]!=*')
 except:
  print(" Error reading card, please try again! \n")

The output is:


get data from serial port:b'ID:4A622E29\n\xbf\xa8\xd6\xd0\xbf\xe924\xca\xfd\xbe\xdd\xce\xaa:1*80*\r\n'
<class 'bytes'>
<class 'str'>
ID:4A622E29
 Card Block 24 Data is :1*80*

ID:4A622E29  Card Block 24 Data is :1*80*
80*
<class 'str'> <class 'str'>

The complete code for the serial assistant is as follows:


#coding=gb18030

import threading
import time
import serial

class ComThread:
 def __init__(self, Port='COM3'):
  self.l_serial = None
  self.alive = False
  self.waitEnd = None
  self.port = Port
  self.ID = None
  self.data = None

 def waiting(self):
  if not self.waitEnd is None:
   self.waitEnd.wait()

 def SetStopEvent(self):
  if not self.waitEnd is None:
   self.waitEnd.set()
  self.alive = False
  self.stop()

 def start(self):
  self.l_serial = serial.Serial()
  self.l_serial.port = self.port
  self.l_serial.baudrate = 115200
  self.l_serial.timeout = 2
  self.l_serial.open()
  if self.l_serial.isOpen():
   self.waitEnd = threading.Event()
   self.alive = True
   self.thread_read = None
   self.thread_read = threading.Thread(target=self.FirstReader)
   self.thread_read.setDaemon(1)
   self.thread_read.start()
   return True
  else:
   return False

 def SendDate(self,i_msg,send):
  lmsg = ''
  isOK = False
  if isinstance(i_msg):
   lmsg = i_msg.encode('gb18030')
  else:
   lmsg = i_msg
  try:
   #  Send data to the appropriate processing component 
   self.l_serial.write(send)
  except Exception as ex:
   pass;
  return isOK

 def FirstReader(self):
  while self.alive:
   time.sleep(0.1)

   data = ''
   data = data.encode('utf-8')

   n = self.l_serial.inWaiting()
   if n:
     data = data + self.l_serial.read(n)
     print('get data from serial port:', data)
     print(type(data))

   n = self.l_serial.inWaiting()
   if len(data)>0 and n==0:
    try:
     temp = data.decode('gb18030')
     print(type(temp))
     print(temp)
     car,temp = str(temp).split("\n",1)
     print(car,temp)

     string = str(temp).strip().split(":")[1]
     str_ID,str_data = str(string).split("*",1)

     print(str_ID)
     print(str_data)
     print(type(str_ID),type(str_data))

     if str_data[-1]== '*':
      break
     else:
      print(str_data[-1])
      print('str_data[-1]!=*')
    except:
     print(" Error reading card, please try again! \n")

  self.ID = str_ID
  self.data = str_data[0:-1]
  self.waitEnd.set()
  self.alive = False

 def stop(self):
  self.alive = False
  self.thread_read.join()
  if self.l_serial.isOpen():
   self.l_serial.close()
# Call Serial Port, Test Serial Port 
def main():
 rt = ComThread()
 rt.sendport = '**1*80*'
 try:
  if rt.start():
   print(rt.l_serial.name)
   rt.waiting()
   print("The data is:%s,The Id is:%s"%(rt.data,rt.ID))
   rt.stop()
  else:
   pass
 except Exception as se:
  print(str(se))

 if rt.alive:
  rt.stop()

 print('')
 print ('End OK .')
 temp_ID=rt.ID
 temp_data=rt.data
 del rt
 return temp_ID,temp_data


if __name__ == '__main__':

 # Set up 1 A main function that runs a window so that it can be called directly if the serial port needs to be called elsewhere main function 
 ID,data = main()

 print("******")
 print(ID,data)

Related articles: