Introduction to the twisted framework under Python

  • 2020-05-10 18:21:33
  • OfStack

What is twisted?

twisted is an event-driven networking framework written in python. It supports many protocols, including UDP,TCP,TLS, and other application-layer protocols, such as HTTP, SMTP, NNTM, IRC, XMPP/Jabber. A good one is the twisted implementation and the many application layer protocols that developers can simply use to implement these protocols. It is quite simple to modify Twisted's SSH server-side implementation. Many times, developers need to implement the protocol class.

An Twisted program consists of a main loop initiated by reactor and some callback functions. When an event occurs, such as an client connecting to server, the server-side event is triggered for execution.
Write a simple TCP server with Twisted

The code below is an TCPServer, which records the data sent by the client.


==== code1.py ====
import sys
from twisted.internet.protocol import ServerFactory
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet import reactor

class CmdProtocol(LineReceiver):

  delimiter = '\n'

  def connectionMade(self):
    self.client_ip = self.transport.getPeer()[1]
    log.msg("Client connection from %s" % self.client_ip)
    if len(self.factory.clients) >= self.factory.clients_max:
      log.msg("Too many connections. bye !")
      self.client_ip = None
      self.transport.loseConnection()
    else:
      self.factory.clients.append(self.client_ip)

  def connectionLost(self, reason):
    log.msg('Lost client connection. Reason: %s' % reason)
    if self.client_ip:
      self.factory.clients.remove(self.client_ip)

  def lineReceived(self, line):
    log.msg('Cmd received from %s : %s' % (self.client_ip, line))

class MyFactory(ServerFactory):

  protocol = CmdProtocol

  def __init__(self, clients_max=10):
    self.clients_max = clients_max
    self.clients = []

log.startLogging(sys.stdout)
reactor.listenTCP(9999, MyFactory(2))
reactor.run()

The following code is critical:


from twisted.internet import reactor
reactor.run()

These two lines of code will start the main reator loop.

In the code above we created the "ServerFactory" class, which is responsible for returning an instance of "CmdProtocol". Each connection is handled by an instantiated "CmdProtocol" instance. reactor of Twisted automatically creates an instance of CmdProtocol after an TCP connection. As you can see, the methods of the protocol class correspond to one type of event handling.

When client connects to server, it triggers the "connectionMade" method, where you can do things like authentication and limit the total number of connections to the client. Each instance of protocol has a reference to a factory, which can be accessed using self.factory.

The implementation of the "CmdProtocol" is twisted. protocols. basic. LineReceiver subclass, LineReceiver class will have the client sends data in accordance with the newline character space, to trigger every one newline lineReceived method. Later we can enhance LineReceived to parse the command.

Twisted implements its own logging system, where we configure the logging output to stdout

When we execute reactor.listenTCP we bind the factory to port 9999 and start listening.


user@lab:~/TMP$ python code1.py
2011-08-29 13:32:32+0200 [-] Log opened.
2011-08-29 13:32:32+0200 [-] __main__.MyFactory starting on 9999
2011-08-29 13:32:32+0200 [-] Starting factory <__main__.MyFactory instance at 0x227e320
2011-08-29 13:32:35+0200 [__main__.MyFactory] Client connection from 127.0.0.1
2011-08-29 13:32:38+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : hello server

Use Twisted to call external processes

Let's add a command to the previous server to read /var/log/syslog


import sys
import os

from twisted.internet.protocol import ServerFactory, ProcessProtocol
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet import reactor

class TailProtocol(ProcessProtocol):
  def __init__(self, write_callback):
    self.write = write_callback

  def outReceived(self, data):
    self.write("Begin lastlog\n")
    data = [line for line in data.split('\n') if not line.startswith('==')]
    for d in data:
      self.write(d + '\n')
    self.write("End lastlog\n")

  def processEnded(self, reason):
    if reason.value.exitCode != 0:
      log.msg(reason)

class CmdProtocol(LineReceiver):

  delimiter = '\n'

  def processCmd(self, line):
    if line.startswith('lastlog'):
      tailProtocol = TailProtocol(self.transport.write)
      reactor.spawnProcess(tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog'])
    elif line.startswith('exit'):
      self.transport.loseConnection()
    else:
      self.transport.write('Command not found.\n')

  def connectionMade(self):
    self.client_ip = self.transport.getPeer()[1]
    log.msg("Client connection from %s" % self.client_ip)
    if len(self.factory.clients) >= self.factory.clients_max:
      log.msg("Too many connections. bye !")
      self.client_ip = None
      self.transport.loseConnection()
    else:
      self.factory.clients.append(self.client_ip)

  def connectionLost(self, reason):
    log.msg('Lost client connection. Reason: %s' % reason)
    if self.client_ip:
      self.factory.clients.remove(self.client_ip)

  def lineReceived(self, line):
    log.msg('Cmd received from %s : %s' % (self.client_ip, line))
    self.processCmd(line)

class MyFactory(ServerFactory):

  protocol = CmdProtocol

  def __init__(self, clients_max=10):
    self.clients_max = clients_max
    self.clients = []

log.startLogging(sys.stdout)
reactor.listenTCP(9999, MyFactory(2))
reactor.run()

In the above code, the processCmd method will be executed after 1 line is not received from the client. If 1 line is received from the exit command, the server will be disconnected. If it is received from lastlog, we will spit out a child process to execute the tail command and redirect the output of the tail command to the client. Here we need to implement the ProcessProtocol class, and we need to override the processEnded method and outReceived method of the class. The outReceived method is executed when the tail command has output, and the processEnded method is executed when the process exits.

Here is the sample execution result:


user@lab:~/TMP$ python code2.py
2011-08-29 15:13:38+0200 [-] Log opened.
2011-08-29 15:13:38+0200 [-] __main__.MyFactory starting on 9999
2011-08-29 15:13:38+0200 [-] Starting factory <__main__.MyFactory instance at 0x1a5a3f8>
2011-08-29 15:13:47+0200 [__main__.MyFactory] Client connection from 127.0.0.1
2011-08-29 15:13:58+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : test
2011-08-29 15:14:02+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : lastlog
2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Cmd received from 127.0.0.1 : exit
2011-08-29 15:14:05+0200 [CmdProtocol,0,127.0.0.1] Lost client connection. Reason: [Failure instance: Traceback (failure with no frames): <class 'twisted.internet.error.ConnectionDone'>: Connection was closed cleanly.

You can use the following command as a client initiation command:


user@lab:~$ netcat 127.0.0.1 9999
test
Command not found.
lastlog
Begin lastlog
Aug 29 15:02:03 lab sSMTP[5919]: Unable to locate mail
Aug 29 15:02:03 lab sSMTP[5919]: Cannot open mail:25
Aug 29 15:02:03 lab CRON[4945]: (CRON) error (grandchild #4947 failed with exit status 1)
Aug 29 15:02:03 lab sSMTP[5922]: Unable to locate mail
Aug 29 15:02:03 lab sSMTP[5922]: Cannot open mail:25
Aug 29 15:02:03 lab CRON[4945]: (logcheck) MAIL (mailed 1 byte of output; but got status 0x0001, #012)
Aug 29 15:05:01 lab CRON[5925]: (root) CMD (command -v debian-sa1 > /dev/null && debian-sa1 1 1)
Aug 29 15:10:01 lab CRON[5930]: (root) CMD (test -x /usr/lib/atsar/atsa1 && /usr/lib/atsar/atsa1)
Aug 29 15:10:01 lab CRON[5928]: (CRON) error (grandchild #5930 failed with exit status 1)
Aug 29 15:13:21 lab pulseaudio[3361]: ratelimit.c: 387 events suppressed

 
End lastlog
exit

Use the Deferred object

reactor is a loop that is waiting for an event to happen. The events here can be either database operations or long-running computational operations. As long as these operations can return an Deferred object. The Deferred object can automatically trigger a callback function when an event occurs. reactor will execute the current block code.

Now we're going to use the Defferred object to compute the SHA1 hash.


import sys
import os
import hashlib

from twisted.internet.protocol import ServerFactory, ProcessProtocol
from twisted.protocols.basic import LineReceiver
from twisted.python import log
from twisted.internet import reactor, threads

class TailProtocol(ProcessProtocol):
  def __init__(self, write_callback):
    self.write = write_callback

  def outReceived(self, data):
    self.write("Begin lastlog\n")
    data = [line for line in data.split('\n') if not line.startswith('==')]
    for d in data:
      self.write(d + '\n')
    self.write("End lastlog\n")

  def processEnded(self, reason):
    if reason.value.exitCode != 0:
      log.msg(reason)

class HashCompute(object):
  def __init__(self, path, write_callback):
    self.path = path
    self.write = write_callback

  def blockingMethod(self):
    os.path.isfile(self.path)
    data = file(self.path).read()
    # uncomment to add more delay
    # import time
    # time.sleep(10)
    return hashlib.sha1(data).hexdigest()

  def compute(self):
    d = threads.deferToThread(self.blockingMethod)
    d.addCallback(self.ret)
    d.addErrback(self.err)

  def ret(self, hdata):
    self.write("File hash is : %s\n" % hdata)

  def err(self, failure):
    self.write("An error occured : %s\n" % failure.getErrorMessage())

class CmdProtocol(LineReceiver):

  delimiter = '\n'

  def processCmd(self, line):
    if line.startswith('lastlog'):
      tailProtocol = TailProtocol(self.transport.write)
      reactor.spawnProcess(tailProtocol, '/usr/bin/tail', args=['/usr/bin/tail', '-10', '/var/log/syslog'])
    elif line.startswith('comphash'):
      try:
        useless, path = line.split(' ')
      except:
        self.transport.write('Please provide a path.\n')
        return
      hc = HashCompute(path, self.transport.write)
      hc.compute()
    elif line.startswith('exit'):
      self.transport.loseConnection()
    else:
      self.transport.write('Command not found.\n')

  def connectionMade(self):
    self.client_ip = self.transport.getPeer()[1]
    log.msg("Client connection from %s" % self.client_ip)
    if len(self.factory.clients) >= self.factory.clients_max:
      log.msg("Too many connections. bye !")
      self.client_ip = None
      self.transport.loseConnection()
    else:
      self.factory.clients.append(self.client_ip)

  def connectionLost(self, reason):
    log.msg('Lost client connection. Reason: %s' % reason)
    if self.client_ip:
      self.factory.clients.remove(self.client_ip)

  def lineReceived(self, line):
    log.msg('Cmd received from %s : %s' % (self.client_ip, line))
    self.processCmd(line)

class MyFactory(ServerFactory):

  protocol = CmdProtocol

  def __init__(self, clients_max=10):
    self.clients_max = clients_max
    self.clients = []

log.startLogging(sys.stdout)
reactor.listenTCP(9999, MyFactory(2))
reactor.run()

blockingMethod reads 1 file from the file system to calculate SHA1. Here we use twisted's deferToThread method, which returns 1 Deferred object. Here the Deferred object is returned immediately after the call, so that the main process can continue to process other events. The callback function is triggered immediately after the method passed to deferToThread is executed. If there is an error in execution, the blockingMethod method throws an exception. If successfully executed, the calculated results are returned via hdata's ret.
Recommended reading materials for twisted

http://twistedmatrix.com/documents/current/core/howto/defer.html http://twistedmatrix.com/documents/current/core/howto/process.html http://twistedmatrix.com/documents/current/core/howto/servers.html

API documents:

http://twistedmatrix.com/documents/current/api/twisted.html


Related articles: