Skip to main content
Streaming allows you to send intermediate results from your distributed workers back to the client in real-time. This is particularly useful for long-running operations like image generation, video creation, or model training where users benefit from seeing progress updates.
For a complete working example of streaming with multi-GPU inference, see the Parallel SDXL Tutorial.

How Streaming Works

With fal.distributed, you can stream results from workers during execution:

Basic Streaming Example

1. Stream from Workers

In your DistributedWorker, use add_streaming_result() to send intermediate results:
from fal.distributed import DistributedWorker
import torch.distributed as dist

class StreamingWorker(DistributedWorker):
    def __call__(self, prompt: str, steps: int = 20):
        for step in range(steps):
            # Do some processing
            result = self.model.step(prompt)
            
            # Only rank 0 streams to avoid duplicates
            if self.rank == 0:
                self.add_streaming_result({
                    "step": step,
                    "progress": (step + 1) / steps,
                    "message": f"Processing step {step + 1}/{steps}"
                }, as_text_event=True)
        
        # Return final result
        return {"output": final_result}
Key points:
  • add_streaming_result(): Sends data to the client
  • as_text_event=True: Formats as Server-Sent Events (SSE)
  • Only rank 0 should stream to avoid duplicate messages

2. Create Streaming Endpoint

Define an endpoint that returns a StreamingResponse:
import fal
from fal.distributed import DistributedRunner
from fastapi.responses import StreamingResponse

class MyApp(fal.App):
    num_gpus = 2
    
    def setup(self):
        self.runner = DistributedRunner(
            worker_cls=StreamingWorker,
            world_size=self.num_gpus,
        )
    
    @fal.endpoint("/stream")
    async def stream(self, request: MyRequest) -> StreamingResponse:
        """Endpoint that streams results"""
        return StreamingResponse(
            self.runner.stream(
                request.dict(),
                as_text_events=True,
            ),
            media_type="text/event-stream",
        )

3. Consume Stream from Client

JavaScript/TypeScript:
const response = await fetch('https://your-app.fal.run/stream', {
  method: 'POST',
  headers: { 'Content-Type': 'application/json' },
  body: JSON.stringify({ prompt: "A sunset", steps: 20 })
});

const reader = response.body.getReader();
const decoder = new TextDecoder();

while (true) {
  const { done, value } = await reader.read();
  if (done) break;
  
  const text = decoder.decode(value);
  const events = text.split('\n\n');
  
  for (const event of events) {
    if (event.startsWith('data: ')) {
      const data = JSON.parse(event.slice(6));
      console.log(`Step ${data.step}: ${data.progress * 100}%`);
    }
  }
}
Python:
import fal_client

for event in fal_client.stream(
    "username/app-name",
    arguments={"prompt": "A sunset", "steps": 20}
    # path="/stream"  # Optional: defaults to "/stream", change if your endpoint uses a different path
):
    print(f"Step {event['step']}: {event['progress'] * 100}%")
If your endpoint uses a path other than /stream, specify it with the path parameter to match your @fal.endpoint() decorator.

Advanced: Streaming with Gather

Stream intermediate results from all GPUs and combine them:
class MultiGPUStreamingWorker(DistributedWorker):
    def __call__(self, prompt: str, num_steps: int = 20):
        for step in range(0, num_steps, 5):  # Stream every 5 steps
            # Generate intermediate result on this GPU
            intermediate = self.model.step(prompt)
            
            # Gather from all workers
            if self.rank == 0:
                gather_list = [
                    torch.zeros_like(intermediate, device=self.device)
                    for _ in range(self.world_size)
                ]
            else:
                gather_list = None
            
            dist.gather(intermediate, gather_list, dst=0)
            
            # Only rank 0 streams the combined result
            if self.rank == 0:
                combined = self.combine_results(gather_list)
                self.add_streaming_result({
                    "step": step,
                    "preview": combined,
                    "num_gpus": self.world_size,
                }, as_text_event=True)
            
            # Synchronize before next step
            dist.barrier()
        
        return {"final": final_result}

Best Practices

1. Stream Only from Rank 0

Avoid duplicate messages by only streaming from the main worker:
if self.rank == 0:
    self.add_streaming_result(data, as_text_event=True)

2. Throttle Stream Frequency

Don’t stream on every iteration - use intervals:
if step % 5 == 0:  # Every 5 steps
    self.add_streaming_result(...)

3. Use Synchronization

Synchronize workers after streaming to maintain consistency:
if self.rank == 0:
    self.add_streaming_result(data, as_text_event=True)

dist.barrier()  # Wait for all workers

4. Keep Payloads Small

Stream minimal data for responsiveness:
# Good: Small progress updates
self.add_streaming_result({
    "step": step,
    "progress": 0.5,
})

# Avoid: Large data in every update
# self.add_streaming_result({"large_array": [...]})

5. Handle Images Efficiently

For streaming images, use base64 encoding:
import base64
import io

# Convert PIL image to base64
buffer = io.BytesIO()
image.save(buffer, format="JPEG")
image_b64 = base64.b64encode(buffer.getvalue()).decode()

self.add_streaming_result({
    "preview": f"data:image/jpeg;base64,{image_b64}"
}, as_text_event=True)

Complete Example

See the Multi-GPU Inference Tutorial for a complete working example with streaming, including:
  • Real-time preview generation
  • Progress updates every 5 steps
  • Gathering results from multiple GPUs
  • Progressive blur effects during generation

Next Steps

I