基于 kubeflow 行分布式训练

通过 kubectl 连接集群

基于 kubeflow 进行分布式训练,需要安装kubeflow相关crd扩展及operator,并用yaml提交任务,因此需要首先准备集群,并用kubectl连接成功。

具体步骤可以参考这里

安装 Training Operator

执行安装:

kubectl apply --force-conflicts --server-side -k "https://ghfast.top/github.com/kubeflow/training-operator.git/manifests/overlays/standalone?ref=v1.8.0"

查看Training Operator是否启动成功:

kubectl get po -n kubeflow --watch
NAME                                 READY   STATUS    RESTARTS       AGE
training-operator-78f4df6758-sdxgk   1/1     Running   1 (2m4s ago)   2m8s

# 注意需要等待 READY 字段,变为1/1,这个过程约需要2分钟

验证 CRD 是否存在:

kubectl get crd | grep -i kubeflow
mpijobs.kubeflow.org                        2025-09-17T05:05:08Z
mxjobs.kubeflow.org                         2025-09-17T05:05:09Z
paddlejobs.kubeflow.org                     2025-09-17T05:05:11Z
pytorchjobs.kubeflow.org                    2025-09-17T05:05:13Z
tfjobs.kubeflow.org                         2025-09-17T05:05:15Z
xgboostjobs.kubeflow.org                    2025-09-17T05:05:17Z

准备代码

将以下代码保存为 pytorch_cpu_demo.py,准备一个共享存储卷,假设命名为t256g,传输到根目录。

注意:

# pytorch_cpu_demo.py

import os
import torch

torch.distributed.init_process_group(init_method="env://")
rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
print(f"rank {rank} world_size {world_size}")

a = torch.tensor([1])
torch.distributed.all_reduce(a)
print(f"rank {rank} world_size {world_size} result {a}")

torch.distributed.barrier()
print(f"rank {rank} world_size {world_size}")

启动训练

准备yaml文件:

---
# pytorch_cpu_demo.yaml
apiVersion: "kubeflow.org/v1"
kind: PyTorchJob
metadata:
  name: torchrun-cpu
spec:
  nprocPerNode: "1"
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              resources:
                limits:
                  cpu: "1"
                  memory: "2Gi"
              image: registry-cn-huabei1-internal.ebcloud.com/ebsys/pytorch:2.5.1-cuda12.2-python3.10-ubuntu22.04-v09
              volumeMounts:
                - mountPath: /data
                  name: my-storage
              command:
                - "torchrun"
                - "/data/pytorch_cpu_demo.py"
          volumes:
            - name: my-storage
              persistentVolumeClaim:
                claimName: t256g
    Worker:
      replicas: 3
      restartPolicy: OnFailure
      template:
        spec:
          containers:
            - name: pytorch
              resources:
                limits:
                  cpu: "1"
                  memory: "2Gi"
              image: registry-cn-huabei1-internal.ebcloud.com/ebsys/pytorch:2.5.1-cuda12.2-python3.10-ubuntu22.04-v09
              volumeMounts:
                - mountPath: /data
                  name: my-storage
              command:
                - "torchrun"
                - "/data/pytorch_cpu_demo.py"
          volumes:
            - name: my-storage
              persistentVolumeClaim:
                claimName: t256g

提交训练任务:

# 提交训练job
kubectl apply -f pytorch_cpu_demo.yaml

查看结果及清理

# 查看训练job进度
kubectl get pytorchjob
NAME           STATE     AGE
torchrun-cpu   Created   5s

# 查看日志
kubectl logs -f torchrun-cpu-master-0

# 删除训练job
kubectl delete pytorchjob torchrun-cpu