Skip to main content
from fal.distributed import DistributedRunner, DistributedWorker

Classes

DistributedRunner

class fal.distributed.DistributedRunner
A class to launch and manage distributed workers.

Constructor Parameters

NameTypeDefaultDescription
worker_clstype[DistributedWorker]\<class 'fal.distributed.worker.DistributedWorker'\>-
world_sizeint1-
master_addrstr'127.0.0.1'-
master_portint29500-
worker_addrstr'127.0.0.1'-
worker_portint54923-
timeoutint86400-
keepalive_payloaddict[str, Any]\{\}-
keepalive_intervalint | float | NoneTypeNone-
cwdstr | Path | NoneTypeNone-
set_deviceOptional[bool]None-

Class Variables

NameTypeDefaultDescription
zmq_socketOptional[Socket[Any]]--
contextOptional[mp.ProcessContext]--
keepalive_timerOptional[KeepAliveTimer]--

Methods

close_zmq_socket

def close_zmq_socket(self) -> 'None'
Closes the ZeroMQ socket.Returns: NoneType

ensure_alive

def ensure_alive(self) -> 'None'
Ensures that the distributed worker processes are alive. If the processes are not alive, it raises an error.Returns: NoneType

gather_errors

def gather_errors(self) -> 'list[Exception]'
Gathers errors from the distributed worker processes. This method should be called to collect any errors that occurred during execution.Returns: list[Exception]

get_zmq_socket

def get_zmq_socket(self) -> 'Socket[Any]'
Returns a ZeroMQ socket of the specified type.Returns: A ZeroMQ socket.

invoke

async def invoke(self, payload: 'dict[str, Any]' = {}, timeout: 'Optional[int]' = None) -> 'Any'
Invokes the distributed worker with the given payload.
ParameterTypeDefaultDescription
payloaddict[str, Any]\{\}The payload to send to the worker.
timeoutOptional[int]NoneThe timeout for the overall operation.
Returns: Any

is_alive

def is_alive(self) -> 'bool'
Check if the distributed worker processes are alive.Returns: bool

keepalive

def keepalive(self, timeout: 'Optional[Union[int, float]]' = 60.0) -> 'None'
Sends the keepalive payload to the worker.
ParameterTypeDefaultDescription
timeoutint | float | NoneType60.0-
Returns: NoneType

maybe_cancel_keepalive

def maybe_cancel_keepalive(self) -> 'None'
Cancels the keepalive timer if it is set.Returns: NoneType

maybe_reset_keepalive

def maybe_reset_keepalive(self) -> 'None'
Resets the keepalive timer if it is set.Returns: NoneType

maybe_start_keepalive

def maybe_start_keepalive(self) -> 'None'
Starts the keepalive timer if it is set.Returns: NoneType

run

def run(self, **kwargs: 'Any') -> 'None'
The main function to run the distributed worker. This function is called by each worker process spawned by torch.multiprocessing.spawn. This method must be synchronous.
ParameterTypeDefaultDescription
kwargsAny-The arguments to pass to the worker.
Returns: NoneType

start

async def start(self, timeout: 'int' = 1800, **kwargs: 'Any') -> 'None'
Starts the distributed worker processes.
ParameterTypeDefaultDescription
timeoutint1800The timeout for the distributed processes.
kwargsAny--
Returns: NoneType

stop

async def stop(self, timeout: 'int' = 10) -> 'None'
Stops the distributed worker processes.
ParameterTypeDefaultDescription
timeoutint10The timeout for the distributed processes to stop.
Returns: NoneType

stream

def stream(self, payload: 'dict[str, Any]' = {}, timeout: 'Optional[int]' = None, streaming_timeout: 'Optional[int]' = None, as_text_events: 'bool' = False) -> 'AsyncIterator[Any]'
Streams the result from the distributed worker.
ParameterTypeDefaultDescription
payloaddict[str, Any]\{\}The payload to send to the worker.
timeoutOptional[int]NoneThe timeout for the overall operation.
streaming_timeoutOptional[int]NoneThe timeout in-between streamed results.
as_text_eventsboolFalseWhether to yield results as text events.
Returns: AsyncIterator[Any]

terminate

def terminate(self, timeout: 'Union[int, float]' = 10) -> 'None'
Terminates the distributed worker processes. This method should be called to clean up the worker processes.
ParameterTypeDefaultDescription
timeoutint | float10-
Returns: NoneType

DistributedWorker

class fal.distributed.DistributedWorker
A base class for distributed workers.

Constructor Parameters

NameTypeDefaultDescription
rankint0-
world_sizeint1-

Class Variables

NameTypeDefaultDescription
queuequeue.Queue[bytes]--
loopasyncio.AbstractEventLoop--
threadthreading.Thread--

Properties

NameTypeDescription
device-The device for the current worker
runningboolWhether the event loop is running

Methods

add_streaming_error

def add_streaming_error(self, error: 'Exception') -> 'None'
Add an error to the queue.
ParameterTypeDefaultDescription
errorException-The error to add to the queue.
Returns: NoneType

add_streaming_result

def add_streaming_result(self, result: 'Any', image_format: 'str' = 'jpeg', as_text_event: 'bool' = False) -> 'None'
Add a streaming result to the queue.
ParameterTypeDefaultDescription
resultAny-The result to add to the queue.
image_formatstr'jpeg'-
as_text_eventboolFalse-
Returns: NoneType

initialize

def initialize(self, **kwargs: 'Any') -> 'None'
Initialize the worker.
ParameterTypeDefaultDescription
kwargsAny--
Returns: NoneType

rank_print

def rank_print(self, message: 'str', debug: 'bool' = False) -> 'None'
Print a message with the rank of the current worker.
ParameterTypeDefaultDescription
messagestr-The message to print.
debugboolFalseWhether to print the message as a debug message.
Returns: NoneType

run_in_worker

def run_in_worker(self, func: 'Callable[..., Any]', *args: 'Any', **kwargs: 'Any') -> 'Future[Any]'
Run a function in the worker.
ParameterTypeDefaultDescription
funcCallable[Ellipsis, Any]--
argsAny--
kwargsAny--
Returns: Future[Any]

setup

def setup(self, **kwargs: 'Any') -> 'None'
Override this method to set up the worker. This method is called once per worker.
ParameterTypeDefaultDescription
kwargsAny--
Returns: NoneType

shutdown

def shutdown(self, timeout: 'Optional[Union[int, float]]' = None) -> 'None'
Shutdown the event loop.
ParameterTypeDefaultDescription
timeoutint | float | NoneTypeNoneThe timeout for the shutdown.
Returns: NoneType

submit

def submit(self, coro: 'Coroutine[Any, Any, Any]') -> 'Future[Any]'
Submit a coroutine to the event loop.
ParameterTypeDefaultDescription
coroCoroutine[Any, Any, Any]-The coroutine to submit to the event loop.
Returns: Future[Any]

teardown

def teardown(self) -> 'None'
Override this method to tear down the worker. This method is called once per worker.Returns: NoneType