Python constructs the icmp echo request and implements the network detector function code sharing

  • 2020-04-02 13:19:53
  • OfStack

Python sends the icmp echo requesy request


import socket
import struct
def checksum(source_string):
    sum = 0
    countTo = (len(source_string)/2)*2
    count = 0
    while count<countTo:
        thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
        sum = sum + thisVal
        sum = sum & 0xffffffff 
        count = count + 2
    if countTo<len(source_string):
        sum = sum + ord(source_string[len(source_string) - 1])
        sum = sum & 0xffffffff 
    sum = (sum >> 16)  +  (sum & 0xffff)
    sum = sum + (sum >> 16)
    answer = ~sum
    answer = answer & 0xffff
    answer = answer >> 8 | (answer << 8 & 0xff00)
    return answer
def ping(ip):
    s = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
    packet = struct.pack(
            "!BBHHH", 8, 0, 0, 0, 0
    )
    chksum=checksum(packet)
    packet = struct.pack(
            "!BBHHH", 8, 0, chksum, 0, 0
    )
    s.sendto(packet, (ip, 1))
if __name__=='__main__':
    ping('192.168.41.56')


Scan detection network function (network detector)


#!/usr/bin/env python3
# -*- coding: utf-8 -*-

'''
 Detect network host survival. 
'''

import os
import struct
import array
import time
import socket
import IPy
import threading

class SendPingThr(threading.Thread):
    '''
     send ICMP The thread requesting the message. 

     Parameters: 
        ipPool      --  Can be an iterative IP Address pool 
        icmpPacket  --  Structure of the icmp message 
        icmpSocket  -- icmp Set of words by 
        timeout     --  Set the send timeout 
    '''
    def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
        threading.Thread.__init__(self)
        self.Sock = icmpSocket
        self.ipPool = ipPool
        self.packet = icmpPacket
        self.timeout = timeout
        self.Sock.settimeout( timeout + 3 )

    def run(self):
        time.sleep(0.01)  # Wait for the receiving thread to start 
        for ip in self.ipPool:
            try:
                self.Sock.sendto(self.packet, (ip, 0))
            except socket.timeout:
                break
        time.sleep(self.timeout)

class Nscan:
    '''
     Parameters: 
        timeout    -- Socket Timeout, default 3 seconds 
        IPv6       --  Whether it is IPv6 By default, False
    '''
    def __init__(self, timeout=3, IPv6=False):
        self.timeout = timeout
        self.IPv6 = IPv6

        self.__data = struct.pack('d', time.time())   # Used for ICMP Load bytes of message ( 8bit ) 
        self.__id = os.getpid()   # structure ICMP The message ID Field, which has no practical meaning 

    @property   # Attribute decorator 
    def __icmpSocket(self):
        ''' create ICMP Socket'''
        if not self.IPv6:
            Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
        else:
            Sock = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.getprotobyname("ipv6-icmp"))
        return Sock

    def __inCksum(self, packet):
        '''ICMP  Message validation and calculation method '''
        if len(packet) & 1:
            packet = packet + '0'
        words = array.array('h', packet)
        sum = 0
        for word in words:
            sum += (word & 0xffff)
        sum = (sum >> 16) + (sum & 0xffff)
        sum = sum + (sum >> 16)

        return (~sum) & 0xffff

    @property
    def __icmpPacket(self):
        ''' structure  ICMP  message '''
        if not self.IPv6:
            header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0) # TYPE , CODE , CHKSUM , ID , SEQ
        else:
            header = struct.pack('BbHHh', 128, 0, 0, self.__id, 0)

        packet = header + self.__data     # packet without checksum
        chkSum = self.__inCksum(packet) # make checksum

        if not self.IPv6:
            header = struct.pack('bbHHh', 8, 0, chkSum, self.__id, 0)
        else:
            header = struct.pack('BbHHh', 128, 0, chkSum, self.__id, 0)

        return header + self.__data   # packet *with* checksum

    def isUnIP(self, IP):
        ''' judge IP Is it a valid unicast address '''
        IP = [int(x) for x in IP.split('.') if x.isdigit()]
        if len(IP) == 4:
            if (0 < IP[0] < 223 and IP[0] != 127 and IP[1] < 256 and IP[2] < 256 and 0 < IP[3] < 255):
                return True
        return False

    def makeIpPool(self, startIP, lastIP):
        ''' production  IP  Address pool '''
        IPver = 6 if self.IPv6 else 4
        intIP = lambda ip: IPy.IP(ip).int()
        ipPool = {IPy.intToIp(ip, IPver) for ip in range(intIP(startIP), intIP(lastIP)+1)}

        return {ip for ip in ipPool if self.isUnIP(ip)}

    def mPing(self, ipPool):
        ''' using ICMP Message detection network host alive 

         Parameters: 
            ipPool  --  Can be an iterative IP Address pool 
        '''
        Sock = self.__icmpSocket
        Sock.settimeout(self.timeout)
        packet = self.__icmpPacket
        recvFroms = set()   # The source of the receiving thread IP Address of the container 

        sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
        sendThr.start()

        while True:
            try:
                recvFroms.add(Sock.recvfrom(1024)[1][0])
            except Exception:
                pass
            finally:
                if not sendThr.isAlive():
                    break
        return recvFroms & ipPool

if __name__=='__main__':
    s = Nscan()
    ipPool = s.makeIpPool('192.168.0.1', '192.168.0.254')
    print( s.mPing(ipPool) )


Related articles: