python scapy library an example of realizing network card receiving and sending packets
- 2021-07-26 08:01:32
- OfStack
Question:
During the test, the transceiver flow is carried out by TestCenter, SmartBit and other instruments. If it is still used for automatic smoking, it will bring the problems of low efficiency and high cost.
Solution:
Using network card to send and receive flow, although there are performance statistical defects, it can verify some basic functions and is economical.
Adopt scapy module,
1-Obtain iface of computer network card, and design which iface to send and receive flow in advance;
2-conf. L2listen listens to each iface
3-subprocess. Popen to call tShark. exe to start grabbing packets, or ping. exe to construct ping packets
4-sendp sends Layer 2 packets and send sends Layer 3 packets
5-sniff sniffs the specified message on iface, which can be filtered
6-Stop wireshark packet grabbing
7-close turns off listening on iface
Discussion:
There is no attempt to use sr1 and srp to receive and send packets.
The whole process is relatively clear, and the steps appear in pairs, which is convenient for memory.
When sniff sniffs, it will lose some messages appearing in front of iface. This problem may be caused by not performing good monitoring and starting packet grabbing.
There is no explanation for the specific performance standards of the network card, so it may be necessary to cross the river by feeling Stone. If it is found that the network card is not suitable for testing, it is necessary to switch to the instrument immediately for testing.
#! usr/bin/env python
# -*- coding:utf-8 -*-
import os
import sys
import re
import struct
import string
from scapy.all import *
import subprocess
conf.use_pcap = True
'''
cmd
python
from scapy.all import *
ls(Ether())
ls(IP())
ls(ICMP())
send(IP(dst='1.2.3.4')/ICMP())
sendp(Raw("zhongxing"), iface='eth15', loop=1, inter=0.2, verbose=False)
Settings inter Parameter to set the direct time interval between sending two adjacent packets
Settings timeout Parameter to set the timeout for waiting for a reply
Settings retry Parameter to set the number of retries.
'''
print u" Realize network card contracting "
target = []
for i in range(1,len(sys.argv)):
m = sys.argv[i].split('=')
if m[0]=='-t':
target.append(m[1])
if m[0]=='-ip':
target.append(m[1])
if m[0]=='-mac':
target.append(m[1])
print 'test -- ',target
print
print u' Object of the network card iface'
eth_local = []
a = repr(conf.route).split('\n')
for x in a:
b = []
b = x.split(' ')
for y in b:
if re.search('eth', y):
eth_local.append(y)
print u' De-repetition '
c = []
c.append(eth_local[0])
for i in range(0,len(eth_local),1):
m = 0
for j in range(0,len(c),1):
if c[j] == eth_local[i]:
m += 1
if m==0:
c.append(eth_local[i])
print c #['eth15', 'eth21', 'eth17']
print u' Create 2 Layer message '
src_mac = '00:00:11:11:22:22'
dst_mac = '00:00:22:22:11:11'
dst_ip = '1.2.3.4'
src_ip = '5.6.7.8'
src_port = 1234
dst_port = 5678
##ls()
##ls(IP())
##IP().show()
##lsc()
pack_ip = IP(dst=dst_ip, src=src_ip, proto=1)
##ls(ICMP())
##ls(UDP())
pack_icmp = ICMP(type=8)
##ls(Ether())
pack_ether = Ether(dst=dst_mac, src=src_mac, type=0x0800)
info = Raw('zhongxing')
t = str(pack_ether/pack_ip/pack_icmp/info)
s = Ether(t)
print u' The message to be sent is: ',s.summary
eth = c[1]
print u' Sending network card iface For %s\n' % eth
print u'--------- Start listening - Send icmp - Sniffing icmp - Turn off listening ----------'
print u'--------- Start listening -------------'
L2socket = conf.L2listen
listen_socket = L2socket(type=ETH_P_ALL, iface=eth)
print listen_socket
print conf.L2listen
#### Start the bag grabbing
##cmd='C:\Program Files (x86)\Wireshark\tShark.exe'
##card_id = str(1)
##cap_file = str('H:\python\test.pcap')
##args = [cmd,"-i "+card_id,"-w",cap_file]
##print "*DEBUG*",args
##p=subprocess.Popen(args)
print u'---------sendp() Function call ----------'
sendp(s,iface=eth, verbose=False)
##print u'---------srp() Function call ----------'
##sr The function is Scapy This function returns two lists,
## No. 1 1 A list of packets that received replies and their corresponding replies,
## No. 1 2 The list of packets that did not receive an answer,
## Usually, we need to call other functions to make these two return values easier to read.
##help(srp)
##p = srp(s,iface=c[1], verbose=False)
##print p.show()
print u'--------- Sniffing, filtering, saving pcap , read pcap----------'
##print sniff.__doc__
##pkts = sniff(iface = 'eth15',filter = 'icmp',count = 3, prn=lambda x: x.summary())
ip = '172.10.0.1'
subprocess.Popen(["ping.exe", ip]) # Provide to sniff
##Ether / IP / ICMP 172.10.1.124 > 172.10.0.1 echo-request 0 / Raw
##Ether / IP / ICMP 172.10.0.1 > 172.10.1.124 echo-reply 0 / Raw
##Ether / IP / ICMP 172.10.1.124 > 172.10.0.1 echo-request 0 / Raw
##listen_socket1 = L2socket(listen_socket)
##pkts = sniff(iface = eth,filter = 'icmp',count = 20, timeout = 10, L2socket=listen_socket)
pkts = sniff(iface = eth, filter = 'icmp', count = 20, timeout = 10)
try:
if 0 < len(pkts):
print u'--------- Sniffed message ----------'
##pkts[0].show()
wrpcap('demo.pcap',pkts)
read_pkts = rdpcap('demo.pcap')
print read_pkts[0]
print u'--------------- Output base64 Data in encoded format ---------------'
export_object(str(pkts[0]))
print u'--------------- Convert to base64 Data in encoded format ---------------'
newPkt = import_object('eNprYAqN+Q8GGp/TOCfN5GBwZWDwc/nCwNAgOItrDRdjLxD/Z+gEQitpgwvijAIMjAxgoODmAYLO\
/m7ebq6ubs7+ri6uAa5+YNrf2dHREaiEgbGQUQ8AnjEcMQ==')
print newPkt
s = Ether(newPkt)
print u' The message to be sent is: ',s.summary
sendp(s,iface=eth, verbose=False)
else:
print u'--------- No message was sniffed ----------'
except:
pass
finally:
print u'--------- Turn off listening -------------'
listen_socket.close()