python scapy library an example of realizing network card receiving and sending packets

  • 2021-07-26 08:01:32
  • OfStack

Question:

During the test, the transceiver flow is carried out by TestCenter, SmartBit and other instruments. If it is still used for automatic smoking, it will bring the problems of low efficiency and high cost.

Solution:

Using network card to send and receive flow, although there are performance statistical defects, it can verify some basic functions and is economical.

Adopt scapy module,

1-Obtain iface of computer network card, and design which iface to send and receive flow in advance;

2-conf. L2listen listens to each iface

3-subprocess. Popen to call tShark. exe to start grabbing packets, or ping. exe to construct ping packets

4-sendp sends Layer 2 packets and send sends Layer 3 packets

5-sniff sniffs the specified message on iface, which can be filtered

6-Stop wireshark packet grabbing

7-close turns off listening on iface

Discussion:

There is no attempt to use sr1 and srp to receive and send packets.

The whole process is relatively clear, and the steps appear in pairs, which is convenient for memory.

When sniff sniffs, it will lose some messages appearing in front of iface. This problem may be caused by not performing good monitoring and starting packet grabbing.

There is no explanation for the specific performance standards of the network card, so it may be necessary to cross the river by feeling Stone. If it is found that the network card is not suitable for testing, it is necessary to switch to the instrument immediately for testing.


#! usr/bin/env python
# -*- coding:utf-8 -*-
import os
import sys
import re
import struct
import string
from scapy.all import *
import subprocess

conf.use_pcap = True 

'''
cmd
python
from scapy.all import *
ls(Ether())
ls(IP())
ls(ICMP())
send(IP(dst='1.2.3.4')/ICMP())
sendp(Raw("zhongxing"), iface='eth15', loop=1, inter=0.2, verbose=False)
 Settings  inter  Parameter to set the direct time interval between sending two adjacent packets 
 Settings  timeout  Parameter to set the timeout for waiting for a reply 
 Settings  retry  Parameter to set the number of retries. 
'''


print u" Realize network card contracting "
target = []
for i in range(1,len(sys.argv)):
  m = sys.argv[i].split('=')
  if m[0]=='-t':
    target.append(m[1])
  if m[0]=='-ip':
    target.append(m[1])
  if m[0]=='-mac':
    target.append(m[1])
print 'test -- ',target
print

print u' Object of the network card iface'
eth_local = []
a = repr(conf.route).split('\n')
for x in a:
  b = []
  b = x.split(' ')
  for y in b:
    if re.search('eth', y):
      eth_local.append(y)
print u' De-repetition '
c = []
c.append(eth_local[0])
for i in range(0,len(eth_local),1):
  m = 0
  for j in range(0,len(c),1):
    if c[j] == eth_local[i]:
      m += 1
  if m==0:
    c.append(eth_local[i])
print c #['eth15', 'eth21', 'eth17']

print u' Create 2 Layer message '
src_mac = '00:00:11:11:22:22'
dst_mac = '00:00:22:22:11:11'
dst_ip = '1.2.3.4'
src_ip = '5.6.7.8'
src_port = 1234
dst_port = 5678

##ls()
##ls(IP())
##IP().show()
##lsc()
pack_ip = IP(dst=dst_ip, src=src_ip, proto=1)
##ls(ICMP())
##ls(UDP())
pack_icmp = ICMP(type=8)
##ls(Ether())
pack_ether = Ether(dst=dst_mac, src=src_mac, type=0x0800)
info = Raw('zhongxing')
t = str(pack_ether/pack_ip/pack_icmp/info)
s = Ether(t)
print u' The message to be sent is: ',s.summary
eth = c[1]
print u' Sending network card iface For  %s\n' % eth

print u'--------- Start listening  -  Send icmp -  Sniffing icmp -  Turn off listening ----------'
print u'--------- Start listening -------------'
L2socket = conf.L2listen
listen_socket = L2socket(type=ETH_P_ALL, iface=eth)
print listen_socket
print conf.L2listen

#### Start the bag grabbing 
##cmd='C:\Program Files (x86)\Wireshark\tShark.exe'
##card_id = str(1)
##cap_file = str('H:\python\test.pcap')
##args = [cmd,"-i "+card_id,"-w",cap_file]
##print "*DEBUG*",args
##p=subprocess.Popen(args)


print u'---------sendp() Function call ----------'
sendp(s,iface=eth, verbose=False)

##print u'---------srp() Function call ----------'
##sr  The function is  Scapy  This function returns two lists, 
## No. 1 1 A list of packets that received replies and their corresponding replies, 
## No. 1 2 The list of packets that did not receive an answer, 
## Usually, we need to call other functions to make these two return values easier to read. 
##help(srp)
##p = srp(s,iface=c[1], verbose=False)
##print p.show()

print u'--------- Sniffing, filtering, saving pcap , read pcap----------'
##print sniff.__doc__
##pkts = sniff(iface = 'eth15',filter = 'icmp',count = 3, prn=lambda x: x.summary())
ip = '172.10.0.1'
subprocess.Popen(["ping.exe", ip]) # Provide to sniff
##Ether / IP / ICMP 172.10.1.124 > 172.10.0.1 echo-request 0 / Raw
##Ether / IP / ICMP 172.10.0.1 > 172.10.1.124 echo-reply 0 / Raw
##Ether / IP / ICMP 172.10.1.124 > 172.10.0.1 echo-request 0 / Raw
##listen_socket1 = L2socket(listen_socket)
##pkts = sniff(iface = eth,filter = 'icmp',count = 20, timeout = 10, L2socket=listen_socket)
pkts = sniff(iface = eth, filter = 'icmp', count = 20, timeout = 10) 
try:
  if 0 < len(pkts):
    print u'--------- Sniffed message ----------'
    ##pkts[0].show()
    wrpcap('demo.pcap',pkts)
    read_pkts = rdpcap('demo.pcap')
    print read_pkts[0]

    print u'--------------- Output base64 Data in encoded format ---------------'
    export_object(str(pkts[0]))

    print u'--------------- Convert to base64 Data in encoded format ---------------'
    newPkt = import_object('eNprYAqN+Q8GGp/TOCfN5GBwZWDwc/nCwNAgOItrDRdjLxD/Z+gEQitpgwvijAIMjAxgoODmAYLO\
    /m7ebq6ubs7+ri6uAa5+YNrf2dHREaiEgbGQUQ8AnjEcMQ==')
    print newPkt
    s = Ether(newPkt)
    print u' The message to be sent is: ',s.summary
    sendp(s,iface=eth, verbose=False)
  else:
    print u'--------- No message was sniffed ----------'
except:
  pass
finally:
  print u'--------- Turn off listening -------------'
  listen_socket.close()

Related articles: