×

基于python的一个https网站证书特征提取项目

0x1 思路

基本思路很简单:基于老师给的思路,首先收集到一百万个网站的csv数据集 具体如下图结构

然后流程为 数据清洗(清洗出非https协议的和不可访问的网站 再将清洗后的数据写入到新的csv文件中) ——> 获取证书(将清洗后的网站csv文件读取 获取到其证书字典(或者元组)然后根据需要的部分提取出来 再次写入到新的csv文件中 便于数据的查询(其实对于这种大数据量还是做数据库最好)) ——> 数据可视化(由于数据获取的都为证书的特征,没有什么数理关系 于是直接做了一套 前后端查询web应用)

0x2 实现

首先说明对于这种百万级别的数据收集 多线程以及高并发问题是肯定不可避免的 而且使用python做这个项目也不是好的选择,最后我会给出go语言的版本

0x1 数据清洗:

读取文件 获取到Domain列 然后按行读取domain

首先需要规定协议为https 然后将domain和protoc ol结合获取到url 通过发送请求获取到回应状态码 根据状态码判断是否可访问

可访问:将域名写入到output文件按列写入 便于后面提取

不可访问:continue

代码实现:
import csv
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed


def is_domain_accessible(domain, timeout=5):
    try:
        # 优先尝试HTTPS(更常见)
        protocols = ["https://"]
        for protocol in protocols:
            url = f"{protocol}{domain}"
            try:
                response = requests.get(url, timeout=timeout)
                if response.status_code == 200:
                    print(f"域名 {url} 可访问")
                    return True
            except requests.exceptions.RequestException:
                continue  # 失败则尝试下一个协议
        print(f"域名 {domain} 不可访问")
        return False
    except Exception as e:
        print(f"域名 {domain} 访问失败: {e}")
        return False


def main(input_csv, output_csv="select.csv", max_workers=20):
    accessible_domains = []
    try:
        with open(input_csv, "r", encoding="utf-8") as file:
            reader = csv.DictReader(file)
            domains = [row.get("Domain", "").strip() for row in reader if row.get("Domain", "").strip()]
    except FileNotFoundError:
        print(f"文件 {input_csv} 未找到,请检查路径是否正确。")
        return

    # 多线程并发检查
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_domain = {executor.submit(is_domain_accessible, domain): domain for domain in domains}

        for future in as_completed(future_to_domain):
            domain = future_to_domain[future]
            try:
                if future.result():
                    accessible_domains.append(domain)
            except Exception as e:
                print(f"检查域名 {domain} 时发生错误: {e}")

    # 写入结果到CSV
    try:
        with open(output_csv, "w", newline="", encoding="utf-8") as file:
            writer = csv.writer(file)
            writer.writerow(["Domain"])
            writer.writerows([[domain] for domain in accessible_domains])
        print(f"所有可访问的域名已保存到 {output_csv}")
    except Exception as e:
        print(f"写入文件失败: {e}")


if __name__ == "__main__":
    main("majestic_million2.csv", "select.csv", max_workers=150)  # 调整max_workers控制并发数

 

0x2  获取证书:

首先读取筛选后的csv文件 获取到域名

将域名作为参数传入到证书获取函数 开启线程池 将任务分配给workers

workers 分别执行获取证书函数

获取证书首先 创建ssl连接 然后使用socket连接 获取到ssocket 调用getpeercer方法 将返回的结果存储为cert

将证书字典的每个特征解析 获取到每个特征 将特征分别写入到headers中 然后保存为新的csv文件(对于获取到的证书的解析 首先需要开多个括号 因为其获取到时只使用get是大概率获取不到信息的 因为是元组 所以需要处理括号 同时需要处理时间信息 将其转换为正常信息)

代码:
import csv
import ssl
import socket
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime


def get_certs(input_csv='select.csv', output_file='certs.csv', max_workers=100):
    domains = []
    try:
        with open(input_csv, 'r', encoding='utf-8') as f:
            domains = [row[0].strip() for row in csv.reader(f) if row]
            print(domains)
    except FileNotFoundError:
        print(f"文件 {input_csv} 未找到")
        return

    start_time = datetime.now()
    print(f"开始处理 {len(domains)} 个域名,最大并发数:{max_workers}")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = {}
        for domain in domains[:50]:
            hostname = extract_hostname(domain)
            futures[executor.submit(get_ssl_certificate, hostname)] = domain

        results = []
        for idx, future in enumerate(as_completed(futures), 1):
            domain = futures[future]
            try:
                cert = future.result()
                results.append(cert)
                # status = "✅ 成功" if cert else "❌ 无证书"
                # print(f"[{idx}/{len(domains)}] {status} | {domain}")
            except Exception as e:
                results.append(None)
                # print(f"[{idx}/{len(domains)}] ❌ 异常 | {domain} | {str(e)}")

    save_certificates_to_file(results, output_file)
    print(f"\n处理完成!耗时:{datetime.now() - start_time}")
    print(f"证书信息已保存到 {output_file}")


def extract_hostname(url):
    """ 从URL中提取主机名 """
    if '://' in url:
        return url.split('://')[1].split('/')[0]
    return url.split('/')[0]


def get_ssl_certificate(hostname, port=443):
    """ 证书获取函数 """
    try:
        # 创建SSL上下文
        context = ssl.create_default_context()
        context.minimum_version = ssl.TLSVersion.TLSv1  # 支持TLS 1.0+

        with socket.create_connection((hostname, port), timeout=10) as sock:
            with context.wrap_socket(sock, server_hostname=hostname) as ssock:
                cert = ssock.getpeercert()
                return cert
    except Exception as e:
        print(f"  ⚠️ 错误详情:{str(e)}")
        return None



def save_certificates_to_file(all_certs, output_file):
    headers = ["Domain", "Subject", "Subject Org", "Subject Country",
               "Issuer", "Issuer Org", "Issuer Country", "Valid From",
               "Valid To", "Serial Number", "Alternative Names", "Status"]

    with open(output_file, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(headers)

        for cert_info in all_certs:
            if not cert_info:
                writer.writerow([""] * len(headers[:-1]) + ["无法获取证书"])
                continue

            # 解析主体和颁发者信息
            subject = parse_component(cert_info.get('subject', []))
            issuer = parse_component(cert_info.get('issuer', []))

            # 处理日期格式
            valid_from = parse_date(cert_info.get('notBefore'))
            valid_to = parse_date(cert_info.get('notAfter'))

            # 处理SAN字段
            san = parse_san(cert_info.get('subjectAltName', []))

            # 构建CSV行
            row = [
                subject.get('commonName', ''),  # 域名
                subject.get('commonName', ''),  # 主体通用名
                subject.get('organizationName', ''),  # 主体组织
                subject.get('countryName', ''),  # 主体国家
                issuer.get('commonName', ''),  # 颁发者通用名
                issuer.get('organizationName', ''),  # 颁发者组织
                issuer.get('countryName', ''),  # 颁发者国家
                valid_from,
                valid_to,
                cert_info.get('serialNumber', ''),  # 序列号
                san,  # 备用名称
                "有效" if cert_info else "无效"  # 状态
            ]

            writer.writerow(row)


def parse_component(component):
    """ 解析证书主体/颁发者元组结构 """
    data = {}
    for item in component:
        # 处理多层嵌套结构
        if isinstance(item, tuple):
            if len(item) == 2 and isinstance(item[0], str):
                # 直接键值对
                k, v = item
                data[k] = v
            else:
                # 嵌套元组结构
                for sub_item in item:
                    if isinstance(sub_item, tuple) and len(sub_item) == 2:
                        k, v = sub_item
                        data[k] = v
    return data


def parse_san(san_list):
    """ 将SAN元组列表转换为字符串 """
    return ', '.join([f"{k}={v}" for k, v in san_list])


def parse_date(date_str):
    """ 标准化日期格式(Feb 10 11:33:33 2025 GMT → 2025-02-10)"""
    if not date_str:
        return ''
    try:
        dt = datetime.strptime(date_str, "%b %d %H:%M:%S %Y %Z")
        return dt.strftime("%Y-%m-%d")
    except:
        return date_str


if __name__ == "__main__":
    # 安装依赖:pip install pyopenssl
    get_certs(max_workers=200)

0x3 数据可视化:

难度不高就是设计一个前端和一个查找后端 前端我用简单的html+css 后端使用flask框架做查找

前端思路如下:


前后端代码:
html.index:
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>SSL Certificate Lookup</title>
    <style>
        * {
            padding: 0;
            margin: 0;
            box-sizing: border-box;
        }

        body {
            font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif;
            line-height: 1.6;
            padding: 20px;
            background-color: #f0f8ff;
        }

        .title {
            font-size: 48px;
            text-align: center;
            margin: 40px 0;
            color: #2c3e50;
            text-shadow: 2px 2px 4px rgba(0,0,0,0.1);
        }

        .search {
            max-width: 800px;
            margin: 0 auto 40px;
            padding: 20px;
            background: white;
            border-radius: 15px;
            box-shadow: 0 4px 10px rgba(0,0,0,0.1);
            display: flex;
            gap: 10px;
        }

        .search input[type=text] {
            flex: 1;
            padding: 15px;
            font-size: 18px;
            border: 2px solid #e0e0e0;
            border-radius: 8px;
            transition: border-color 0.3s;
        }

        .search input[type=button] {
            padding: 15px 30px;
            background: #4CAF50;
            color: white;
            border: none;
            border-radius: 8px;
            cursor: pointer;
            font-size: 18px;
            transition: background 0.3s;
        }

        .search input[type=button]:hover {
            background: #45a049;
        }

        .result {
            max-width: 1000px;
            margin: 0 auto;
            background: white;
            border-radius: 15px;
            box-shadow: 0 4px 10px rgba(0,0,0,0.1);
            overflow: hidden;
        }

        .result-header {
            background: #2c3e50;
            color: white;
            padding: 20px;
            text-align: center;
            font-size: 24px;
            border-bottom: 3px solid #34495e;
        }

        .cert-info {
            padding: 20px;
        }

        .info-row {
            display: flex;
            margin: 15px 0;
            padding: 15px;
            background: #f8f9fa;
            border-radius: 8px;
            box-shadow: inset 0 0 5px rgba(0,0,0,0.05);
        }

        .info-row:nth-child(even) {
            background: #ffffff;
        }

        .info-label {
            width: 200px;
            font-weight: bold;
            color: #2c3e50;
            margin-right: 20px;
        }

        .info-value {
            flex: 1;
            word-break: break-all;
        }

        .error-message {
            color: #e74c3c;
            font-size: 18px;
            padding: 20px;
            text-align: center;
        }

        @media (max-width: 768px) {
            .search {
                flex-direction: column;
            }

            .info-row {
                flex-direction: column;
            }

            .info-label {
                width: 100%;
                margin-bottom: 10px;
            }
        }
    </style>
</head>
<body>
    <div class="title">SSL Certificate Lookup</div>

    <div class="search">
        <input type="text" placeholder="Enter domain name..." id="domainInput">
        <input type="button" value="Search" id="searchBtn">
    </div>

    <div class="result">
        <div class="result-header">Certificate Information</div>
        <div class="cert-info" id="certInfo">
            <!-- 动态内容将显示在这里 -->
        </div>
    </div>

    <script>
        document.getElementById('searchBtn').addEventListener('click', function() {
            const domain = document.getElementById('domainInput').value.trim();
            const certInfo = document.getElementById('certInfo');

            if (!domain) {
                alert('请输入有效的域名');
                return;
            }

            // 清空之前的结果
            certInfo.innerHTML = '';

            // 创建加载提示
            const loading = document.createElement('div');
            loading.className = 'info-value';
            loading.textContent = 'Loading...';
            certInfo.appendChild(loading);

            fetch('/search', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/x-www-form-urlencoded',
                },
                body: `domain=${encodeURIComponent(domain)}`
            })
            .then(response => response.json())
            .then(data => {
                certInfo.innerHTML = ''; // 清空加载提示

                if (data.status === 'error') {
                    const errorDiv = document.createElement('div');
                    errorDiv.className = 'error-message';
                    errorDiv.textContent = data.message;
                    certInfo.appendChild(errorDiv);
                    return;
                }

                const fields = [
                    'Domain', 'Subject', 'Subject Org', 'Subject Country',
                    'Issuer', 'Issuer Org', 'Issuer Country', 'Valid From',
                    'Valid Until', 'Serial Number', 'Alternative Names'
                ];

                fields.forEach(field => {
                    const row = document.createElement('div');
                    row.className = 'info-row';

                    const label = document.createElement('div');
                    label.className = 'info-label';
                    label.textContent = field;

                    const value = document.createElement('div');
                    value.className = 'info-value';
                    value.textContent = data.data[field] || '无';

                    row.appendChild(label);
                    row.appendChild(value);
                    certInfo.appendChild(row);
                });
            })
            .catch(error => {
                console.error('Error:', error);
                certInfo.innerHTML = '<div class="error-message">请求发生错误,请稍后重试</div>';
            });
        });
    </script>
</body>
</html>

-------------------------------------------------------------------------------------------------------------------------------------

app.py代码:

import flask
from flask import Flask, request, jsonify
import pandas as pd

app = Flask(__name__)

# 读取CSV文件
df = pd.read_csv('certs.csv')

@app.route('/')
def index():
    return flask.render_template('index.html')

@app.route('/search', methods=['POST'])
def search():
    domain = request.form.get('domain', '').strip().lower()

    if not domain:
        return jsonify({'error': '请输入有效的域名'}), 400

    # 在CSV中查找匹配的域名
    result = df[df['Domain'].str.lower() == domain].to_dict('records')

    if not result:
        return jsonify({
            'status': 'error',
            'message': '该域名无法查找'
        }), 404

    # 处理空值显示为"无"
    cert_info = {}
    for key, value in result[0].items():
        cert_info[key] = value if pd.notnull(value) else '无'

    return jsonify({
        'status': 'success',
        'data': cert_info
    })


if __name__ == '__main__':
    app.run(debug=True)

-------------------------------------------------------------------------------------------------------------------------------------

0x3 展示

0x1:

文件结构

前端展示

 

0x4 扩展

由于python对于高并发处理的速度慢 做这种百万级别的数据处理是非常困难的 于是我使用go语言将这个项目做了速度优化

文件结构:使用go语言模块化处理(对go处于noob阶段 做的还不是很了解<3)

 

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

作者

xiangchao100321@outlook.com

A normal student

关于openwrt拨号的一些疑问

在重新设置路由器的openwrt时 习惯性的...

读出全部