Analysis of python concurrent network communication model

  • 2021-11-10 10:16:50
  • OfStack

Directory 1. Common model classification 1.1, circular server model 1.2, IO concurrency model 1.3, multi-process/thread network concurrency model 2. fork-based multi-process network concurrency model 3. threading-based multi-thread network concurrency 4. ftp file server 4.1, project function 4.2, overall structure design 5. IO concurrency 5.1, IO classification 5.2, IO multiplexing 5.3, bit operation 5.4, poll method to implement IO multiplexing 5.5, epoll method

1. Classification of common models

1.1. Circular server model

Receive client requests cyclically and process requests. Only one request can be processed at the same time, and the next one will be processed after processing.

Advantages: Simple implementation and less resource occupation Disadvantages: Unable to process multiple client requests at the same time Applicability: The processing task can be completed quickly, and the client does not need to occupy the server program for a long time. udp is more suitable for cycling than tcp.

1.2. IO concurrency model

Using IO multiplexing, asynchronous IO and other technologies, multiple client IO requests are processed simultaneously.

Advantages: Less resource consumption, and can efficiently handle multiple IO behaviors at the same time Disadvantages: It can only handle concurrent IO events, but cannot handle cpu calculations Applicability: HTTP request, network transmission, etc. are all IO behaviors.

1.3. Multi-process/thread network concurrency model

Whenever a client connects to the server, a new process/thread is created to serve the client, and the process/thread is destroyed when the client exits.

Advantages: It can simultaneously meet the needs of multiple clients occupying the server for a long time, and can handle various requests. Disadvantages: High resource consumption Applicable situation: The client connects less at the same time, and needs to deal with complex situations.

2. Multi-process network concurrency model based on fork

1. Create a listening socket

2. Wait for client requests to be received

3. Client Connection Creates a New Process to Handle Client Requests

4. The original process continues to wait for other clients to connect

5. If the client exits, destroy the corresponding process


from socket import *
import os
import signal

#  Create a listening socket 
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)

#  Client service function 
def handle(c):
  while True:
    data = c.recv(1024)
    if not data:
      break
    print(data.decode())
    c.send(b'OK')
  c.close()

s = socket()  # tcp Socket 
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)   #  Setting socket port reuse 
s.bind(ADDR)
s.listen(3)

signal.signal(signal.SIGCHLD,signal.SIG_IGN)    #  Handling zombie processes 

print("Listen the port %d..." % PORT)

#  Loop to wait for client connection 
while True:
  try:
    c,addr = s.accept()
  except KeyboardInterrupt:
    os._exit(0)
  except Exception as e:
    print(e)
    continue

  #  Create a child process to handle this client 
  pid = os.fork()
  if pid == 0:  #  Processing client requests 
    s.close()
    handle(c)
    os._exit(0)  # handle After processing the client request, the sub-process also exits 

  #  Whether an error occurs or the parent process loops back to accept the request 
  # c No use for parent process 
  c.close()

3. Multi-threaded network concurrency based on threading

1. Create a listening socket

2. Receive client connection requests in a loop

3. Create a thread to process client requests when there is a new client connection

4. The main thread continues to wait for other clients to connect

5. When the client exits, the corresponding branch thread exits


from socket import *
from threading import Thread
import sys

#  Create a listening socket 
HOST = '0.0.0.0'
PORT = 8888
ADDR = (HOST,PORT)

#  Processing client requests 
def handle(c):
  while True:
    data = c.recv(1024)
    if not data:
      break
    print(data.decode())
    c.send(b'OK')
  c.close()

s = socket()  # tcp Socket 
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(ADDR)
s.listen(3)

print("Listen the port %d..."%PORT)
#  Loop to wait for client connection 
while True:
  try:
    c,addr = s.accept()
  except KeyboardInterrupt:
    sys.exit(" Server exit ")
  except Exception as e:
    print(e)
    continue

  #  Create a thread to process client requests 
  t = Thread(target=handle, args=(c,))
  t.setDaemon(True)   #  When the parent process ends, all processes terminate 
  t.start()

4. ftp File Server

4.1. Project Functions

The client has a simple page command prompt: the functions include:

View the list of files in the server file library (normal files) You can download one of these files locally You can upload client files to the server file library

Server requirements:

Allow multiple clients to operate at the same time Each client may send back commands in succession

Technical analysis:

tcp sockets are more suitable for file transfer Concurrency scheme-"fork multi-process concurrency Read and write operations on files get List of files--"os. listdir ()

Treatment of sticky package

4.2. Overall structural design

Server functionality is encapsulated in classes (upload, download, view lists) The socket is created, and the process function calls main () The client is responsible for initiating request, receiving reply and displaying The server is responsible for accepting the request and processing it logically

ftp server:


from socket import *
from threading import Thread
import os
import time

# Global variable 
HOST = '0.0.0.0'
PORT = 8080
ADDR = (HOST,PORT)
FTP = "/home/tarena/FTP/"  # File library location 

#  Create a file server server function class 
class FTPServer(Thread):
  def __init__(self,connfd):
    self.connfd = connfd
    super().__init__()

  def do_list(self):
    # Get a list of files 
    files = os.listdir(FTP)
    if not files:
      self.connfd.send(" File library is empty ".encode())
      return
    else:
      self.connfd.send(b'OK')
      time.sleep(0.1)  # Prevent and send content from sticking packets later 

    # Splice file list 
    files_ = ""
    for file in files:
      if file[0] != '.' and \
              os.path.isfile(FTP+file):
        files_ += file + '\n'
    self.connfd.send(files_.encode())

  def do_get(self,filename):
    try:
      fd = open(FTP+filename,'rb')
    except Exception:
      self.connfd.send(" File does not exist ".encode())
      return
    else:
      self.connfd.send(b'OK')
      time.sleep(0.1)
    # File sending 
    while True:
      data = fd.read(1024)
      if not data:
        time.sleep(0.1)
        self.connfd.send(b'##')
        break
      self.connfd.send(data)

  # Receive client requests in a loop 
  def run(self):
    while True:
      data = self.connfd.recv(1024).decode()
      if not data or data == 'Q':
        return 
      elif data == 'L':
        self.do_list()
      elif data[0] == 'G':   # G filename
        filename = data.split(' ')[-1]
        self.do_get(filename)

#  Network construction 
def main():
  #  Create a socket 
  sockfd = socket()
  sockfd.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
  sockfd.bind(ADDR)
  sockfd.listen(3)
  print("Listen the port %d..."%PORT)
  while True:
    try:
      connfd,addr = sockfd.accept()
      print("Connect from",addr)
    except KeyboardInterrupt:
      print(" Server program exit ")
      return
    except Exception as e:
      print(e)
      continue

    # Create a new threaded client 
    client = FTPServer(connfd)
    client.setDaemon(True)
    client.start()   # Run the run method 


if __name__ == "__main__":
  main()

ftp client:


from socket import *
import sys

ADDR = ('127.0.0.1',8080) #  Server address 

# Client function processing class 
class FTPClient:
  def __init__(self,sockfd):
    self.sockfd = sockfd

  def do_list(self):
    self.sockfd.send(b'L')  # Send a request 
    #  Waiting for a reply 
    data = self.sockfd.recv(128).decode()
    if data == 'OK':
      #   1 Secondary received file list string 
      data = self.sockfd.recv(4096)
      print(data.decode())
    else:
      print(data)

  def do_get(self,filename):
    # Send a request 
    self.sockfd.send(('G '+filename).encode())
    #  Waiting for a reply 
    data = self.sockfd.recv(128).decode()
    if data == 'OK':
      fd = open(filename,'wb')
      # Receive files 
      while True:
        data = self.sockfd.recv(1024)
        if data == b'##':
          break
        fd.write(data)
      fd.close()
    else:
      print(data)

  def do_quit(self):
    self.sockfd.send(b'Q')
    self.sockfd.close()
    sys.exit(" Thank you for using ")

# Create a client network 
def main():
  sockfd = socket()
  try:
    sockfd.connect(ADDR)
  except Exception as e:
    print(e)
    return

  ftp = FTPClient(sockfd) # Instantiate object 

  # Cyclic send request 
  while True:
    print("\n========= Command options ==========")
    print("****      list         ****")
    print("****    get file       ****")
    print("****    put file       ****")
    print("****      quit         ****")
    print("=============================")

    cmd = input(" Enter the command: ")

    if cmd.strip() == 'list':
      ftp.do_list()
    elif cmd[:3] == 'get':
      #   get filename
      filename = cmd.strip().split(' ')[-1]
      ftp.do_get(filename)
    elif cmd[:3] == 'put':
      #   put ../filename
      filename = cmd.strip().split(' ')[-1]
      ftp.do_put(filename)
    elif cmd.strip() == 'quit':
      ftp.do_quit()
    else:
      print(" Please enter the correct command ")



if __name__ == "__main__":
  main()

5. IO concurrent

Definition: The operations of data exchange in memory are defined as IO operations, IO-------------------------------

Data exchange between memory and disk: reading and writing files, updating database

Memory and terminal data exchange: input print sys. stdin sys. stdout sys. stderr

Memory and Network Data Exchange: Network Connection recvsendrecvfrom

IO-intensive programs: There are a large number of IO operations and a small number of cpu operations in program execution. The consumption of cpu is less, and the running time of IO is longer

CPU (computation) intensive program: There are a large number of cpu operations in the program, while IO operations are relatively few and cpu consumption is large.

5.1. IO Classification

IO is divided into blocking IO, non-blocking IO, IO multiplexing, event-driven IO and asynchronous IO

Blocking IO

Definition: When performing IO operation, if the execution condition is not met, it will block. Blocking IO is the default form of IO. Efficiency: Blocking IO is a kind of IO with very low efficiency. However, because of its simple logic, it is the default IO behavior.

Blocking condition:

Function blocking because some execution condition is not met e. g. accept input recv Blocking state e. g caused by processing IO for a long time. Network transmission, reading and writing of large files

Non-blocking IO

Definition: By modifying the IO property behavior, the originally blocked IO becomes a non-blocking state.

Set the socket to non-blocking IO

sockfd.setblocking(bool) Function: Set socket to non-blocking IO Parameter: The default is True, indicating that the socket IO is blocked; Set to False to make socket IO non-blocking

Timeout detection: Set a maximum blocking time, after which it will no longer block and wait.

sockfd.settimeout(sec) Function: Set the timeout time for sockets Parameter: Set time

5.2, IO Multiplexing

Definition: With one monitor, you can monitor the behavior of multiple IO events at the same time. When an IO event can be executed, let the IO event occur.

rs, ws, xs = select (rlist, wlist, xlist [, timeout]) Monitor IO events, blocking IO time waiting for monitoring to occur

Parameters:

rlist list, storing (passively) IO awaiting processing (receive) wlist list for active processing of IO (send) xlist list, storing the error and IO (exception) that you want to handle timeout timeout detection

Return value:

rs List IO Ready in rlist ws List IO Ready in wlist xs List IO Ready in xlist

select Implementation of tcp Service

1. Put the IO of interest into the corresponding monitor category list

2. Monitoring through the select function

3. Traverse the list of select return values to determine that IO events are ready

4. Handle IO events that occur


from socket import *
from select import select

# Create 1 Listening sockets as IO of concern 
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)

# Set up a list of concerns 
rlist = [s]
wlist = []
xlist = [s]

# Cyclic monitoring of IO 
while True:
  rs,ws,xs = select(rlist,wlist,xlist)
  #  Traversal 3 Return list , Processing IO 
  for r in rs:
    #  Use if points according to different traversal to IO 
    if r is s:
      c,addr = r.accept()
      print("Connect from",addr)
      rlist.append(c) # Add a new IO event 
    # else Socket Readiness for Client 
    else:
      data = r.recv(1024)
      #  Client exit 
      if not data:
        rlist.remove(r) # Remove from the list of concerns 
        r.close()
        continue #  Continue to process other ready IO 
      print("Receive:",data.decode())
      # r.send(b'OK')
      # We want to proactively handle this IO object 
      wlist.append(r)

  for w in ws:
    w.send(b'OK')
    wlist.remove(w) # Remove after use 

  for x in xs:
    pass

Note:

If there is an IO event in wlist, select immediately returns to ws In the process of processing IO, do not occupy the server in an infinite loop IO multiplexing consumes less resources and is more efficient. Expansion:

5.3, bit operation

Convert an integer to binary, and operate operators according to binary bits
& Bitwise and Bitwise or ^ Bitwise Exclusive OR < < Left shift > > Right shift
111011 14 111 0
(11 & 141010) (11 141 111) (11 ^ 140101)
11 < < 2=== > 44 Right Fill 0 14 > > 2 === > 3 Squeeze out the number on the right

Use:

Operate registers while making underlying hardware Filter the flag bits

5.4. Implementation of IO Multiplexing by poll Method

Create an poll object: p = select. poll ()

Register IO events of interest: p. register (fd, event)

IO to be concerned about fd IO Event Types to Focus on for event

Common types:

POLLIN Read IO Event (rlist) POLLOUT Write IO Event (wlist) POLLERR anomaly IO (xlist) POLLHUP Disconnect

Unfocus on IO: p. unregister (fd)

Parameter: IO object or fileno of IO object

events = p. poll ():

Function: Block IO events waiting for monitoring to occur Return value: Returns the IO event that occurred

events is a list [(fileno, evnet), (), ()...]

Each tuple is a ready IO, the first item of the tuple is the fileno of the IO, and the second item is the event type of the IO ready

poll_server step

1. Create a socket

2. Put the socket register

3. Create a lookup dictionary and maintain

4. Cyclic monitoring of IO occurrence

5. Handling IO occurrences


from socket import *
from select import *

# Create a socket 
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)

#  Create a poll object to focus on s 
p = poll()

# Create a lookup dictionary to find IO objects through fileno 
fdmap = {s.fileno():s}

#  Focus on s 
p.register(s,POLLIN|POLLERR)

# Cyclic monitoring 
while True:
  events = p.poll()
  # Loop through the events that occur fd -->fileno
  for fd,event in events:
    # Distinguish events for processing 
    if fd == s.fileno():
      c,addr = fdmap[fd].accept()
      print("Connect from",addr)
      # Add a new concern IO 
      p.register(c,POLLIN|POLLERR)
      fdmap[c.fileno()] = c # Dictionary maintenance 
    # Bitwise AND decision is POLLIN ready 
    elif event & POLLIN:
      data = fdmap[fd].recv(1024)
      if not data:
        p.unregister(fd) # Cancel attention 
        fdmap[fd].close()
        del fdmap[fd]  # Delete from the dictionary 
        continue
      print("Receive:",data.decode())
      fdmap[fd].send(b'OK')

5.5. epoll method

1. How to use: Basically the same as poll

The generated object is changed to epoll () Change all event types to EPOLL types

2. epoll Features

The efficiency of epoll is higher than that of select poll epoll monitors more IO than select epoll has more triggering modes than poll (EPOLLET edge triggering)

from socket import *
from select import *

# Create a socket 
s = socket()
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind(('0.0.0.0',8888))
s.listen(3)

#  Create e The poll object focuses on s 
ep = epoll()

# Create a lookup dictionary to find IO objects through fileno 
fdmap = {s.fileno():s}

#  Focus on s 
ep.register(s,EPOLLIN|EPOLLERR)

# Cyclic monitoring 
while True:
  events = ep.poll()
  # Loop through the events that occur fd -->fileno
  for fd,event in events:
    print(" Dear, you have IO to deal with ")
    # Distinguish events for processing 
    if fd == s.fileno():
      c,addr = fdmap[fd].accept()
      print("Connect from",addr)
      # Add a new concern IO 
      # Change the trigger mode to edge trigger 
      ep.register(c,EPOLLIN|EPOLLERR|EPOLLET)
      fdmap[c.fileno()] = c # Dictionary maintenance 
    # Bitwise AND decision is E POLLIN ready 
    # elif event & EPOLLIN:
    #   data = fdmap[fd].recv(1024)
    #   if not data:
    #     ep.unregister(fd) # Cancel attention 
    #     fdmap[fd].close()
    #     del fdmap[fd]  # Delete from the dictionary 
    #     continue
    #   print("Receive:",data.decode())
    #   fdmap[fd].send(b'OK')

The above is the analysis of python concurrent network communication model details, more information about python concurrent network communication model please pay attention to other related articles on this site!


Related articles: