1. 程式人生 > >簡單的發包工具——網路協議編輯器(python+scapy+pycharm)

簡單的發包工具——網路協議編輯器(python+scapy+pycharm)

一、實現的功能

    基於python+scapy設計協議編輯器,基於Tkinter的Python  GUI介面設計。實現了MAC、ARP、IP 、TCP、UDP協議的編輯與傳送,並且支援生成協議欄位的預設值,支援使用者輸入協議欄位值,發包前對協議欄位的合理性進行檢查,發包前自動計算並顯示校驗和,支援單次發包、多次發包,支援連續發包,連續發包時可隨時停止和計算並顯示資料包傳送速度的功能。

注:

程式原始碼:https://download.csdn.net/download/wmrem/10439779

執行前需要先安裝第三方庫scapy,安裝方法:https://blog.csdn.net/wmrem/article/details/80004819


二、主要函式說明——IP包的編輯與傳送為例

1.建立IP包編輯器介面,為每個按鈕繫結單擊響應時間。
def create_ip_sender():
   1.設定協議編輯器的介面
    ip_fields = 'IP協議的版本:',…… , '目的IP地址:'
    entries = create_protocol_editor(protocol_editor_panedwindow, ip_fields)
    2.建立傳送,預設值,清空按鈕,並繫結功能
    send_packet_button, reset_button, default_packet_button = create_bottom_buttons(protocol_editor_panedwindow)
    3.為"回車鍵"的Press事件編寫事件響應程式碼,傳送IP包
    tk.bind('<Return>', (lambda event: send_ip_packet(entries, send_packet_button)))  # <Return>代表回車鍵
    4.為"傳送"、預設值、按鈕的單擊事件編寫事件響應程式碼,傳送IP包
    send_packet_button.bind('<Button-1>', (
        lambda event: send_ip_packet(entries, send_packet_button)))

2. 在協議欄位編輯框中填入預設IP包的欄位值,為IP資料包的傳送做好準備
def create_default_ip_packet(entries):
    1.清空各欄位的值
    clear_protocol_editor(entries)
    2.建立預設的IP資料包,並將其各欄位的值填入協議編輯器的對應的文字框中
    default_ip_packet = IP()
    entries[0].insert(0, int(default_ip_packet.version))

    …

3.獲取協議編輯器文字框的值,建立對應的資料包並開啟一個執行緒用於連續傳送資料包。
def send_ip_packet(entries, send_packet_button):
    if 按鈕為傳送時:
1. 從協議編輯框中獲取要傳送的資料包的各個欄位的值
        ip_version = int(entries[0].get())

        2.用獲取的資料包的各欄位的值建立相應的資料包
        packet_to_send = IP(version=ip_version,……,src=ip_src, dst=ip_dst)
        3.開一個執行緒用於連續傳送資料包,並啟動
        t = threading.Thread(target=send_packet, args=(packet_to_send,))
        t.start()
        4.使協議導航樹不可用
        toggle_protocols_tree_state()
        send_packet_button['text'] = '停止'
    else 按鈕為停止時:
        5.按鈕為停止時可以終止資料包傳送執行緒
        stop_sending.set()
        6.恢復協議導航樹可用
        toggle_protocols_tree_state()

        send_packet_button['text'] = '傳送'

4.傳送資料包的執行緒函式,並計算資料包傳送速度
def send_packet(packet_to_send):
    stop_sending.clear()
    1.計算待發送資料包的長度(用於計算髮送速度)
    packet_size = len(packet_to_send)
    2.推導資料包的協議型別
    3.計算髮包開始傳送時間點
    begin_time = datetime.now()
    4.連續傳送資料包直到程序關閉
    while 程序沒有關閉:
        if 傳送Ether:
            sendp(packet_to_send, verbose=0)  # verbose=0,不在控制回顯'Sent 1 packets'.
        else:
            send(packet_to_send, verbose=0)
        5.計算髮送的總位元組數
        total_bytes = packet_size * n
        6.計算髮包用的總時間 
        total_time = (end_time - begin_time).total_seconds()
7.計算髮包的速度
        bytes_per_second = total_bytes / total_time / 1024

三、基礎知識準備

1.構造資料包,傳送資料包——“/"運算子資料包的拼裝,send()用於3層發包,可直接傳送IP資料包,自動加以太幀;只發送以太幀使用sendp()。

2.計算校驗和——IP包的校驗和預設值為None,在IP包被髮送時,其校驗和被被自動計算並填充在IP包中
以下方式可以在發包前計算IP包的校驗和,直接用packet.show2()命令也可以顯示校驗和。raw(packet)將資料包的內容轉換為位元組。

 

3.檢視資料包的欄位值(使用ls()),檢視某層協議資料包物件支援的函式(使用help())

4.ls()列出scapy支援的所有資料包,協議;lsc()列出scapy支援的全部命令;conf列出scapy的當前配置。

四、主要功能實現程式碼——以IP包為例

1.建立IP包編輯器的介面

# 建立協議欄位編輯區
def create_protocol_editor(root, field_names):
    """
    建立協議欄位編輯區
    :param root: 協議編輯區
    :param field_names: 協議欄位名列表
    :return: 協議欄位編輯框列表
    """
    entries = []
    for field in field_names:
        row = Frame(root)
        label = Label(row, width=15, text=field, anchor='e')
        entry = Entry(row, font=('Courier', '12', 'bold'), state='normal')  # 設定編輯框為等寬字型
        row.pack(side=TOP, fill=X, padx=5, pady=5)
        label.pack(side=LEFT)
        entry.pack(side=RIGHT, expand=YES, fill=X)
        entries.append(entry)
    return entries
def create_ip_sender():
    """
    建立IP包編輯器
    :return: None
    """
    # IP幀編輯區
    ip_fields = 'IP協議的版本:', '首部長度(5-15):', '區分服務:', '總長度:', '標識:', '標誌(0-2)DF,MF:', \
                '片偏移:', '生存時間:', '協議(資料部分):', '首部校驗和:', '源IP地址:', '目的IP地址:'
    entries = create_protocol_editor(protocol_editor_panedwindow, ip_fields)
    send_packet_button, reset_button, default_packet_button = create_bottom_buttons(protocol_editor_panedwindow)
    # 為"回車鍵"的Press事件編寫事件響應程式碼,傳送ARP包
    tk.bind('<Return>', (lambda event: send_ip_packet(entries, send_packet_button)))  # <Return>代表回車鍵
    # 為"傳送"按鈕的單擊事件編寫事件響應程式碼,傳送ARP包
    send_packet_button.bind('<Button-1>', (
        lambda event: send_ip_packet(entries, send_packet_button)))  # <Button-1>代表滑鼠左鍵單擊
    # 為"清空"按鈕的單擊事件編寫事件響應程式碼,清空協議欄位編輯框
    reset_button.bind('<Button-1>', (lambda event: clear_protocol_editor(entries)))
    # 為"預設值"按鈕的單擊事件編寫事件響應程式碼,在協議欄位編輯框填入ARP包欄位的預設值
    default_packet_button.bind('<Button-1>', (lambda event: create_default_ip_packet(entries)))

2.在協議欄位編輯框中填入預設IP包的欄位值,填入前需要先清空當前值。

def clear_protocol_editor(entries):
    """
    清空協議編輯器的當前值
    :param entries: 協議欄位編輯框列表
    :return: None
    """
    for entry in entries:
        # 如果有隻讀Entry,也要清空它的當前值
        state = entry['state']
        entry['state'] = 'normal'
        entry.delete(0, END)
        entry['state'] = state
# 當前網絡卡的預設閘道器
default_gateway = [a for a in os.popen('route print').readlines() if ' 0.0.0.0 ' in a][0].split()[-3]
def create_default_ip_packet(entries):
    """
        在協議欄位編輯框中填入預設IP包的欄位值
         :param entries: 協議欄位編輯框列表
         :return: None
         """
    clear_protocol_editor(entries)
    default_ip_packet = IP()
    entries[0].insert(0, int(default_ip_packet.version))
    entries[1].insert(0, 5)
    entries[3].insert(0, 20)
    entries[2].insert(0, hex(default_ip_packet.tos))
    entries[4].insert(0, int(default_ip_packet.id))
    entries[5].insert(0, int(default_ip_packet.flags))
    entries[6].insert(0, int(default_ip_packet.frag))
    entries[7].insert(0, int(default_ip_packet.ttl))
    entries[8].insert(0, int(default_ip_packet.proto))
    entries[9]['state'] = NORMAL # 可操作
    entries[9].insert(0, "單機發送時自動計算")
    entries[9]['state'] = DISABLED  # 不可操作
    # 目標IP地址設成本地預設閘道器
    entries[11].insert(0, default_gateway)
    default_ip_packet = IP(dst=entries[11].get())#可以省略
    entries[10].insert(0, default_ip_packet.src)

3.傳送IP包

def send_ip_packet(entries, send_packet_button):
    """
    發IP包
    :param entries:
    :param send_packet_button:
    :return:
    """
    if send_packet_button['text'] == '傳送':
        ip_version = int(entries[0].get())
        ip_ihl = int(entries[1].get())
        ip_tos = int(entries[2].get(), 16)
        ip_len = int(entries[3].get())
        ip_id = int(entries[4].get())
        ip_flags = int(entries[5].get())
        ip_frag = int(entries[6].get())
        ip_ttl = int(entries[7].get())
        ip_proto = int(entries[8].get())
        ip_src = entries[10].get()
        ip_dst = entries[11].get()
        # ip_options = entries[12].get()
        packet_to_send = IP(version=ip_version, ihl=ip_ihl, tos=ip_tos, len=ip_len, id=ip_id,
                            frag=ip_frag, flags=ip_flags, ttl=ip_ttl, proto=ip_proto, src=ip_src, dst=ip_dst)
        packet_to_send = IP(raw(packet_to_send))
        entries[9]['state'] = NORMAL  # 重新啟用
        entries[9].delete(0, END)
        entries[9].insert(0, hex(packet_to_send.chksum))
        entries[9]['state'] = DISABLED  # 不可操作
        # 開一個執行緒用於連續傳送資料包
        t = threading.Thread(target=send_packet, args=(packet_to_send,))
        t.setDaemon(True)
        t.start()
        # 使協議導航樹不可用
        toggle_protocols_tree_state()
        send_packet_button['text'] = '停止'
    else:
        # 終止資料包傳送執行緒
        stop_sending.set()
        # 恢復協議導航樹可用
        toggle_protocols_tree_state()
        send_packet_button['text'] = '傳送'

4.執行緒中執行的用於傳送資料包的函式,可以計算髮包速度

def send_packet(packet_to_send):
    """
    在我們給出的發包程式中,如果電腦速度太快,send_packet函式中的send(...)函式執行前後,
    datetetime返回的begin_time和end_time可能是相同的,結果會報除零錯誤,所以,send_packet函式應該做修改

    用於傳送資料包的執行緒函式,持續傳送資料包
    :type packet_to_send: 待發送的資料包
    """
    # print(packet.show(dump=True))
    # 對傳送的資料包次數進行計數,用於計算髮送速度
    n = 0
    stop_sending.clear()
    # 待發送資料包的長度(用於計算髮送速度)
    packet_size = len(packet_to_send)
    # 推導資料包的協議型別
    proto_names = ['TCP', 'UDP', 'ICMP', 'IP', 'ARP', 'Ether', 'Unknown']
    packet_proto = ''
    for pn in proto_names:
        if pn in packet_to_send:
            packet_proto = pn
            break
    # 開始傳送時間點
    begin_time = datetime.now()
    while not stop_sending.is_set():
        if isinstance(packet_to_send, Ether):
            sendp(packet_to_send, verbose=0)  # verbose=0,不在控制回顯'Sent 1 packets'.
        else:
            send(packet_to_send, verbose=0)
        n += 1
        end_time = datetime.now()
        total_bytes = packet_size * n
        #修改
        total_time = (end_time - begin_time).total_seconds()
        if total_time == 0:
            total_time = 2.23E-308  # 當begin_time和end_time相等時,將total_time設為IEEE 745標準中規定的最小浮點數
        bytes_per_second = total_bytes / total_time / 1024
        # bytes_per_second = total_bytes / ((end_time - begin_time).total_seconds()) / 1024
        status_bar.set('已經發送了%d個%s資料包, 已經發送了%d個位元組,傳送速率: %0.2fK位元組/秒',
                       n, packet_proto, total_bytes, bytes_per_second)

五、執行結果