Skip to main content
Streaming allows you to send progressive results to clients as your endpoint processes a request. This is ideal for showing image generation previews, video frame updates, or any operation where users benefit from seeing incremental progress.
Streaming vs Realtime: Streaming (SSE) is one-way (server → client) and ideal for progressive output. For bidirectional communication where clients send multiple requests over a persistent connection, see Realtime Endpoints.

When to Use Streaming

FeatureStreaming (SSE)Realtime (WebSocket)
DirectionOne-way (server → client)Bidirectional (client ↔ server)
ConnectionNew connection per requestPersistent, reusable
LatencyHigher (new connection each time)Lower (connection reuse)
Best forProgressive output, previewsInteractive apps, back-to-back requests
ProtocolJSON over SSEBinary msgpack
Use Streaming when:
  • You want to show progressive output (e.g., image generation previews, video updates)
  • Clients send a single request and receive multiple updates
  • You don’t need bidirectional communication
Use Realtime when:
  • Users send multiple requests in quick succession (e.g., interactive image editing)
  • You need the lowest possible latency between requests
  • You’re building interactive, back-and-forth experiences

Example: Streaming Intermediate Steps with SDXL

This example shows how to stream intermediate image previews during Stable Diffusion XL generation. It uses a TinyVAE for fast preview decoding and the pipeline’s callback system to capture progress at each step.
import fal
import json
import base64
import random
from io import BytesIO
from typing import Any
from queue import Queue

import torch
from pydantic import BaseModel, Field
from fastapi.responses import StreamingResponse


class StreamInput(BaseModel):
    prompt: str = Field(description="The prompt to generate an image from.")
    negative_prompt: str = Field(default="blurry, low quality")
    num_inference_steps: int = Field(default=20, ge=1, le=50)
    width: int = Field(default=1024)
    height: int = Field(default=1024)
    seed: int | None = Field(default=None)


class StreamingSDXLApp(fal.App):
    machine_type = "GPU-A100"

    requirements = [
        "accelerate==1.4.0",
        "diffusers==0.30.3",
        "torch==2.6.0+cu124",
        "transformers==4.47.1",
        "--extra-index-url",
        "https://download.pytorch.org/whl/cu124",
    ]

    def setup(self):
        from diffusers import AutoencoderTiny, StableDiffusionXLPipeline

        # Load the main SDXL pipeline
        self.pipeline = StableDiffusionXLPipeline.from_pretrained(
            "stabilityai/stable-diffusion-xl-base-1.0",
            torch_dtype=torch.float16,
            variant="fp16",
        ).to("cuda")

        # TinyVAE for fast preview generation (much faster than full VAE)
        self.tiny_vae = AutoencoderTiny.from_pretrained(
            "madebyollin/taesdxl",
            torch_dtype=torch.float16,
        ).to("cuda")

    @fal.endpoint("/stream")
    def stream_image(self, input: StreamInput) -> StreamingResponse:
        seed = input.seed if input.seed is not None else random.randint(0, 2**32 - 1)
        generator = torch.Generator(device="cuda").manual_seed(seed)

        # Queue to pass events from callback to the streaming generator
        event_queue: Queue[dict[str, Any] | None] = Queue()

        def pipeline_callback(
            pipe: Any,
            step: int,
            timestep: int,
            callback_kwargs: dict[str, Any],
        ) -> dict[str, Any]:
            """Called after each inference step to stream preview."""
            # Only stream every 5 steps to reduce overhead
            if step > 0 and step % 5 != 0:
                return callback_kwargs

            latents = callback_kwargs["latents"]

            # Decode latents to image using TinyVAE (fast!)
            with torch.no_grad():
                image = self.tiny_vae.decode(
                    latents / self.tiny_vae.config.scaling_factor,
                    return_dict=False,
                )[0]
                image = self.pipeline.image_processor.postprocess(
                    image, output_type="pil"
                )[0]

            # Convert to base64 data URI
            buffer = BytesIO()
            image.save(buffer, format="JPEG", quality=70)
            data_uri = f"data:image/jpeg;base64,{base64.b64encode(buffer.getvalue()).decode()}"

            # Stream the image in the format the playground expects
            event_queue.put({
                "image": {
                    "url": data_uri,
                    "content_type": "image/jpeg",
                }
            })

            return callback_kwargs

        def event_stream():
            import threading

            def run_pipeline():
                # Run the pipeline with our callback
                result = self.pipeline(
                    prompt=input.prompt,
                    negative_prompt=input.negative_prompt,
                    width=input.width,
                    height=input.height,
                    num_inference_steps=input.num_inference_steps,
                    generator=generator,
                    callback_on_step_end=pipeline_callback,
                )

                # Get final image
                final_image = result.images[0]
                buffer = BytesIO()
                final_image.save(buffer, format="JPEG", quality=95)
                data_uri = f"data:image/jpeg;base64,{base64.b64encode(buffer.getvalue()).decode()}"

                # Send final result in the same format
                event_queue.put({
                    "image": {
                        "url": data_uri,
                        "content_type": "image/jpeg",
                    }
                })
                event_queue.put(None)  # Signal completion

            # Run pipeline in background thread
            thread = threading.Thread(target=run_pipeline)
            thread.start()

            # Yield events as they come in
            while True:
                event = event_queue.get()
                if event is None:
                    break
                yield f"data: {json.dumps(event)}\n\n"

            thread.join()

        return StreamingResponse(
            event_stream(),
            media_type="text/event-stream",
        )

    @fal.endpoint("/")
    def generate(self, input: StreamInput) -> dict[str, Any]:
        """Standard non-streaming endpoint."""
        seed = input.seed if input.seed is not None else random.randint(0, 2**32 - 1)
        generator = torch.Generator(device="cuda").manual_seed(seed)

        result = self.pipeline(
            prompt=input.prompt,
            negative_prompt=input.negative_prompt,
            width=input.width,
            height=input.height,
            num_inference_steps=input.num_inference_steps,
            generator=generator,
        )

        return {"image": result.images[0], "seed": seed}

Example Details

This SDXL example demonstrates several techniques:
  • TinyVAE for previews: Uses madebyollin/taesdxl which is ~10x faster than the full VAE for decoding intermediate latents
  • Pipeline callback: Uses diffusers’ callback_on_step_end to capture progress at each denoising step
  • Throttled streaming: Only streams every 5 steps to balance responsiveness with overhead
  • Thread-safe queue: Uses a queue to safely pass events from the pipeline thread to the streaming generator
Playground Image Display FormatFor images to display correctly in the fal playground, you must include both url and content_type:
{"image": {"url": "data:image/jpeg;base64,...", "content_type": "image/jpeg"}}
The playground uses content_type to determine how to render the result. Without it, the result will be displayed as raw JSON instead of an image.For multiple images, use an array:
{"images": [{"url": "...", "content_type": "image/jpeg"}, ...]}

Client-Side Usage

Endpoint Path RequirementThe fal_client.stream() (Python) and fal.stream() (JavaScript) functions automatically append /stream to your endpoint ID. This means your app must define a streaming endpoint at /stream using @fal.endpoint("/stream").For example, calling fal_client.stream("your-username/your-app-name", ...) will connect to https://fal.run/your-username/your-app-name/stream.

Python

import fal_client

for event in fal_client.stream(
    "your-username/your-app-name",
    arguments={"prompt": "a beautiful sunset", "num_inference_steps": 20},
):
    # Each event contains {"image": {"url": "data:...", "content_type": "image/jpeg"}}
    image_url = event.get("image", {}).get("url")
    if image_url:
        print(f"Received image preview")

JavaScript

import { fal } from "@fal-ai/client";

const stream = await fal.stream("your-username/your-app-name", {
  input: {
    prompt: "a beautiful sunset",
    num_inference_steps: 20,
  },
});

for await (const event of stream) {
  // Each event contains {image: {url: "data:...", content_type: "image/jpeg"}}
  if (event.image?.url) {
    console.log("Received image preview");
    // Display the image: event.image.url is a data URI
  }
}

const finalResult = await stream.done();

Key Points

  1. Use StreamingResponse from FastAPI with media_type="text/event-stream"
  2. Format events as SSE: yield f"data: {json.dumps(payload)}\n\n" (note the double newline)
  3. Include content_type: Use {"image": {"url": "data:...", "content_type": "image/jpeg"}} for playground display
  4. Throttle streaming: Don’t stream every intermediate result—balance responsiveness with overhead
  5. Use lower quality for previews: Save bandwidth with lower resolution or compression for intermediate results

Next Steps