skills/skills.netease.im/auto-model-dimension-crawler

auto-model-dimension-crawler

SKILL.md

汽车维表完整爬取与处理 Skill

功能说明

本Skill提供从汽车之家和懂车帝爬取车型数据,并生成标准化汽车维表的完整流程。

工作流程

┌─────────────────┐
│ 1. 获取车型ID列表 │ ← 从汽车之家tree_menu
└────────┬────────┘
┌─────────────────┐
│ 2. 爬取车型数据  │ ← 从汽车之家API
└────────┬────────┘
┌─────────────────┐
│ 3. 获取参数映射  │ ← 按能源类型分别获取
└────────┬────────┘
┌─────────────────┐
│ 4. 处理数据     │ ← 重命名参数、解决串行
└────────┬────────┘
┌─────────────────┐
│ 5. 爬取懂车帝   │ ← 经销商报价和销售状态
└────────┬────────┘
┌─────────────────┐
│ 6. 合并数据     │ ← 生成最终维表
└────────┬────────┘
┌─────────────────┐
│ 7. 合并CSV文件  │ ← 合并多个分散的CSV文件
└─────────────────┘

安装依赖

pip install pandas requests tqdm

快速开始

方式1:使用主脚本(推荐)

# 完整流程:从爬取到生成维表
python car_dimension_table.py --full-process

# 或分步执行
python car_dimension_table.py --step 1  # 获取车型ID
python car_dimension_table.py --step 2  # 爬取车型数据
python car_dimension_table.py --step 3  # 获取参数映射
python car_dimension_table.py --step 4  # 处理数据
python car_dimension_table.py --step 5  # 爬取懂车帝
python car_dimension_table.py --step 6  # 合并生成维表
python car_dimension_table.py --step 7  # 合并多个CSV文件

方式2:使用Python API

from car_dimension_table import CarDimensionTable

# 初始化
crawler = CarDimensionTable()

# 执行完整流程
crawler.full_process()

# 或分步执行
crawler.step1_get_car_ids()        # 获取车型ID
crawler.step2_crawl_car_data()     # 爬取车型数据
crawler.step3_get_param_mappings() # 获取参数映射
crawler.step4_process_data()       # 处理数据
crawler.step5_crawl_dcd()          # 爬取懂车帝
crawler.step6_merge_data()         # 合并生成维表
crawler.step7_merge_csv_files()    # 合并多个CSV文件

详细步骤说明

Step 1: 获取车型ID列表

从汽车之家tree_menu接口获取所有车型ID。

注意:tree_menu接口需要浏览器抓包获取,无法直接API调用。

替代方案

  1. 访问 https://car.autohome.com.cn/javascripts/specCompare.js
  2. 提取tree_menu数据
  3. 解析为车型ID列表
# 解析tree_menu获取车型ID
def parse_tree_menu(tree_menu_data):
    """
    解析汽车之家tree_menu数据,提取车型ID
    
    Args:
        tree_menu_data: tree_menu JSON数据
        
    Returns:
        DataFrame: 包含品牌、车系、seriesid的列表
    """
    import pandas as pd
    
    cars = []
    for brand in tree_menu_data.get('brandlist', []):
        brand_name = brand.get('name')
        for factory in brand.get('factorylist', []):
            factory_name = factory.get('name')
            for series in factory.get('serieslist', []):
                series_name = series.get('name')
                series_id = series.get('id')
                series_state = series.get('state')  # 20=在售, 40=停产
                
                cars.append({
                    '品牌': brand_name,
                    '厂商': factory_name,
                    '车系': series_name,
                    'seriesid': series_id,
                    'state': series_state
                })
    
    return pd.DataFrame(cars)

Step 2: 爬取车型数据

使用汽车之家API爬取每个车型的详细数据。

API端点https://car-web-api.autohome.com.cn/car/param/getParamConf

def crawl_car_data(series_id, car_name):
    """
    爬取单个车型的详细数据
    
    Args:
        series_id: 汽车之家车型ID
        car_name: 车型名称
        
    Returns:
        DataFrame: 车型数据
    """
    import requests
    import pandas as pd
    
    url = f"https://car-web-api.autohome.com.cn/car/param/getParamConf?mode=1&site=1&seriesid={series_id}"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Accept": "application/json",
        "Referer": "https://www.autohome.com.cn/"
    }
    
    response = requests.get(url, headers=headers, timeout=30)
    data = response.json()
    
    if data.get('message') != 'success':
        return None
    
    result = data['result']
    brand_name = result['bread']['brandname']
    series_name = result['bread']['seriesname']
    brand_id = result['bread']['brandid']
    
    # 处理每个车型配置
    all_cars = []
    for spec in result.get('datalist', []):
        car_info = {
            '品牌': brand_name,
            '车系': series_name,
            '品牌_id': brand_id,
            '车型_id': spec.get('specid'),
            '年款': spec.get('condition', [''])[0] if spec.get('condition') else ''
        }
        
        # 处理参数配置
        for param in spec.get('paramconflist', []):
            if param is None:
                continue
            item_name = param.get('itemname', '')
            title_id = param.get('titleid')
            
            if title_id is None:
                continue
            
            # 处理颜色信息
            if 'colorinfo' in param and param['colorinfo']:
                colors = param['colorinfo'].get('list', [])
                color_str = ', '.join([f"{c['name']}{c['price']})" if c.get('price', 0) > 0 else c['name'] for c in colors])
                car_info[f'参数_{title_id}'] = color_str
            # 处理子列表
            elif 'sublist' in param and param['sublist']:
                sub_str = ', '.join([f"{s.get('value', '')}({s.get('name', '')})" for s in param['sublist']])
                car_info[f'参数_{title_id}'] = sub_str
            else:
                car_info[f'参数_{title_id}'] = item_name
        
        all_cars.append(car_info)
    
    return pd.DataFrame(all_cars)

Step 3: 获取参数名称映射

关键步骤:按能源类型分别获取参数名称映射,避免数据串行。

def get_param_mapping(series_id):
    """从汽车之家API获取参数名称映射"""
    import requests
    
    url = f"https://car-web-api.autohome.com.cn/car/param/getParamConf?mode=1&site=1&seriesid={series_id}"
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Accept": "application/json",
        "Referer": "https://www.autohome.com.cn/"
    }
    
    response = requests.get(url, headers=headers, timeout=30)
    if response.status_code == 200:
        data = response.json()
        if data.get('message') == 'success' and data.get('result'):
            param_names = {}
            for title in data['result'].get('titlelist', []):
                for item in title.get('items', []):
                    titleid = item.get('titleid')
                    itemname = item.get('itemname', '')
                    if titleid and itemname:
                        param_names[f"参数_{titleid}"] = itemname
            return param_names
    return {}

# 获取各种能源类型的参数映射
def download_all_param_mappings():
    """下载所有能源类型的参数名称映射"""
    car_samples = {
        '纯电动': '5026',        # AION S
        '汽油': '3301',          # 景逸S50
        '柴油': '7798',          # 212 T01
        '插电式混合动力': '5323', # 腾势X
        '增程式': '6576',        # 理想L9
    }
    
    mappings = {}
    for energy_type, series_id in car_samples.items():
        print(f"获取【{energy_type}】参数映射...")
        mapping = get_param_mapping(series_id)
        mappings[energy_type] = mapping
        
        # 保存到文件
        import json
        with open(f'param_mappings/mapping_{energy_type}.json', 'w', encoding='utf-8') as f:
            json.dump(mapping, f, ensure_ascii=False, indent=2)
    
    return mappings

Step 4: 处理数据(解决串行问题)

def process_car_data(car_folder, mapping_dir='param_mappings'):
    """
    处理车型数据,按能源类型使用正确的参数映射
    
    关键:不同能源类型的参数顺序不同,必须分别处理
    """
    import pandas as pd
    import json
    import os
    
    # 加载参数映射
    mappings = {}
    for energy_type in ['纯电动', '汽油', '柴油', '插电式混合动力', '增程式']:
        filepath = f'{mapping_dir}/mapping_{energy_type}.json'
        if os.path.exists(filepath):
            with open(filepath, 'r', encoding='utf-8') as f:
                mappings[energy_type] = json.load(f)
    
    # 回退映射
    fallback = {
        '汽油+48V轻混系统': '汽油',
        '油电混合': '汽油',
        '汽油+CNG': '汽油',
        '柴油+48V轻混系统': '柴油',
        'CNG': '汽油',
        '汽油电驱': '汽油',
        '氢燃料': '汽油',
        '甲醇混动': '汽油',
        '汽油+24V轻混系统': '汽油',
        '-': '汽油',
    }
    
    # 处理所有车型文件
    car_files = [f for f in os.listdir(car_folder) if f.endswith('.csv')]
    all_data = []
    
    for car_file in car_files:
        df = pd.read_csv(f'{car_folder}/{car_file}', encoding='utf-8-sig', low_memory=False)
        energy_type = df['参数_6'].iloc[0] if '参数_6' in df.columns else '汽油'
        
        # 确定使用哪种映射
        if energy_type in mappings:
            mapping = mappings[energy_type].copy()
        elif energy_type in fallback:
            mapping = mappings[fallback[energy_type]].copy()
        else:
            mapping = mappings.get('汽油', {}).copy()
        
        # 重命名列
        df_renamed = df.rename(columns=mapping)
        df_renamed = df_renamed.loc[:, ~df_renamed.columns.duplicated()]
        
        all_data.append(df_renamed)
    
    # 合并所有数据
    df_final = pd.concat(all_data, ignore_index=True, sort=False)
    return df_final

Step 5: 爬取懂车帝经销商报价

def crawl_dcd_quotes(car_list, output_file='懂车帝报价.csv'):
    """
    从懂车帝爬取经销商报价
    
    Args:
        car_list: 车型列表(品牌+车系)
        output_file: 输出文件
    """
    import requests
    import pandas as pd
    import time
    
    headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "Accept": "application/json",
        "Referer": "https://www.dongchedi.com/"
    }
    
    all_quotes = []
    
    for brand, series in car_list:
        # 1. 搜索车型获取series_id
        search_url = "https://www.dongchedi.com/search"
        params = {"keyword": f"{brand} {series}"}
        # ... 搜索逻辑
        
        # 2. 获取车型列表
        car_list_url = f"https://www.dongchedi.com/motor/pc/car/series/car_list?series_id={series_id}"
        # ... 获取车型列表
        
        # 3. 获取经销商报价
        for car_id in car_ids:
            quote_url = f"https://www.dongchedi.com/motor/pc/car/series/car_dealer_price?car_id={car_id}&city_name=北京"
            # ... 获取报价
            
        time.sleep(0.5)  # 避免被封
    
    # 保存结果
    df = pd.DataFrame(all_quotes)
    df.to_csv(output_file, index=False, encoding='utf-8-sig')
    return df

Step 6: 合并生成最终维表

def merge_data(autohome_df, dcd_df, output_file='汽车维表_最终版.csv'):
    """
    合并汽车之家和懂车帝数据,生成最终维表
    
    Args:
        autohome_df: 汽车之家数据
        dcd_df: 懂车帝报价数据
        output_file: 输出文件
    """
    import pandas as pd
    
    # 1. 按品牌+车系+年款匹配懂车帝报价
    merged = autohome_df.merge(
        dcd_df,
        left_on=['品牌', '车系', '年款'],
        right_on=['品牌', '车系', '年款'],
        how='left'
    )
    
    # 2. 判断销售状态
    def judge_sales_status(row):
        if pd.notna(row.get('经销商报价')):
            return '在售'
        elif '即将上市' in str(row.get('车型名称', '')):
            return '即将上市'
        else:
            return '停售'
    
    merged['销售状态'] = merged.apply(judge_sales_status, axis=1)
    
    # 3. 选择关键字段生成维表
    key_columns = [
        '品牌', '车系', '年款', '车型名称', '厂商', '级别', '能源类型',
        '销售状态', '厂商指导价(元)', '经销商报价', '上市时间',
        # ... 其他关键字段
    ]
    
    # 只保留存在的列
    existing_columns = [c for c in key_columns if c in merged.columns]
    final_df = merged[existing_columns]
    
    # 保存
    final_df.to_csv(output_file, index=False, encoding='utf-8-sig')
    return final_df

数据验证

def validate_data(df):
    """验证数据准确性"""
    results = {}
    
    # 1. 检查串行问题
    if '上市时间' in df.columns:
        shangshi = df['上市时间'].dropna().astype(str)
        valid_dates = shangshi[shangshi.str.match(r'^\d{4}-\d{2}-\d{2}$', na=False)]
        results['上市时间准确率'] = len(valid_dates) / len(shangshi) * 100
    
    if '环保标准' in df.columns:
        huanbao = df['环保标准'].dropna().astype(str)
        date_in_huanbao = huanbao[huanbao.str.match(r'^\d{4}-\d{2}-\d{2}$', na=False)]
        results['环保标准串行数'] = len(date_in_huanbao)
    
    # 2. 检查未命名列
    param_cols = [c for c in df.columns if c.startswith('参数_')]
    results['未命名列数'] = len(param_cols)
    
    # 3. 数据完整性
    results['总行数'] = len(df)
    results['总列数'] = len(df.columns)
    
    return results

常见问题

Q1: 为什么有数据串行?

原因:不同能源类型的车型,汽车之家API返回的参数顺序不同。

解决:按能源类型分别获取参数映射,分别处理数据。

Q2: 如何获取tree_menu数据?

方法1:浏览器抓包

  1. 访问 https://car.autohome.com.cn/
  2. 打开开发者工具,切换到Network
  3. 找到tree_menu接口请求
  4. 复制响应数据

方法2:从specCompare.js提取

// 在浏览器控制台执行
fetch('https://car.autohome.com.cn/javascripts/specCompare.js')
  .then(r => r.text())
  .then(t => console.log(t))

Q3: 懂车帝反爬怎么办?

策略

  1. 使用代理IP
  2. 控制请求频率(每次请求间隔0.5-1秒)
  3. 使用Selenium模拟浏览器
  4. 分批次爬取,避免一次性请求过多

Q4: 如何处理缺失数据?

建议

  1. 对于关键字段(价格、上市时间等),标记为缺失
  2. 对于配置参数,保留空值
  3. 在最终维表中添加"数据完整性"标记

Step 7: 合并多个CSV文件(4月3日修复版)

当车型数据分散存储在多个CSV文件中时(如每个车型一个文件),需要将它们合并成一个完整的维表。

使用场景

  • 爬取的数据按车型分别保存为多个CSV文件
  • 需要将所有分散的数据合并成一个大表
  • 处理列名不一致的情况

两种模式

  1. 完整模式(默认):采样200个文件收集列名,适合需要完整列信息的场景
  2. 高效模式fast_mode=True):采样100个文件,使用reindex快速对齐,适合大数据量场景
# 使用方式1:命令行
python car_dimension_table.py --step 7

# 使用方式2:Python API
from car_dimension_table import CarDimensionTable

crawler = CarDimensionTable()

# 完整模式(默认)
crawler.step7_merge_csv_files(
    data_dir='output/car',
    output_file='output/merged_car_data_all.csv',
    fast_mode=False  # 默认
)

# 高效模式(推荐用于3800+文件的大数据量场景)
crawler.step7_merge_csv_files(
    data_dir='output/car',
    output_file='output/merged_car_data_all.csv',
    fast_mode=True
)

完整模式代码实现

def merge_csv_files(data_dir, output_file='merged_car_data.csv', fast_mode=False):
    """
    合并多个CSV文件 - 4月3日修复版本
    
    修复内容:
    1. 优化列名收集逻辑,采样200个文件确保覆盖所有可能的列
    2. 添加优先级列排序,关键字段排在前面
    3. 处理缺失列,自动填充空值确保数据一致性
    4. 添加详细的进度显示和错误处理
    
    Args:
        data_dir: 包含CSV文件的目录路径
        output_file: 合并后的输出文件路径
        fast_mode: 是否使用高效模式
        
    Returns:
        DataFrame: 合并后的数据
    """
    import pandas as pd
    import os
    from pathlib import Path
    
    # 获取所有CSV文件
    csv_files = list(Path(data_dir).rglob("*.csv"))
    print(f"找到 {len(csv_files)} 个CSV文件")
    
    # 第一步:收集所有列名
    print("\n第一步:收集列名...")
    all_columns = set()
    
    if fast_mode:
        # 高效模式:采样100个文件,直接读取文件头
        sample_size = min(100, len(csv_files))
        for csv_file in csv_files[:sample_size]:
            try:
                with open(csv_file, 'r', encoding='utf-8-sig') as f:
                    header = f.readline().strip()
                    cols = header.split(',')
                    all_columns.update(cols)
            except:
                pass
    else:
        # 完整模式:采样200个文件,使用pandas读取
        sample_size = min(200, len(csv_files))
        for i, csv_file in enumerate(csv_files[:sample_size]):
            if i % 50 == 0:
                print(f"  采样 {i}/{sample_size}")
            try:
                df = pd.read_csv(csv_file, nrows=1, encoding='utf-8-sig')
                all_columns.update(df.columns.tolist())
            except Exception as e:
                pass
    
    # 将列名转换为列表
    all_columns = list(all_columns)
    print(f"发现 {len(all_columns)} 个不同的列")
    
    # 定义优先级列(基础信息列排在前面)
    priority_cols = ['品牌', '厂商', '车系', '车型名称', '车型_id', '年款', '销售状态', 
                     '厂商指导价(元)', '经销商报价', '级别', '能源类型', '上市时间',
                     '最大功率(kW)', '最大扭矩(N·m)', '长*宽*高(mm)', '轴距(mm)']
    
    # 重新排序列:优先级列在前,其余按字母顺序
    sorted_columns = [col for col in priority_cols if col in all_columns]
    sorted_columns.extend(sorted([col for col in all_columns if col not in priority_cols]))
    
    print(f"排序后共 {len(sorted_columns)} 个列")
    
    # 第二步:读取并合并所有文件
    print("\n第二步:读取所有文件...")
    all_data = []
    success_count = 0
    error_count = 0
    total_rows = 0
    
    progress_interval = 1000 if fast_mode else 500
    
    for i, csv_file in enumerate(csv_files):
        if i % progress_interval == 0 or i == len(csv_files) - 1:
            print(f"  读取 {i}/{len(csv_files)} | 成功: {success_count} | 失败: {error_count} | 总行数: {total_rows}")
        
        try:
            # 读取CSV
            df = pd.read_csv(csv_file, encoding='utf-8-sig', low_memory=False)
            
            if fast_mode:
                # 高效模式:使用reindex快速对齐列
                df = df.reindex(columns=sorted_columns)
            else:
                # 完整模式:筛选列并填充缺失列
                existing_cols = [col for col in sorted_columns if col in df.columns]
                df = df[existing_cols]
                
                # 添加缺失的列(填充空值)
                for col in sorted_columns:
                    if col not in df.columns:
                        df[col] = None
                
                # 按指定顺序排列列
                df = df[sorted_columns]
            
            all_data.append(df)
            success_count += 1
            total_rows += len(df)
            
        except Exception as e:
            error_count += 1
            if error_count <= 5:  # 只显示前5个错误
                print(f"  错误: {csv_file} - {e}")
    
    print(f"\n读取完成:")
    print(f"  成功: {success_count} 个文件")
    print(f"  失败: {error_count} 个文件")
    print(f"  总行数: {total_rows}")
    
    # 第三步:合并并保存
    if all_data:
        print("\n第三步:合并所有数据...")
        merged_df = pd.concat(all_data, ignore_index=True, sort=False)
        
        print(f"正在保存到 {output_file}...")
        merged_df.to_csv(output_file, index=False, encoding='utf-8-sig')
        
        file_size_mb = os.path.getsize(output_file) / (1024*1024)
        
        print(f"\n✅ 合并完成!")
        print(f"  总行数: {len(merged_df):,}")
        print(f"  总列数: {len(merged_df.columns)}")
        print(f"  文件大小: {file_size_mb:.2f} MB")
        
        # 显示数据统计
        print(f"\n📊 数据统计:")
        if '品牌' in merged_df.columns:
            print(f"  品牌数量: {merged_df['品牌'].nunique()}")
        if '车型名称' in merged_df.columns:
            print(f"  车型数量: {merged_df['车型名称'].nunique()}")
        print(f"\n  前3行数据:")
        print(merged_df.head(3).to_string())
        
        return merged_df
    else:
        print("❌ 没有数据可以合并")
        return None

关键特性(4月3日修复版新增):

  1. 列名自动收集:采样200个文件(高效模式100个)收集所有可能的列名
  2. 列排序优化:关键字段(品牌、车系、价格等)排在前面
  3. 缺失列处理:自动为缺失的列填充空值,确保数据一致性
  4. 进度显示:每500个文件(高效模式1000个)显示进度
  5. 错误处理:记录失败文件但继续处理
  6. 数据统计:合并完成后显示品牌数量、车型数量等统计信息
  7. 双模式支持:完整模式(准确)和高效模式(快速)

使用示例

# 合并car_crawled_documents目录下的所有CSV文件
# 方式1:完整模式(推荐用于最终数据)
merge_csv_files(
    data_dir='/path/to/car_crawled_documents',
    output_file='merged_car_data.csv',
    fast_mode=False
)

# 方式2:高效模式(推荐用于3800+文件的大数据量场景)
merge_csv_files(
    data_dir='/path/to/car_crawled_documents',
    output_file='merged_car_data.csv',
    fast_mode=True
)

性能对比(基于3800+文件测试):

模式 采样文件数 列名收集 数据处理 适用场景
完整模式 200 pandas读取 逐列处理 最终数据,需要完整列信息
高效模式 100 文件头读取 reindex对齐 大数据量,快速预览

常见问题

Q1: 为什么有数据串行?

原因:不同能源类型的车型,汽车之家API返回的参数顺序不同。

解决:按能源类型分别获取参数映射,分别处理数据。

Q2: 如何获取tree_menu数据?

方法1:浏览器抓包

  1. 访问 https://car.autohome.com.cn/
  2. 打开开发者工具,切换到Network
  3. 找到tree_menu接口请求
  4. 复制响应数据

方法2:从specCompare.js提取

// 在浏览器控制台执行
fetch('https://car.autohome.com.cn/javascripts/specCompare.js')
  .then(r => r.text())
  .then(t => console.log(t))

Q3: 懂车帝反爬怎么办?

策略

  1. 使用代理IP
  2. 控制请求频率(每次请求间隔0.5-1秒)
  3. 使用Selenium模拟浏览器
  4. 分批次爬取,避免一次性请求过多

Q4: 如何处理缺失数据?

建议

  1. 对于关键字段(价格、上市时间等),标记为缺失
  2. 对于配置参数,保留空值
  3. 在最终维表中添加"数据完整性"标记

Q5: 合并CSV时内存不足怎么办?

解决方案

  1. 分批处理:将文件分批读取,使用pd.concat逐步合并
  2. 只保留必要列:在合并前筛选需要的列
  3. 使用Dask:对于超大数据集,使用Dask代替Pandas
  4. 增加采样:减少列名采样数量
# 分批合并示例
def merge_csv_files_chunked(data_dir, output_file, chunk_size=1000):
    csv_files = list(Path(data_dir).rglob("*.csv"))
    
    # 先收集所有列名
    all_columns = collect_columns(csv_files[:200])
    
    # 分批处理
    for i in range(0, len(csv_files), chunk_size):
        chunk_files = csv_files[i:i+chunk_size]
        chunk_data = [process_file(f, all_columns) for f in chunk_files]
        
        # 合并当前批次
        chunk_df = pd.concat(chunk_data, ignore_index=True)
        
        # 追加到文件(第一次写入,后续追加)
        mode = 'w' if i == 0 else 'a'
        header = i == 0
        chunk_df.to_csv(output_file, mode=mode, header=header, index=False)
        
        del chunk_df  # 释放内存

版本历史

  • v1.0 (2026-04-01): 初始版本,完整爬取流程
  • v1.1 (2026-04-03): 新增Step 7 合并CSV文件功能(4月3日修复版)
    • 修复列名收集逻辑,采样200个文件确保覆盖所有列
    • 添加优先级列排序,关键字段排在前面
    • 处理缺失列,自动填充空值确保数据一致性
    • 添加详细的进度显示和错误处理
    • 支持高效模式(fast_mode)用于大数据量场景
  • v1.2 (2026-04-07): 优化Skill文档和代码结构
Installs
2
First Seen
Apr 13, 2026