1. 项目概述:为什么我坚持用 SageMaker Pipelines 做数据流水线,而不是自己搭 Airflow 或写一堆 Shell 脚本
去年底帮一家做智能仓储的客户重构他们的数据处理链路时,我第一次在生产环境里把 SageMaker Pipelines 从“试试看”推到了“全量替换”。他们原来的方案是用 EC2 上跑的 Airflow + 自研 Python 处理脚本 + S3 触发 Lambda,整套系统维护起来像在修一辆边跑边散架的老爷车——每次加一个新数据源,就得改 DAG、调权限、查 Lambda 冷启动超时、核对 S3 事件通知是否漏发。最夸张的一次,因为某个上游 CSV 文件多了一列空格,整个 pipeline 卡在数据校验环节,没人知道是哪一步出的问题,排查花了整整两天。
SageMaker Pipelines 不是另一个“又一个 workflow 工具”,它是 AWS 把 MLOps 场景里最痛的几个点——可复现性、参数化、版本追踪、执行可观测性、与训练/部署服务天然打通——全揉进一个托管服务里的结果。它不强制你用 SageMaker Training 或 Processing,但只要你用,所有输入输出、参数、代码快照、执行日志、指标图谱,全在同一个控制台里点几下就能拉出来。我试过把一个包含 7 个处理节点、3 个模型训练任务、2 个模型评估环节的完整 ML 流水线,从本地 Jupyter Notebook 里调试好,打包成 Pipeline 定义,一键发布到 SageMaker Studio,整个过程不到 12 分钟。更关键的是,当业务方第二天早上说“能不能把昨天的数据重跑一遍,但把异常检测阈值从 0.8 改成 0.65”,我只需要在 Studio 界面里点开上次执行记录,点击“重新运行”,填入新参数,回车——5 分钟后新结果就进了 S3,旧结果原封不动存着,连 Git 提交都不用做。
这篇文章讲的不是“怎么照着文档跑通一个 Hello World”,而是我踩过至少 17 次坑、重写了 4 版 Pipeline 定义、在 3 个不同规模客户现场落地后,总结出来的真实可用的构建逻辑。它适合两类人:一类是已经用着 SageMaker 但还在用 Notebook 手动跑批处理的工程师,另一类是正被 Airflow 权限配置、Docker 镜像管理、跨账户 S3 访问折腾得睡不着觉的 MLOps 新手。核心关键词就三个:SageMaker Pipelines、数据流水线、可复现执行。后面所有内容,都围绕这三点展开,不讲虚的,只说我在控制台里点过、在 CloudWatch 里查过、在 S3 里翻过日志的真实操作。
2. 整体设计思路:Pipeline 不是流程图,而是“带状态的函数调用链”
很多人第一次看 SageMaker Pipelines 的文档,会下意识把它当成一个可视化版的 Airflow DAG。这是最大的认知偏差。Airflow 的 DAG 是调度逻辑的声明,而 SageMaker Pipeline 的Pipeline对象,本质是一个可序列化的、带输入输出契约的函数定义。它的每个 Step(比如ProcessingStep、TrainingStep)不是“要执行什么”,而是“这个步骤承诺提供什么输入、消耗什么参数、产出什么输出、失败时如何回滚”。理解这一点,才能避开 80% 的设计陷阱。
举个具体例子。我们有个客户做物流时效预测,原始数据来自 Kafka(通过 MSK 导入 S3)、ERP 系统导出的 CSV、以及高德地图 API 返回的实时路况 JSON。以前他们用 Airflow,每个数据源单独一个 DAG,靠外部时间戳或文件名约定来触发下游。问题来了:如果 ERP 数据晚到 2 小时,Kafka 数据先处理完了,下游训练就拿不到完整特征。SageMaker Pipelines 怎么解?我们定义了一个DataIngestionStep,它的输入是三个 S3 URI 参数(kafka_data_uri,erp_data_uri,traffic_data_uri),输出是一个统一的 Parquet 目录ingested_data_uri。Pipeline 的执行引擎不关心这些 URI 指向的文件是否存在,它只认契约——只要这三个参数传进来,它就去调用你指定的容器镜像,把参数塞进去执行。至于那个镜像里是先检查文件存在性、还是等 5 分钟重试、还是直接报错,那是你容器的事。Pipeline 只管“调用”和“记录”。
这种设计带来的第一个好处是彻底解耦。数据团队可以独立维护ingestion.py脚本,算法团队只管写train.py,双方约定好输入输出路径和参数名,Pipeline 就是那个严守契约的中间人。第二个好处是可测试性爆炸提升。你完全可以在本地用docker run -e KAFKA_DATA_URI=s3://test-bucket/kafka/2024-06-01/ ...启动容器,验证逻辑,再把镜像推到 ECR,最后在 Pipeline 定义里引用。不需要起一套 Airflow 开发环境,也不用 mock S3 事件。第三个好处,也是最容易被忽略的,是失败恢复粒度可控。Airflow 里一个 Task 失败,要么重跑整个 DAG,要么手动标记下游为成功。SageMaker Pipelines 里,如果你把数据清洗、特征工程、模型训练拆成三个独立 Step,那么当特征工程失败时,Pipeline 引擎会自动跳过已成功的清洗 Step,只重跑特征工程及其下游,清洗阶段的输出缓存(S3 中的中间 Parquet)直接复用。这背后是 Pipeline 引擎对每个 Step 输出的CacheConfig和DependsOn关系的精确管理,不是靠人工判断。
所以我的设计铁律第一条:每个 Step 必须有明确、不可变的输入输出契约,且契约内容必须能被 Pipeline 引擎解析(即 S3 URI、字符串、数字、布尔值)。像“读取最新分区”、“获取上一次执行的输出路径”这类动态逻辑,必须封装在你的容器脚本里,不能指望 Pipeline 帮你做。第二条:Step 之间尽量用 S3 URI 传递数据,而不是用 PipelineParameter 传大段 JSON。我见过有人把整个特征配置字典序列化成字符串传给 TrainingStep,结果 Pipeline 解析时卡住,因为 Parameter 有长度限制(10KB)。URI 是无限的,S3 是可靠的。第三条:永远为每个 Step 显式设置CacheConfig。默认是关闭的,意味着每次执行都重新跑。生产环境里,除非你明确需要每次都 fresh run,否则关掉缓存等于主动放弃 Pipeline 最大的价值之一。
3. 核心细节解析:从本地脚本到可复用 Pipeline Step 的四步转化
把一个能在本地跑通的 Python 脚本变成 Pipeline 里可信赖的 Step,不是简单改个入口函数名的事。我把它拆成四个必须完成的动作,少一个,上线后准出问题。
3.1 第一步:脚本改造——从“硬编码路径”到“参数驱动契约”
假设你有一个clean_data.py,本地跑是这样:
import pandas as pd df = pd.read_csv("/home/user/data/raw.csv") df = df.dropna() df.to_parquet("/home/user/data/cleaned.parquet")这玩意儿放进 Pipeline 里就是定时炸弹。Pipeline 不知道/home/user/data/在哪,也不知道raw.csv是哪个时间点的。改造的核心,是让它接受命令行参数,并严格遵循 SageMaker Processing Job 的约定。
SageMaker Processing Job 会自动挂载两个关键目录:
/opt/ml/processing/input/:所有输入数据(S3 URI)会被下载到这里,按input_name子目录组织/opt/ml/processing/output/:所有输出数据必须写到这里,Pipeline 会自动上传到你指定的 S3 输出 URI
所以改造后的clean_data.py应该长这样:
import argparse import pandas as pd import os def main(): parser = argparse.ArgumentParser() parser.add_argument("--input-data", type=str, help="S3 URI of input data") parser.add_argument("--output-data", type=str, help="S3 URI of output data") args = parser.parse_args() # SageMaker Processing 会自动把 --input-data 对应的 S3 文件下载到 /opt/ml/processing/input/ # 所以我们直接读这个本地路径 input_path = "/opt/ml/processing/input/" # 注意:这里不能直接用 args.input_data 去读 S3,Processing Job 已经帮你做了 files = os.listdir(input_path) if not files: raise ValueError(f"No files found in {input_path}") # 假设只有一个 CSV 文件 csv_file = [f for f in files if f.endswith('.csv')][0] df = pd.read_csv(os.path.join(input_path, csv_file)) # 清洗逻辑 df = df.dropna() # 输出必须写到 /opt/ml/processing/output/ 下,Processing Job 会自动上传 output_path = "/opt/ml/processing/output/" os.makedirs(output_path, exist_ok=True) df.to_parquet(os.path.join(output_path, "cleaned.parquet")) if __name__ == "__main__": main()关键点在于:脚本不碰 S3 SDK,不写死任何路径,只依赖 Processing Job 提供的挂载点和命令行参数。--input-data和--output-data这两个参数名,会在 Pipeline 定义里被引用,必须完全一致。我建议在脚本开头加个print(f"Input: {args.input_data}, Output: {args.output_data}"),方便调试时确认参数传进来了没。
3.2 第二步:Docker 镜像构建——轻量、确定、可审计
很多新手卡在镜像这步,不是因为不会写 Dockerfile,而是没想清楚“这个镜像到底要承载什么”。它不是你的开发环境,也不是一个全能 Python 环境,它应该是一个极简、专注、只做一件事的执行单元。
我们的clean_data.py需要 pandas,那就只装 pandas。别一股脑pip install sagemaker boto3 pandas scikit-learn全装上。原因有三:第一,镜像体积越大,启动越慢,Pipeline Step 的冷启动时间直线上升;第二,依赖越多,版本冲突风险越高,今天能跑,明天 pip 升级一个包就崩;第三,安全审计时,每个多余的包都是潜在攻击面。
一个生产可用的Dockerfile示例:
FROM public.ecr.aws/sagemaker/sagemaker-scikit-learn:1.2-1-cpu-py3 # 创建工作目录 WORKDIR /opt/ml/processing/code # 复制脚本(注意:只复制 clean_data.py,不复制其他无关文件) COPY clean_data.py . # 设置入口点,让容器启动时直接运行脚本 ENTRYPOINT ["python", "clean_data.py"]这里用了 AWS 官方的 Scikit-learn 镜像,它已经预装了 pandas、numpy 等基础科学计算库,且经过 AWS 安全加固。ENTRYPOINT设为["python", "clean_data.py"],意味着容器启动时,python clean_data.py就是唯一命令,后续 Pipeline 传入的--input-data等参数会自动追加到这个命令后面。千万别用CMD,它容易被覆盖。
构建并推送镜像的命令,我固定写在一个build.sh里,避免手敲出错:
#!/bin/bash IMAGE_NAME="data-cleaner" ACCOUNT_ID="123456789012" # 替换为你的 AWS 账户 ID REGION="us-east-1" # 登录 ECR aws ecr get-login-password --region $REGION | docker login --username AWS --password-stdin $ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com # 构建 docker build -t $IMAGE_NAME . # 打标签并推送 docker tag $IMAGE_NAME:latest $ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com/$IMAGE_NAME:latest docker push $ACCOUNT_ID.dkr.ecr.$REGION.amazonaws.com/$IMAGE_NAME:latest提示:ECR 仓库名必须小写字母、数字、连字符,且不能以连字符开头或结尾。我吃过亏,建了个叫
Data-Cleaner的仓库,推送时一直报错,改成>from sagemaker.processing import ProcessingInput, ProcessingOutput ... inputs = [ ProcessingInput( source="s3://my-bucket/raw/", destination="/opt/ml/processing/input/", # 必须和脚本里读的路径一致 input_name="raw_input" ) ] outputs = [ ProcessingOutput( output_name="cleaned_output", source="/opt/ml/processing/output/", # 必须和脚本里写的路径一致 destination="s3://my-bucket/processed/" ) ]如果
destination写成/input/或/opt/ml/input/,脚本就读不到文件;如果source写成/opt/ml/processing/outputs/,Processing Job 就找不到要上传的文件,整个 Step 会卡在“上传中”状态直到超时。3.4 第四步:PipelineParameter 注入——让流水线真正“活”起来
硬编码参数是 Pipeline 的死敌。想象一下,每次换数据日期,都要改 Python 脚本、重建镜像、更新 Pipeline 定义……这和没用 Pipeline 有什么区别?真正的灵活性,在于
PipelineParameter。SageMaker Pipelines 支持三种参数类型:
ParameterString、ParameterInteger、ParameterBoolean。它们的作用,是让 Pipeline 定义本身成为一个模板,而每次执行时传入的具体值,才是驱动它运转的燃料。比如,我们想让数据清洗 Step 能动态指定日期分区:
from sagemaker.workflow.parameters import ParameterString # 定义一个字符串参数,名为 "data_date",默认值是今天 data_date = ParameterString( name="DataDate", default_value="2024-06-01" ) # 在 ProcessingInput 的 source 里使用它 inputs = [ ProcessingInput( source=f"s3://my-bucket/raw/{data_date}/", # 注意:这里直接拼接 destination="/opt/ml/processing/input/", input_name="raw_input" ) ]关键点在于:
f"s3://my-bucket/raw/{data_date}/"这个字符串,data_date是一个ParameterString对象,不是普通字符串。SageMaker Pipeline SDK 会识别它,并在 Pipeline 执行时,用实际传入的值(比如"2024-06-02")去替换。你不能在脚本里用str(data_date),那会得到一个对象地址。更进一步,你可以用参数控制整个 Step 的开关。比如,有时你想跳过清洗,直接用上一次的清洗结果:
skip_cleaning = ParameterBoolean( name="SkipCleaning", default_value=False ) # 在 Pipeline 定义里,用 DependsOn 控制依赖关系 cleaning_step = ProcessingStep(...) training_step = TrainingStep(...) # 如果 skip_cleaning 为 True,则 training_step 不依赖 cleaning_step if skip_cleaning: training_step.depends_on = [] else: training_step.depends_on = [cleaning_step]注意:
depends_on是在 Pipeline 定义时静态决定的,不是运行时动态判断。所以这种开关逻辑,要在定义 Pipeline 时就写死分支。更灵活的做法,是把开关逻辑写在你的容器脚本里,用os.environ.get("SKIP_CLEANING")读取环境变量,由 Pipeline 通过environment参数传进去。4. 实操全流程:从零开始构建一个端到端数据流水线
现在,我们把前面所有细节串起来,构建一个真实的、可立即运行的端到端数据流水线。场景是:每天从 S3 读取原始销售数据(CSV),清洗后生成特征表(Parquet),再用 XGBoost 训练一个销量预测模型,最后评估模型在验证集上的 RMSE。整个流程在 SageMaker Studio 笔记本里完成。
4.1 环境准备与依赖安装
首先,确保你的 SageMaker Studio Domain 已创建,且用户配置文件(User Profile)关联的 ExecutionRole 有足够权限(至少包括
AmazonSageMakerFullAccess和对目标 S3 Bucket 的读写权限)。打开一个新笔记本,内核选conda_python3,然后安装必要库:!pip install sagemaker boto3 pandas numpy注意:不要用
!pip install --upgrade sagemaker,SageMaker Studio 环境里预装的sagemakerSDK 版本是经过严格测试的,升级可能导致兼容性问题。如果提示版本太低,优先考虑升级 Studio 内核,而不是 SDK。4.2 数据准备与 S3 结构规划
在 S3 上创建一个桶(比如
my-company-sales-data),并规划好以下前缀:
raw/2024-06-01/sales.csv:原始数据,格式为date,product_id,quantity,pricefeatures/:清洗后特征表的输出位置models/:训练模型的输出位置evaluation/:评估结果的输出位置上传一个测试用的
sales.csv,内容如下(10 行):date,product_id,quantity,price 2024-06-01,A001,10,99.99 2024-06-01,A002,5,199.99 2024-06-01,B001,20,49.99 ...4.3 编写并构建清洗脚本镜像
在 Studio 笔记本里,创建一个新文件
clean_data.py:import argparse import pandas as pd import os import numpy as np def main(): parser = argparse.ArgumentParser() parser.add_argument("--input-data", type=str, help="S3 URI of input data") parser.add_argument("--output-data", type=str, help="S3 URI of output data") parser.add_argument("--date", type=str, help="Data date for partitioning") args = parser.parse_args() print(f"Starting cleaning for date: {args.date}") # 读取输入 input_path = "/opt/ml/processing/input/" files = os.listdir(input_path) if not files: raise ValueError(f"No files found in {input_path}") csv_file = [f for f in files if f.endswith('.csv')][0] df = pd.read_csv(os.path.join(input_path, csv_file)) # 基础清洗 df = df.dropna(subset=['quantity', 'price']) df['revenue'] = df['quantity'] * df['price'] df['date'] = pd.to_datetime(df['date']) df['day_of_week'] = df['date'].dt.dayofweek df['is_weekend'] = (df['day_of_week'] >= 5).astype(int) # 输出 output_path = "/opt/ml/processing/output/" os.makedirs(output_path, exist_ok=True) # 按日期分区保存 output_file = os.path.join(output_path, f"features_{args.date}.parquet") df.to_parquet(output_file) print(f"Cleaning completed. Output saved to {output_file}") if __name__ == "__main__": main()然后,创建
Dockerfile:FROM public.ecr.aws/sagemaker/sagemaker-scikit-learn:1.2-1-cpu-py3 WORKDIR /opt/ml/processing/code COPY clean_data.py . ENTRYPOINT ["python", "clean_data.py"]在终端里执行
build.sh(内容见 3.2 节),确保镜像成功推送到 ECR。4.4 定义 Pipeline 并发布
现在,编写 Pipeline 定义。这段代码会创建一个完整的
Pipeline对象,并发布到 SageMaker:import boto3 from sagemaker.workflow.pipeline import Pipeline from sagemaker.workflow.steps import ProcessingStep, TrainingStep from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.sklearn.estimator import SKLearn from sagemaker.workflow.parameters import ParameterString, ParameterInteger from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.inputs import TrainingInput # 1. 定义参数 data_date = ParameterString( name="DataDate", default_value="2024-06-01" ) max_depth = ParameterInteger( name="MaxDepth", default_value=6 ) # 2. 定义清洗 Step sklearn_processor = SKLearnProcessor( framework_version="1.2-1", role="arn:aws:iam::123456789012:role/MySageMakerExecutionRole", # 替换为你的角色 ARN instance_type="ml.m5.xlarge", instance_count=1, base_job_name="data-cleaner" ) cleaning_step = ProcessingStep( name="CleanRawData", processor=sklearn_processor, inputs=[ ProcessingInput( source=f"s3://my-company-sales-data/raw/{data_date}/", destination="/opt/ml/processing/input/", input_name="raw_input" ) ], outputs=[ ProcessingOutput( output_name="cleaned_features", source="/opt/ml/processing/output/", destination=f"s3://my-company-sales-data/features/{data_date}/" ) ], code="s3://my-company-sales-data/code/clean_data.py", # 也可以把脚本放 S3,这里指向 S3 路径 job_arguments=["--date", data_date], # 传参给脚本 cache_config={"enabled": True, "size_in_gb": 1} # 启用缓存,1GB 足够 ) # 3. 定义训练 Step xgb_estimator = SKLearn( entry_point="train.py", # 训练脚本,需提前准备好 framework_version="1.2-1", role="arn:aws:iam::123456789012:role/MySageMakerExecutionRole", instance_type="ml.m5.xlarge", instance_count=1, hyperparameters={"max_depth": max_depth} ) training_step = TrainingStep( name="TrainXGBoostModel", estimator=xgb_estimator, inputs={ "train": TrainingInput( s3_data=f"s3://my-company-sales-data/features/{data_date}/", content_type="text/csv" ), "validation": TrainingInput( s3_data="s3://my-company-sales-data/features/validation/", content_type="text/csv" ) } ) # 4. 创建 Pipeline 对象 pipeline = Pipeline( name="SalesPredictionPipeline", parameters=[data_date, max_depth], steps=[cleaning_step, training_step] ) # 5. 发布 Pipeline pipeline.upsert(role_arn="arn:aws:iam::123456789012:role/MySageMakerExecutionRole") print(f"Pipeline {pipeline.name} created/updated successfully.")提示:
upsert()方法是幂等的。如果同名 Pipeline 已存在,它会更新定义;如果不存在,则创建。这让你可以反复迭代 Pipeline 定义,无需手动删除。4.5 在 Studio 中执行与监控
发布成功后,打开 SageMaker Studio 的Pipelines标签页。你应该能看到
SalesPredictionPipeline列表。点击它,进入详情页,点击右上角Execute。在执行配置弹窗里:
DataDate填2024-06-01MaxDepth填6- 其他保持默认
点击Execute。Pipeline 会立即开始运行。在详情页的Execution Graph标签页,你能看到两个 Step 的状态实时变化:
CleanRawData→TrainXGBoostModel。点击任意 Step,可以查看:
- Input/Output:确认传入的 S3 URI 是否正确
- Logs:跳转到 CloudWatch Logs,查看容器内 stdout/stderr
- Metrics:如果脚本里打了
print("Step completed"),这里能看到- Artifacts:链接到 S3 中的输出文件
整个执行过程,从点击到模型训练完成,通常在 5-10 分钟内(取决于数据量和实例大小)。完成后,你会在 S3 的
models/目录下看到训练好的模型.tar.gz文件,在evaluation/目录下看到evaluation.json。5. 常见问题与排查技巧实录:那些官方文档不会告诉你的坑
即使严格按照上面步骤操作,上线初期也大概率会遇到各种“意料之外”的问题。我把过去一年里客户现场和我自己踩过的坑,整理成一张速查表,并附上独家排查技巧。
问题现象 根本原因 排查技巧 解决方案 Step 卡在 "Starting" 状态超过 10 分钟 Processing Job 的 ExecutionRole 缺少 s3:GetObject权限,导致无法下载输入数据查看 CloudWatch Logs 中 /aws/sagemaker/ProcessingJobs下对应 Job 的stdout日志,搜索PermissionDenied或NoSuchKey检查 ExecutionRole 的 IAM 策略,确保 Resource字段精确匹配 S3 URI 的 bucket 和 prefix,不要用*Step 执行失败,日志显示 No module named 'pandas'Docker 镜像里没装 pandas,或者 Dockerfile的FROM镜像版本不匹配在本地用 docker run -it <your-image> python -c "import pandas"测试用 public.ecr.aws/sagemaker/sagemaker-scikit-learn:1.2-1-cpu-py3这个官方镜像,它自带 pandas。避免自己从python:3.8-slim从头装Pipeline 执行时提示 Parameter 'DataDate' is not defined在 Pipeline构造函数的parameters列表里,漏掉了data_date这个参数对象检查 Pipeline(..., parameters=[...])这一行,确认所有在 Step 中引用的ParameterXxx对象都在这个列表里把所有参数定义放在文件顶部,统一管理,避免遗漏 训练 Step 的 TrainingInput传入的 S3 URI 在日志里显示为空TrainingInput(s3_data=...)的s3_data是一个字符串,但你传入的是ParameterString对象,没有用f""拼接在 Notebook 里打印 f"s3://bucket/{data_date}/",看输出是不是s3://bucket/<sagemaker.workflow.parameters.ParameterString object at 0x...>/必须用 f"s3://bucket/{data_date}/",让 SDK 自动解析,不能手动str(data_date)Pipeline 执行成功,但 S3 输出目录里没有文件 脚本里写的输出路径和 ProcessingOutput.source不匹配,或者脚本没执行到to_parquet()就退出了查看 CloudWatch Logs 中 stderr,搜索Exception或Traceback;同时检查ProcessingOutput.source是否等于脚本里os.path.join("/opt/ml/processing/output/", ...)的路径在脚本开头加 print("Script started"),结尾加print("Script finished"),确认脚本是否完整执行除了这张表,还有几个我反复强调的实操心得:
心得一:永远先在本地用
docker run测试容器。不要等到 Pipeline 执行失败才去查。把你的镜像拉下来,用模拟的 S3 URI 挂载一个本地目录,直接运行:mkdir -p /tmp/input /tmp/output echo "date,product_id,quantity,price\n2024-06-01,A001,10,99.99" > /tmp/input/sales.csv docker run -v /tmp/input:/opt/ml/processing/input/ -v /tmp/output:/opt/ml/processing/output/ -e INPUT_DATA=/opt/ml/processing/input/ -e OUTPUT_DATA=/opt/ml/processing/output/ your-ecr-repo/data-cleaner:latest --date 2024-06-01 ls -l /tmp/output/ # 确认 cleaned.parquet 出来了没这一步能过滤掉 90% 的脚本和镜像问题。
心得二:Pipeline 的
cache_config不是万能的,要配合depends_on使用。比如,你有一个 Step 专门做数据采样,输出一个sampled.parquet。如果这个 Step 启用了缓存,那么当sampled.parquet已存在时,它会跳过执行。但如果下游的训练 Step 依赖这个采样结果,而你又修改了采样逻辑(比如改了随机种子),缓存就会导致训练用的是旧样本。这时,你应该在训练 Step 里显式设置depends_on=[sampling_step],并禁用采样 Step 的缓存,或者给采样 Step 加一个version参数,让缓存键包含版本号。心得三:日志是你的朋友,但要看对地方。SageMaker Pipeline 的日志分散在三个地方:1) Pipeline Execution 的
CloudWatch Logs(路径/aws/sagemaker/Pipelines/...),这里记录 Pipeline 引擎本身的调度日志;2) 每个 Step 对应的ProcessingJob或TrainingJob的CloudWatch Logs(路径/aws/sagemaker/ProcessingJobs/...),这里记录容器内脚本的 stdout/stderr;3) S3 中的output_data_config.s3_uri下的output.tar.gz里的output.log(如果有的话)。绝大多数问题,看第 2 个地方就够了。记住,ProcessingJob的日志组名,就是你在ProcessingStep里设置的base_job_name加上时间戳。心得四:不要迷信“全托管”。SageMaker Pipelines 确实托管了调度、日志、状态追踪,但它不托管你的代码逻辑、不托管你的数据质量、不托管你的模型漂移。我见过太多客户,Pipeline 每天准时跑通,日志全是绿色,但业务方反馈“预测不准了”。一查才发现,上游数据源格式变了,清洗脚本没适配,Pipeline 还是照常把脏数据喂给了模型。所以,务必在每个关键 Step 后加数据校验逻辑,比如在清洗脚本末尾,用
assert len(df) > 0和assert df['quantity'].min() > 0,让问题暴露在 Pipeline 内部,而不是等业务方投诉。最后再分享一个小技巧:当你需要快速验证 Pipeline 的某个 Step 是否能独立运行时,不要删掉其他 Step。直接在
Pipeline构造函数里,把steps列表只保留你要测试的那个 Step。比如,只想测清洗,就把steps=[cleaning_step]。Pipeline 引擎会忽略所有依赖,只执行这一个 Step。这比在本地搭环境快得多,也比在 Studio 里手动触发 Processing Job 更贴近真实 Pipeline 上下文。