Analysis of python concurrent network communication model
- 2021-11-10 10:16:50
- OfStack
1. Classification of common models
1.1. Circular server model
Receive client requests cyclically and process requests. Only one request can be processed at the same time, and the next one will be processed after processing.
Advantages: Simple implementation and less resource occupation Disadvantages: Unable to process multiple client requests at the same time Applicability: The processing task can be completed quickly, and the client does not need to occupy the server program for a long time. udp is more suitable for cycling than tcp.
1.2. IO concurrency model
Using IO multiplexing, asynchronous IO and other technologies, multiple client IO requests are processed simultaneously.
Advantages: Less resource consumption, and can efficiently handle multiple IO behaviors at the same time Disadvantages: It can only handle concurrent IO events, but cannot handle cpu calculations Applicability: HTTP request, network transmission, etc. are all IO behaviors.
1.3. Multi-process/thread network concurrency model
Whenever a client connects to the server, a new process/thread is created to serve the client, and the process/thread is destroyed when the client exits.
Advantages: It can simultaneously meet the needs of multiple clients occupying the server for a long time, and can handle various requests. Disadvantages: High resource consumption Applicable situation: The client connects less at the same time, and needs to deal with complex situations.
2. Multi-process network concurrency model based on fork
1. Create a listening socket
2. Wait for client requests to be received
3. Client Connection Creates a New Process to Handle Client Requests
4. The original process continues to wait for other clients to connect
5. If the client exits, destroy the corresponding process
from socket import *
import os
import signal
# Create a listening socket
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)
# Client service function
def handle(c):
while True:
data = c.recv(1024)
if not data:
break
print(data.decode())
c.send(b'OK')
c.close()
s = socket() # tcp Socket
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) # Setting socket port reuse
s.bind(ADDR)
s.listen(3)
signal.signal(signal.SIGCHLD,signal.SIG_IGN) # Handling zombie processes
print("Listen the port %d..." % PORT)
# Loop to wait for client connection
while True:
try:
c,addr = s.accept()
except KeyboardInterrupt:
os._exit(0)
except Exception as e:
print(e)
continue
# Create a child process to handle this client
pid = os.fork()
if pid == 0: # Processing client requests
s.close()
handle(c)
os._exit(0) # handle After processing the client request, the sub-process also exits
# Whether an error occurs or the parent process loops back to accept the request
# c No use for parent process
c.close()
3. Multi-threaded network concurrency based on threading
1. Create a listening socket
2. Receive client connection requests in a loop
3. Create a thread to process client requests when there is a new client connection
4. The main thread continues to wait for other clients to connect
5. When the client exits, the corresponding branch thread exits
from socket import *
from threading import Thread
import sys
# Create a listening socket
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)
# Processing client requests
def handle(c):
while True:
data = c.recv(1024)
if not data:
break
print(data.decode())
c.send(b'OK')
c.close()
s = socket() # tcp Socket
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(ADDR)
s.listen(3)
print("Listen the port %d..."%PORT)
# Loop to wait for client connection
while True:
try:
c,addr = s.accept()
except KeyboardInterrupt:
sys.exit(" Server exit ")
except Exception as e:
print(e)
continue
# Create a thread to process client requests
t = Thread(target=handle, args=(c,))
t.setDaemon(True) # When the parent process ends, all processes terminate
t.start()
4. ftp File Server
4.1. Project Functions
The client has a simple page command prompt: the functions include:
View the list of files in the server file library (normal files) You can download one of these files locally You can upload client files to the server file libraryServer requirements:
Allow multiple clients to operate at the same time Each client may send back commands in successionTechnical analysis:
tcp sockets are more suitable for file transfer Concurrency scheme-"fork multi-process concurrency Read and write operations on files get List of files--"os. listdir ()Treatment of sticky package
4.2. Overall structural design
Server functionality is encapsulated in classes (upload, download, view lists) The socket is created, and the process function calls main () The client is responsible for initiating request, receiving reply and displaying The server is responsible for accepting the request and processing it logicallyftp server:
from socket import *
from threading import Thread
import os
import time
# Global variable
HOST = '0.0.0.0'
PORT = 8080
ADDR = (HOST,PORT)
FTP = "/home/tarena/FTP/" # File library location
# Create a file server server function class
class FTPServer(Thread):
def __init__(self,connfd):
self.connfd = connfd
super().__init__()
def do_list(self):
# Get a list of files
files = os.listdir(FTP)
if not files:
self.connfd.send(" File library is empty ".encode())
return
else:
self.connfd.send(b'OK')
time.sleep(0.1) # Prevent and send content from sticking packets later
# Splice file list
files_ = ""
for file in files:
if file[0] != '.' and \
os.path.isfile(FTP+file):
files_ += file + '\n'
self.connfd.send(files_.encode())
def do_get(self,filename):
try:
fd = open(FTP+filename,'rb')
except Exception:
self.connfd.send(" File does not exist ".encode())
return
else:
self.connfd.send(b'OK')
time.sleep(0.1)
# File sending
while True:
data = fd.read(1024)
if not data:
time.sleep(0.1)
self.connfd.send(b'##')
break
self.connfd.send(data)
# Receive client requests in a loop
def run(self):
while True:
data = self.connfd.recv(1024).decode()
if not data or data == 'Q':
return
elif data == 'L':
self.do_list()
elif data[0] == 'G': # G filename
filename = data.split(' ')[-1]
self.do_get(filename)
# Network construction
def main():
# Create a socket
sockfd = socket()
sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
sockfd.bind(ADDR)
sockfd.listen(3)
print("Listen the port %d..."%PORT)
while True:
try:
connfd,addr = sockfd.accept()
print("Connect from",addr)
except KeyboardInterrupt:
print(" Server program exit ")
return
except Exception as e:
print(e)
continue
# Create a new threaded client
client = FTPServer(connfd)
client.setDaemon(True)
client.start() # Run the run method
if __name__ == "__main__":
main()
ftp client:
from socket import *
import sys
ADDR = ('127.0.0.1',8080) # Server address
# Client function processing class
class FTPClient:
def __init__(self,sockfd):
self.sockfd = sockfd
def do_list(self):
self.sockfd.send(b'L') # Send a request
# Waiting for a reply
data = self.sockfd.recv(128).decode()
if data == 'OK':
# 1 Secondary received file list string
data = self.sockfd.recv(4096)
print(data.decode())
else:
print(data)
def do_get(self,filename):
# Send a request
self.sockfd.send(('G '+filename).encode())
# Waiting for a reply
data = self.sockfd.recv(128).decode()
if data == 'OK':
fd = open(filename,'wb')
# Receive files
while True:
data = self.sockfd.recv(1024)
if data == b'##':
break
fd.write(data)
fd.close()
else:
print(data)
def do_quit(self):
self.sockfd.send(b'Q')
self.sockfd.close()
sys.exit(" Thank you for using ")
# Create a client network
def main():
sockfd = socket()
try:
sockfd.connect(ADDR)
except Exception as e:
print(e)
return
ftp = FTPClient(sockfd) # Instantiate object
# Cyclic send request
while True:
print("\n========= Command options ==========")
print("**** list ****")
print("**** get file ****")
print("**** put file ****")
print("**** quit ****")
print("=============================")
cmd = input(" Enter the command: ")
if cmd.strip() == 'list':
ftp.do_list()
elif cmd[:3] == 'get':
# get filename
filename = cmd.strip().split(' ')[-1]
ftp.do_get(filename)
elif cmd[:3] == 'put':
# put ../filename
filename = cmd.strip().split(' ')[-1]
ftp.do_put(filename)
elif cmd.strip() == 'quit':
ftp.do_quit()
else:
print(" Please enter the correct command ")
if __name__ == "__main__":
main()
5. IO concurrent
Definition: The operations of data exchange in memory are defined as IO operations, IO-------------------------------
Data exchange between memory and disk: reading and writing files, updating database
Memory and terminal data exchange: input print sys. stdin sys. stdout sys. stderr
Memory and Network Data Exchange: Network Connection recvsendrecvfrom
IO-intensive programs: There are a large number of IO operations and a small number of cpu operations in program execution. The consumption of cpu is less, and the running time of IO is longer
CPU (computation) intensive program: There are a large number of cpu operations in the program, while IO operations are relatively few and cpu consumption is large.
5.1. IO Classification
IO is divided into blocking IO, non-blocking IO, IO multiplexing, event-driven IO and asynchronous IO
Blocking IO
Definition: When performing IO operation, if the execution condition is not met, it will block. Blocking IO is the default form of IO. Efficiency: Blocking IO is a kind of IO with very low efficiency. However, because of its simple logic, it is the default IO behavior.Blocking condition:
Function blocking because some execution condition is not met e. g. accept input recv Blocking state e. g caused by processing IO for a long time. Network transmission, reading and writing of large filesNon-blocking IO
Definition: By modifying the IO property behavior, the originally blocked IO becomes a non-blocking state.
Set the socket to non-blocking IO
sockfd.setblocking(bool) Function: Set socket to non-blocking IO Parameter: The default is True, indicating that the socket IO is blocked; Set to False to make socket IO non-blockingTimeout detection: Set a maximum blocking time, after which it will no longer block and wait.
sockfd.settimeout(sec) Function: Set the timeout time for sockets Parameter: Set time
5.2, IO Multiplexing
Definition: With one monitor, you can monitor the behavior of multiple IO events at the same time. When an IO event can be executed, let the IO event occur.
rs, ws, xs = select (rlist, wlist, xlist [, timeout]) Monitor IO events, blocking IO time waiting for monitoring to occur
Parameters:
rlist list, storing (passively) IO awaiting processing (receive) wlist list for active processing of IO (send) xlist list, storing the error and IO (exception) that you want to handle timeout timeout detectionReturn value:
rs List IO Ready in rlist ws List IO Ready in wlist xs List IO Ready in xlistselect Implementation of tcp Service
1. Put the IO of interest into the corresponding monitor category list
2. Monitoring through the select function
3. Traverse the list of select return values to determine that IO events are ready
4. Handle IO events that occur
from socket import *
from select import select
# Create 1 Listening sockets as IO of concern
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# Set up a list of concerns
rlist = [s]
wlist = []
xlist = [s]
# Cyclic monitoring of IO
while True:
rs,ws,xs = select(rlist,wlist,xlist)
# Traversal 3 Return list , Processing IO
for r in rs:
# Use if points according to different traversal to IO
if r is s:
c,addr = r.accept()
print("Connect from",addr)
rlist.append(c) # Add a new IO event
# else Socket Readiness for Client
else:
data = r.recv(1024)
# Client exit
if not data:
rlist.remove(r) # Remove from the list of concerns
r.close()
continue # Continue to process other ready IO
print("Receive:",data.decode())
# r.send(b'OK')
# We want to proactively handle this IO object
wlist.append(r)
for w in ws:
w.send(b'OK')
wlist.remove(w) # Remove after use
for x in xs:
pass
Note:
If there is an IO event in wlist, select immediately returns to ws In the process of processing IO, do not occupy the server in an infinite loop IO multiplexing consumes less resources and is more efficient. Expansion:
5.3, bit operation
Convert an integer to binary, and operate operators according to binary bits
&
Bitwise and Bitwise or ^ Bitwise Exclusive OR
<
<
Left shift
>
>
Right shift
111011 14 111 0
(11
&
141010) (11 141 111) (11 ^ 140101)
11
<
<
2===
>
44 Right Fill 0 14
>
>
2 ===
>
3 Squeeze out the number on the right
Use:
Operate registers while making underlying hardware Filter the flag bits
5.4. Implementation of IO Multiplexing by poll Method
Create an poll object: p = select. poll ()
Register IO events of interest: p. register (fd, event)
IO to be concerned about fd IO Event Types to Focus on for eventCommon types:
POLLIN Read IO Event (rlist) POLLOUT Write IO Event (wlist) POLLERR anomaly IO (xlist) POLLHUP DisconnectUnfocus on IO: p. unregister (fd)
Parameter: IO object or fileno of IO object
events = p. poll ():
Function: Block IO events waiting for monitoring to occur Return value: Returns the IO event that occurredevents is a list [(fileno, evnet), (), ()...]
Each tuple is a ready IO, the first item of the tuple is the fileno of the IO, and the second item is the event type of the IO ready
poll_server step
1. Create a socket
2. Put the socket register
3. Create a lookup dictionary and maintain
4. Cyclic monitoring of IO occurrence
5. Handling IO occurrences
from socket import *
from select import *
# Create a socket
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# Create a poll object to focus on s
p = poll()
# Create a lookup dictionary to find IO objects through fileno
fdmap = {s.fileno():s}
# Focus on s
p.register(s,POLLIN|POLLERR)
# Cyclic monitoring
while True:
events = p.poll()
# Loop through the events that occur fd -->fileno
for fd,event in events:
# Distinguish events for processing
if fd == s.fileno():
c,addr = fdmap[fd].accept()
print("Connect from",addr)
# Add a new concern IO
p.register(c,POLLIN|POLLERR)
fdmap[c.fileno()] = c # Dictionary maintenance
# Bitwise AND decision is POLLIN ready
elif event & POLLIN:
data = fdmap[fd].recv(1024)
if not data:
p.unregister(fd) # Cancel attention
fdmap[fd].close()
del fdmap[fd] # Delete from the dictionary
continue
print("Receive:",data.decode())
fdmap[fd].send(b'OK')
5.5. epoll method
1. How to use: Basically the same as poll
The generated object is changed to epoll () Change all event types to EPOLL types2. epoll Features
The efficiency of epoll is higher than that of select poll epoll monitors more IO than select epoll has more triggering modes than poll (EPOLLET edge triggering)
from socket import *
from select import *
# Create a socket
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)
# Create e The poll object focuses on s
ep = epoll()
# Create a lookup dictionary to find IO objects through fileno
fdmap = {s.fileno():s}
# Focus on s
ep.register(s,EPOLLIN|EPOLLERR)
# Cyclic monitoring
while True:
events = ep.poll()
# Loop through the events that occur fd -->fileno
for fd,event in events:
print(" Dear, you have IO to deal with ")
# Distinguish events for processing
if fd == s.fileno():
c,addr = fdmap[fd].accept()
print("Connect from",addr)
# Add a new concern IO
# Change the trigger mode to edge trigger
ep.register(c,EPOLLIN|EPOLLERR|EPOLLET)
fdmap[c.fileno()] = c # Dictionary maintenance
# Bitwise AND decision is E POLLIN ready
# elif event & EPOLLIN:
# data = fdmap[fd].recv(1024)
# if not data:
# ep.unregister(fd) # Cancel attention
# fdmap[fd].close()
# del fdmap[fd] # Delete from the dictionary
# continue
# print("Receive:",data.decode())
# fdmap[fd].send(b'OK')
The above is the analysis of python concurrent network communication model details, more information about python concurrent network communication model please pay attention to other related articles on this site!