Detailed method of receiving and reading data for python3 Serial serial assistant
- 2021-06-28 09:38:09
- OfStack
In fact, there are many serial ports written in python language on the network, but most of them are written by python2. There is no suitable serial assistant written by python, so I can only write a serial assistant by myself. Because I only need the serial port to be able to receive and read data, this serial assistant only implements data reception and reading.
To create a serial assistant, you first need to create a class. The implementation of the refactoring class is as follows:
#coding=gb18030
import threading
import time
import serial
class ComThread:
def __init__(self, Port='COM3'):
# Construct properties of serial port
self.l_serial = None
self.alive = False
self.waitEnd = None
self.port = Port
self.ID = None
self.data = None
# Define functions for serial port waiting
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait()
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set()
self.alive = False
self.stop()
# Start Serial Port Function
def start(self):
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = 115200
# Set wait time if stop waiting exceeds
self.l_serial.timeout = 2
self.l_serial.open()
# Determine if the serial port is open
if self.l_serial.isOpen():
self.waitEnd = threading.Event()
self.alive = True
self.thread_read = None
self.thread_read = threading.Thread(target=self.FirstReader)
self.thread_read.setDaemon(1)
self.thread_read.start()
return True
else:
return False
Once the class is created, the function of serial port reading is implemented, which reads data as follows:
First, create a string to hold the received data:
data = ''
data = data.encode('utf-8')# Because the serial port uses bytes, it needs to be transcoded, otherwise the serial port will not be recognized
Start receiving data after you have created a variable to receive data:
n = self.l_serial.inWaiting()# Get Received Data Length
if n:
# Read and save data data
data = data + self.l_serial.read(n)
# Output Received Data
print('get data from serial port:', data)
# display data Type of so that errors can be checked if they occur
print(type(data))
After the data is received, it is necessary to process the received data and extract useful information. Because the lower computer uses different protocols, so the processing method is also different. I use the protocol **+ID+*Data+*, so my extraction method is as follows:
# Get Length of Data Not Received
n = self.l_serial.inWaiting()
# Determine if all the data transferred from the lower computer has been extracted to prevent all data from being retrieved before
if len(data)>0 and n==0:
try:
# Convert data to character type
temp = data.decode('gb18030')
# output temp Type, see if transcoding is successful
print(type(temp))
# Output Received Data
print(temp)
Split data by line break and output
car,temp = str(temp).split("\n",1)
print(car,temp)
# take temp Press ':' Split and get the 2 Data
string = str(temp).strip().split(":")[1]
# Since the previously split data type is a list, it needs to be converted to a string and followed by '*' Divide, get what we need Id and data
str_ID,str_data = str(string).split("*",1)
print(str_ID)
print(str_data)
print(type(str_ID),type(str_data))
# judge data Last 1 Is Bit Yes '*' Exit if not output exception
if str_data[-1]== '*':
break
else:
print(str_data[-1])
print('str_data[-1]!=*')
except:
print(" Error reading card, please try again! \n")
The output is:
get data from serial port:b'ID:4A622E29\n\xbf\xa8\xd6\xd0\xbf\xe924\xca\xfd\xbe\xdd\xce\xaa:1*80*\r\n'
<class 'bytes'>
<class 'str'>
ID:4A622E29
Card Block 24 Data is :1*80*
ID:4A622E29 Card Block 24 Data is :1*80*
80*
<class 'str'> <class 'str'>
The complete code for the serial assistant is as follows:
#coding=gb18030
import threading
import time
import serial
class ComThread:
def __init__(self, Port='COM3'):
self.l_serial = None
self.alive = False
self.waitEnd = None
self.port = Port
self.ID = None
self.data = None
def waiting(self):
if not self.waitEnd is None:
self.waitEnd.wait()
def SetStopEvent(self):
if not self.waitEnd is None:
self.waitEnd.set()
self.alive = False
self.stop()
def start(self):
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = 115200
self.l_serial.timeout = 2
self.l_serial.open()
if self.l_serial.isOpen():
self.waitEnd = threading.Event()
self.alive = True
self.thread_read = None
self.thread_read = threading.Thread(target=self.FirstReader)
self.thread_read.setDaemon(1)
self.thread_read.start()
return True
else:
return False
def SendDate(self,i_msg,send):
lmsg = ''
isOK = False
if isinstance(i_msg):
lmsg = i_msg.encode('gb18030')
else:
lmsg = i_msg
try:
# Send data to the appropriate processing component
self.l_serial.write(send)
except Exception as ex:
pass;
return isOK
def FirstReader(self):
while self.alive:
time.sleep(0.1)
data = ''
data = data.encode('utf-8')
n = self.l_serial.inWaiting()
if n:
data = data + self.l_serial.read(n)
print('get data from serial port:', data)
print(type(data))
n = self.l_serial.inWaiting()
if len(data)>0 and n==0:
try:
temp = data.decode('gb18030')
print(type(temp))
print(temp)
car,temp = str(temp).split("\n",1)
print(car,temp)
string = str(temp).strip().split(":")[1]
str_ID,str_data = str(string).split("*",1)
print(str_ID)
print(str_data)
print(type(str_ID),type(str_data))
if str_data[-1]== '*':
break
else:
print(str_data[-1])
print('str_data[-1]!=*')
except:
print(" Error reading card, please try again! \n")
self.ID = str_ID
self.data = str_data[0:-1]
self.waitEnd.set()
self.alive = False
def stop(self):
self.alive = False
self.thread_read.join()
if self.l_serial.isOpen():
self.l_serial.close()
# Call Serial Port, Test Serial Port
def main():
rt = ComThread()
rt.sendport = '**1*80*'
try:
if rt.start():
print(rt.l_serial.name)
rt.waiting()
print("The data is:%s,The Id is:%s"%(rt.data,rt.ID))
rt.stop()
else:
pass
except Exception as se:
print(str(se))
if rt.alive:
rt.stop()
print('')
print ('End OK .')
temp_ID=rt.ID
temp_data=rt.data
del rt
return temp_ID,temp_data
if __name__ == '__main__':
# Set up 1 A main function that runs a window so that it can be called directly if the serial port needs to be called elsewhere main function
ID,data = main()
print("******")
print(ID,data)