auto-model-dimension-crawler
SKILL.md
汽车维表完整爬取与处理 Skill
功能说明
本Skill提供从汽车之家和懂车帝爬取车型数据,并生成标准化汽车维表的完整流程。
工作流程
┌─────────────────┐
│ 1. 获取车型ID列表 │ ← 从汽车之家tree_menu
└────────┬────────┘
↓
┌─────────────────┐
│ 2. 爬取车型数据 │ ← 从汽车之家API
└────────┬────────┘
↓
┌─────────────────┐
│ 3. 获取参数映射 │ ← 按能源类型分别获取
└────────┬────────┘
↓
┌─────────────────┐
│ 4. 处理数据 │ ← 重命名参数、解决串行
└────────┬────────┘
↓
┌─────────────────┐
│ 5. 爬取懂车帝 │ ← 经销商报价和销售状态
└────────┬────────┘
↓
┌─────────────────┐
│ 6. 合并数据 │ ← 生成最终维表
└────────┬────────┘
↓
┌─────────────────┐
│ 7. 合并CSV文件 │ ← 合并多个分散的CSV文件
└─────────────────┘
安装依赖
pip install pandas requests tqdm
快速开始
方式1:使用主脚本(推荐)
# 完整流程:从爬取到生成维表
python car_dimension_table.py --full-process
# 或分步执行
python car_dimension_table.py --step 1 # 获取车型ID
python car_dimension_table.py --step 2 # 爬取车型数据
python car_dimension_table.py --step 3 # 获取参数映射
python car_dimension_table.py --step 4 # 处理数据
python car_dimension_table.py --step 5 # 爬取懂车帝
python car_dimension_table.py --step 6 # 合并生成维表
python car_dimension_table.py --step 7 # 合并多个CSV文件
方式2:使用Python API
from car_dimension_table import CarDimensionTable
# 初始化
crawler = CarDimensionTable()
# 执行完整流程
crawler.full_process()
# 或分步执行
crawler.step1_get_car_ids() # 获取车型ID
crawler.step2_crawl_car_data() # 爬取车型数据
crawler.step3_get_param_mappings() # 获取参数映射
crawler.step4_process_data() # 处理数据
crawler.step5_crawl_dcd() # 爬取懂车帝
crawler.step6_merge_data() # 合并生成维表
crawler.step7_merge_csv_files() # 合并多个CSV文件
详细步骤说明
Step 1: 获取车型ID列表
从汽车之家tree_menu接口获取所有车型ID。
注意:tree_menu接口需要浏览器抓包获取,无法直接API调用。
替代方案:
- 访问 https://car.autohome.com.cn/javascripts/specCompare.js
- 提取tree_menu数据
- 解析为车型ID列表
# 解析tree_menu获取车型ID
def parse_tree_menu(tree_menu_data):
"""
解析汽车之家tree_menu数据,提取车型ID
Args:
tree_menu_data: tree_menu JSON数据
Returns:
DataFrame: 包含品牌、车系、seriesid的列表
"""
import pandas as pd
cars = []
for brand in tree_menu_data.get('brandlist', []):
brand_name = brand.get('name')
for factory in brand.get('factorylist', []):
factory_name = factory.get('name')
for series in factory.get('serieslist', []):
series_name = series.get('name')
series_id = series.get('id')
series_state = series.get('state') # 20=在售, 40=停产
cars.append({
'品牌': brand_name,
'厂商': factory_name,
'车系': series_name,
'seriesid': series_id,
'state': series_state
})
return pd.DataFrame(cars)
Step 2: 爬取车型数据
使用汽车之家API爬取每个车型的详细数据。
API端点:https://car-web-api.autohome.com.cn/car/param/getParamConf
def crawl_car_data(series_id, car_name):
"""
爬取单个车型的详细数据
Args:
series_id: 汽车之家车型ID
car_name: 车型名称
Returns:
DataFrame: 车型数据
"""
import requests
import pandas as pd
url = f"https://car-web-api.autohome.com.cn/car/param/getParamConf?mode=1&site=1&seriesid={series_id}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json",
"Referer": "https://www.autohome.com.cn/"
}
response = requests.get(url, headers=headers, timeout=30)
data = response.json()
if data.get('message') != 'success':
return None
result = data['result']
brand_name = result['bread']['brandname']
series_name = result['bread']['seriesname']
brand_id = result['bread']['brandid']
# 处理每个车型配置
all_cars = []
for spec in result.get('datalist', []):
car_info = {
'品牌': brand_name,
'车系': series_name,
'品牌_id': brand_id,
'车型_id': spec.get('specid'),
'年款': spec.get('condition', [''])[0] if spec.get('condition') else ''
}
# 处理参数配置
for param in spec.get('paramconflist', []):
if param is None:
continue
item_name = param.get('itemname', '')
title_id = param.get('titleid')
if title_id is None:
continue
# 处理颜色信息
if 'colorinfo' in param and param['colorinfo']:
colors = param['colorinfo'].get('list', [])
color_str = ', '.join([f"{c['name']}(¥{c['price']})" if c.get('price', 0) > 0 else c['name'] for c in colors])
car_info[f'参数_{title_id}'] = color_str
# 处理子列表
elif 'sublist' in param and param['sublist']:
sub_str = ', '.join([f"{s.get('value', '')}({s.get('name', '')})" for s in param['sublist']])
car_info[f'参数_{title_id}'] = sub_str
else:
car_info[f'参数_{title_id}'] = item_name
all_cars.append(car_info)
return pd.DataFrame(all_cars)
Step 3: 获取参数名称映射
关键步骤:按能源类型分别获取参数名称映射,避免数据串行。
def get_param_mapping(series_id):
"""从汽车之家API获取参数名称映射"""
import requests
url = f"https://car-web-api.autohome.com.cn/car/param/getParamConf?mode=1&site=1&seriesid={series_id}"
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json",
"Referer": "https://www.autohome.com.cn/"
}
response = requests.get(url, headers=headers, timeout=30)
if response.status_code == 200:
data = response.json()
if data.get('message') == 'success' and data.get('result'):
param_names = {}
for title in data['result'].get('titlelist', []):
for item in title.get('items', []):
titleid = item.get('titleid')
itemname = item.get('itemname', '')
if titleid and itemname:
param_names[f"参数_{titleid}"] = itemname
return param_names
return {}
# 获取各种能源类型的参数映射
def download_all_param_mappings():
"""下载所有能源类型的参数名称映射"""
car_samples = {
'纯电动': '5026', # AION S
'汽油': '3301', # 景逸S50
'柴油': '7798', # 212 T01
'插电式混合动力': '5323', # 腾势X
'增程式': '6576', # 理想L9
}
mappings = {}
for energy_type, series_id in car_samples.items():
print(f"获取【{energy_type}】参数映射...")
mapping = get_param_mapping(series_id)
mappings[energy_type] = mapping
# 保存到文件
import json
with open(f'param_mappings/mapping_{energy_type}.json', 'w', encoding='utf-8') as f:
json.dump(mapping, f, ensure_ascii=False, indent=2)
return mappings
Step 4: 处理数据(解决串行问题)
def process_car_data(car_folder, mapping_dir='param_mappings'):
"""
处理车型数据,按能源类型使用正确的参数映射
关键:不同能源类型的参数顺序不同,必须分别处理
"""
import pandas as pd
import json
import os
# 加载参数映射
mappings = {}
for energy_type in ['纯电动', '汽油', '柴油', '插电式混合动力', '增程式']:
filepath = f'{mapping_dir}/mapping_{energy_type}.json'
if os.path.exists(filepath):
with open(filepath, 'r', encoding='utf-8') as f:
mappings[energy_type] = json.load(f)
# 回退映射
fallback = {
'汽油+48V轻混系统': '汽油',
'油电混合': '汽油',
'汽油+CNG': '汽油',
'柴油+48V轻混系统': '柴油',
'CNG': '汽油',
'汽油电驱': '汽油',
'氢燃料': '汽油',
'甲醇混动': '汽油',
'汽油+24V轻混系统': '汽油',
'-': '汽油',
}
# 处理所有车型文件
car_files = [f for f in os.listdir(car_folder) if f.endswith('.csv')]
all_data = []
for car_file in car_files:
df = pd.read_csv(f'{car_folder}/{car_file}', encoding='utf-8-sig', low_memory=False)
energy_type = df['参数_6'].iloc[0] if '参数_6' in df.columns else '汽油'
# 确定使用哪种映射
if energy_type in mappings:
mapping = mappings[energy_type].copy()
elif energy_type in fallback:
mapping = mappings[fallback[energy_type]].copy()
else:
mapping = mappings.get('汽油', {}).copy()
# 重命名列
df_renamed = df.rename(columns=mapping)
df_renamed = df_renamed.loc[:, ~df_renamed.columns.duplicated()]
all_data.append(df_renamed)
# 合并所有数据
df_final = pd.concat(all_data, ignore_index=True, sort=False)
return df_final
Step 5: 爬取懂车帝经销商报价
def crawl_dcd_quotes(car_list, output_file='懂车帝报价.csv'):
"""
从懂车帝爬取经销商报价
Args:
car_list: 车型列表(品牌+车系)
output_file: 输出文件
"""
import requests
import pandas as pd
import time
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
"Accept": "application/json",
"Referer": "https://www.dongchedi.com/"
}
all_quotes = []
for brand, series in car_list:
# 1. 搜索车型获取series_id
search_url = "https://www.dongchedi.com/search"
params = {"keyword": f"{brand} {series}"}
# ... 搜索逻辑
# 2. 获取车型列表
car_list_url = f"https://www.dongchedi.com/motor/pc/car/series/car_list?series_id={series_id}"
# ... 获取车型列表
# 3. 获取经销商报价
for car_id in car_ids:
quote_url = f"https://www.dongchedi.com/motor/pc/car/series/car_dealer_price?car_id={car_id}&city_name=北京"
# ... 获取报价
time.sleep(0.5) # 避免被封
# 保存结果
df = pd.DataFrame(all_quotes)
df.to_csv(output_file, index=False, encoding='utf-8-sig')
return df
Step 6: 合并生成最终维表
def merge_data(autohome_df, dcd_df, output_file='汽车维表_最终版.csv'):
"""
合并汽车之家和懂车帝数据,生成最终维表
Args:
autohome_df: 汽车之家数据
dcd_df: 懂车帝报价数据
output_file: 输出文件
"""
import pandas as pd
# 1. 按品牌+车系+年款匹配懂车帝报价
merged = autohome_df.merge(
dcd_df,
left_on=['品牌', '车系', '年款'],
right_on=['品牌', '车系', '年款'],
how='left'
)
# 2. 判断销售状态
def judge_sales_status(row):
if pd.notna(row.get('经销商报价')):
return '在售'
elif '即将上市' in str(row.get('车型名称', '')):
return '即将上市'
else:
return '停售'
merged['销售状态'] = merged.apply(judge_sales_status, axis=1)
# 3. 选择关键字段生成维表
key_columns = [
'品牌', '车系', '年款', '车型名称', '厂商', '级别', '能源类型',
'销售状态', '厂商指导价(元)', '经销商报价', '上市时间',
# ... 其他关键字段
]
# 只保留存在的列
existing_columns = [c for c in key_columns if c in merged.columns]
final_df = merged[existing_columns]
# 保存
final_df.to_csv(output_file, index=False, encoding='utf-8-sig')
return final_df
数据验证
def validate_data(df):
"""验证数据准确性"""
results = {}
# 1. 检查串行问题
if '上市时间' in df.columns:
shangshi = df['上市时间'].dropna().astype(str)
valid_dates = shangshi[shangshi.str.match(r'^\d{4}-\d{2}-\d{2}$', na=False)]
results['上市时间准确率'] = len(valid_dates) / len(shangshi) * 100
if '环保标准' in df.columns:
huanbao = df['环保标准'].dropna().astype(str)
date_in_huanbao = huanbao[huanbao.str.match(r'^\d{4}-\d{2}-\d{2}$', na=False)]
results['环保标准串行数'] = len(date_in_huanbao)
# 2. 检查未命名列
param_cols = [c for c in df.columns if c.startswith('参数_')]
results['未命名列数'] = len(param_cols)
# 3. 数据完整性
results['总行数'] = len(df)
results['总列数'] = len(df.columns)
return results
常见问题
Q1: 为什么有数据串行?
原因:不同能源类型的车型,汽车之家API返回的参数顺序不同。
解决:按能源类型分别获取参数映射,分别处理数据。
Q2: 如何获取tree_menu数据?
方法1:浏览器抓包
- 访问 https://car.autohome.com.cn/
- 打开开发者工具,切换到Network
- 找到tree_menu接口请求
- 复制响应数据
方法2:从specCompare.js提取
// 在浏览器控制台执行
fetch('https://car.autohome.com.cn/javascripts/specCompare.js')
.then(r => r.text())
.then(t => console.log(t))
Q3: 懂车帝反爬怎么办?
策略:
- 使用代理IP
- 控制请求频率(每次请求间隔0.5-1秒)
- 使用Selenium模拟浏览器
- 分批次爬取,避免一次性请求过多
Q4: 如何处理缺失数据?
建议:
- 对于关键字段(价格、上市时间等),标记为缺失
- 对于配置参数,保留空值
- 在最终维表中添加"数据完整性"标记
Step 7: 合并多个CSV文件(4月3日修复版)
当车型数据分散存储在多个CSV文件中时(如每个车型一个文件),需要将它们合并成一个完整的维表。
使用场景:
- 爬取的数据按车型分别保存为多个CSV文件
- 需要将所有分散的数据合并成一个大表
- 处理列名不一致的情况
两种模式:
- 完整模式(默认):采样200个文件收集列名,适合需要完整列信息的场景
- 高效模式(
fast_mode=True):采样100个文件,使用reindex快速对齐,适合大数据量场景
# 使用方式1:命令行
python car_dimension_table.py --step 7
# 使用方式2:Python API
from car_dimension_table import CarDimensionTable
crawler = CarDimensionTable()
# 完整模式(默认)
crawler.step7_merge_csv_files(
data_dir='output/car',
output_file='output/merged_car_data_all.csv',
fast_mode=False # 默认
)
# 高效模式(推荐用于3800+文件的大数据量场景)
crawler.step7_merge_csv_files(
data_dir='output/car',
output_file='output/merged_car_data_all.csv',
fast_mode=True
)
完整模式代码实现
def merge_csv_files(data_dir, output_file='merged_car_data.csv', fast_mode=False):
"""
合并多个CSV文件 - 4月3日修复版本
修复内容:
1. 优化列名收集逻辑,采样200个文件确保覆盖所有可能的列
2. 添加优先级列排序,关键字段排在前面
3. 处理缺失列,自动填充空值确保数据一致性
4. 添加详细的进度显示和错误处理
Args:
data_dir: 包含CSV文件的目录路径
output_file: 合并后的输出文件路径
fast_mode: 是否使用高效模式
Returns:
DataFrame: 合并后的数据
"""
import pandas as pd
import os
from pathlib import Path
# 获取所有CSV文件
csv_files = list(Path(data_dir).rglob("*.csv"))
print(f"找到 {len(csv_files)} 个CSV文件")
# 第一步:收集所有列名
print("\n第一步:收集列名...")
all_columns = set()
if fast_mode:
# 高效模式:采样100个文件,直接读取文件头
sample_size = min(100, len(csv_files))
for csv_file in csv_files[:sample_size]:
try:
with open(csv_file, 'r', encoding='utf-8-sig') as f:
header = f.readline().strip()
cols = header.split(',')
all_columns.update(cols)
except:
pass
else:
# 完整模式:采样200个文件,使用pandas读取
sample_size = min(200, len(csv_files))
for i, csv_file in enumerate(csv_files[:sample_size]):
if i % 50 == 0:
print(f" 采样 {i}/{sample_size}")
try:
df = pd.read_csv(csv_file, nrows=1, encoding='utf-8-sig')
all_columns.update(df.columns.tolist())
except Exception as e:
pass
# 将列名转换为列表
all_columns = list(all_columns)
print(f"发现 {len(all_columns)} 个不同的列")
# 定义优先级列(基础信息列排在前面)
priority_cols = ['品牌', '厂商', '车系', '车型名称', '车型_id', '年款', '销售状态',
'厂商指导价(元)', '经销商报价', '级别', '能源类型', '上市时间',
'最大功率(kW)', '最大扭矩(N·m)', '长*宽*高(mm)', '轴距(mm)']
# 重新排序列:优先级列在前,其余按字母顺序
sorted_columns = [col for col in priority_cols if col in all_columns]
sorted_columns.extend(sorted([col for col in all_columns if col not in priority_cols]))
print(f"排序后共 {len(sorted_columns)} 个列")
# 第二步:读取并合并所有文件
print("\n第二步:读取所有文件...")
all_data = []
success_count = 0
error_count = 0
total_rows = 0
progress_interval = 1000 if fast_mode else 500
for i, csv_file in enumerate(csv_files):
if i % progress_interval == 0 or i == len(csv_files) - 1:
print(f" 读取 {i}/{len(csv_files)} | 成功: {success_count} | 失败: {error_count} | 总行数: {total_rows}")
try:
# 读取CSV
df = pd.read_csv(csv_file, encoding='utf-8-sig', low_memory=False)
if fast_mode:
# 高效模式:使用reindex快速对齐列
df = df.reindex(columns=sorted_columns)
else:
# 完整模式:筛选列并填充缺失列
existing_cols = [col for col in sorted_columns if col in df.columns]
df = df[existing_cols]
# 添加缺失的列(填充空值)
for col in sorted_columns:
if col not in df.columns:
df[col] = None
# 按指定顺序排列列
df = df[sorted_columns]
all_data.append(df)
success_count += 1
total_rows += len(df)
except Exception as e:
error_count += 1
if error_count <= 5: # 只显示前5个错误
print(f" 错误: {csv_file} - {e}")
print(f"\n读取完成:")
print(f" 成功: {success_count} 个文件")
print(f" 失败: {error_count} 个文件")
print(f" 总行数: {total_rows}")
# 第三步:合并并保存
if all_data:
print("\n第三步:合并所有数据...")
merged_df = pd.concat(all_data, ignore_index=True, sort=False)
print(f"正在保存到 {output_file}...")
merged_df.to_csv(output_file, index=False, encoding='utf-8-sig')
file_size_mb = os.path.getsize(output_file) / (1024*1024)
print(f"\n✅ 合并完成!")
print(f" 总行数: {len(merged_df):,}")
print(f" 总列数: {len(merged_df.columns)}")
print(f" 文件大小: {file_size_mb:.2f} MB")
# 显示数据统计
print(f"\n📊 数据统计:")
if '品牌' in merged_df.columns:
print(f" 品牌数量: {merged_df['品牌'].nunique()}")
if '车型名称' in merged_df.columns:
print(f" 车型数量: {merged_df['车型名称'].nunique()}")
print(f"\n 前3行数据:")
print(merged_df.head(3).to_string())
return merged_df
else:
print("❌ 没有数据可以合并")
return None
关键特性(4月3日修复版新增):
- 列名自动收集:采样200个文件(高效模式100个)收集所有可能的列名
- 列排序优化:关键字段(品牌、车系、价格等)排在前面
- 缺失列处理:自动为缺失的列填充空值,确保数据一致性
- 进度显示:每500个文件(高效模式1000个)显示进度
- 错误处理:记录失败文件但继续处理
- 数据统计:合并完成后显示品牌数量、车型数量等统计信息
- 双模式支持:完整模式(准确)和高效模式(快速)
使用示例:
# 合并car_crawled_documents目录下的所有CSV文件
# 方式1:完整模式(推荐用于最终数据)
merge_csv_files(
data_dir='/path/to/car_crawled_documents',
output_file='merged_car_data.csv',
fast_mode=False
)
# 方式2:高效模式(推荐用于3800+文件的大数据量场景)
merge_csv_files(
data_dir='/path/to/car_crawled_documents',
output_file='merged_car_data.csv',
fast_mode=True
)
性能对比(基于3800+文件测试):
| 模式 | 采样文件数 | 列名收集 | 数据处理 | 适用场景 |
|---|---|---|---|---|
| 完整模式 | 200 | pandas读取 | 逐列处理 | 最终数据,需要完整列信息 |
| 高效模式 | 100 | 文件头读取 | reindex对齐 | 大数据量,快速预览 |
常见问题
Q1: 为什么有数据串行?
原因:不同能源类型的车型,汽车之家API返回的参数顺序不同。
解决:按能源类型分别获取参数映射,分别处理数据。
Q2: 如何获取tree_menu数据?
方法1:浏览器抓包
- 访问 https://car.autohome.com.cn/
- 打开开发者工具,切换到Network
- 找到tree_menu接口请求
- 复制响应数据
方法2:从specCompare.js提取
// 在浏览器控制台执行
fetch('https://car.autohome.com.cn/javascripts/specCompare.js')
.then(r => r.text())
.then(t => console.log(t))
Q3: 懂车帝反爬怎么办?
策略:
- 使用代理IP
- 控制请求频率(每次请求间隔0.5-1秒)
- 使用Selenium模拟浏览器
- 分批次爬取,避免一次性请求过多
Q4: 如何处理缺失数据?
建议:
- 对于关键字段(价格、上市时间等),标记为缺失
- 对于配置参数,保留空值
- 在最终维表中添加"数据完整性"标记
Q5: 合并CSV时内存不足怎么办?
解决方案:
- 分批处理:将文件分批读取,使用
pd.concat逐步合并 - 只保留必要列:在合并前筛选需要的列
- 使用Dask:对于超大数据集,使用Dask代替Pandas
- 增加采样:减少列名采样数量
# 分批合并示例
def merge_csv_files_chunked(data_dir, output_file, chunk_size=1000):
csv_files = list(Path(data_dir).rglob("*.csv"))
# 先收集所有列名
all_columns = collect_columns(csv_files[:200])
# 分批处理
for i in range(0, len(csv_files), chunk_size):
chunk_files = csv_files[i:i+chunk_size]
chunk_data = [process_file(f, all_columns) for f in chunk_files]
# 合并当前批次
chunk_df = pd.concat(chunk_data, ignore_index=True)
# 追加到文件(第一次写入,后续追加)
mode = 'w' if i == 0 else 'a'
header = i == 0
chunk_df.to_csv(output_file, mode=mode, header=header, index=False)
del chunk_df # 释放内存
版本历史
- v1.0 (2026-04-01): 初始版本,完整爬取流程
- v1.1 (2026-04-03): 新增Step 7 合并CSV文件功能(4月3日修复版)
- 修复列名收集逻辑,采样200个文件确保覆盖所有列
- 添加优先级列排序,关键字段排在前面
- 处理缺失列,自动填充空值确保数据一致性
- 添加详细的进度显示和错误处理
- 支持高效模式(fast_mode)用于大数据量场景
- v1.2 (2026-04-07): 优化Skill文档和代码结构