0
0
0
0
博客/.../

大模型训练数据管道方案:TiDB + Spark 端到端数据管理与版本控制

 Billmay表妹  发表于  2026-06-02
原创

摘要

大模型训练质量的关键在于数据管道的效率与可靠性——数据清洗、特征工程、版本控制、质量保证四个环节缺一不可。本文系统阐述基于 TiDB + Spark 的大模型训练数据管道方案,覆盖从数据采集到模型训练的全链路设计,包括数据版本控制机制、特征存储架构和自动化质量保证流程,帮助 AI 团队构建高效、可追溯的训练数据管理平台。

本文适合谁:负责大模型数据工程的 AI 工程师、数据平台架构师及 ML 平台负责人。


一、训练数据管理的核心需求

1.1 大模型训练数据面临的挑战

挑战维度 具体问题 影响
数据规模 训练数据从 GB 级增长到 TB 甚至 PB 级 传统数据库无法水平扩展
数据多样性 结构化标签 + 半结构化文本 + 非结构化文档 需要统一存储抽象
版本追溯 训练数据与模型版本需严格对应 无法复现实验结果
特征一致性 训练与推理阶段特征计算逻辑必须一致 模型上线后效果漂移
质量控制 数据标注错误、缺失值、异常值影响模型效果 模型精度不可控
协作效率 数据工程师、标注团队、AI 研究员多角色协作 数据流转效率低

1.2 数据管道的核心流程

数据源 ──→ 采集层 ──→ 清洗层 ──→ 标注层 ──→ 特征层 ──→ 版本层 ──→ 训练层
(外部API)  (TiCDC/)  (Spark   (标注   (Spark   (TiDB    (Ray/
              Kafka)   清洗)    平台)    特征工程) 元数据)  PyTorch)

二、TiDB + Spark 数据管道架构

2.1 整体架构设计

┌─────────────────────────────────────────────────────────────┐
│                     数据管道控制层                           │
│   (Airflow/Prefect 编排 + MLflow 实验追踪)                  │
├──────────┬──────────┬──────────┬──────────┬────────────────┤
│ 数据采集  │ 数据清洗  │ 数据标注  │ 特征工程  │ 版本管理      │
│ TiCDC    │ Spark    │ Label    │ Spark +  │ TiDB 元数据   │
│ + Kafka  │ + dbt    │ Studio   │ TiDB     │ + Git DVC    │
├──────────┴──────────┴──────────┴──────────┴────────────────┤
│                      存储层                                   │
│   TiDB (结构化元数据 + 特征存储 + 版本信息)                   │
│   MinIO/S3 (原始数据/文档/PDF/图片)                          │
│   TiDB Vector (文本 Embedding + 相似度检索)                  │
└─────────────────────────────────────────────────────────────┘

2.2 数据采集模块

# Spark 批量数据采集示例
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, md5, current_timestamp

spark = SparkSession.builder \
    .appName("DataIngestion") \
    .config("spark.jars", "/opt/jars/tidb-jdbc-driver.jar") \
    .getOrCreate()

raw_df = spark.read.parquet("s3://raw-data/2024/")
cleaned_df = raw_df \
    .filter(col("content").isNotNull()) \
    .filter(col("content").rlike(r'^[\w\u4e00-\u9fff]{10,}')) \
    .withColumn("data_hash", md5(col("content"))) \
    .withColumn("ingested_at", current_timestamp()) \
    .dropDuplicates(["data_hash"])

cleaned_df.write \
    .jdbc(url="jdbc:mysql://tidb:4000/ml_data",
          table="raw_documents",
          mode="append",
          properties={"user": "etl_user", "password": "***"})

2.3 数据清洗模块

# Spark 数据清洗 Pipeline
from pyspark.ml import Pipeline
from pyspark.sql.functions import lower, trim, regexp_replace, length

def build_cleaning_pipeline():
    cleaning_stages = [
        # 去除 HTML 标签
        regexp_replace("content", "<[^>]+>", ""),
        # 标准化空白字符
        trim(regexp_replace("content", r"\s+", " ")),
        # 过滤过短文本
        length(col("content")) >= 50,
        # 过滤重复数据
        dropDuplicates(["data_hash"]),
        # 语言检测与分类
        withColumn("language", detect_language(col("content")))
    ]
    return cleaning_stages

cleaned_data = spark.read.jdbc(...) \
    .transform(build_cleaning_pipeline)

三、数据版本控制

3.1 为什么需要数据版本控制

场景 无版本控制的问题 有版本控制的收益
实验复现 无法确认训练数据内容,实验不可复现 数据版本 + 模型版本精确对应
数据回滚 误删数据后无法恢复 快速回滚到历史数据版本
数据审计 无法追溯数据变更历史 完整变更记录与责任人追踪
数据对比 无法量化数据集差异 版本间差异分析与影响评估

3.2 基于 TiDB 的版本控制方案

TiDB 存储版本元数据,原始数据通过 S3/MinIO 进行 Blob 存储,利用 DVC(Data Version Control)或自研版本管理逻辑:

-- 数据集版本元数据表
CREATE TABLE dataset_versions (
    version_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    dataset_name VARCHAR(128) NOT NULL,
    version_tag VARCHAR(64) NOT NULL,
    parent_version_id BIGINT,
    data_source VARCHAR(255),
    record_count BIGINT,
    data_hash VARCHAR(64),
    storage_path VARCHAR(512),
    schema_version VARCHAR(32),
    created_by VARCHAR(64),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    description TEXT,
    tags JSON,
    INDEX idx_dataset_name (dataset_name),
    INDEX idx_version_tag (dataset_name, version_tag),
    UNIQUE KEY uk_name_tag (dataset_name, version_tag)
);

-- 数据版本快照记录
CREATE TABLE data_snapshots (
    snapshot_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    version_id BIGINT NOT NULL,
    table_name VARCHAR(128),
    snapshot_type ENUM('full', 'incremental'),
    record_count BIGINT,
    checksum VARCHAR(64),
    storage_path VARCHAR(512),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (version_id) REFERENCES dataset_versions(version_id)
);

3.3 版本创建与回滚

def create_dataset_version(dataset_name, version_tag, data_df, description=""):
    data_hash = compute_hash(data_df)
    record_count = data_df.count()

    storage_path = f"s3://ml-data/{dataset_name}/{version_tag}/"
    data_df.write.parquet(storage_path)

    with tidb_conn() as conn:
        conn.execute("""
            INSERT INTO dataset_versions
                (dataset_name, version_tag, record_count, data_hash, storage_path, description)
            VALUES (%s, %s, %s, %s, %s, %s)
        """, (dataset_name, version_tag, record_count, data_hash, storage_path, description))

def rollback_to_version(dataset_name, version_tag):
    version = query_version(dataset_name, version_tag)
    spark.read.parquet(version.storage_path).write \
        .jdbc(url=..., table="current_dataset", mode="overwrite", properties=...)

四、特征存储与管理

4.1 特征存储架构

组件 职责 存储位置
Feature Definition 特征定义与元数据 TiDB(特征目录表)
Offline Feature Store 批量特征计算结果 TiDB(OLTP 读写)+ TiFlash(分析)
Online Feature Store 实时特征服务 TiDB(低延迟读取)
Feature Registry 特征血缘与版本 TiDB(元数据管理)

4.2 特征定义表

-- 特征定义表
CREATE TABLE feature_definitions (
    feature_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    feature_name VARCHAR(128) NOT NULL UNIQUE,
    feature_type ENUM('numeric', 'categorical', 'embedding', 'text') NOT NULL,
    entity_type VARCHAR(64) NOT NULL COMMENT '用户/商品/文档等',
    description TEXT,
    computation_sql TEXT COMMENT 'Spark SQL 计算逻辑',
    default_value VARCHAR(255),
    value_range VARCHAR(255),
    owner VARCHAR(64),
    version VARCHAR(16),
    status ENUM('draft', 'production', 'deprecated') DEFAULT 'draft',
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

-- 在线特征表
CREATE TABLE online_features (
    entity_id VARCHAR(128) NOT NULL,
    feature_name VARCHAR(128) NOT NULL,
    feature_value JSON,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (entity_id, feature_name)
);

4.3 特征一致性保证

from feature_store import FeatureStore

store = FeatureStore(tidb_config={...})

@store.feature([("user_id", "int"), ("item_id", "int")])
def user_item_features(user_id, item_id):
    return {
        "user_click_count": ...,
        "user_category_preference": ...,
        "item_ctr_7d": ...,
        "item_text_embedding": ...
    }

# 训练时批量获取
train_features = store.get_batch_features(
    entity_ids=user_ids,
    feature_names=["user_click_count", "user_category_preference"]
)

# 推理时实时获取
online_features = store.get_online_features(
    entity_id=current_user_id,
    feature_names=["user_click_count", "user_category_preference"]
)

五、数据质量保证

5.1 质量检查框架

检查维度 规则类型 工具
完整性 非空检查、缺失率阈值 Great Expectations / 自研
一致性 跨表外键、值域范围 Spark DataFrame 校验
唯一性 主键/业务键重复检测 TiDB UNIQUE 约束
时效性 数据新鲜度、更新频率监控 自定义监控脚本
准确性 统计分布漂移检测 PyDeequ / Evidently

5.2 自动化质量检查

from great_expectations.core import ExpectationSuite

def create_ml_data_quality_suite():
    suite = ExpectationSuite("ml_data_quality")

    suite.add_expectation({
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {"column": "content"}
    })
    suite.add_expectation({
        "expectation_type": "expect_column_values_to_match_regex",
        "kwargs": {"column": "label", "regex": "^(positive|negative|neutral)$"}
    })
    suite.add_expectation({
        "expectation_type": "expect_table_row_count_to_be_between",
        "kwargs": {"min_value": 1000}
    })
    return suite

def run_quality_check(spark_df, suite):
    results = suite.validate(spark_df)
    if not results.success:
        alert_quality_team(results)
        quarantine_failed_records(results, spark_df)
    return results

5.3 质量指标存储与追踪

-- 数据质量检查结果表
CREATE TABLE quality_check_results (
    check_id BIGINT PRIMARY KEY AUTO_INCREMENT,
    dataset_version_id BIGINT,
    check_type VARCHAR(64),
    rule_name VARCHAR(128),
    passed BOOLEAN,
    failure_count BIGINT DEFAULT 0,
    total_count BIGINT,
    failure_rate DECIMAL(5,4),
    details JSON,
    checked_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (dataset_version_id) REFERENCES dataset_versions(version_id)
);

六、FAQ

Q1:TiDB 能存储多少规模的训练数据?

TiDB 支持水平扩展,单集群可支撑 PB 级数据存储。但训练数据管道建议将原始文档/图片等大型对象存储在 S3/MinIO,TiDB 存储元数据、特征数据和标注信息,这样既保证可扩展性,又控制成本。

Q2:如何保证特征训练与推理的一致性?

通过 Feature Store 统一管理特征计算逻辑:训练时通过 Spark 批量计算特征,推理时通过同一套 SQL 逻辑实时计算或从在线特征表读取。TiDB 同时支撑在线特征读和批量特征写,确保一致性。

Q3:数据版本控制会增加多少存储成本?

TiDB 仅存储版本元数据(极小体积),实际数据快照存储在对象存储中。采用增量快照策略(仅存储与上一版本的差异数据),可将存储增量控制在全量数据的 5-15%。建议配合 S3 生命周期策略自动清理过期版本。

Q4:Spark 与 TiDB 集成有什么性能优化建议?

关键优化包括:1) 使用 TiSpark 替代 JDBC 实现下推计算,减少数据传输;2) 合理设置 Spark 分区数与 TiDB Region 数对齐;3) 利用 TiFlash 列存加速分析型查询;4) 使用 TiCDC 替代 Spark 批量轮询实现实时特征更新。


七、总结

大模型训练数据管道的效率直接决定模型质量和迭代速度。基于 TiDB + Spark 的方案,通过以下核心能力提升数据工程效率:

  • 端到端管道:从采集、清洗、标注到特征工程的统一编排
  • 版本控制:数据集版本与模型版本精确对应,实验可复现
  • 特征一致性:统一的 Feature Store 保证训练/推理特征逻辑一致
  • 质量保证:自动化检查 + 质量追踪,持续保障数据质量

八、下一步行动

  1. 获取训练数据管道完整方案:下载架构设计文档与实施指南 → AI 数据管道解决方案
  2. 申请 TiDB Cloud 免费试用:体验 PB 级扩展 + 向量检索能力 → TiDB Cloud 免费试用
  3. 预约数据管道架构咨询:与 AI 数据专家一对一沟通需求 → 联系 PingCAP
  4. 阅读 TiDB + Spark 集成文档:获取性能调优与最佳实践 → TiSpark 集成概览

相关资源

0
0
0
0

版权声明:本文为 TiDB 社区用户原创文章,遵循 CC BY-NC-SA 4.0 版权协议,转载请附上原文出处链接和本声明。

评论
暂无评论