Skip to main content
from fal_client import (
    SyncClient,
    AsyncClient,
    RealtimeConnection,
    AsyncRealtimeConnection,
    Status,
    Queued,
    InProgress,
    Completed,
    SyncRequestHandle,
    AsyncRequestHandle,
    run,
    subscribe_async,
    subscribe,
    submit,
    stream,
    run_async,
    submit_async,
    stream_async,
    realtime,
    realtime_async,
    cancel,
    cancel_async,
    status,
    status_async,
    result,
    result_async,
    encode,
    encode_file,
    encode_image,
)

Classes

SyncClient

class fal_client.SyncClient

Constructor Parameters

NameTypeDefaultDescription
keystr | NoneNone-
default_timeoutfloat120.0-

Class Variables

NameTypeDefaultDescription
keystr | NoneNone-
default_timeoutfloat120.0-

Methods

cancel

def cancel(self, application: 'str', request_id: 'str') -> 'None'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: NoneType

get_handle

def get_handle(self, application: 'str', request_id: 'str') -> 'SyncRequestHandle'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: SyncRequestHandle

realtime

def realtime(self, application: 'str', *, max_buffering: 'int | None' = None, token_expiration: 'int' = 120) -> 'Iterator[RealtimeConnection]'
ParameterTypeDefaultDescription
applicationstr--
max_bufferingint | NoneNone-
token_expirationint120-
Returns: Iterator[RealtimeConnection]

result

def result(self, application: 'str', request_id: 'str') -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: dict[str, Any]

run

def run(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', timeout: 'float | None' = None, hint: 'str | None' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
Run an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return the result of the inference call directly.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
timeoutfloat | NoneNone-
hintstr | NoneNone-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

status

def status(self, application: 'str', request_id: 'str', *, with_logs: 'bool' = False) -> 'Status'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
with_logsboolFalse-
Returns: Status

stream

def stream(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '/stream', timeout: 'float | None' = None) -> 'Iterator[dict[str, Any]]'
Stream the output of an application with the given arguments (which will be JSON serialized). This is only supported at a few select applications at the moment, so be sure to first consult with the documentation of individual applications to see if this is supported.The function will iterate over each event that is streamed from the server.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr'/stream'-
timeoutfloat | NoneNone-
Returns: Iterator[dict[str, Any]]

submit

def submit(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, webhook_url: 'str | None' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'SyncRequestHandle'
Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return a handle to the request that can be used to check the status and retrieve the result of the inference call when it is done.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
webhook_urlstr | NoneNone-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: SyncRequestHandle

subscribe

def subscribe(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, with_logs: 'bool' = False, on_enqueue: 'Optional[Callable[[str], None]]' = None, on_queue_update: 'Optional[Callable[[Status], None]]' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
with_logsboolFalse-
on_enqueueOptional[Callable[str, NoneType]]None-
on_queue_updateOptional[Callable[Status, NoneType]]None-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

upload

def upload(self, data: 'str | bytes', content_type: 'str', file_name: 'str | None' = None) -> 'str'
Upload the given data blob to the CDN and return the access URL. The content type should be specified as the second argument. Use upload_file or upload_image for convenience.
ParameterTypeDefaultDescription
datastr | bytes--
content_typestr--
file_namestr | NoneNone-
Returns: str

upload_file

def upload_file(self, path: 'os.PathLike') -> 'str'
Upload a file from the local filesystem to the CDN and return the access URL.
ParameterTypeDefaultDescription
pathPathLike--
Returns: str

upload_image

def upload_image(self, image: 'Image.Image', format: 'str' = 'jpeg') -> 'str'
Upload a pillow image object to the CDN and return the access URL.
ParameterTypeDefaultDescription
imageImage.Image--
formatstr'jpeg'-

ws_connect

def ws_connect(self, application: 'str', *, path: 'str' = '', max_buffering: 'int | None' = None, token_expiration: 'int' = 120) -> "Iterator['Connection']"
ParameterTypeDefaultDescription
applicationstr--
pathstr''-
max_bufferingint | NoneNone-
token_expirationint120-

AsyncClient

class fal_client.AsyncClient

Constructor Parameters

NameTypeDefaultDescription
keystr | NoneNone-
default_timeoutfloat120.0-

Class Variables

NameTypeDefaultDescription
keystr | NoneNone-
default_timeoutfloat120.0-

Methods

cancel

async def cancel(self, application: 'str', request_id: 'str') -> 'None'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: NoneType

get_handle

def get_handle(self, application: 'str', request_id: 'str') -> 'AsyncRequestHandle'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: AsyncRequestHandle

realtime

def realtime(self, application: 'str', *, max_buffering: 'int | None' = None, token_expiration: 'int' = 120) -> 'AsyncIterator[AsyncRealtimeConnection]'
ParameterTypeDefaultDescription
applicationstr--
max_bufferingint | NoneNone-
token_expirationint120-
Returns: AsyncIterator[AsyncRealtimeConnection]

result

async def result(self, application: 'str', request_id: 'str') -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: dict[str, Any]

run

async def run(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', timeout: 'float | None' = None, hint: 'str | None' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
Run an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return the result of the inference call directly.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
timeoutfloat | NoneNone-
hintstr | NoneNone-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

status

async def status(self, application: 'str', request_id: 'str', *, with_logs: 'bool' = False) -> 'Status'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
with_logsboolFalse-
Returns: Status

stream

def stream(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '/stream', timeout: 'float | None' = None) -> 'AsyncIterator[dict[str, Any]]'
Stream the output of an application with the given arguments (which will be JSON serialized). This is only supported at a few select applications at the moment, so be sure to first consult with the documentation of individual applications to see if this is supported.The function will iterate over each event that is streamed from the server.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr'/stream'-
timeoutfloat | NoneNone-
Returns: AsyncIterator[dict[str, Any]]

submit

async def submit(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, webhook_url: 'str | None' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'AsyncRequestHandle'
Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return a handle to the request that can be used to check the status and retrieve the result of the inference call when it is done.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
webhook_urlstr | NoneNone-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: AsyncRequestHandle

subscribe

async def subscribe(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, with_logs: 'bool' = False, on_enqueue: 'Optional[Callable[[str], None]]' = None, on_queue_update: 'Optional[Callable[[Status], None]]' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
with_logsboolFalse-
on_enqueueOptional[Callable[str, NoneType]]None-
on_queue_updateOptional[Callable[Status, NoneType]]None-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

upload

async def upload(self, data: 'str | bytes', content_type: 'str', file_name: 'str | None' = None) -> 'str'
Upload the given data blob to the CDN and return the access URL. The content type should be specified as the second argument. Use upload_file or upload_image for convenience.
ParameterTypeDefaultDescription
datastr | bytes--
content_typestr--
file_namestr | NoneNone-
Returns: str

upload_file

async def upload_file(self, path: 'os.PathLike') -> 'str'
Upload a file from the local filesystem to the CDN and return the access URL.
ParameterTypeDefaultDescription
pathPathLike--
Returns: str

upload_image

async def upload_image(self, image: 'Image.Image', format: 'str' = 'jpeg') -> 'str'
Upload a pillow image object to the CDN and return the access URL.
ParameterTypeDefaultDescription
imageImage.Image--
formatstr'jpeg'-

ws_connect

def ws_connect(self, application: 'str', *, path: 'str' = '', max_buffering: 'int | None' = None, token_expiration: 'int' = 120) -> "AsyncIterator['WebSocketClientProtocol']"
ParameterTypeDefaultDescription
applicationstr--
pathstr''-
max_bufferingint | NoneNone-
token_expirationint120-

RealtimeConnection

class fal_client.RealtimeConnection
Synchronous realtime connection wrapper.

Constructor Parameters

NameTypeDefaultDescription
_ws'Connection'--

Methods

close

def close(self) -> 'None'
Returns: NoneType

recv

def recv(self) -> 'dict[str, Any]'
Returns: dict[str, Any]

send

def send(self, arguments: 'dict[str, Any]') -> 'None'
ParameterTypeDefaultDescription
argumentsdict[str, Any]--
Returns: NoneType

AsyncRealtimeConnection

class fal_client.AsyncRealtimeConnection
Asynchronous realtime connection wrapper.

Constructor Parameters

NameTypeDefaultDescription
_ws'WebSocketClientProtocol'--

Methods

close

async def close(self) -> 'None'
Returns: NoneType

recv

async def recv(self) -> 'dict[str, Any]'
Returns: dict[str, Any]

send

async def send(self, arguments: 'dict[str, Any]') -> 'None'
ParameterTypeDefaultDescription
argumentsdict[str, Any]--
Returns: NoneType

Status

class fal_client.Status

Queued

class fal_client.Queued
Indicates the request is enqueued and waiting to be processed. The position field indicates the relative position in the queue (0-indexed).
Inherits from: Status

Constructor Parameters

NameTypeDefaultDescription
positionint--

Class Variables

NameTypeDefaultDescription
positionint--

InProgress

class fal_client.InProgress
Indicates the request is currently being processed. If the status operation called with the with_logs parameter set to True, the logs field will be a list of log objects.
Inherits from: Status

Constructor Parameters

NameTypeDefaultDescription
logslist[dict[str, Any]] | None--

Class Variables

NameTypeDefaultDescription
logslist[dict[str, Any]] | None--

Completed

class fal_client.Completed
Indicates the request has been completed and the result can be gathered. The logs field will contain the logs if the status operation was called with the with_logs parameter set to True. Metrics might contain the inference time, and other internal metadata (number of tokens processed, etc.).
Inherits from: Status

Constructor Parameters

NameTypeDefaultDescription
logslist[dict[str, Any]] | None--
metricsdict[str, Any]--

Class Variables

NameTypeDefaultDescription
logslist[dict[str, Any]] | None--
metricsdict[str, Any]--

SyncRequestHandle

class fal_client.SyncRequestHandle
Inherits from: _BaseRequestHandle

Constructor Parameters

NameTypeDefaultDescription
request_idstr--
response_urlstr--
status_urlstr--
cancel_urlstr--
clientClient--

Class Variables

NameTypeDefaultDescription
clienthttpx.Client--

Methods

cancel

def cancel(self) -> 'None'
Cancel the request.Returns: NoneType

from_request_id

def from_request_id(cls, client: 'httpx.Client', application: 'str', request_id: 'str') -> 'SyncRequestHandle'
ParameterTypeDefaultDescription
clientClient--
applicationstr--
request_idstr--
Returns: SyncRequestHandle

get

def get(self) -> 'AnyJSON'
Wait till the request is completed and return the result of the inference call.Returns: dict[str, Any]

iter_events

def iter_events(self, *, with_logs: 'bool' = False, interval: 'float' = 0.1) -> 'Iterator[Status]'
Continuously poll for the status of the request and yield it at each interval till the request is completed. If with_logs is True, logs will be included in the response.
ParameterTypeDefaultDescription
with_logsboolFalse-
intervalfloat0.1-
Returns: Iterator[Status]

status

def status(self, *, with_logs: 'bool' = False) -> 'Status'
Returns the status of the request (which can be one of the following: Queued, InProgress, Completed). If with_logs is True, logs will be included for InProgress and Completed statuses.
ParameterTypeDefaultDescription
with_logsboolFalse-
Returns: Status

AsyncRequestHandle

class fal_client.AsyncRequestHandle
Inherits from: _BaseRequestHandle

Constructor Parameters

NameTypeDefaultDescription
request_idstr--
response_urlstr--
status_urlstr--
cancel_urlstr--
clientAsyncClient--

Class Variables

NameTypeDefaultDescription
clienthttpx.AsyncClient--

Methods

cancel

async def cancel(self) -> 'None'
Cancel the request.Returns: NoneType

from_request_id

def from_request_id(cls, client: 'httpx.AsyncClient', application: 'str', request_id: 'str') -> 'AsyncRequestHandle'
ParameterTypeDefaultDescription
clientAsyncClient--
applicationstr--
request_idstr--
Returns: AsyncRequestHandle

get

async def get(self) -> 'AnyJSON'
Wait till the request is completed and return the result.Returns: dict[str, Any]

iter_events

def iter_events(self, *, with_logs: 'bool' = False, interval: 'float' = 0.1) -> 'AsyncIterator[Status]'
Continuously poll for the status of the request and yield it at each interval till the request is completed. If with_logs is True, logs will be included in the response.
ParameterTypeDefaultDescription
with_logsboolFalse-
intervalfloat0.1-
Returns: AsyncIterator[Status]

status

async def status(self, *, with_logs: 'bool' = False) -> 'Status'
Returns the status of the request (which can be one of the following: Queued, InProgress, Completed). If with_logs is True, logs will be included for InProgress and Completed statuses.
ParameterTypeDefaultDescription
with_logsboolFalse-
Returns: Status

Functions

run

def run(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', timeout: 'float | None' = None, hint: 'str | None' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
Run an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return the result of the inference call directly.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
timeoutfloat | NoneNone-
hintstr | NoneNone-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

subscribe_async

async def subscribe_async(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, with_logs: 'bool' = False, on_enqueue: 'Optional[Callable[[str], None]]' = None, on_queue_update: 'Optional[Callable[[Status], None]]' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
with_logsboolFalse-
on_enqueueOptional[Callable[str, NoneType]]None-
on_queue_updateOptional[Callable[Status, NoneType]]None-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

subscribe

def subscribe(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, with_logs: 'bool' = False, on_enqueue: 'Optional[Callable[[str], None]]' = None, on_queue_update: 'Optional[Callable[[Status], None]]' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
with_logsboolFalse-
on_enqueueOptional[Callable[str, NoneType]]None-
on_queue_updateOptional[Callable[Status, NoneType]]None-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

submit

def submit(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, webhook_url: 'str | None' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'SyncRequestHandle'
Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return a handle to the request that can be used to check the status and retrieve the result of the inference call when it is done.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
webhook_urlstr | NoneNone-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: SyncRequestHandle

stream

def stream(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '/stream', timeout: 'float | None' = None) -> 'Iterator[dict[str, Any]]'
Stream the output of an application with the given arguments (which will be JSON serialized). This is only supported at a few select applications at the moment, so be sure to first consult with the documentation of individual applications to see if this is supported. The function will iterate over each event that is streamed from the server.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr'/stream'-
timeoutfloat | NoneNone-
Returns: Iterator[dict[str, Any]]

run_async

async def run_async(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', timeout: 'float | None' = None, hint: 'str | None' = None, headers: 'dict[str, str]' = {}) -> 'AnyJSON'
Run an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return the result of the inference call directly.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
timeoutfloat | NoneNone-
hintstr | NoneNone-
headersdict[str, str]\{\}-
Returns: dict[str, Any]

submit_async

async def submit_async(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '', hint: 'str | None' = None, webhook_url: 'str | None' = None, priority: 'Optional[Priority]' = None, headers: 'dict[str, str]' = {}) -> 'AsyncRequestHandle'
Submit an application with the given arguments (which will be JSON serialized). The path parameter can be used to specify a subpath when applicable. This method will return a handle to the request that can be used to check the status and retrieve the result of the inference call when it is done.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr''-
hintstr | NoneNone-
webhook_urlstr | NoneNone-
priorityOptional[Literal[normal, low]]None-
headersdict[str, str]\{\}-
Returns: AsyncRequestHandle

stream_async

def stream_async(self, application: 'str', arguments: 'AnyJSON', *, path: 'str' = '/stream', timeout: 'float | None' = None) -> 'AsyncIterator[dict[str, Any]]'
Stream the output of an application with the given arguments (which will be JSON serialized). This is only supported at a few select applications at the moment, so be sure to first consult with the documentation of individual applications to see if this is supported. The function will iterate over each event that is streamed from the server.
ParameterTypeDefaultDescription
applicationstr--
argumentsdict[str, Any]--
pathstr'/stream'-
timeoutfloat | NoneNone-
Returns: AsyncIterator[dict[str, Any]]

realtime

def realtime(self, application: 'str', *, max_buffering: 'int | None' = None, token_expiration: 'int' = 120) -> 'Iterator[RealtimeConnection]'
ParameterTypeDefaultDescription
applicationstr--
max_bufferingint | NoneNone-
token_expirationint120-
Returns: Iterator[RealtimeConnection]

realtime_async

def realtime_async(self, application: 'str', *, max_buffering: 'int | None' = None, token_expiration: 'int' = 120) -> 'AsyncIterator[AsyncRealtimeConnection]'
ParameterTypeDefaultDescription
applicationstr--
max_bufferingint | NoneNone-
token_expirationint120-
Returns: AsyncIterator[AsyncRealtimeConnection]

cancel

def cancel(self, application: 'str', request_id: 'str') -> 'None'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: NoneType

cancel_async

async def cancel_async(self, application: 'str', request_id: 'str') -> 'None'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: NoneType

status

def status(self, application: 'str', request_id: 'str', *, with_logs: 'bool' = False) -> 'Status'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
with_logsboolFalse-
Returns: Status

status_async

async def status_async(self, application: 'str', request_id: 'str', *, with_logs: 'bool' = False) -> 'Status'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
with_logsboolFalse-
Returns: Status

result

def result(self, application: 'str', request_id: 'str') -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: dict[str, Any]

result_async

async def result_async(self, application: 'str', request_id: 'str') -> 'AnyJSON'
ParameterTypeDefaultDescription
applicationstr--
request_idstr--
Returns: dict[str, Any]

encode

def encode(data: 'str | bytes', content_type: 'str') -> 'str'
Encode the given data blob to a data URL with the specified content type.
ParameterTypeDefaultDescription
datastr | bytes--
content_typestr--
Returns: str

encode_file

def encode_file(path: 'os.PathLike') -> 'str'
Encode a file from the local filesystem to a data URL with the inferred content type.
ParameterTypeDefaultDescription
pathPathLike--
Returns: str

encode_image

def encode_image(image: 'Image.Image', format: 'str' = 'jpeg') -> 'str'
Encode a pillow image object to a data URL with the specified format.
ParameterTypeDefaultDescription
imageImage.Image--
formatstr'jpeg'-