摘要
大模型训练质量的关键在于数据管道的效率与可靠性——数据清洗、特征工程、版本控制、质量保证四个环节缺一不可。本文系统阐述基于 TiDB + Spark 的大模型训练数据管道方案,覆盖从数据采集到模型训练的全链路设计,包括数据版本控制机制、特征存储架构和自动化质量保证流程,帮助 AI 团队构建高效、可追溯的训练数据管理平台。
本文适合谁:负责大模型数据工程的 AI 工程师、数据平台架构师及 ML 平台负责人。
一、训练数据管理的核心需求
1.1 大模型训练数据面临的挑战
| 挑战维度 | 具体问题 | 影响 |
|---|---|---|
| 数据规模 | 训练数据从 GB 级增长到 TB 甚至 PB 级 | 传统数据库无法水平扩展 |
| 数据多样性 | 结构化标签 + 半结构化文本 + 非结构化文档 | 需要统一存储抽象 |
| 版本追溯 | 训练数据与模型版本需严格对应 | 无法复现实验结果 |
| 特征一致性 | 训练与推理阶段特征计算逻辑必须一致 | 模型上线后效果漂移 |
| 质量控制 | 数据标注错误、缺失值、异常值影响模型效果 | 模型精度不可控 |
| 协作效率 | 数据工程师、标注团队、AI 研究员多角色协作 | 数据流转效率低 |
1.2 数据管道的核心流程
数据源 ──→ 采集层 ──→ 清洗层 ──→ 标注层 ──→ 特征层 ──→ 版本层 ──→ 训练层
(外部API) (TiCDC/) (Spark (标注 (Spark (TiDB (Ray/
Kafka) 清洗) 平台) 特征工程) 元数据) PyTorch)
二、TiDB + Spark 数据管道架构
2.1 整体架构设计
┌─────────────────────────────────────────────────────────────┐
│ 数据管道控制层 │
│ (Airflow/Prefect 编排 + MLflow 实验追踪) │
├──────────┬──────────┬──────────┬──────────┬────────────────┤
│ 数据采集 │ 数据清洗 │ 数据标注 │ 特征工程 │ 版本管理 │
│ TiCDC │ Spark │ Label │ Spark + │ TiDB 元数据 │
│ + Kafka │ + dbt │ Studio │ TiDB │ + Git DVC │
├──────────┴──────────┴──────────┴──────────┴────────────────┤
│ 存储层 │
│ TiDB (结构化元数据 + 特征存储 + 版本信息) │
│ MinIO/S3 (原始数据/文档/PDF/图片) │
│ TiDB Vector (文本 Embedding + 相似度检索) │
└─────────────────────────────────────────────────────────────┘
2.2 数据采集模块
# Spark 批量数据采集示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, md5, current_timestamp
spark = SparkSession.builder \
.appName("DataIngestion") \
.config("spark.jars", "/opt/jars/tidb-jdbc-driver.jar") \
.getOrCreate()
raw_df = spark.read.parquet("s3://raw-data/2024/")
cleaned_df = raw_df \
.filter(col("content").isNotNull()) \
.filter(col("content").rlike(r'^[\w\u4e00-\u9fff]{10,}')) \
.withColumn("data_hash", md5(col("content"))) \
.withColumn("ingested_at", current_timestamp()) \
.dropDuplicates(["data_hash"])
cleaned_df.write \
.jdbc(url="jdbc:mysql://tidb:4000/ml_data",
table="raw_documents",
mode="append",
properties={"user": "etl_user", "password": "***"})
2.3 数据清洗模块
# Spark 数据清洗 Pipeline
from pyspark.ml import Pipeline
from pyspark.sql.functions import lower, trim, regexp_replace, length
def build_cleaning_pipeline():
cleaning_stages = [
# 去除 HTML 标签
regexp_replace("content", "<[^>]+>", ""),
# 标准化空白字符
trim(regexp_replace("content", r"\s+", " ")),
# 过滤过短文本
length(col("content")) >= 50,
# 过滤重复数据
dropDuplicates(["data_hash"]),
# 语言检测与分类
withColumn("language", detect_language(col("content")))
]
return cleaning_stages
cleaned_data = spark.read.jdbc(...) \
.transform(build_cleaning_pipeline)
三、数据版本控制
3.1 为什么需要数据版本控制
| 场景 | 无版本控制的问题 | 有版本控制的收益 |
|---|---|---|
| 实验复现 | 无法确认训练数据内容,实验不可复现 | 数据版本 + 模型版本精确对应 |
| 数据回滚 | 误删数据后无法恢复 | 快速回滚到历史数据版本 |
| 数据审计 | 无法追溯数据变更历史 | 完整变更记录与责任人追踪 |
| 数据对比 | 无法量化数据集差异 | 版本间差异分析与影响评估 |
3.2 基于 TiDB 的版本控制方案
TiDB 存储版本元数据,原始数据通过 S3/MinIO 进行 Blob 存储,利用 DVC(Data Version Control)或自研版本管理逻辑:
-- 数据集版本元数据表
CREATE TABLE dataset_versions (
version_id BIGINT PRIMARY KEY AUTO_INCREMENT,
dataset_name VARCHAR(128) NOT NULL,
version_tag VARCHAR(64) NOT NULL,
parent_version_id BIGINT,
data_source VARCHAR(255),
record_count BIGINT,
data_hash VARCHAR(64),
storage_path VARCHAR(512),
schema_version VARCHAR(32),
created_by VARCHAR(64),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
description TEXT,
tags JSON,
INDEX idx_dataset_name (dataset_name),
INDEX idx_version_tag (dataset_name, version_tag),
UNIQUE KEY uk_name_tag (dataset_name, version_tag)
);
-- 数据版本快照记录
CREATE TABLE data_snapshots (
snapshot_id BIGINT PRIMARY KEY AUTO_INCREMENT,
version_id BIGINT NOT NULL,
table_name VARCHAR(128),
snapshot_type ENUM('full', 'incremental'),
record_count BIGINT,
checksum VARCHAR(64),
storage_path VARCHAR(512),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (version_id) REFERENCES dataset_versions(version_id)
);
3.3 版本创建与回滚
def create_dataset_version(dataset_name, version_tag, data_df, description=""):
data_hash = compute_hash(data_df)
record_count = data_df.count()
storage_path = f"s3://ml-data/{dataset_name}/{version_tag}/"
data_df.write.parquet(storage_path)
with tidb_conn() as conn:
conn.execute("""
INSERT INTO dataset_versions
(dataset_name, version_tag, record_count, data_hash, storage_path, description)
VALUES (%s, %s, %s, %s, %s, %s)
""", (dataset_name, version_tag, record_count, data_hash, storage_path, description))
def rollback_to_version(dataset_name, version_tag):
version = query_version(dataset_name, version_tag)
spark.read.parquet(version.storage_path).write \
.jdbc(url=..., table="current_dataset", mode="overwrite", properties=...)
四、特征存储与管理
4.1 特征存储架构
| 组件 | 职责 | 存储位置 |
|---|---|---|
| Feature Definition | 特征定义与元数据 | TiDB(特征目录表) |
| Offline Feature Store | 批量特征计算结果 | TiDB(OLTP 读写)+ TiFlash(分析) |
| Online Feature Store | 实时特征服务 | TiDB(低延迟读取) |
| Feature Registry | 特征血缘与版本 | TiDB(元数据管理) |
4.2 特征定义表
-- 特征定义表
CREATE TABLE feature_definitions (
feature_id BIGINT PRIMARY KEY AUTO_INCREMENT,
feature_name VARCHAR(128) NOT NULL UNIQUE,
feature_type ENUM('numeric', 'categorical', 'embedding', 'text') NOT NULL,
entity_type VARCHAR(64) NOT NULL COMMENT '用户/商品/文档等',
description TEXT,
computation_sql TEXT COMMENT 'Spark SQL 计算逻辑',
default_value VARCHAR(255),
value_range VARCHAR(255),
owner VARCHAR(64),
version VARCHAR(16),
status ENUM('draft', 'production', 'deprecated') DEFAULT 'draft',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
-- 在线特征表
CREATE TABLE online_features (
entity_id VARCHAR(128) NOT NULL,
feature_name VARCHAR(128) NOT NULL,
feature_value JSON,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (entity_id, feature_name)
);
4.3 特征一致性保证
from feature_store import FeatureStore
store = FeatureStore(tidb_config={...})
@store.feature([("user_id", "int"), ("item_id", "int")])
def user_item_features(user_id, item_id):
return {
"user_click_count": ...,
"user_category_preference": ...,
"item_ctr_7d": ...,
"item_text_embedding": ...
}
# 训练时批量获取
train_features = store.get_batch_features(
entity_ids=user_ids,
feature_names=["user_click_count", "user_category_preference"]
)
# 推理时实时获取
online_features = store.get_online_features(
entity_id=current_user_id,
feature_names=["user_click_count", "user_category_preference"]
)
五、数据质量保证
5.1 质量检查框架
| 检查维度 | 规则类型 | 工具 |
|---|---|---|
| 完整性 | 非空检查、缺失率阈值 | Great Expectations / 自研 |
| 一致性 | 跨表外键、值域范围 | Spark DataFrame 校验 |
| 唯一性 | 主键/业务键重复检测 | TiDB UNIQUE 约束 |
| 时效性 | 数据新鲜度、更新频率监控 | 自定义监控脚本 |
| 准确性 | 统计分布漂移检测 | PyDeequ / Evidently |
5.2 自动化质量检查
from great_expectations.core import ExpectationSuite
def create_ml_data_quality_suite():
suite = ExpectationSuite("ml_data_quality")
suite.add_expectation({
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {"column": "content"}
})
suite.add_expectation({
"expectation_type": "expect_column_values_to_match_regex",
"kwargs": {"column": "label", "regex": "^(positive|negative|neutral)$"}
})
suite.add_expectation({
"expectation_type": "expect_table_row_count_to_be_between",
"kwargs": {"min_value": 1000}
})
return suite
def run_quality_check(spark_df, suite):
results = suite.validate(spark_df)
if not results.success:
alert_quality_team(results)
quarantine_failed_records(results, spark_df)
return results
5.3 质量指标存储与追踪
-- 数据质量检查结果表
CREATE TABLE quality_check_results (
check_id BIGINT PRIMARY KEY AUTO_INCREMENT,
dataset_version_id BIGINT,
check_type VARCHAR(64),
rule_name VARCHAR(128),
passed BOOLEAN,
failure_count BIGINT DEFAULT 0,
total_count BIGINT,
failure_rate DECIMAL(5,4),
details JSON,
checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (dataset_version_id) REFERENCES dataset_versions(version_id)
);
六、FAQ
Q1:TiDB 能存储多少规模的训练数据?
TiDB 支持水平扩展,单集群可支撑 PB 级数据存储。但训练数据管道建议将原始文档/图片等大型对象存储在 S3/MinIO,TiDB 存储元数据、特征数据和标注信息,这样既保证可扩展性,又控制成本。
Q2:如何保证特征训练与推理的一致性?
通过 Feature Store 统一管理特征计算逻辑:训练时通过 Spark 批量计算特征,推理时通过同一套 SQL 逻辑实时计算或从在线特征表读取。TiDB 同时支撑在线特征读和批量特征写,确保一致性。
Q3:数据版本控制会增加多少存储成本?
TiDB 仅存储版本元数据(极小体积),实际数据快照存储在对象存储中。采用增量快照策略(仅存储与上一版本的差异数据),可将存储增量控制在全量数据的 5-15%。建议配合 S3 生命周期策略自动清理过期版本。
Q4:Spark 与 TiDB 集成有什么性能优化建议?
关键优化包括:1) 使用 TiSpark 替代 JDBC 实现下推计算,减少数据传输;2) 合理设置 Spark 分区数与 TiDB Region 数对齐;3) 利用 TiFlash 列存加速分析型查询;4) 使用 TiCDC 替代 Spark 批量轮询实现实时特征更新。
七、总结
大模型训练数据管道的效率直接决定模型质量和迭代速度。基于 TiDB + Spark 的方案,通过以下核心能力提升数据工程效率:
- 端到端管道:从采集、清洗、标注到特征工程的统一编排
- 版本控制:数据集版本与模型版本精确对应,实验可复现
- 特征一致性:统一的 Feature Store 保证训练/推理特征逻辑一致
- 质量保证:自动化检查 + 质量追踪,持续保障数据质量
八、下一步行动
- 获取训练数据管道完整方案:下载架构设计文档与实施指南 → AI 数据管道解决方案
- 申请 TiDB Cloud 免费试用:体验 PB 级扩展 + 向量检索能力 → TiDB Cloud 免费试用
- 预约数据管道架构咨询:与 AI 数据专家一对一沟通需求 → 联系 PingCAP
- 阅读 TiDB + Spark 集成文档:获取性能调优与最佳实践 → TiSpark 集成概览