Chapter 4: Training Workloads: Jobs, Operators, and Frameworks

Learning Objectives

Pre-Assessment Quiz

1. What Kubernetes primitive guarantees that a specified number of Pods complete successfully before marking the workload done?

Deployment
Job
DaemonSet
StatefulSet

2. In an Indexed Job, how does each Pod know which shard of work to process?

The scheduler assigns GPU IDs to each Pod
Each Pod receives a unique JOB_COMPLETION_INDEX environment variable
The Job controller writes a ConfigMap with shard assignments
Pods negotiate shard ownership via leader election

3. What problem does gang scheduling solve in distributed training?

It ensures Pods are scheduled on nodes with the fastest GPUs
It distributes data shards evenly across workers
It prevents partial scheduling deadlock where some Pods wait idle for others
It automatically scales the number of training workers

4. Which environment variables does the Kubeflow Training Operator automatically inject into PyTorchJob Pods?

GPU_COUNT, NODE_ID, CLUSTER_SIZE
MASTER_ADDR, MASTER_PORT, WORLD_SIZE, RANK
TORCH_HOME, CUDA_VISIBLE_DEVICES, NCCL_SOCKET_IFNAME
TRAINING_EPOCHS, BATCH_SIZE, LEARNING_RATE

5. In data parallelism, what operation synchronizes gradients across all workers at the end of each training step?

Broadcast
Scatter
AllReduce
Gather

6. When should you use model parallelism instead of data parallelism?

When you want to train faster on a small dataset
When the model is too large to fit in a single GPU's VRAM
When you have only one GPU available
When the dataset is too large to fit on disk

7. What is the purpose of pipeline parallelism's micro-batch streaming?

To reduce the total number of training epochs needed
To overlap computation across pipeline stages, reducing idle time
To increase the size of each mini-batch for better convergence
To eliminate the need for gradient synchronization

8. In a distributed training checkpoint, why should only rank 0 write the checkpoint file?

Only rank 0 has access to the shared filesystem
To avoid race conditions from multiple workers writing simultaneously
Other ranks do not have the model weights in memory
Rank 0 is always the fastest worker

9. What does TorchElastic's minReplicas parameter control?

The minimum GPU memory required per worker
The minimum number of workers needed to continue training
The minimum number of training epochs
The minimum batch size per worker

10. In 3D hybrid parallelism, what are the three dimensions combined?

CPU parallelism, GPU parallelism, and network parallelism
Tensor parallelism, pipeline parallelism, and data parallelism
Forward parallelism, backward parallelism, and optimizer parallelism
Node parallelism, pod parallelism, and container parallelism

11. What signal does Kubernetes send to a Pod before eviction, allowing it to save an emergency checkpoint?

SIGKILL
SIGTERM
SIGHUP
SIGINT

12. What distinguishes the MPIJob Launcher/Worker pattern from the PyTorchJob Master/Worker pattern?

MPIJob uses GPUs on the launcher; PyTorchJob does not use a master GPU
The MPIJob launcher runs mpirun to coordinate but does no model computation itself
PyTorchJob workers cannot communicate with each other directly
MPIJob does not support fault tolerance or restarts

4.1 Kubernetes Job Primitives for Training

Before reaching for a specialized ML operator, native Kubernetes Job primitives handle a wide range of training workloads. The higher-level operators covered later are themselves built on top of these primitives.

Jobs and CronJobs

A Kubernetes Job creates one or more Pods and guarantees that a specified number of them complete successfully. For a single-node training run (e.g., nightly fine-tuning on a small model), a plain Job is sufficient.

Key Job parameters: backoffLimit controls retry count on failure; restartPolicy: OnFailure restarts failed containers in place.

A CronJob wraps a Job with a cron schedule expression, enabling recurring model refreshes (e.g., retraining a recommendation model every night from the latest data).

Indexed Jobs for Parallel Work

Indexed Jobs fan out independent work across multiple Pods. Each Pod receives a unique JOB_COMPLETION_INDEX environment variable (0, 1, 2, ...) for selecting its data shard. This is ideal for hyperparameter sweeps and data-parallel preprocessing.

Analogy: Think of an Indexed Job as a restaurant kitchen where each chef is assigned a numbered ticket. Chef 0 handles order 0, Chef 1 handles order 1, and so on. The kitchen manager (Kubernetes) ensures every ticket is completed and re-assigns if a chef calls in sick.

Init Containers for Data Preparation

Init containers run to completion before any app containers start in the same Pod. They provide a clean mechanism for downloading datasets, decompressing archives, or validating checksums before training begins. The training container does not start until all init containers exit successfully.

Key Points

4.2 Kubeflow Training Operator

When you need tightly coupled distributed training where workers communicate gradient updates in real time, native Jobs become awkward. The Kubeflow Training Operator provides Kubernetes-native custom resources (CRDs) for the most popular deep learning frameworks.

Custom Resources: PyTorchJob, TFJob, MPIJob

Custom ResourceFrameworkCommunicationUse Case
PyTorchJobPyTorchNCCL / Gloo / MPIDDP, FSDP, tensor parallelism
TFJobTensorFlowgRPC (PS or AllReduce)Classic TF distributed strategies
MPIJobAny MPI-basedMPI (OpenMPI/MPICH)Horovod, custom HPC workloads

When the Training Operator reconciles a PyTorchJob, it automatically injects MASTER_ADDR, MASTER_PORT, WORLD_SIZE, and RANK into every Pod. Your training code uses these directly through torch.distributed.init_process_group().

Worker Pod Topology

Master/Worker (PyTorchJob, TFJob): One master Pod coordinates the distributed rendezvous; workers connect back to it. The master is rank 0 in the process group.

Launcher/Worker (MPIJob): A lightweight launcher Pod runs mpirun to coordinate workers. The launcher performs no model computation; it only manages the MPI process topology.

Gang Scheduling with Volcano or Coscheduling

Gang scheduling treats all Pods in a job as an atomic unit: either all are scheduled simultaneously, or none are. This prevents partial scheduling deadlock where some workers sit idle consuming GPU resources while waiting for peers that cannot be placed.

SchedulerKey Feature
VolcanoFull batch scheduler with queue management and fair-share policies
CoschedulingLightweight gang scheduling as a kube-scheduler plugin

Gang Scheduling: All-or-Nothing Pod Placement

SCENARIO 1: Without Gang Scheduling (Deadlock) Node A (2 GPU slots) Pod 0 - Scheduled on Node A Pod 0 Other Job - occupying GPU slot Other Node B (2 GPU slots) Pod 1 - Scheduled on Node B Pod 1 Pod 2 - Scheduled on Node B Pod 2 Pod 3 - PENDING, no room anywhere Pod 3 PENDING DEADLOCK: 3 pods idle, waiting for Pod 3 SCENARIO 2: With Gang Scheduling (All-or-Nothing) Node A (2 GPU slots) Node B (2 GPU slots) Volcano Gang Scheduler Checking: minAvailable=4 Available slots: 3 REJECT: Cannot place all 4 pods Job queued. No GPUs wasted. Pod 0 Pod 1 Pod 2 Pod 3 ALL 4 PODS PLACED: Training begins!

Key Points

4.3 Distributed Training Strategies

Choosing the right parallelism strategy is one of the most consequential decisions in large-scale ML training. The wrong choice can waste GPU memory, saturate the network, or slow training by an order of magnitude.

Data Parallelism (DDP and Horovod)

Data parallelism is the most widely used strategy. Each worker holds a complete model copy and trains on a different data slice. After each step, all workers synchronize gradients via AllReduce so every worker ends up with identical averaged gradients.

Key considerations: use DistributedSampler for data sharding, scale learning rate linearly with world size (lr = base_lr * world_size), and prefer NCCL backend for GPU-to-GPU communication.

Model Parallelism and Tensor Parallelism

When a model is too large for a single GPU's VRAM, model parallelism distributes layers across devices. Each GPU processes the full mini-batch but only through its portion of the network.

Tensor parallelism is finer-grained: individual weight matrices are sharded across GPUs (e.g., Q/K/V projection matrices in transformer attention are column-partitioned).

FSDP (Fully Sharded Data Parallel) combines both: parameters, gradients, and optimizer states are all sharded across workers.

Pipeline Parallelism

Pipeline parallelism divides the model into sequential stages and streams multiple micro-batches through simultaneously. While Stage 1 processes micro-batch 2, Stage 2 processes micro-batch 1, overlapping computation to reduce idle "bubble" time.

DeepSpeed implements pipeline parallelism alongside ZeRO memory optimization (partitioning optimizer states, gradients, and parameters across ranks). Megatron-LM natively combines tensor and pipeline parallelism for models in the 10B-1T parameter range.

3D Hybrid Parallelism

For the largest models, 3D parallelism combines all three axes:

DimensionStrategyScope
Intra-layerTensor parallelismWithin a node (NVLink)
Inter-layerPipeline parallelismAcross nodes
Cross-replicaData parallelismAcross the cluster

Total GPU count = tensor degree x pipeline stages x data replicas (e.g., 8 x 4 x 16 = 512 GPUs).

Distributed Training Strategies Compared

Data Parallel Model Parallel Pipeline Parallel Worker 0 Full Model Data Shard 0 Worker 1 Full Model Data Shard 1 Forward + Backward Forward + Backward AllReduce Identical gradients on all workers Each worker: full model copy Different data per worker GPU 0 Layers 0-11 GPU 1 Layers 12-23 GPU 2 Layers 24-35 activations activations Same data, different layers Each GPU: part of the model Full data through each GPU Time --> Stage 0 Stage 1 Stage 2 Stage 0: Micro-batch 1 forward MB1 Stage 0: Micro-batch 2 forward MB2 Stage 0: Micro-batch 3 forward MB3 Stage 1: Idle (bubble) idle Stage 1: Micro-batch 1 forward MB1 Stage 1: Micro-batch 2 forward MB2 Stage 2: Idle (bubble) idle Stage 2: Idle (bubble) idle Stage 2: Micro-batch 1 forward MB1 Pipeline "bubble" = idle time Sequential stages, streamed micro-batches overlap work When to Use Each Strategy Data Parallel Model fits in GPU VRAM Scale with more data Model Parallel Model too large for 1 GPU Split layers across devices Pipeline Parallel Large model + need throughput Stream micro-batches

Key Points

4.4 Checkpointing and Fault Tolerance

A distributed training run on hundreds of GPUs can take days or weeks. Without checkpointing and fault tolerance, a single hardware failure can erase days of compute.

Periodic Checkpoint Saving

The foundation of fault tolerance is periodic checkpointing -- saving model weights, optimizer state, and training metadata to persistent shared storage. On restart, the job loads the latest checkpoint and resumes rather than starting from epoch 0.

Best practices: only rank 0 writes checkpoints (to avoid race conditions), maintain a latest.txt pointer file, and use ReadWriteMany PVCs (NFS/CephFS) or cloud object stores for shared access.

Asynchronous checkpointing reduces GPU stall time by copying state to CPU memory first (fast, non-blocking), then writing to storage in a background thread.

Automatic Restart and Recovery

Kubernetes provides basic restart via restartPolicy: OnFailure. For distributed jobs, a node failure typically kills the entire job because remaining workers cannot make progress without their peers. The Training Operator restarts all Pods, and each worker loads from the last checkpoint.

TorchFT enables sub-group recovery: when a node fails, the platform preempts lower-priority workloads to replace the failed process group without a full restart.

For spot instances, set backoffLimit generously and implement a SIGTERM handler to save an emergency checkpoint before eviction.

Elastic Training

Elastic training allows a job to continue when worker count changes mid-training. Workers can join or leave without a full restart, making spot instances viable for long training runs.

The TorchElastic Controller (TECK) supports minReplicas and maxReplicas bounds. When workers are added or removed, torchrun triggers a rendezvous round: all workers checkpoint, the updated group rejoins, and training continues with adjusted hyperparameters (e.g., learning rate scales linearly with world size).

Checkpoint and Recovery Timeline

Training Timeline Normal training: steps 0-300 CP1 Step 100 CP2 Step 200 Training continues: steps 200-300 X NODE FAILURE Step 300 Lost: steps 200-300 Pods restart Load checkpoint CP2 Resume from step 200 Resumed training: steps 200 onwards CP3 Step 300 CP4 Step 400 Done Recovery Mechanisms Periodic checkpoint (every N steps) to shared PVC restartPolicy: OnFailure triggers Pod recreation Resumed training from last checkpoint (lost work = steps since last CP)

Key Points

Post-Assessment Quiz

1. What Kubernetes primitive guarantees that a specified number of Pods complete successfully before marking the workload done?

Deployment
Job
DaemonSet
StatefulSet

2. In an Indexed Job, how does each Pod know which shard of work to process?

The scheduler assigns GPU IDs to each Pod
Each Pod receives a unique JOB_COMPLETION_INDEX environment variable
The Job controller writes a ConfigMap with shard assignments
Pods negotiate shard ownership via leader election

3. What problem does gang scheduling solve in distributed training?

It ensures Pods are scheduled on nodes with the fastest GPUs
It distributes data shards evenly across workers
It prevents partial scheduling deadlock where some Pods wait idle for others
It automatically scales the number of training workers

4. Which environment variables does the Kubeflow Training Operator automatically inject into PyTorchJob Pods?

GPU_COUNT, NODE_ID, CLUSTER_SIZE
MASTER_ADDR, MASTER_PORT, WORLD_SIZE, RANK
TORCH_HOME, CUDA_VISIBLE_DEVICES, NCCL_SOCKET_IFNAME
TRAINING_EPOCHS, BATCH_SIZE, LEARNING_RATE

5. In data parallelism, what operation synchronizes gradients across all workers at the end of each training step?

Broadcast
Scatter
AllReduce
Gather

6. When should you use model parallelism instead of data parallelism?

When you want to train faster on a small dataset
When the model is too large to fit in a single GPU's VRAM
When you have only one GPU available
When the dataset is too large to fit on disk

7. What is the purpose of pipeline parallelism's micro-batch streaming?

To reduce the total number of training epochs needed
To overlap computation across pipeline stages, reducing idle time
To increase the size of each mini-batch for better convergence
To eliminate the need for gradient synchronization

8. In a distributed training checkpoint, why should only rank 0 write the checkpoint file?

Only rank 0 has access to the shared filesystem
To avoid race conditions from multiple workers writing simultaneously
Other ranks do not have the model weights in memory
Rank 0 is always the fastest worker

9. What does TorchElastic's minReplicas parameter control?

The minimum GPU memory required per worker
The minimum number of workers needed to continue training
The minimum number of training epochs
The minimum batch size per worker

10. In 3D hybrid parallelism, what are the three dimensions combined?

CPU parallelism, GPU parallelism, and network parallelism
Tensor parallelism, pipeline parallelism, and data parallelism
Forward parallelism, backward parallelism, and optimizer parallelism
Node parallelism, pod parallelism, and container parallelism

11. What signal does Kubernetes send to a Pod before eviction, allowing it to save an emergency checkpoint?

SIGKILL
SIGTERM
SIGHUP
SIGINT

12. What distinguishes the MPIJob Launcher/Worker pattern from the PyTorchJob Master/Worker pattern?

MPIJob uses GPUs on the launcher; PyTorchJob does not use a master GPU
The MPIJob launcher runs mpirun to coordinate but does no model computation itself
PyTorchJob workers cannot communicate with each other directly
MPIJob does not support fault tolerance or restarts

Your Progress

Answer Explanations