fal.distributed
module. We’ll build a production-ready Flux LoRA training service that uses Distributed Data Parallel (DDP) for efficient multi-GPU training with real-time progress streaming.
For a comprehensive overview of multi-GPU parallelism strategies including DDP and when to use them, see the Multi-GPU Workloads Overview.
Two-App Architecture
This tutorial demonstrates a microservices architecture with two separate apps communicating with each other:- Preprocessor App (
flux-preprocessor
): Handles image preprocessing, captioning, and VAE/text encoding across multiple GPUs - Training App (
flux-training
): Handles DDP training with the preprocessed data
Architectural Note: While this workflow could be implemented as a single app, we’re using two separate apps to demonstrate how to orchestrate multiple ML models across microservices. This pattern showcases:
- Independent scaling: Scale preprocessing and training separately based on demand
- Service isolation: Each model stays warm independently (no cold starts)
- Flexible infrastructure: Different GPU types/counts for different workloads
- Inter-service communication: Apps communicate via
fal_client
API calls
Important: For this tutorial, you need to run the preprocessor app first to get its app name (username/uuid format from the
fal run
output), then pass that app name to the training app.🚀 Try this Example
View the complete source code on GitHub.Or clone this repository:Step 1: Run the preprocessor app (in terminal 1):Step 2: Run the training app (in terminal 2):Step 3: Submit a training request:
Before you run, make sure you have:
- Authenticated with fal:
fal auth login
- Activated your virtual environment (recommended):
python -m venv venv && source venv/bin/activate
- A ZIP file containing training images (10-30 images recommended)
Key Features
Microservices Architecture:- Two independent apps communicate via API calls (demonstrates orchestrating multiple ML models)
- Preprocessor app runs separately for image preprocessing, captioning, and encoding
- Training app calls preprocessor via
fal_client.submit()
for clean service separation
- Each GPU has a full copy of the model wrapped in DDP
- Each GPU processes different batches with automatic gradient synchronization
- Only Rank 0 saves the final LoRA checkpoint
Architecture Overview
This training system demonstrates a microservices architecture with two separate apps communicating via API calls:What is DDP (Distributed Data Parallel)?
DDP is a data parallelism strategy where:- Each GPU has a full model copy: All workers have identical model parameters
- Each GPU processes different data: Training data is split across GPUs
- Gradients are synchronized: After backward pass, gradients are averaged across all GPUs
- Parameters stay in sync: All GPUs update with the same averaged gradients
Code Walkthrough
We’ll walk through the code in the order you’d build it: first the preprocessor (which you can test independently), then the training app.Part 1: Preprocessor App
The preprocessor runs on separate GPUs and handles image preparation. Let’s start with the app definition:Preprocessor App Configuration
-
machine_type = "GPU-H100"
: Specifies the hardware your app runs on. Here we’re using H100 GPUs for fast preprocessing. -
num_gpus = 2
: Requests 2 GPUs per runner. Each GPU will process different images in parallel. -
keep_alive = 300
: Keeps the runner warm for 5 minutes after the last request. Avoids cold starts for subsequent requests. -
min_concurrency = 0
: Allows runners to scale down to zero when idle (saves costs). -
max_concurrency = 2
: Allows up to 2 concurrent requests. Additional requests will queue. -
requirements
: Python packages to install on the runner. Always pin versions for reproducibility.
The Setup Function
Thesetup()
function runs once when each runner starts. It’s where you load models, download weights, and initialize resources:
-
/data/
directory: This is a persistent, shared volume attached to your runner. Files stored here persist across requests and are shared between the main process and all GPU workers. Perfect for model weights that you don’t want to re-download on every request. -
snapshot_download(..., local_dir="/data/flux_weights")
: Downloads the Flux model from Hugging Face into the persistent/data/
volume. The first runner downloads it once, then subsequent runners (and requests) reuse the cached files. -
DistributedRunner(worker_cls, world_size)
: Creates a runner that orchestrates multiple GPU workers for parallel processing. See API Reference →worker_cls=FluxPreprocessorWorker
: Your custom worker class that inherits fromDistributedWorker
world_size=self.num_gpus
: Creates one worker process per GPU (2 workers for 2 GPUs)
-
await self.runner.start(model_path=model_path)
: Starts all GPU worker processes and initializes them. See API Reference →- Each worker will run its own
setup()
method to load models onto its assigned GPU - The
model_path
keyword argument is passed to each worker’ssetup()
method - Waits for all workers to signal “READY” before returning
- Each worker will run its own
setup()
completes, your app is ready to handle requests. The runner stays warm (based on keep_alive
), so subsequent requests skip this expensive setup.
Preprocessor Endpoint
The main endpoint handles the full preprocessing pipeline:fal run flux-preprocessor
, you can test it independently:
Preprocessor Worker Implementation
The worker runs on each GPU and handles the actual preprocessing. Here’s the implementation:-
setup(model_path, **kwargs)
: Called once duringrunner.start()
to load models. Themodel_path
passed torunner.start()
is received here. API Reference → -
self.device
: Each worker loads models onto its assigned GPU usingself.device
. Worker 0 usescuda:0
, worker 1 usescuda:1
, etc. -
self.rank
andself.world_size
: Used to split work. Each worker processes every Nth image where N =world_size
, starting atrank
.- Worker 0 (rank=0): processes images [0, 2, 4, 6, …]
- Worker 1 (rank=1): processes images [1, 3, 5, 7, …]
-
__call__(**kwargs)
: Receives the payload fromrunner.invoke()
as keyword arguments. Processes data and returns results. API Reference → -
self.rank_print()
: Prints with rank prefix for debugging. API Reference → - Only rank 0 returns data: The final result is assembled on rank 0 and returned. Other workers return empty dict.
Part 2: Training App
Now that preprocessing works, let’s build the training app.Training App Setup
The training app setup follows the same pattern as the preprocessor: create a
DistributedRunner
, then call start()
to initialize all workers. See the DistributedRunner API Reference for details.Training Endpoint with Preprocessing
The main endpoint calls the preprocessor, then runs training:- Calls the preprocessor app via
fal_client.subscribe()
- Downloads the preprocessed
.pt
file - Passes it to the training worker via
self.runner.invoke()
Part 3: Training Worker Implementation
The worker implements the actual DDP training logic. Here’s the key setup method:-
setup(**kwargs)
: Called once per worker duringrunner.start()
. This is where you load models, download weights, and initialize resources. See API Reference → -
self.device
: The CUDA device for this worker (cuda:0
,cuda:1
, etc.). Always load your model with.to(self.device)
. -
self.rank
: Worker ID (0 to world_size-1). Useful for rank-specific operations like saving checkpoints only on rank 0. -
self.rank_print()
: Prints messages with the rank prefix for easy debugging. See API Reference →
- Load Flux transformer on each GPU using
self.device
- Add LoRA adapters to specific layers
- Freeze base model, only train LoRA parameters
- Wrap with DDP for gradient synchronization
Training Loop with Data Distribution
The__call__()
method is called for each training request and implements the actual training loop. This method receives the payload dict from runner.invoke()
as keyword arguments. See API Reference →
- Data loading: Only rank 0 loads, then broadcasts to all workers
- Different batches per GPU: Each GPU gets different
local_indices
- Automatic gradient sync: DDP handles this during
loss.backward()
- Rank 0 saves: Only one worker saves the checkpoint to avoid conflicts
Using the Application
After running
fal run
or fal deploy
for each app, you’ll see URLs like https://fal.ai/dashboard/sdk/username/app-id/
. You can:- Test in the Playground: Click the URL or visit it in your browser to open the interactive playground and test your app
- View on Dashboard: Visit fal.ai/dashboard to see all your apps, monitor usage, and manage deployments
Important: You must provide the
preprocessor_app
parameter with the app name (username/uuid format)! Get it from the fal run flux-preprocessor
output.Test in the Playground
After deploying both apps, you can test them directly in the browser:- Preprocessor App: Open
https://fal.ai/dashboard/sdk/username/preprocessor-app-id/
to test image preprocessing - Training App: Open
https://fal.ai/dashboard/sdk/username/training-app-id/
to submit training jobs with a UI
Call from Code
For other languages (JavaScript, TypeScript, etc.) and advanced client usage, see the Client Libraries documentation.
DDP Best Practices
1. Synchronization Barriers
Use barriers when all GPUs need to wait:2. Rank-specific Operations
Only perform I/O on rank 0 to avoid conflicts:Next Steps
Multi-GPU Workloads Overview
Learn about other training strategies (Pipeline Parallelism, FSDP, etc.)
Streaming with Multi-GPU
Deep dive into streaming progress updates
Deploy Multi-GPU Inference
Learn about data parallelism for inference