Python constructs the icmp echo request and implements the network detector function code sharing
- 2020-04-02 13:19:53
- OfStack
Python sends the icmp echo requesy request
import socket
import struct
def checksum(source_string):
sum = 0
countTo = (len(source_string)/2)*2
count = 0
while count<countTo:
thisVal = ord(source_string[count + 1])*256 + ord(source_string[count])
sum = sum + thisVal
sum = sum & 0xffffffff
count = count + 2
if countTo<len(source_string):
sum = sum + ord(source_string[len(source_string) - 1])
sum = sum & 0xffffffff
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
answer = ~sum
answer = answer & 0xffff
answer = answer >> 8 | (answer << 8 & 0xff00)
return answer
def ping(ip):
s = socket.socket(socket.AF_INET, socket.SOCK_RAW, 1)
packet = struct.pack(
"!BBHHH", 8, 0, 0, 0, 0
)
chksum=checksum(packet)
packet = struct.pack(
"!BBHHH", 8, 0, chksum, 0, 0
)
s.sendto(packet, (ip, 1))
if __name__=='__main__':
ping('192.168.41.56')
Scan detection network function (network detector)
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
Detect network host survival.
'''
import os
import struct
import array
import time
import socket
import IPy
import threading
class SendPingThr(threading.Thread):
'''
send ICMP The thread requesting the message.
Parameters:
ipPool -- Can be an iterative IP Address pool
icmpPacket -- Structure of the icmp message
icmpSocket -- icmp Set of words by
timeout -- Set the send timeout
'''
def __init__(self, ipPool, icmpPacket, icmpSocket, timeout=3):
threading.Thread.__init__(self)
self.Sock = icmpSocket
self.ipPool = ipPool
self.packet = icmpPacket
self.timeout = timeout
self.Sock.settimeout( timeout + 3 )
def run(self):
time.sleep(0.01) # Wait for the receiving thread to start
for ip in self.ipPool:
try:
self.Sock.sendto(self.packet, (ip, 0))
except socket.timeout:
break
time.sleep(self.timeout)
class Nscan:
'''
Parameters:
timeout -- Socket Timeout, default 3 seconds
IPv6 -- Whether it is IPv6 By default, False
'''
def __init__(self, timeout=3, IPv6=False):
self.timeout = timeout
self.IPv6 = IPv6
self.__data = struct.pack('d', time.time()) # Used for ICMP Load bytes of message ( 8bit )
self.__id = os.getpid() # structure ICMP The message ID Field, which has no practical meaning
@property # Attribute decorator
def __icmpSocket(self):
''' create ICMP Socket'''
if not self.IPv6:
Sock = socket.socket(socket.AF_INET, socket.SOCK_RAW, socket.getprotobyname("icmp"))
else:
Sock = socket.socket(socket.AF_INET6, socket.SOCK_RAW, socket.getprotobyname("ipv6-icmp"))
return Sock
def __inCksum(self, packet):
'''ICMP Message validation and calculation method '''
if len(packet) & 1:
packet = packet + '0'
words = array.array('h', packet)
sum = 0
for word in words:
sum += (word & 0xffff)
sum = (sum >> 16) + (sum & 0xffff)
sum = sum + (sum >> 16)
return (~sum) & 0xffff
@property
def __icmpPacket(self):
''' structure ICMP message '''
if not self.IPv6:
header = struct.pack('bbHHh', 8, 0, 0, self.__id, 0) # TYPE , CODE , CHKSUM , ID , SEQ
else:
header = struct.pack('BbHHh', 128, 0, 0, self.__id, 0)
packet = header + self.__data # packet without checksum
chkSum = self.__inCksum(packet) # make checksum
if not self.IPv6:
header = struct.pack('bbHHh', 8, 0, chkSum, self.__id, 0)
else:
header = struct.pack('BbHHh', 128, 0, chkSum, self.__id, 0)
return header + self.__data # packet *with* checksum
def isUnIP(self, IP):
''' judge IP Is it a valid unicast address '''
IP = [int(x) for x in IP.split('.') if x.isdigit()]
if len(IP) == 4:
if (0 < IP[0] < 223 and IP[0] != 127 and IP[1] < 256 and IP[2] < 256 and 0 < IP[3] < 255):
return True
return False
def makeIpPool(self, startIP, lastIP):
''' production IP Address pool '''
IPver = 6 if self.IPv6 else 4
intIP = lambda ip: IPy.IP(ip).int()
ipPool = {IPy.intToIp(ip, IPver) for ip in range(intIP(startIP), intIP(lastIP)+1)}
return {ip for ip in ipPool if self.isUnIP(ip)}
def mPing(self, ipPool):
''' using ICMP Message detection network host alive
Parameters:
ipPool -- Can be an iterative IP Address pool
'''
Sock = self.__icmpSocket
Sock.settimeout(self.timeout)
packet = self.__icmpPacket
recvFroms = set() # The source of the receiving thread IP Address of the container
sendThr = SendPingThr(ipPool, packet, Sock, self.timeout)
sendThr.start()
while True:
try:
recvFroms.add(Sock.recvfrom(1024)[1][0])
except Exception:
pass
finally:
if not sendThr.isAlive():
break
return recvFroms & ipPool
if __name__=='__main__':
s = Nscan()
ipPool = s.makeIpPool('192.168.0.1', '192.168.0.254')
print( s.mPing(ipPool) )