The flow is called by the user or an existing flow run is executed in a new process.
See Flow.__call__ and prefect.engine.__main__ (python -m prefect.engine)
A synchronous function acts as an entrypoint to the engine.
The engine executes on a dedicated "global loop" thread. For asynchronous flow calls,
we return a coroutine from the entrypoint so the user can enter the engine without
blocking their event loop.
See enter_flow_run_engine_from_flow_call, enter_flow_run_engine_from_subprocess
The thread that calls the entrypoint waits until orchestration of the flow run completes.
This thread is referred to as the "user" thread and is usually the "main" thread.
The thread is not blocked while waiting — it allows the engine to send work back to it.
This allows us to send calls back to the user thread from the global loop thread.
See wait_for_call_in_loop_thread and call_soon_in_waiting_thread
The asynchronous engine branches depending on if the flow run exists already and if
there is a parent flow run in the current context.
See create_then_begin_flow_run, create_and_begin_subflow_run, and retrieve_flow_then_begin_flow_run
The asynchronous engine prepares for execution of the flow run.
This includes starting the task runner, preparing context, etc.
See begin_flow_run
The flow run is orchestrated through states, calling the user's function as necessary.
Generally the user's function is sent for execution on the user thread.
If the flow function cannot be safely executed on the user thread, e.g. it is
a synchronous child in an asynchronous parent it will be scheduled on a worker
thread instead.
See orchestrate_flow_run, call_soon_in_waiting_thread, call_soon_in_new_thread
The task is called or submitted by the user.
We require that this is always within a flow.
See Task.__call__ and Task.submit
A synchronous function acts as an entrypoint to the engine.
Unlike flow calls, this will not block until completion if submit was used.
See enter_task_run_engine
A future is created for the task call.
Creation of the task run and submission to the task runner is scheduled as a
background task so submission of many tasks can occur concurrently.
See create_task_run_future and create_task_run_then_submit
The engine branches depending on if a future, state, or result is requested.
If a future is requested, it is returned immediately to the user thread.
Otherwise, the engine will wait for the task run to complete and return the final
state or result.
See get_task_call_return_value
An engine function is submitted to the task runner.
The task runner will schedule this function for execution on a worker.
When executed, it will prepare for orchestration and wait for completion of the run.
See create_task_run_then_submit and begin_task_run
The task run is orchestrated through states, calling the user's function as necessary.
The user's function is always executed in a worker thread for isolation.
See orchestrate_task_run, call_soon_in_new_thread
_Ideally, for local and sequential task runners we would send the task run to the
user thread as we do for flows. See #9855.
Begins execution of a flow run; blocks until completion of the flow run
Starts a task runner
Determines the result storage block to use
Orchestrates the flow run (runs the user-function and generates tasks)
Waits for tasks to complete / shutsdown the task runner
Sets a terminal state for the flow run
Note that the flow_run contains a parameters attribute which is the serialized
parameters sent to the backend while the parameters argument here should be the
deserialized and validated dictionary of python objects.
asyncdefbegin_flow_run(flow:Flow,flow_run:FlowRun,parameters:Dict[str,Any],client:PrefectClient,user_thread:threading.Thread,)->State:""" Begins execution of a flow run; blocks until completion of the flow run - Starts a task runner - Determines the result storage block to use - Orchestrates the flow run (runs the user-function and generates tasks) - Waits for tasks to complete / shutsdown the task runner - Sets a terminal state for the flow run Note that the `flow_run` contains a `parameters` attribute which is the serialized parameters sent to the backend while the `parameters` argument here should be the deserialized and validated dictionary of python objects. Returns: The final state of the run """logger=flow_run_logger(flow_run,flow)log_prints=should_log_prints(flow)flow_run_context=FlowRunContext.construct(log_prints=log_prints)asyncwithAsyncExitStack()asstack:awaitstack.enter_async_context(report_flow_run_crashes(flow_run=flow_run,client=client,flow=flow))# Create a task group for background tasksflow_run_context.background_tasks=awaitstack.enter_async_context(anyio.create_task_group())# If the flow is async, we need to provide a portal so sync tasks can runflow_run_context.sync_portal=(stack.enter_context(start_blocking_portal())ifflow.isasyncelseNone)task_runner=flow.task_runner.duplicate()iftask_runnerisNotImplemented:# Backwards compatibility; will not support concurrent flow runstask_runner=flow.task_runnerlogger.warning(f"Task runner {type(task_runner).__name__!r} does not implement the"" `duplicate` method and will fail if used for concurrent execution of"" the same flow.")logger.debug(f"Starting {type(flow.task_runner).__name__!r}; submitted tasks "f"will be run {CONCURRENCY_MESSAGES[flow.task_runner.concurrency_type]}...")flow_run_context.task_runner=awaitstack.enter_async_context(task_runner.start())flow_run_context.result_factory=awaitResultFactory.from_flow(flow,client=client)iflog_prints:stack.enter_context(patch_print())terminal_or_paused_state=awaitorchestrate_flow_run(flow,flow_run=flow_run,parameters=parameters,wait_for=None,client=client,partial_flow_run_context=flow_run_context,# Orchestration needs to be interruptible if it has a timeoutinterruptible=flow.timeout_secondsisnotNone,user_thread=user_thread,)ifterminal_or_paused_state.is_paused():timeout=terminal_or_paused_state.state_details.pause_timeoutmsg="Currently paused and suspending execution."iftimeout:msg+=f" Resume before {timeout.to_rfc3339_string()} to finish execution."logger.log(level=logging.INFO,msg=msg)awaitAPILogHandler.aflush()returnterminal_or_paused_stateelse:terminal_state=terminal_or_paused_state# If debugging, use the more complete `repr` than the usual `str` descriptiondisplay_state=repr(terminal_state)ifPREFECT_DEBUG_MODEelsestr(terminal_state)logger.log(level=logging.INFOifterminal_state.is_completed()elselogging.ERROR,msg=f"Finished in state {display_state}",)# When a "root" flow run finishes, flush logs so we do not have to rely on handling# during interpreter shutdownawaitAPILogHandler.aflush()returnterminal_state
asyncdefbegin_task_map(task:Task,flow_run_context:Optional[FlowRunContext],parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],return_type:EngineReturnType,task_runner:Optional[BaseTaskRunner],autonomous:bool=False,)->List[Union[PrefectFuture,Awaitable[PrefectFuture],TaskRun]]:"""Async entrypoint for task mapping"""# We need to resolve some futures to map over their data, collect the upstream# links beforehand to retain relationship tracking.task_inputs={k:awaitcollect_task_run_inputs(v,max_depth=0)fork,vinparameters.items()}# Resolve the top-level parameters in order to get mappable data of a known length.# Nested parameters will be resolved in each mapped child where their relationships# will also be tracked.parameters=awaitresolve_inputs(parameters,max_depth=1)# Ensure that any parameters in kwargs are expanded before this checkparameters=explode_variadic_parameter(task.fn,parameters)iterable_parameters={}static_parameters={}annotated_parameters={}forkey,valinparameters.items():ifisinstance(val,(allow_failure,quote)):# Unwrap annotated parameters to determine if they are iterableannotated_parameters[key]=valval=val.unwrap()ifisinstance(val,unmapped):static_parameters[key]=val.valueelifisiterable(val):iterable_parameters[key]=list(val)else:static_parameters[key]=valifnotlen(iterable_parameters):raiseMappingMissingIterable("No iterable parameters were received. Parameters for map must "f"include at least one iterable. Parameters: {parameters}")iterable_parameter_lengths={key:len(val)forkey,valiniterable_parameters.items()}lengths=set(iterable_parameter_lengths.values())iflen(lengths)>1:raiseMappingLengthMismatch("Received iterable parameters with different lengths. Parameters for map"f" must all be the same length. Got lengths: {iterable_parameter_lengths}")map_length=list(lengths)[0]task_runs=[]foriinrange(map_length):call_parameters={key:value[i]forkey,valueiniterable_parameters.items()}call_parameters.update({key:valueforkey,valueinstatic_parameters.items()})# Add default values for parameters; these are skipped earlier since they should# not be mapped overforkey,valueinget_parameter_defaults(task.fn).items():call_parameters.setdefault(key,value)# Re-apply annotations to each key againforkey,annotationinannotated_parameters.items():call_parameters[key]=annotation.rewrap(call_parameters[key])# Collapse any previously exploded kwargscall_parameters=collapse_variadic_parameters(task.fn,call_parameters)ifautonomous:task_runs.append(awaitcreate_autonomous_task_run(task=task,parameters=call_parameters,))else:task_runs.append(partial(get_task_call_return_value,task=task,flow_run_context=flow_run_context,parameters=call_parameters,wait_for=wait_for,return_type=return_type,task_runner=task_runner,extra_task_inputs=task_inputs,))ifautonomous:returntask_runs# Maintain the order of the task runs when using the sequential task runnerrunner=task_runneriftask_runnerelseflow_run_context.task_runnerifrunner.concurrency_type==TaskConcurrencyType.SEQUENTIAL:return[awaittask_run()fortask_runintask_runs]returnawaitgather(*task_runs)
This function is intended for submission to the task runner.
This method may be called from a worker so we ensure the settings context has been
entered. For example, with a runner that is executing tasks in the same event loop,
we will likely not enter the context again because the current context already
matches:
main thread:
--> Flow called with settings A
--> begin_task_run executes same event loop
--> Profile A matches and is not entered again
However, with execution on a remote environment, we are going to need to ensure the
settings for the task run are respected by entering the context:
main thread:
--> Flow called with settings A
--> begin_task_run is scheduled on a remote worker, settings A is serialized
remote worker:
--> Remote worker imports Prefect (may not occur)
--> Global settings is loaded with default settings
--> begin_task_run executes on a different event loop than the flow
--> Current settings is not set or does not match, settings A is entered
asyncdefbegin_task_run(task:Task,task_run:TaskRun,parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],result_factory:ResultFactory,log_prints:bool,settings:prefect.context.SettingsContext,):""" Entrypoint for task run execution. This function is intended for submission to the task runner. This method may be called from a worker so we ensure the settings context has been entered. For example, with a runner that is executing tasks in the same event loop, we will likely not enter the context again because the current context already matches: main thread: --> Flow called with settings A --> `begin_task_run` executes same event loop --> Profile A matches and is not entered again However, with execution on a remote environment, we are going to need to ensure the settings for the task run are respected by entering the context: main thread: --> Flow called with settings A --> `begin_task_run` is scheduled on a remote worker, settings A is serialized remote worker: --> Remote worker imports Prefect (may not occur) --> Global settings is loaded with default settings --> `begin_task_run` executes on a different event loop than the flow --> Current settings is not set or does not match, settings A is entered """maybe_flow_run_context=prefect.context.FlowRunContext.get()asyncwithAsyncExitStack()asstack:# The settings context may be null on a remote worker so we use the safe `.get`# method and compare it to the settings required for this task runifprefect.context.SettingsContext.get()!=settings:stack.enter_context(settings)setup_logging()ifmaybe_flow_run_context:# Accessible if on a worker that is running in the same thread as the flowclient=maybe_flow_run_context.client# Only run the task in an interruptible thread if it in the same thread as# the flow _and_ the flow run has a timeout attached. If the task is on a# worker, the flow run timeout will not be raised in the worker process.interruptible=maybe_flow_run_context.timeout_scopeisnotNoneelse:# Otherwise, retrieve a new clien`tclient=awaitstack.enter_async_context(get_client())interruptible=Falseawaitstack.enter_async_context(anyio.create_task_group())awaitstack.enter_async_context(report_task_run_crashes(task_run,client))# TODO: Use the background tasks group to manage logging for this taskiflog_prints:stack.enter_context(patch_print())awaitcheck_api_reachable(client,f"Cannot orchestrate task run '{task_run.id}'")try:state=awaitorchestrate_task_run(task=task,task_run=task_run,parameters=parameters,wait_for=wait_for,result_factory=result_factory,log_prints=log_prints,interruptible=interruptible,client=client,)ifnotmaybe_flow_run_context:# When a a task run finishes on a remote worker flush logs to prevent# loss if the process exitsawaitAPILogHandler.aflush()exceptAbortasabort:# Task run probably already completed, fetch its statetask_run=awaitclient.read_task_run(task_run.id)iftask_run.state.is_final():task_run_logger(task_run).info(f"Task run '{task_run.id}' already finished.")else:# TODO: This is a concerning case; we should determine when this occurs# 1. This can occur when the flow run is not in a running statetask_run_logger(task_run).warning(f"Task run '{task_run.id}' received abort during orchestration: "f"{abort} Task run is in {task_run.state.type.value} state.")state=task_run.stateexceptPause:# A pause signal here should mean the flow run suspended, so we# should do the same. We'll look up the flow run's pause state to# try and reuse it, so we capture any data like timeouts.flow_run=awaitclient.read_flow_run(task_run.flow_run_id)ifflow_run.stateandflow_run.state.is_paused():state=flow_run.stateelse:state=Suspended()task_run_logger(task_run).info("Task run encountered a pause signal during orchestration.")returnstate
Async entrypoint for flows calls within a flow run
Subflows differ from parent flows in that they
- Resolve futures in passed parameters into values
- Create a dummy task for representation in the parent flow
- Retrieve default result storage from the parent flow rather than the server
@inject_clientasyncdefcreate_and_begin_subflow_run(flow:Flow,parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],return_type:EngineReturnType,client:PrefectClient,user_thread:threading.Thread,)->Any:""" Async entrypoint for flows calls within a flow run Subflows differ from parent flows in that they - Resolve futures in passed parameters into values - Create a dummy task for representation in the parent flow - Retrieve default result storage from the parent flow rather than the server Returns: The final state of the run """parent_flow_run_context=FlowRunContext.get()parent_logger=get_run_logger(parent_flow_run_context)log_prints=should_log_prints(flow)terminal_state=Noneparent_logger.debug(f"Resolving inputs to {flow.name!r}")task_inputs={k:awaitcollect_task_run_inputs(v)fork,vinparameters.items()}ifwait_for:task_inputs["wait_for"]=awaitcollect_task_run_inputs(wait_for)rerunning=(parent_flow_run_context.flow_run.run_count>1ifgetattr(parent_flow_run_context,"flow_run",None)elseFalse)# Generate a task in the parent flow run to represent the result of the subflow rundummy_task=Task(name=flow.name,fn=flow.fn,version=flow.version)parent_task_run=awaitclient.create_task_run(task=dummy_task,flow_run_id=(parent_flow_run_context.flow_run.idifgetattr(parent_flow_run_context,"flow_run",None)elseNone),dynamic_key=_dynamic_key_for_task_run(parent_flow_run_context,dummy_task),task_inputs=task_inputs,state=Pending(),)# Resolve any task futures in the inputparameters=awaitresolve_inputs(parameters)ifparent_task_run.state.is_final()andnot(rerunningandnotparent_task_run.state.is_completed()):# Retrieve the most recent flow run from the databaseflow_runs=awaitclient.read_flow_runs(flow_run_filter=FlowRunFilter(parent_task_run_id={"any_":[parent_task_run.id]}),sort=FlowRunSort.EXPECTED_START_TIME_ASC,)flow_run=flow_runs[-1]# Set up variables required downstreamterminal_state=flow_run.statelogger=flow_run_logger(flow_run,flow)else:flow_run=awaitclient.create_flow_run(flow,parameters=flow.serialize_parameters(parameters),parent_task_run_id=parent_task_run.id,state=parent_task_run.stateifnotrerunningelsePending(),tags=TagsContext.get().current_tags,)parent_logger.info(f"Created subflow run {flow_run.name!r} for flow {flow.name!r}")logger=flow_run_logger(flow_run,flow)ui_url=PREFECT_UI_URL.value()ifui_url:logger.info(f"View at {ui_url}/flow-runs/flow-run/{flow_run.id}",extra={"send_to_api":False},)result_factory=awaitResultFactory.from_flow(flow,client=parent_flow_run_context.client)ifflow.should_validate_parameters:try:parameters=flow.validate_parameters(parameters)exceptException:message="Validation of flow parameters failed with error:"logger.exception(message)terminal_state=awaitpropose_state(client,state=awaitexception_to_failed_state(message=message,result_factory=result_factory),flow_run_id=flow_run.id,)ifterminal_stateisNoneornotterminal_state.is_final():asyncwithAsyncExitStack()asstack:awaitstack.enter_async_context(report_flow_run_crashes(flow_run=flow_run,client=client,flow=flow))task_runner=flow.task_runner.duplicate()iftask_runnerisNotImplemented:# Backwards compatibility; will not support concurrent flow runstask_runner=flow.task_runnerlogger.warning(f"Task runner {type(task_runner).__name__!r} does not implement"" the `duplicate` method and will fail if used for concurrent"" execution of the same flow.")awaitstack.enter_async_context(task_runner.start())iflog_prints:stack.enter_context(patch_print())terminal_state=awaitorchestrate_flow_run(flow,flow_run=flow_run,parameters=parameters,wait_for=wait_for,# If the parent flow run has a timeout, then this one needs to be# interruptible as wellinterruptible=parent_flow_run_context.timeout_scopeisnotNone,client=client,partial_flow_run_context=FlowRunContext.construct(sync_portal=parent_flow_run_context.sync_portal,task_runner=task_runner,background_tasks=parent_flow_run_context.background_tasks,result_factory=result_factory,log_prints=log_prints,),user_thread=user_thread,)# Display the full state (including the result) if debuggingdisplay_state=repr(terminal_state)ifPREFECT_DEBUG_MODEelsestr(terminal_state)logger.log(level=logging.INFOifterminal_state.is_completed()elselogging.ERROR,msg=f"Finished in state {display_state}",)# Track the subflow state so the parent flow can use it to determine its final stateparent_flow_run_context.flow_run_states.append(terminal_state)ifreturn_type=="state":returnterminal_stateelifreturn_type=="result":returnawaitterminal_state.result(fetch=True)else:raiseValueError(f"Invalid return type for flow engine {return_type!r}.")
asyncdefcreate_autonomous_task_run(task:Task,parameters:Dict[str,Any])->TaskRun:"""Create a task run in the API for an autonomous task submission and store the provided parameters using the existing result storage mechanism. """asyncwithget_client()asclient:state=Scheduled()ifparameters:parameters_id=uuid4()state.state_details.task_parameters_id=parameters_id# TODO: Improve use of result storage for parameter storage / referencetask.persist_result=Truefactory=awaitResultFactory.from_autonomous_task(task,client=client)awaitfactory.store_parameters(parameters_id,parameters)task_run=awaitclient.create_task_run(task=task,flow_run_id=None,dynamic_key=f"{task.task_key}-{str(uuid4())[:NUM_CHARS_DYNAMIC_KEY]}",state=state,)engine_logger.debug(f"Submitted run of task {task.name!r} for execution")returntask_run
@inject_clientasyncdefcreate_then_begin_flow_run(flow:Flow,parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],return_type:EngineReturnType,client:PrefectClient,user_thread:threading.Thread,)->Any:""" Async entrypoint for flow calls Creates the flow run in the backend, then enters the main flow run engine. """# TODO: Returns a `State` depending on `return_type` and we can add an overload to# the function signature to clarify this eventually.awaitcheck_api_reachable(client,"Cannot create flow run")state=Pending()ifflow.should_validate_parameters:try:parameters=flow.validate_parameters(parameters)exceptException:state=awaitexception_to_failed_state(message="Validation of flow parameters failed with error:")flow_run=awaitclient.create_flow_run(flow,# Send serialized parameters to the backendparameters=flow.serialize_parameters(parameters),state=state,tags=TagsContext.get().current_tags,)engine_logger.info(f"Created flow run {flow_run.name!r} for flow {flow.name!r}")logger=flow_run_logger(flow_run,flow)ui_url=PREFECT_UI_URL.value()ifui_url:logger.info(f"View at {ui_url}/flow-runs/flow-run/{flow_run.id}",extra={"send_to_api":False},)ifstate.is_failed():logger.error(state.message)engine_logger.info(f"Flow run {flow_run.name!r} received invalid parameters and is marked as"" failed.")else:state=awaitbegin_flow_run(flow=flow,flow_run=flow_run,parameters=parameters,client=client,user_thread=user_thread,)ifreturn_type=="state":returnstateelifreturn_type=="result":returnawaitstate.result(fetch=True)else:raiseValueError(f"Invalid return type for flow engine {return_type!r}.")
defenter_flow_run_engine_from_flow_call(flow:Flow,parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],return_type:EngineReturnType,)->Union[State,Awaitable[State]]:""" Sync entrypoint for flow calls. This function does the heavy lifting of ensuring we can get into an async context for flow run execution with minimal overhead. """setup_logging()registry=PrefectObjectRegistry.get()ifregistryandregistry.block_code_execution:engine_logger.warning(f"Script loading is in progress, flow {flow.name!r} will not be executed."" Consider updating the script to only call the flow if executed"f' directly:\n\n\tif __name__ == "__main__":\n\t\t{flow.fn.__name__}()')returnNoneparent_flow_run_context=FlowRunContext.get()is_subflow_run=parent_flow_run_contextisnotNoneifwait_forisnotNoneandnotis_subflow_run:raiseValueError("Only flows run as subflows can wait for dependencies.")begin_run=create_call(create_and_begin_subflow_runifis_subflow_runelsecreate_then_begin_flow_run,flow=flow,parameters=parameters,wait_for=wait_for,return_type=return_type,client=parent_flow_run_context.clientifis_subflow_runelseNone,user_thread=threading.current_thread(),)# On completion of root flows, wait for the global thread to ensure that# any work there is completedone_callbacks=([create_call(wait_for_global_loop_exit)]ifnotis_subflow_runelseNone)# WARNING: You must define any context managers here to pass to our concurrency# api instead of entering them in here in the engine entrypoint. Otherwise, async# flows will not use the context as this function _exits_ to return an awaitable to# the user. Generally, you should enter contexts _within_ the async `begin_run`# instead but if you need to enter a context from the main thread you'll need to do# it here.contexts=[capture_sigterm()]ifflow.isasyncand(notis_subflow_runor(is_subflow_runandparent_flow_run_context.flow.isasync)):# return a coro for the user to await if the flow is async# unless it is an async subflow called in a sync flowretval=from_async.wait_for_call_in_loop_thread(begin_run,done_callbacks=done_callbacks,contexts=contexts,)else:retval=from_sync.wait_for_call_in_loop_thread(begin_run,done_callbacks=done_callbacks,contexts=contexts,)returnretval
Sync entrypoint for flow runs that have been submitted for execution by an agent
Differs from enter_flow_run_engine_from_flow_call in that we have a flow run id
but not a flow object. The flow must be retrieved before execution can begin.
Additionally, this assumes that the caller is always in a context without an event
loop as this should be called from a fresh process.
defenter_flow_run_engine_from_subprocess(flow_run_id:UUID)->State:""" Sync entrypoint for flow runs that have been submitted for execution by an agent Differs from `enter_flow_run_engine_from_flow_call` in that we have a flow run id but not a flow object. The flow must be retrieved before execution can begin. Additionally, this assumes that the caller is always in a context without an event loop as this should be called from a fresh process. """# Ensure collections are imported and have the opportunity to register types before# loading the user code from the deploymentprefect.plugins.load_prefect_collections()setup_logging()state=from_sync.wait_for_call_in_loop_thread(create_call(retrieve_flow_then_begin_flow_run,flow_run_id,user_thread=threading.current_thread(),),contexts=[capture_sigterm()],)APILogHandler.flush()returnstate
defenter_task_run_engine(task:Task,parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],return_type:EngineReturnType,task_runner:Optional[BaseTaskRunner],mapped:bool,)->Union[PrefectFuture,Awaitable[PrefectFuture],TaskRun]:"""Sync entrypoint for task calls"""flow_run_context=FlowRunContext.get()ifnotflow_run_context:ifreturn_type=="future"ormapped:raiseRuntimeError(" If you meant to submit a background task, you need to set"" `prefect config set PREFECT_EXPERIMENTAL_ENABLE_TASK_SCHEDULING=true`"" and use `your_task.submit()` instead of `your_task()`.")fromprefect.task_engineimportsubmit_autonomous_task_run_to_enginereturnsubmit_autonomous_task_run_to_engine(task=task,task_run=None,parameters=parameters,task_runner=task_runner,wait_for=wait_for,return_type=return_type,client=get_client(),)ifflow_run_context.timeout_scopeandflow_run_context.timeout_scope.cancel_called:raiseTimeoutError("Flow run timed out")begin_run=create_call(begin_task_mapifmappedelseget_task_call_return_value,task=task,flow_run_context=flow_run_context,parameters=parameters,wait_for=wait_for,return_type=return_type,task_runner=task_runner,)iftask.isasyncand(flow_run_context.flowisNoneorflow_run_context.flow.isasync):# return a coro for the user to await if an async task in an async flowreturnfrom_async.wait_for_call_in_loop_thread(begin_run)else:returnfrom_sync.wait_for_call_in_loop_thread(begin_run)
Since async flows are run directly in the main event loop, timeout behavior will
match that described by anyio. If the flow is awaiting something, it will
immediately return; otherwise, the next time it awaits it will exit. Sync flows
are being task runner in a worker thread, which cannot be interrupted. The worker
thread will exit at the next task call. The worker thread also has access to the
status of the cancellation scope at FlowRunContext.timeout_scope.cancel_called
which allows it to raise a TimeoutError to respect the timeout.
asyncdeforchestrate_flow_run(flow:Flow,flow_run:FlowRun,parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],interruptible:bool,client:PrefectClient,partial_flow_run_context:FlowRunContext,user_thread:threading.Thread,)->State:""" Executes a flow run. Note on flow timeouts: Since async flows are run directly in the main event loop, timeout behavior will match that described by anyio. If the flow is awaiting something, it will immediately return; otherwise, the next time it awaits it will exit. Sync flows are being task runner in a worker thread, which cannot be interrupted. The worker thread will exit at the next task call. The worker thread also has access to the status of the cancellation scope at `FlowRunContext.timeout_scope.cancel_called` which allows it to raise a `TimeoutError` to respect the timeout. Returns: The final state of the run """logger=flow_run_logger(flow_run,flow)flow_run_context=Noneparent_flow_run_context=FlowRunContext.get()try:# Resolve futures in any non-data dependencies to ensure they are readyifwait_forisnotNone:awaitresolve_inputs({"wait_for":wait_for},return_data=False)exceptUpstreamTaskErrorasupstream_exc:returnawaitpropose_state(client,Pending(name="NotReady",message=str(upstream_exc)),flow_run_id=flow_run.id,# if orchestrating a run already in a pending state, force orchestration to# update the state nameforce=flow_run.state.is_pending(),)state=awaitpropose_state(client,Running(),flow_run_id=flow_run.id)# flag to ensure we only update the flow run name oncerun_name_set=Falseawait_run_flow_hooks(flow=flow,flow_run=flow_run,state=state)whilestate.is_running():waited_for_task_runs=False# Update the flow run to the latest dataflow_run=awaitclient.read_flow_run(flow_run.id)try:withFlowRunContext(**{**partial_flow_run_context.dict(),**{"flow_run":flow_run,"flow":flow,"client":client,"parameters":parameters,},})asflow_run_context:# update flow run nameifnotrun_name_setandflow.flow_run_name:flow_run_name=_resolve_custom_flow_run_name(flow=flow,parameters=parameters)awaitclient.update_flow_run(flow_run_id=flow_run.id,name=flow_run_name)logger.extra["flow_run_name"]=flow_run_namelogger.debug(f"Renamed flow run {flow_run.name!r} to {flow_run_name!r}")flow_run.name=flow_run_namerun_name_set=Trueargs,kwargs=parameters_to_args_kwargs(flow.fn,parameters)logger.debug(f"Executing flow {flow.name!r} for flow run {flow_run.name!r}...")ifPREFECT_DEBUG_MODE:logger.debug(f"Executing {call_repr(flow.fn,*args,**kwargs)}")else:logger.debug("Beginning execution...",extra={"state_message":True})flow_call=create_call(flow.fn,*args,**kwargs)# This check for a parent call is needed for cases where the engine# was entered directly during testingparent_call=get_current_call()ifparent_calland(notparent_flow_run_contextor(getattr(parent_flow_run_context,"flow",None)andparent_flow_run_context.flow.isasync==flow.isasync)):from_async.call_soon_in_waiting_thread(flow_call,thread=user_thread,timeout=flow.timeout_seconds,)else:from_async.call_soon_in_new_thread(flow_call,timeout=flow.timeout_seconds)result=awaitflow_call.aresult()waited_for_task_runs=awaitwait_for_task_runs_and_report_crashes(flow_run_context.task_run_futures,client=client)exceptPausedRunasexc:# could get raised either via utility or by returning Paused from a task run# if a task run pauses, we set its state as the flow's state# to preserve reschedule and timeout behaviorpaused_flow_run=awaitclient.read_flow_run(flow_run.id)ifpaused_flow_run.state.is_running():state=awaitpropose_state(client,state=exc.state,flow_run_id=flow_run.id,)returnstatepaused_flow_run_state=paused_flow_run.statereturnpaused_flow_run_stateexceptCancelledErrorasexc:ifnotflow_call.timedout():# If the flow call was not cancelled by us; this is a crashraise# Construct a new exception as `TimeoutError`original=excexc=TimeoutError()exc.__cause__=originallogger.exception("Encountered exception during execution:")terminal_state=awaitexception_to_failed_state(exc,message=f"Flow run exceeded timeout of {flow.timeout_seconds} seconds",result_factory=flow_run_context.result_factory,name="TimedOut",)exceptException:# Generic exception in user codelogger.exception("Encountered exception during execution:")terminal_state=awaitexception_to_failed_state(message="Flow run encountered an exception.",result_factory=flow_run_context.result_factory,)else:ifresultisNone:# All tasks and subflows are reference tasks if there is no return value# If there are no tasks, use `None` instead of an empty iterableresult=(flow_run_context.task_run_futures+flow_run_context.task_run_states+flow_run_context.flow_run_states)orNoneterminal_state=awaitreturn_value_to_state(awaitresolve_futures_to_states(result),result_factory=flow_run_context.result_factory,)ifnotwaited_for_task_runs:# An exception occurred that prevented us from waiting for task runs to# complete. Ensure that we wait for them before proposing a final state# for the flow run.awaitwait_for_task_runs_and_report_crashes(flow_run_context.task_run_futures,client=client)# Before setting the flow run state, store state.data using# block storage and send the resulting data document to the Prefect API instead.# This prevents the pickled return value of flow runs# from being sent to the Prefect API and stored in the Prefect database.# state.data is left as is, otherwise we would have to load# the data from block storage again after storing.state=awaitpropose_state(client,state=terminal_state,flow_run_id=flow_run.id,)await_run_flow_hooks(flow=flow,flow_run=flow_run,state=state)ifstate.type!=terminal_state.typeandPREFECT_DEBUG_MODE:logger.debug((f"Received new state {state} when proposing final state"f" {terminal_state}"),extra={"send_to_api":False},)ifnotstate.is_final()andnotstate.is_paused():logger.info((f"Received non-final state {state.name!r} when proposing final"f" state {terminal_state.name!r} and will attempt to run again..."),)# Attempt to enter a running state againstate=awaitpropose_state(client,Running(),flow_run_id=flow_run.id)returnstate
This function should be submitted to a task runner. We must construct the context
here instead of receiving it already populated since we may be in a new environment.
Proposes a RUNNING state, then
- if accepted, the task user function will be run
- if rejected, the received state will be returned
When the user function is run, the result will be used to determine a final state
- if an exception is encountered, it is trapped and stored in a FAILED state
- otherwise, return_value_to_state is used to determine the state
If the final state is COMPLETED, we generate a cache key as specified by the task
The final state is then proposed
- if accepted, this is the final state and will be returned
- if rejected and a new final state is provided, it will be returned
- if rejected and a non-final state is provided, we will attempt to enter a RUNNING
state again
asyncdeforchestrate_task_run(task:Task,task_run:TaskRun,parameters:Dict[str,Any],wait_for:Optional[Iterable[PrefectFuture]],result_factory:ResultFactory,log_prints:bool,interruptible:bool,client:PrefectClient,)->State:""" Execute a task run This function should be submitted to a task runner. We must construct the context here instead of receiving it already populated since we may be in a new environment. Proposes a RUNNING state, then - if accepted, the task user function will be run - if rejected, the received state will be returned When the user function is run, the result will be used to determine a final state - if an exception is encountered, it is trapped and stored in a FAILED state - otherwise, `return_value_to_state` is used to determine the state If the final state is COMPLETED, we generate a cache key as specified by the task The final state is then proposed - if accepted, this is the final state and will be returned - if rejected and a new final state is provided, it will be returned - if rejected and a non-final state is provided, we will attempt to enter a RUNNING state again Returns: The final state of the run """flow_run_context=prefect.context.FlowRunContext.get()ifflow_run_context:flow_run=flow_run_context.flow_runelse:flow_run=awaitclient.read_flow_run(task_run.flow_run_id)logger=task_run_logger(task_run,task=task,flow_run=flow_run)partial_task_run_context=TaskRunContext.construct(task_run=task_run,task=task,client=client,result_factory=result_factory,log_prints=log_prints,)task_introspection_start_time=time.perf_counter()try:# Resolve futures in parameters into dataresolved_parameters=awaitresolve_inputs(parameters)# Resolve futures in any non-data dependencies to ensure they are readyawaitresolve_inputs({"wait_for":wait_for},return_data=False)exceptUpstreamTaskErrorasupstream_exc:returnawaitpropose_state(client,Pending(name="NotReady",message=str(upstream_exc)),task_run_id=task_run.id,# if orchestrating a run already in a pending state, force orchestration to# update the state nameforce=task_run.state.is_pending(),)task_introspection_end_time=time.perf_counter()introspection_time=round(task_introspection_end_time-task_introspection_start_time,3)threshold=PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD.value()ifthresholdandintrospection_time>threshold:logger.warning(f"Task parameter introspection took {introspection_time} seconds "f", exceeding `PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD` of {threshold}. ""Try wrapping large task parameters with ""`prefect.utilities.annotations.quote` for increased performance, ""e.g. `my_task(quote(param))`. To disable this message set ""`PREFECT_TASK_INTROSPECTION_WARN_THRESHOLD=0`.")# Generate the cache key to attach to proposed states# The cache key uses a TaskRunContext that does not include a `timeout_context``task_run_context=TaskRunContext(**partial_task_run_context.dict(),parameters=resolved_parameters)cache_key=(task.cache_key_fn(task_run_context,resolved_parameters,)iftask.cache_key_fnelseNone)# Ignore the cached results for a cache key, default = false# Setting on task level overrules the Prefect setting (env var)refresh_cache=(task.refresh_cacheiftask.refresh_cacheisnotNoneelsePREFECT_TASKS_REFRESH_CACHE.value())# Emit an event to capture that the task run was in the `PENDING` state.last_event=emit_task_run_state_change_event(task_run=task_run,initial_state=None,validated_state=task_run.state)last_state=(Pending()ifflow_run_contextandflow_run_context.autonomous_task_runelsetask_run.state)# Completed states with persisted results should have result data. If it's missing,# this could be a manual state transition, so we should use the Unknown result type# to represent that we know we don't know the result.if(last_stateandlast_state.is_completed()andresult_factory.persist_resultandnotlast_state.data):state=awaitpropose_state(client,state=Completed(data=awaitUnknownResult.create()),task_run_id=task_run.id,force=True,)# Transition from `PENDING` -> `RUNNING`try:state=awaitpropose_state(client,Running(state_details=StateDetails(cache_key=cache_key,refresh_cache=refresh_cache)),task_run_id=task_run.id,)exceptPauseasexc:# We shouldn't get a pause signal without a state, but if this happens,# just use a Paused state to assume an in-process pause.state=exc.stateifexc.stateelsePaused()# If a flow submits tasks and then pauses, we may reach this point due# to concurrency timing because the tasks will try to transition after# the flow run has paused. Orchestration will send back a Paused state# for the task runs.ifstate.state_details.pause_reschedule:# If we're being asked to pause and reschedule, we should exit the# task and expect to be resumed later.raiseifstate.is_paused():BACKOFF_MAX=10# Secondsbackoff_count=0asyncdeftick():nonlocalbackoff_countifbackoff_count<BACKOFF_MAX:backoff_count+=1interval=1+backoff_count+random.random()*backoff_countawaitanyio.sleep(interval)# Enter a loop to wait for the task run to be resumed, i.e.# become Pending, and then propose a Running state again.whileTrue:awaittick()# Propose a Running state again. We do this instead of reading the# task run because if the flow run times out, this lets# orchestration fail the task run.try:state=awaitpropose_state(client,Running(state_details=StateDetails(cache_key=cache_key,refresh_cache=refresh_cache)),task_run_id=task_run.id,)exceptPauseasexc:ifnotexc.state:continueifexc.state.state_details.pause_reschedule:# If the pause state includes pause_reschedule, we should exit the# task and expect to be resumed later. We've already checked for this# above, but we check again here in case the state changed; e.g. the# flow run suspended.raiseelse:# Propose a Running state again.continueelse:break# Emit an event to capture the result of proposing a `RUNNING` state.last_event=emit_task_run_state_change_event(task_run=task_run,initial_state=last_state,validated_state=state,follows=last_event,)last_state=state# flag to ensure we only update the task run name oncerun_name_set=False# Only run the task if we enter a `RUNNING` statewhilestate.is_running():# Retrieve the latest metadata for the task run contexttask_run=awaitclient.read_task_run(task_run.id)withtask_run_context.copy(update={"task_run":task_run,"start_time":pendulum.now("UTC")}):try:args,kwargs=parameters_to_args_kwargs(task.fn,resolved_parameters)# update task run nameifnotrun_name_setandtask.task_run_name:task_run_name=_resolve_custom_task_run_name(task=task,parameters=resolved_parameters)awaitclient.set_task_run_name(task_run_id=task_run.id,name=task_run_name)logger.extra["task_run_name"]=task_run_namelogger.debug(f"Renamed task run {task_run.name!r} to {task_run_name!r}")task_run.name=task_run_namerun_name_set=TrueifPREFECT_DEBUG_MODE.value():logger.debug(f"Executing {call_repr(task.fn,*args,**kwargs)}")else:logger.debug("Beginning execution...",extra={"state_message":True})call=from_async.call_soon_in_new_thread(create_call(task.fn,*args,**kwargs),timeout=task.timeout_seconds)result=awaitcall.aresult()except(CancelledError,asyncio.CancelledError)asexc:ifnotcall.timedout():# If the task call was not cancelled by us; this is a crashraise# Construct a new exception as `TimeoutError`original=excexc=TimeoutError()exc.__cause__=originallogger.exception("Encountered exception during execution:")terminal_state=awaitexception_to_failed_state(exc,message=(f"Task run exceeded timeout of {task.timeout_seconds} seconds"),result_factory=task_run_context.result_factory,name="TimedOut",)exceptExceptionasexc:logger.exception("Encountered exception during execution:")terminal_state=awaitexception_to_failed_state(exc,message="Task run encountered an exception",result_factory=task_run_context.result_factory,)else:terminal_state=awaitreturn_value_to_state(result,result_factory=task_run_context.result_factory,)# for COMPLETED tasks, add the cache key and expirationifterminal_state.is_completed():terminal_state.state_details.cache_expiration=((pendulum.now("utc")+task.cache_expiration)iftask.cache_expirationelseNone)terminal_state.state_details.cache_key=cache_keyifterminal_state.is_failed():# Defer to user to decide whether failure is retriableterminal_state.state_details.retriable=(await_check_task_failure_retriable(task,task_run,terminal_state))state=awaitpropose_state(client,terminal_state,task_run_id=task_run.id)last_event=emit_task_run_state_change_event(task_run=task_run,initial_state=last_state,validated_state=state,follows=last_event,)last_state=stateawait_run_task_hooks(task=task,task_run=task_run,state=state,)ifstate.type!=terminal_state.typeandPREFECT_DEBUG_MODE:logger.debug((f"Received new state {state} when proposing final state"f" {terminal_state}"),extra={"send_to_api":False},)ifnotstate.is_final()andnotstate.is_paused():logger.info((f"Received non-final state {state.name!r} when proposing final"f" state {terminal_state.name!r} and will attempt to run"" again..."),)# Attempt to enter a running state againstate=awaitpropose_state(client,Running(),task_run_id=task_run.id)last_event=emit_task_run_state_change_event(task_run=task_run,initial_state=last_state,validated_state=state,follows=last_event,)last_state=state# If debugging, use the more complete `repr` than the usual `str` descriptiondisplay_state=repr(state)ifPREFECT_DEBUG_MODEelsestr(state)logger.log(level=logging.INFOifstate.is_completed()elselogging.ERROR,msg=f"Finished in state {display_state}",)returnstate
Pauses the current flow run by blocking execution until resumed.
When called within a flow run, execution will block and no downstream tasks will
run until the flow is resumed. Task runs that have already started will continue
running. A timeout parameter can be passed that will fail the flow run if it has not
been resumed within the specified time.
Parameters:
Name
Type
Description
Default
flow_run_id
UUID
a flow run id. If supplied, this function will attempt to pause
the specified flow run outside of the flow run process. When paused, the
flow run will continue execution until the NEXT task is orchestrated, at
which point the flow will exit. Any tasks that have already started will
run until completion. When resumed, the flow run will be rescheduled to
finish execution. In order pause a flow run in this way, the flow needs to
have an associated deployment and results need to be configured with the
persist_results option.
None
timeout
int
the number of seconds to wait for the flow to be resumed before
failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds
any configured flow-level timeout, the flow might fail even after resuming.
3600
poll_interval
int
The number of seconds between checking whether the flow has been
resumed. Defaults to 10 seconds.
10
reschedule
bool
Flag that will reschedule the flow run if resumed. Instead of
blocking execution, the flow will gracefully exit (with no result returned)
instead. To use this flag, a flow needs to have an associated deployment and
results need to be configured with the persist_results option.
False
key
str
An optional key to prevent calling pauses more than once. This defaults to
the number of pauses observed by the flow so far, and prevents pauses that
use the "reschedule" option from running the same pause twice. A custom key
can be supplied for custom pausing behavior.
None
wait_for_input
Optional[Type[T]]
a subclass of RunInput or any type supported by
Pydantic. If provided when the flow pauses, the flow will wait for the
input to be provided before resuming. If the flow is resumed without
providing the input, the flow will fail. If the flow is resumed with the
input, the flow will resume and the input will be loaded and returned
from this function.
None
@taskdeftask_one():foriinrange(3):sleep(1)@flowdefmy_flow():terminal_state=task_one.submit(return_state=True)ifterminal_state.type==StateType.COMPLETED:print("Task one succeeded! Pausing flow run..")pause_flow_run(timeout=2)else:print("Task one failed. Skipping pause flow run..")
@sync_compatible@deprecated_parameter("flow_run_id",start_date="Dec 2023",help="Use `suspend_flow_run` instead.")@deprecated_parameter("reschedule",start_date="Dec 2023",when=lambdap:pisTrue,help="Use `suspend_flow_run` instead.",)@experimental_parameter("wait_for_input",group="flow_run_input",when=lambday:yisnotNone)asyncdefpause_flow_run(wait_for_input:Optional[Type[T]]=None,flow_run_id:UUID=None,timeout:int=3600,poll_interval:int=10,reschedule:bool=False,key:str=None,)->Optional[T]:""" Pauses the current flow run by blocking execution until resumed. When called within a flow run, execution will block and no downstream tasks will run until the flow is resumed. Task runs that have already started will continue running. A timeout parameter can be passed that will fail the flow run if it has not been resumed within the specified time. Args: flow_run_id: a flow run id. If supplied, this function will attempt to pause the specified flow run outside of the flow run process. When paused, the flow run will continue execution until the NEXT task is orchestrated, at which point the flow will exit. Any tasks that have already started will run until completion. When resumed, the flow run will be rescheduled to finish execution. In order pause a flow run in this way, the flow needs to have an associated deployment and results need to be configured with the `persist_results` option. timeout: the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming. poll_interval: The number of seconds between checking whether the flow has been resumed. Defaults to 10 seconds. reschedule: Flag that will reschedule the flow run if resumed. Instead of blocking execution, the flow will gracefully exit (with no result returned) instead. To use this flag, a flow needs to have an associated deployment and results need to be configured with the `persist_results` option. key: An optional key to prevent calling pauses more than once. This defaults to the number of pauses observed by the flow so far, and prevents pauses that use the "reschedule" option from running the same pause twice. A custom key can be supplied for custom pausing behavior. wait_for_input: a subclass of `RunInput` or any type supported by Pydantic. If provided when the flow pauses, the flow will wait for the input to be provided before resuming. If the flow is resumed without providing the input, the flow will fail. If the flow is resumed with the input, the flow will resume and the input will be loaded and returned from this function. Example: ```python @task def task_one(): for i in range(3): sleep(1) @flow def my_flow(): terminal_state = task_one.submit(return_state=True) if terminal_state.type == StateType.COMPLETED: print("Task one succeeded! Pausing flow run..") pause_flow_run(timeout=2) else: print("Task one failed. Skipping pause flow run..") ``` """ifflow_run_id:ifwait_for_inputisnotNone:raiseRuntimeError("Cannot wait for input when pausing out of process.")returnawait_out_of_process_pause(flow_run_id=flow_run_id,timeout=timeout,reschedule=reschedule,key=key,)else:returnawait_in_process_pause(timeout=timeout,poll_interval=poll_interval,reschedule=reschedule,key=key,wait_for_input=wait_for_input,)
@asynccontextmanagerasyncdefreport_flow_run_crashes(flow_run:FlowRun,client:PrefectClient,flow:Flow):""" Detect flow run crashes during this context and update the run to a proper final state. This context _must_ reraise the exception to properly exit the run. """try:yieldexcept(Abort,Pause):# Do not capture internal signals as crashesraiseexceptBaseExceptionasexc:state=awaitexception_to_crashed_state(exc)logger=flow_run_logger(flow_run)withanyio.CancelScope(shield=True):logger.error(f"Crash detected! {state.message}")logger.debug("Crash details:",exc_info=exc)flow_run_state=awaitpropose_state(client,state,flow_run_id=flow_run.id)engine_logger.debug(f"Reported crashed flow run {flow_run.name!r} successfully!")# Only `on_crashed` and `on_cancellation` flow run state change hooks can be called here.# We call the hooks after the state change proposal to `CRASHED` is validated# or rejected (if it is in a `CANCELLING` state).await_run_flow_hooks(flow=flow,flow_run=flow_run,state=flow_run_state,)# Reraise the exceptionraise
@asynccontextmanagerasyncdefreport_task_run_crashes(task_run:TaskRun,client:PrefectClient):""" Detect task run crashes during this context and update the run to a proper final state. This context _must_ reraise the exception to properly exit the run. """try:yieldexcept(Abort,Pause):# Do not capture internal signals as crashesraiseexceptBaseExceptionasexc:state=awaitexception_to_crashed_state(exc)logger=task_run_logger(task_run)withanyio.CancelScope(shield=True):logger.error(f"Crash detected! {state.message}")logger.debug("Crash details:",exc_info=exc)awaitclient.set_task_run_state(state=state,task_run_id=task_run.id,force=True,)engine_logger.debug(f"Reported crashed task run {task_run.name!r} successfully!")# Reraise the exceptionraise
@sync_compatibleasyncdefresume_flow_run(flow_run_id,run_input:Optional[Dict]=None):""" Resumes a paused flow. Args: flow_run_id: the flow_run_id to resume run_input: a dictionary of inputs to provide to the flow run. """client=get_client()asyncwithclient:flow_run=awaitclient.read_flow_run(flow_run_id)ifnotflow_run.state.is_paused():raiseNotPausedError("Cannot resume a run that isn't paused!")response=awaitclient.resume_flow_run(flow_run_id,run_input=run_input)ifresponse.status==SetStateStatus.REJECT:ifresponse.state.type==StateType.FAILED:raiseFlowPauseTimeout("Flow run can no longer be resumed.")else:raiseRuntimeError(f"Cannot resume this run: {response.details.reason}")
@inject_clientasyncdefretrieve_flow_then_begin_flow_run(flow_run_id:UUID,client:PrefectClient,user_thread:threading.Thread,)->State:""" Async entrypoint for flow runs that have been submitted for execution by an agent - Retrieves the deployment information - Loads the flow object using deployment information - Updates the flow run version """flow_run=awaitclient.read_flow_run(flow_run_id)entrypoint=os.environ.get("PREFECT__FLOW_ENTRYPOINT")try:flow=(load_flow_from_entrypoint(entrypoint)ifentrypointelseawaitload_flow_from_flow_run(flow_run,client=client))exceptException:message=("Flow could not be retrieved from"f" {'entrypoint'ifentrypointelse'deployment'}.")flow_run_logger(flow_run).exception(message)state=awaitexception_to_failed_state(message=message)awaitclient.set_flow_run_state(state=state,flow_run_id=flow_run_id,force=True)returnstate# Update the flow run policy defaults to match settings on the flow# Note: Mutating the flow run object prevents us from performing another read# operation if these properties are used by the client downstreamifflow_run.empirical_policy.retry_delayisNone:flow_run.empirical_policy.retry_delay=flow.retry_delay_secondsifflow_run.empirical_policy.retriesisNone:flow_run.empirical_policy.retries=flow.retriesawaitclient.update_flow_run(flow_run_id=flow_run_id,flow_version=flow.version,empirical_policy=flow_run.empirical_policy,)ifflow.should_validate_parameters:failed_state=Nonetry:parameters=flow.validate_parameters(flow_run.parameters)exceptException:message="Validation of flow parameters failed with error: "flow_run_logger(flow_run).exception(message)failed_state=awaitexception_to_failed_state(message=message)iffailed_stateisnotNone:awaitpropose_state(client,state=failed_state,flow_run_id=flow_run_id,)returnfailed_stateelse:parameters=flow_run.parameters# Ensure default values are populatedparameters={**get_parameter_defaults(flow.fn),**parameters}returnawaitbegin_flow_run(flow=flow,flow_run=flow_run,parameters=parameters,client=client,user_thread=user_thread,)
Suspends a flow run by stopping code execution until resumed.
When suspended, the flow run will continue execution until the NEXT task is
orchestrated, at which point the flow will exit. Any tasks that have
already started will run until completion. When resumed, the flow run will
be rescheduled to finish execution. In order suspend a flow run in this
way, the flow needs to have an associated deployment and results need to be
configured with the persist_results option.
Parameters:
Name
Type
Description
Default
flow_run_id
Optional[UUID]
a flow run id. If supplied, this function will attempt to
suspend the specified flow run. If not supplied will attempt to
suspend the current flow run.
None
timeout
Optional[int]
the number of seconds to wait for the flow to be resumed before
failing. Defaults to 1 hour (3600 seconds). If the pause timeout
exceeds any configured flow-level timeout, the flow might fail even
after resuming.
3600
key
Optional[str]
An optional key to prevent calling suspend more than once. This
defaults to a random string and prevents suspends from running the
same suspend twice. A custom key can be supplied for custom
suspending behavior.
None
wait_for_input
Optional[Type[T]]
a subclass of RunInput or any type supported by
Pydantic. If provided when the flow suspends, the flow will remain
suspended until receiving the input before resuming. If the flow is
resumed without providing the input, the flow will fail. If the flow is
resumed with the input, the flow will resume and the input will be
loaded and returned from this function.
@sync_compatible@inject_client@experimental_parameter("wait_for_input",group="flow_run_input",when=lambday:yisnotNone)asyncdefsuspend_flow_run(wait_for_input:Optional[Type[T]]=None,flow_run_id:Optional[UUID]=None,timeout:Optional[int]=3600,key:Optional[str]=None,client:PrefectClient=None,)->Optional[T]:""" Suspends a flow run by stopping code execution until resumed. When suspended, the flow run will continue execution until the NEXT task is orchestrated, at which point the flow will exit. Any tasks that have already started will run until completion. When resumed, the flow run will be rescheduled to finish execution. In order suspend a flow run in this way, the flow needs to have an associated deployment and results need to be configured with the `persist_results` option. Args: flow_run_id: a flow run id. If supplied, this function will attempt to suspend the specified flow run. If not supplied will attempt to suspend the current flow run. timeout: the number of seconds to wait for the flow to be resumed before failing. Defaults to 1 hour (3600 seconds). If the pause timeout exceeds any configured flow-level timeout, the flow might fail even after resuming. key: An optional key to prevent calling suspend more than once. This defaults to a random string and prevents suspends from running the same suspend twice. A custom key can be supplied for custom suspending behavior. wait_for_input: a subclass of `RunInput` or any type supported by Pydantic. If provided when the flow suspends, the flow will remain suspended until receiving the input before resuming. If the flow is resumed without providing the input, the flow will fail. If the flow is resumed with the input, the flow will resume and the input will be loaded and returned from this function. """context=FlowRunContext.get()ifflow_run_idisNone:ifTaskRunContext.get():raiseRuntimeError("Cannot suspend task runs.")ifcontextisNoneorcontext.flow_runisNone:raiseRuntimeError("Flow runs can only be suspended from within a flow run.")logger=get_run_logger(context=context)logger.info("Suspending flow run, execution will be rescheduled when this flow run is"" resumed.")flow_run_id=context.flow_run.idsuspending_current_flow_run=Truepause_counter=_observed_flow_pauses(context)pause_key=keyorstr(pause_counter)else:# Since we're suspending another flow run we need to generate a pause# key that won't conflict with whatever suspends/pauses that flow may# have. Since this method won't be called during that flow run it's# okay that this is non-deterministic.suspending_current_flow_run=Falsepause_key=keyorstr(uuid4())proposed_state=Suspended(timeout_seconds=timeout,pause_key=pause_key)ifwait_for_input:wait_for_input=run_input_subclass_from_type(wait_for_input)run_input_keyset=keyset_from_paused_state(proposed_state)proposed_state.state_details.run_input_keyset=run_input_keysettry:state=awaitpropose_state(client=client,state=proposed_state,flow_run_id=flow_run_id,)exceptAbortasexc:# Aborted requests mean the suspension is not allowedraiseRuntimeError(f"Flow run cannot be suspended: {exc}")ifstate.is_running():# The orchestrator rejected the suspended state which means that this# suspend has happened before and the flow run has been resumed.ifwait_for_input:# The flow run wanted input, so we need to load it and return it# to the user.returnawaitwait_for_input.load(run_input_keyset)returnifnotstate.is_paused():# If we receive anything but a PAUSED state, we are unable to continueraiseRuntimeError(f"Flow run cannot be suspended. Received unexpected state from API: {state}")ifwait_for_input:awaitwait_for_input.save(run_input_keyset)ifsuspending_current_flow_run:# Exit this process so the run can be resubmitted laterraisePause()