If set, the container will be removed on completion. Otherwise,
the container will remain after exit for inspection.
command
bool
A list of strings specifying the command to run in the container to
start the flow run. In most cases you should not override this.
env
bool
Environment variables to set for the container.
image
str
An optional string specifying the tag of a Docker image to use.
Defaults to the Prefect image.
image_pull_policy
Optional[ImagePullPolicy]
Specifies if the image should be pulled. One of 'ALWAYS',
'NEVER', 'IF_NOT_PRESENT'.
image_registry
Optional[DockerRegistry]
A DockerRegistry block containing credentials to use if image is stored in a private
image registry.
labels
Optional[DockerRegistry]
An optional dictionary of labels, mapping name to value.
name
Optional[DockerRegistry]
An optional name for the container.
network_mode
Optional[str]
Set the network mode for the created container. Defaults to 'host'
if a local API url is detected, otherwise the Docker default of 'bridge' is
used. If 'networks' is set, this cannot be set.
networks
List[str]
An optional list of strings specifying Docker networks to connect the
container to.
stream_output
bool
If set, stream output from the container to local standard output.
volumes
List[str]
An optional list of volume mount strings in the format of
"local_path:container_path".
memswap_limit
Union[int, str]
Total memory (memory + swap), -1 to disable swap. Should only be
set if mem_limit is also set. If mem_limit is set, this defaults to
allowing the container to use as much swap as memory. For example, if
mem_limit is 300m and memswap_limit is not set, the container can use
600m in total of memory and swap.
mem_limit
Union[float, str]
Memory limit of the created container. Accepts float values to enforce
a limit in bytes or a string with a unit e.g. 100000b, 1000k, 128m, 1g.
If a string is given without a unit, bytes are assumed.
If using a local API URL on Linux, we will update the network mode default to 'host'
to enable connectivity. If using another OS or an alternative network mode is used,
we will replace 'localhost' in the API URL with 'host.docker.internal'. Generally,
this will enable connectivity, but the API URL can be provided as an environment
variable to override inference in more complex use-cases.
Note, if using 'host.docker.internal' in the API URL on Linux, the API must be bound
to 0.0.0.0 or the Docker IP address to allow connectivity. On macOS, this is not
necessary and the API is connectable while bound to localhost.
Source code in src/prefect/infrastructure/container.py
@deprecated_class(start_date="Mar 2024",help="Use the Docker worker from prefect-docker instead."" Refer to the upgrade guide for more information:"" https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/.",)classDockerContainer(Infrastructure):""" Runs a command in a container. Requires a Docker Engine to be connectable. Docker settings will be retrieved from the environment. Click [here](https://docs.prefect.io/guides/deployment/docker) to see a tutorial. Attributes: auto_remove: If set, the container will be removed on completion. Otherwise, the container will remain after exit for inspection. command: A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. env: Environment variables to set for the container. image: An optional string specifying the tag of a Docker image to use. Defaults to the Prefect image. image_pull_policy: Specifies if the image should be pulled. One of 'ALWAYS', 'NEVER', 'IF_NOT_PRESENT'. image_registry: A `DockerRegistry` block containing credentials to use if `image` is stored in a private image registry. labels: An optional dictionary of labels, mapping name to value. name: An optional name for the container. network_mode: Set the network mode for the created container. Defaults to 'host' if a local API url is detected, otherwise the Docker default of 'bridge' is used. If 'networks' is set, this cannot be set. networks: An optional list of strings specifying Docker networks to connect the container to. stream_output: If set, stream output from the container to local standard output. volumes: An optional list of volume mount strings in the format of "local_path:container_path". memswap_limit: Total memory (memory + swap), -1 to disable swap. Should only be set if `mem_limit` is also set. If `mem_limit` is set, this defaults to allowing the container to use as much swap as memory. For example, if `mem_limit` is 300m and `memswap_limit` is not set, the container can use 600m in total of memory and swap. mem_limit: Memory limit of the created container. Accepts float values to enforce a limit in bytes or a string with a unit e.g. 100000b, 1000k, 128m, 1g. If a string is given without a unit, bytes are assumed. privileged: Give extended privileges to this container. ## Connecting to a locally hosted Prefect API If using a local API URL on Linux, we will update the network mode default to 'host' to enable connectivity. If using another OS or an alternative network mode is used, we will replace 'localhost' in the API URL with 'host.docker.internal'. Generally, this will enable connectivity, but the API URL can be provided as an environment variable to override inference in more complex use-cases. Note, if using 'host.docker.internal' in the API URL on Linux, the API must be bound to 0.0.0.0 or the Docker IP address to allow connectivity. On macOS, this is not necessary and the API is connectable while bound to localhost. """type:Literal["docker-container"]=Field(default="docker-container",description="The type of infrastructure.")image:str=Field(default_factory=get_prefect_image_name,description="Tag of a Docker image to use. Defaults to the Prefect image.",)image_pull_policy:Optional[ImagePullPolicy]=Field(default=None,description="Specifies if the image should be pulled.")image_registry:Optional[DockerRegistry]=Nonenetworks:List[str]=Field(default_factory=list,description=("A list of strings specifying Docker networks to connect the container to."),)network_mode:Optional[str]=Field(default=None,description=("The network mode for the created container (e.g. host, bridge). If"" 'networks' is set, this cannot be set."),)auto_remove:bool=Field(default=False,description="If set, the container will be removed on completion.",)volumes:List[str]=Field(default_factory=list,description=("A list of volume mount strings in the format of"' "local_path:container_path".'),)stream_output:bool=Field(default=True,description=("If set, the output will be streamed from the container to local standard"" output."),)memswap_limit:Union[int,str]=Field(default=None,description=("Total memory (memory + swap), -1 to disable swap. Should only be ""set if `mem_limit` is also set. If `mem_limit` is set, this defaults to""allowing the container to use as much swap as memory. For example, if ""`mem_limit` is 300m and `memswap_limit` is not set, the container can use ""600m in total of memory and swap."),)mem_limit:Union[float,str]=Field(default=None,description=("Memory limit of the created container. Accepts float values to enforce ""a limit in bytes or a string with a unit e.g. 100000b, 1000k, 128m, 1g. ""If a string is given without a unit, bytes are assumed."),)privileged:bool=Field(default=False,description="Give extended privileges to this container.",)_block_type_name="Docker Container"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/14a315b79990200db7341e42553e23650b34bb96-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/infrastructure/#prefect.infrastructure.DockerContainer"@validator("labels")defconvert_labels_to_docker_format(cls,labels:Dict[str,str]):labels=labelsor{}new_labels={}forname,valueinlabels.items():if"/"inname:namespace,key=name.split("/",maxsplit=1)new_namespace=".".join(reversed(namespace.split(".")))new_labels[f"{new_namespace}.{key}"]=valueelse:new_labels[name]=valuereturnnew_labels@validator("volumes")defcheck_volume_format(cls,volumes):forvolumeinvolumes:if":"notinvolume:raiseValueError("Invalid volume specification. "f"Expected format 'path:container_path', but got {volume!r}")returnvolumes@sync_compatibleasyncdefrun(self,task_status:Optional[anyio.abc.TaskStatus]=None,)->Optional[bool]:ifnotself.command:raiseValueError("Docker container cannot be run with empty command.")# The `docker` library uses requests instead of an async http library so it must# be run in a thread to avoid blocking the event loop.container=awaitrun_sync_in_worker_thread(self._create_and_start_container)container_pid=self._get_infrastructure_pid(container_id=container.id)# Mark as started and return the infrastructure idiftask_status:task_status.started(container_pid)# Monitor the containercontainer=awaitrun_sync_in_worker_thread(self._watch_container_safe,container)exit_code=container.attrs["State"].get("ExitCode")returnDockerContainerResult(status_code=exit_codeifexit_codeisnotNoneelse-1,identifier=container_pid,)asyncdefkill(self,infrastructure_pid:str,grace_seconds:int=30):docker_client=self._get_client()base_url,container_id=self._parse_infrastructure_pid(infrastructure_pid)ifdocker_client.api.base_url!=base_url:raiseInfrastructureNotAvailable("".join([(f"Unable to stop container {container_id!r}: the current"" Docker API "),(f"URL {docker_client.api.base_url!r} does not match the"" expected "),f"API base URL {base_url}.",]))try:container=docker_client.containers.get(container_id=container_id)exceptdocker.errors.NotFound:raiseInfrastructureNotFound(f"Unable to stop container {container_id!r}: The container was not"" found.")try:container.stop(timeout=grace_seconds)exceptException:raisedefpreview(self):# TODO: build and document a more sophisticated previewdocker_client=self._get_client()try:returnjson.dumps(self._build_container_settings(docker_client))finally:docker_client.close()asyncdefgenerate_work_pool_base_job_template(self):fromprefect.workers.utilitiesimport(get_default_base_job_template_for_infrastructure_type,)base_job_template=awaitget_default_base_job_template_for_infrastructure_type(self.get_corresponding_worker_type())ifbase_job_templateisNone:returnawaitsuper().generate_work_pool_base_job_template()forkey,valueinself.dict(exclude_unset=True,exclude_defaults=True).items():ifkey=="command":base_job_template["variables"]["properties"]["command"]["default"]=shlex.join(value)elifkey=="image_registry":self.logger.warning("Image registry blocks are not supported by Docker"" work pools. Please authenticate to your registry using"" the `docker login` command on your worker instances.")elifkeyin["type","block_type_slug","_block_document_id","_block_document_name","_is_anonymous",]:continueelifkey=="image_pull_policy":new_value=Noneifvalue==ImagePullPolicy.ALWAYS:new_value="Always"elifvalue==ImagePullPolicy.NEVER:new_value="Never"elifvalue==ImagePullPolicy.IF_NOT_PRESENT:new_value="IfNotPresent"base_job_template["variables"]["properties"][key]["default"]=new_valueelifkeyinbase_job_template["variables"]["properties"]:base_job_template["variables"]["properties"][key]["default"]=valueelse:self.logger.warning(f"Variable {key!r} is not supported by Docker work pools. Skipping.")returnbase_job_templatedefget_corresponding_worker_type(self):return"docker"def_get_infrastructure_pid(self,container_id:str)->str:"""Generates a Docker infrastructure_pid string in the form of `<docker_host_base_url>:<container_id>`. """docker_client=self._get_client()base_url=docker_client.api.base_urldocker_client.close()returnf"{base_url}:{container_id}"def_parse_infrastructure_pid(self,infrastructure_pid:str)->Tuple[str,str]:"""Splits a Docker infrastructure_pid into its component parts"""# base_url can contain `:` so we only want the last item of the splitbase_url,container_id=infrastructure_pid.rsplit(":",1)returnbase_url,str(container_id)def_build_container_settings(self,docker_client:"DockerClient",)->Dict:network_mode=self._get_network_mode()returndict(image=self.image,network=self.networks[0]ifself.networkselseNone,network_mode=network_mode,command=self.command,environment=self._get_environment_variables(network_mode),auto_remove=self.auto_remove,labels={**CONTAINER_LABELS,**self.labels},extra_hosts=self._get_extra_hosts(docker_client),name=self._get_container_name(),volumes=self.volumes,mem_limit=self.mem_limit,memswap_limit=self.memswap_limit,privileged=self.privileged,)def_create_and_start_container(self)->"Container":ifself.image_registry:# If an image registry block was supplied, load an authenticated Docker# client from the block. Otherwise, use an unauthenticated client to# pull images from public registries.docker_client=self.image_registry.get_docker_client()else:docker_client=self._get_client()container_settings=self._build_container_settings(docker_client)ifself._should_pull_image(docker_client):self.logger.info(f"Pulling image {self.image!r}...")self._pull_image(docker_client)container=self._create_container(docker_client,**container_settings)# Add additional networks after the container is created; only one network can# be attached at creation timeiflen(self.networks)>1:fornetwork_nameinself.networks[1:]:network=docker_client.networks.get(network_name)network.connect(container)# Start the containercontainer.start()docker_client.close()returncontainerdef_get_image_and_tag(self)->Tuple[str,Optional[str]]:returnparse_image_tag(self.image)def_determine_image_pull_policy(self)->ImagePullPolicy:""" Determine the appropriate image pull policy. 1. If they specified an image pull policy, use that. 2. If they did not specify an image pull policy and gave us the "latest" tag, use ImagePullPolicy.always. 3. If they did not specify an image pull policy and did not specify a tag, use ImagePullPolicy.always. 4. If they did not specify an image pull policy and gave us a tag other than "latest", use ImagePullPolicy.if_not_present. This logic matches the behavior of Kubernetes. See:https://kubernetes.io/docs/concepts/containers/images/#imagepullpolicy-defaulting """ifnotself.image_pull_policy:_,tag=self._get_image_and_tag()iftag=="latest"ornottag:returnImagePullPolicy.ALWAYSreturnImagePullPolicy.IF_NOT_PRESENTreturnself.image_pull_policydef_get_network_mode(self)->Optional[str]:# User's value takes precedence; this may collide with the incompatible options# mentioned below.ifself.network_mode:ifsys.platform!="linux"andself.network_mode=="host":warnings.warn(f"{self.network_mode!r} network mode is not supported on platform "f"{sys.platform!r} and may not work as intended.")returnself.network_mode# Network mode is not compatible with networks or ports (we do not support ports# yet though)ifself.networks:returnNone# Check for a local API connectionapi_url=self.env.get("PREFECT_API_URL",PREFECT_API_URL.value())ifapi_url:try:_,netloc,_,_,_,_=urllib.parse.urlparse(api_url)exceptExceptionasexc:warnings.warn(f"Failed to parse host from API URL {api_url!r} with exception: "f"{exc}\nThe network mode will not be inferred.")returnNonehost=netloc.split(":")[0]# If using a locally hosted API, use a host network on linuxifsys.platform=="linux"and(host=="127.0.0.1"orhost=="localhost"):return"host"# Default to unsetreturnNonedef_should_pull_image(self,docker_client:"DockerClient")->bool:""" Decide whether we need to pull the Docker image. """image_pull_policy=self._determine_image_pull_policy()ifimage_pull_policyisImagePullPolicy.ALWAYS:returnTrueelifimage_pull_policyisImagePullPolicy.NEVER:returnFalseelifimage_pull_policyisImagePullPolicy.IF_NOT_PRESENT:try:# NOTE: images.get() wants the tag included with the image# name, while images.pull() wants them split.docker_client.images.get(self.image)exceptdocker.errors.ImageNotFound:self.logger.debug(f"Could not find Docker image locally: {self.image}")returnTruereturnFalsedef_pull_image(self,docker_client:"DockerClient"):""" Pull the image we're going to use to create the container. """image,tag=self._get_image_and_tag()returndocker_client.images.pull(image,tag)def_create_container(self,docker_client:"DockerClient",**kwargs)->"Container":""" Create a docker container with retries on name conflicts. If the container already exists with the given name, an incremented index is added. """# Create the container with retries on name conflicts (with an incremented idx)index=0container=Nonename=original_name=kwargs.pop("name")whilenotcontainer:fromdocker.errorsimportAPIErrortry:display_name=repr(name)ifnameelse"with auto-generated name"self.logger.info(f"Creating Docker container {display_name}...")container=docker_client.containers.create(name=name,**kwargs)exceptAPIErrorasexc:if"Conflict"instr(exc)and"container name"instr(exc):self.logger.info(f"Docker container name {display_name} already exists; ""retrying...")index+=1name=f"{original_name}-{index}"else:raiseself.logger.info(f"Docker container {container.name!r} has status {container.status!r}")returncontainerdef_watch_container_safe(self,container:"Container")->"Container":# Monitor the container capturing the latest snapshot while capturing# not found errorsdocker_client=self._get_client()try:forlatest_containerinself._watch_container(docker_client,container.id):container=latest_containerexceptdocker.errors.NotFound:# The container was removed during watchingself.logger.warning(f"Docker container {container.name} was removed before we could wait ""for its completion.")finally:docker_client.close()returncontainerdef_watch_container(self,docker_client:"DockerClient",container_id:str)->Generator[None,None,"Container"]:container:"Container"=docker_client.containers.get(container_id)status=container.statusself.logger.info(f"Docker container {container.name!r} has status {container.status!r}")yieldcontainerifself.stream_output:try:forlogincontainer.logs(stream=True):log:bytesprint(log.decode().rstrip())exceptdocker.errors.APIErrorasexc:if"marked for removal"instr(exc):self.logger.warning(f"Docker container {container.name} was marked for removal"" before logs could be retrieved. Output will not be"" streamed. ")else:self.logger.exception("An unexpected Docker API error occurred while streaming"f" output from container {container.name}.")container.reload()ifcontainer.status!=status:self.logger.info(f"Docker container {container.name!r} has status"f" {container.status!r}")yieldcontainercontainer.wait()self.logger.info(f"Docker container {container.name!r} has status {container.status!r}")yieldcontainerdef_get_client(self):try:withwarnings.catch_warnings():# Silence warnings due to use of deprecated methods within dockerpy# See https://github.com/docker/docker-py/pull/2931warnings.filterwarnings("ignore",message="distutils Version classes are deprecated.*",category=DeprecationWarning,)docker_client=docker.from_env()exceptdocker.errors.DockerExceptionasexc:raiseRuntimeError("Could not connect to Docker.")fromexcreturndocker_clientdef_get_container_name(self)->Optional[str]:""" Generates a container name to match the configured name, ensuring it is Docker compatible. """# Must match `/?[a-zA-Z0-9][a-zA-Z0-9_.-]+` in the endifnotself.name:returnNonereturn(slugify(self.name,lowercase=False,# Docker does not limit length but URL limits apply eventually so# limit the length for safetymax_length=250,# Docker allows these characters for container namesregex_pattern=r"[^a-zA-Z0-9_.-]+",).lstrip(# Docker does not allow leading underscore, dash, or period"_-.")# Docker does not allow 0 character names so cast to null if the name is# empty after slufificationorNone)def_get_extra_hosts(self,docker_client)->Dict[str,str]:""" A host.docker.internal -> host-gateway mapping is necessary for communicating with the API on Linux machines. Docker Desktop on macOS will automatically already have this mapping. """ifsys.platform=="linux"and(# Do not warn if the user has specified a host manually that does not use# a local address"PREFECT_API_URL"notinself.envorre.search(".*(localhost)|(127.0.0.1)|(host.docker.internal).*",self.env["PREFECT_API_URL"],)):user_version=packaging.version.parse(format_outlier_version_name(docker_client.version()["Version"]))required_version=packaging.version.parse("20.10.0")ifuser_version<required_version:warnings.warn("`host.docker.internal` could not be automatically resolved to"" your local ip address. This feature is not supported on Docker"f" Engine v{user_version}, upgrade to v{required_version}+ if you"" encounter issues.")return{}else:# Compatibility for linux -- https://github.com/docker/cli/issues/2290# Only supported by Docker v20.10.0+ which is our minimum recommend versionreturn{"host.docker.internal":"host-gateway"}def_get_environment_variables(self,network_mode):# If the API URL has been set by the base environment rather than the by the# user, update the value to ensure connectivity when using a bridge network by# updating local connections to use the docker internal host unless the# network mode is "host" where localhost is available already.env={**self._base_environment(),**self.env}if("PREFECT_API_URL"inenvand"PREFECT_API_URL"notinself.envandnetwork_mode!="host"):env["PREFECT_API_URL"]=(env["PREFECT_API_URL"].replace("localhost","host.docker.internal").replace("127.0.0.1","host.docker.internal"))# Drop null values allowing users to "unset" variablesreturn{key:valueforkey,valueinenv.items()ifvalueisnotNone}
@deprecated_class(start_date="Mar 2024",help="Use the `BaseWorker` class to create custom infrastructure integrations instead."" Refer to the upgrade guide for more information:"" https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/.",)classInfrastructure(Block,abc.ABC):_block_schema_capabilities=["run-infrastructure"]type:strenv:Dict[str,Optional[str]]=pydantic.Field(default_factory=dict,title="Environment",description="Environment variables to set in the configured infrastructure.",)labels:Dict[str,str]=pydantic.Field(default_factory=dict,description="Labels applied to the infrastructure for metadata purposes.",)name:Optional[str]=pydantic.Field(default=None,description="Name applied to the infrastructure for identification.",)command:Optional[List[str]]=pydantic.Field(default=None,description="The command to run in the infrastructure.",)asyncdefgenerate_work_pool_base_job_template(self):ifself._block_document_idisNone:raiseBlockNotSavedError("Cannot publish as work pool, block has not been saved. Please call"" `.save()` on your block before publishing.")block_schema=self.__class__.schema()return{"job_configuration":{"block":"{{ block }}"},"variables":{"type":"object","properties":{"block":{"title":"Block","description":("The infrastructure block to use for job creation."),"allOf":[{"$ref":f"#/definitions/{self.__class__.__name__}"}],"default":{"$ref":{"block_document_id":str(self._block_document_id)}},}},"required":["block"],"definitions":{self.__class__.__name__:block_schema},},}defget_corresponding_worker_type(self):return"block"@sync_compatibleasyncdefpublish_as_work_pool(self,work_pool_name:Optional[str]=None):""" Creates a work pool configured to use the given block as the job creator. Used to migrate from a agents setup to a worker setup. Args: work_pool_name: The name to give to the created work pool. If not provided, the name of the current block will be used. """base_job_template=awaitself.generate_work_pool_base_job_template()work_pool_name=work_pool_nameorself._block_document_nameifwork_pool_nameisNone:raiseValueError("`work_pool_name` must be provided if the block has not been saved.")console=Console()try:asyncwithprefect.get_client()asclient:work_pool=awaitclient.create_work_pool(work_pool=WorkPoolCreate(name=work_pool_name,type=self.get_corresponding_worker_type(),base_job_template=base_job_template,))exceptObjectAlreadyExists:console.print((f"Work pool with name {work_pool_name!r} already exists, please use"" a different name."),style="red",)returnconsole.print(f"Work pool {work_pool.name} created!",style="green",)ifPREFECT_UI_URL:console.print("You see your new work pool in the UI at"f" {PREFECT_UI_URL.value()}/work-pools/work-pool/{work_pool.name}")deploy_script=("my_flow.deploy(work_pool_name='{work_pool.name}', image='my_image:tag')")ifnothasattr(self,"image"):deploy_script=("my_flow.from_source(source='https://github.com/org/repo.git',"f" entrypoint='flow.py:my_flow').deploy(work_pool_name='{work_pool.name}')")console.print("\nYou can deploy a flow to this work pool by calling"f" [blue].deploy[/]:\n\n\t{deploy_script}\n")console.print("\nTo start a worker to execute flow runs in this work pool run:\n")console.print(f"\t[blue]prefect worker start --pool {work_pool.name}[/]\n")@abc.abstractmethodasyncdefrun(self,task_status:anyio.abc.TaskStatus=None,)->InfrastructureResult:""" Run the infrastructure. If provided a `task_status`, the status will be reported as started when the infrastructure is successfully created. The status return value will be an identifier for the infrastructure. The call will then monitor the created infrastructure, returning a result at the end containing a status code indicating if the infrastructure exited cleanly or encountered an error. """# Note: implementations should include `sync_compatible`@abc.abstractmethoddefpreview(self)->str:""" View a preview of the infrastructure that would be run. """@propertydeflogger(self):returnget_logger(f"prefect.infrastructure.{self.type}")@propertydefis_using_a_runner(self):returnself.commandisnotNoneand"prefect flow-run execute"inshlex.join(self.command)@classmethoddef_base_environment(cls)->Dict[str,str]:""" Environment variables that should be passed to all created infrastructure. These values should be overridable with the `env` field. """returnget_current_settings().to_environment_variables(exclude_unset=True)defprepare_for_flow_run(self:Self,flow_run:"FlowRun",deployment:Optional["Deployment"]=None,flow:Optional["Flow"]=None,)->Self:""" Return an infrastructure block that is prepared to execute a flow run. """ifdeploymentisnotNone:deployment_labels=self._base_deployment_labels(deployment)else:deployment_labels={}ifflowisnotNone:flow_labels=self._base_flow_labels(flow)else:flow_labels={}returnself.copy(update={"env":{**self._base_flow_run_environment(flow_run),**self.env},"labels":{**self._base_flow_run_labels(flow_run),**deployment_labels,**flow_labels,**self.labels,},"name":self.nameorflow_run.name,"command":self.commandorself._base_flow_run_command(),})@staticmethoddef_base_flow_run_command()->List[str]:""" Generate a command for a flow run job. """ifexperiment_enabled("enhanced_cancellation"):if(PREFECT_EXPERIMENTAL_WARNandPREFECT_EXPERIMENTAL_WARN_ENHANCED_CANCELLATION):warnings.warn(EXPERIMENTAL_WARNING.format(feature="Enhanced flow run cancellation",group="enhanced_cancellation",help="",),ExperimentalFeature,stacklevel=3,)return["prefect","flow-run","execute"]return["python","-m","prefect.engine"]@staticmethoddef_base_flow_run_labels(flow_run:"FlowRun")->Dict[str,str]:""" Generate a dictionary of labels for a flow run job. """return{"prefect.io/flow-run-id":str(flow_run.id),"prefect.io/flow-run-name":flow_run.name,"prefect.io/version":prefect.__version__,}@staticmethoddef_base_flow_run_environment(flow_run:"FlowRun")->Dict[str,str]:""" Generate a dictionary of environment variables for a flow run job. """environment={}environment["PREFECT__FLOW_RUN_ID"]=str(flow_run.id)returnenvironment@staticmethoddef_base_deployment_labels(deployment:"Deployment")->Dict[str,str]:labels={"prefect.io/deployment-name":deployment.name,}ifdeployment.updatedisnotNone:labels["prefect.io/deployment-updated"]=deployment.updated.in_timezone("utc").to_iso8601_string()returnlabels@staticmethoddef_base_flow_labels(flow:"Flow")->Dict[str,str]:return{"prefect.io/flow-name":flow.name,}
defprepare_for_flow_run(self:Self,flow_run:"FlowRun",deployment:Optional["Deployment"]=None,flow:Optional["Flow"]=None,)->Self:""" Return an infrastructure block that is prepared to execute a flow run. """ifdeploymentisnotNone:deployment_labels=self._base_deployment_labels(deployment)else:deployment_labels={}ifflowisnotNone:flow_labels=self._base_flow_labels(flow)else:flow_labels={}returnself.copy(update={"env":{**self._base_flow_run_environment(flow_run),**self.env},"labels":{**self._base_flow_run_labels(flow_run),**deployment_labels,**flow_labels,**self.labels,},"name":self.nameorflow_run.name,"command":self.commandorself._base_flow_run_command(),})
@sync_compatibleasyncdefpublish_as_work_pool(self,work_pool_name:Optional[str]=None):""" Creates a work pool configured to use the given block as the job creator. Used to migrate from a agents setup to a worker setup. Args: work_pool_name: The name to give to the created work pool. If not provided, the name of the current block will be used. """base_job_template=awaitself.generate_work_pool_base_job_template()work_pool_name=work_pool_nameorself._block_document_nameifwork_pool_nameisNone:raiseValueError("`work_pool_name` must be provided if the block has not been saved.")console=Console()try:asyncwithprefect.get_client()asclient:work_pool=awaitclient.create_work_pool(work_pool=WorkPoolCreate(name=work_pool_name,type=self.get_corresponding_worker_type(),base_job_template=base_job_template,))exceptObjectAlreadyExists:console.print((f"Work pool with name {work_pool_name!r} already exists, please use"" a different name."),style="red",)returnconsole.print(f"Work pool {work_pool.name} created!",style="green",)ifPREFECT_UI_URL:console.print("You see your new work pool in the UI at"f" {PREFECT_UI_URL.value()}/work-pools/work-pool/{work_pool.name}")deploy_script=("my_flow.deploy(work_pool_name='{work_pool.name}', image='my_image:tag')")ifnothasattr(self,"image"):deploy_script=("my_flow.from_source(source='https://github.com/org/repo.git',"f" entrypoint='flow.py:my_flow').deploy(work_pool_name='{work_pool.name}')")console.print("\nYou can deploy a flow to this work pool by calling"f" [blue].deploy[/]:\n\n\t{deploy_script}\n")console.print("\nTo start a worker to execute flow runs in this work pool run:\n")console.print(f"\t[blue]prefect worker start --pool {work_pool.name}[/]\n")
If provided a task_status, the status will be reported as started when the
infrastructure is successfully created. The status return value will be an
identifier for the infrastructure.
The call will then monitor the created infrastructure, returning a result at
the end containing a status code indicating if the infrastructure exited cleanly
or encountered an error.
Source code in src/prefect/infrastructure/base.py
189190191192193194195196197198199200201202203204
@abc.abstractmethodasyncdefrun(self,task_status:anyio.abc.TaskStatus=None,)->InfrastructureResult:""" Run the infrastructure. If provided a `task_status`, the status will be reported as started when the infrastructure is successfully created. The status return value will be an identifier for the infrastructure. The call will then monitor the created infrastructure, returning a result at the end containing a status code indicating if the infrastructure exited cleanly or encountered an error. """
@deprecated_class(start_date="Mar 2024",help="Use the KubernetesClusterConfig block from prefect-kubernetes instead.",)classKubernetesClusterConfig(Block):""" Stores configuration for interaction with Kubernetes clusters. See `from_file` for creation. Attributes: config: The entire loaded YAML contents of a kubectl config file context_name: The name of the kubectl context to use Example: Load a saved Kubernetes cluster config: ```python from prefect.blocks.kubernetes import KubernetesClusterConfig cluster_config_block = KubernetesClusterConfig.load("BLOCK_NAME") ``` """_block_type_name="Kubernetes Cluster Config"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/2d0b896006ad463b49c28aaac14f31e00e32cfab-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/blocks/kubernetes/#prefect.blocks.kubernetes.KubernetesClusterConfig"config:Dict=Field(default=...,description="The entire contents of a kubectl config file.")context_name:str=Field(default=...,description="The name of the kubectl context to use.")@validator("config",pre=True)defparse_yaml_config(cls,value):returnvalidate_yaml(value)@classmethoddeffrom_file(cls:Type[Self],path:Path=None,context_name:str=None)->Self:""" Create a cluster config from the a Kubernetes config file. By default, the current context in the default Kubernetes config file will be used. An alternative file or context may be specified. The entire config file will be loaded and stored. """kube_config=kubernetes.config.kube_configpath=Path(pathorkube_config.KUBE_CONFIG_DEFAULT_LOCATION)path=path.expanduser().resolve()# Determine the contextexisting_contexts,current_context=kube_config.list_kube_config_contexts(config_file=str(path))context_names={ctx["name"]forctxinexisting_contexts}ifcontext_name:ifcontext_namenotincontext_names:raiseValueError(f"Context {context_name!r} not found. "f"Specify one of: {listrepr(context_names,sep=', ')}.")else:context_name=current_context["name"]# Load the entire config fileconfig_file_contents=path.read_text()config_dict=yaml.safe_load(config_file_contents)returncls(config=config_dict,context_name=context_name)defget_api_client(self)->"ApiClient":""" Returns a Kubernetes API client for this cluster config. """returnkubernetes.config.kube_config.new_client_from_config_dict(config_dict=self.config,context=self.context_name)defconfigure_client(self)->None:""" Activates this cluster configuration by loading the configuration into the Kubernetes Python client. After calling this, Kubernetes API clients can use this config's context. """kubernetes.config.kube_config.load_kube_config_from_dict(config_dict=self.config,context=self.context_name)
Activates this cluster configuration by loading the configuration into the
Kubernetes Python client. After calling this, Kubernetes API clients can use
this config's context.
Source code in src/prefect/blocks/kubernetes.py
111112113114115116117118119
defconfigure_client(self)->None:""" Activates this cluster configuration by loading the configuration into the Kubernetes Python client. After calling this, Kubernetes API clients can use this config's context. """kubernetes.config.kube_config.load_kube_config_from_dict(config_dict=self.config,context=self.context_name)
@classmethoddeffrom_file(cls:Type[Self],path:Path=None,context_name:str=None)->Self:""" Create a cluster config from the a Kubernetes config file. By default, the current context in the default Kubernetes config file will be used. An alternative file or context may be specified. The entire config file will be loaded and stored. """kube_config=kubernetes.config.kube_configpath=Path(pathorkube_config.KUBE_CONFIG_DEFAULT_LOCATION)path=path.expanduser().resolve()# Determine the contextexisting_contexts,current_context=kube_config.list_kube_config_contexts(config_file=str(path))context_names={ctx["name"]forctxinexisting_contexts}ifcontext_name:ifcontext_namenotincontext_names:raiseValueError(f"Context {context_name!r} not found. "f"Specify one of: {listrepr(context_names,sep=', ')}.")else:context_name=current_context["name"]# Load the entire config fileconfig_file_contents=path.read_text()config_dict=yaml.safe_load(config_file_contents)returncls(config=config_dict,context_name=context_name)
Returns a Kubernetes API client for this cluster config.
Source code in src/prefect/blocks/kubernetes.py
103104105106107108109
defget_api_client(self)->"ApiClient":""" Returns a Kubernetes API client for this cluster config. """returnkubernetes.config.kube_config.new_client_from_config_dict(config_dict=self.config,context=self.context_name)
A list of strings specifying the command to run in the container to
start the flow run. In most cases you should not override this.
customizations
JsonPatch
A list of JSON 6902 patches to apply to the base Job manifest.
env
JsonPatch
Environment variables to set for the container.
finished_job_ttl
Optional[int]
The number of seconds to retain jobs after completion. If set, finished jobs will
be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be
manually removed.
image
Optional[str]
An optional string specifying the image reference of a container image
to use for the job, for example, docker.io/prefecthq/prefect:2-latest. The
behavior is as described in https://kubernetes.io/docs/concepts/containers/images/#image-names.
Defaults to the Prefect image.
image_pull_policy
Optional[KubernetesImagePullPolicy]
The Kubernetes image pull policy to use for job containers.
job
KubernetesManifest
The base manifest for the Kubernetes Job.
job_watch_timeout_seconds
Optional[int]
Number of seconds to wait for the job to complete
before marking it as crashed. Defaults to None, which means no timeout will be enforced.
labels
Optional[int]
An optional dictionary of labels to add to the job.
name
Optional[int]
An optional name for the job.
namespace
Optional[str]
An optional string signifying the Kubernetes namespace to use.
pod_watch_timeout_seconds
int
Number of seconds to watch for pod creation before timing out (default 60).
service_account_name
Optional[str]
An optional string specifying which Kubernetes service account to use.
stream_output
bool
If set, stream output from the job to local standard output.
Source code in src/prefect/infrastructure/kubernetes.py
@deprecated_class(start_date="Mar 2024",help="Use the Kubernetes worker from prefect-kubernetes instead."" Refer to the upgrade guide for more information:"" https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/.",)classKubernetesJob(Infrastructure):""" Runs a command as a Kubernetes Job. For a guided tutorial, see [How to use Kubernetes with Prefect](https://medium.com/the-prefect-blog/how-to-use-kubernetes-with-prefect-419b2e8b8cb2/). For more information, including examples for customizing the resulting manifest, see [`KubernetesJob` infrastructure concepts](https://docs.prefect.io/concepts/infrastructure/#kubernetesjob). Attributes: cluster_config: An optional Kubernetes cluster config to use for this job. command: A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. customizations: A list of JSON 6902 patches to apply to the base Job manifest. env: Environment variables to set for the container. finished_job_ttl: The number of seconds to retain jobs after completion. If set, finished jobs will be cleaned up by Kubernetes after the given delay. If None (default), jobs will need to be manually removed. image: An optional string specifying the image reference of a container image to use for the job, for example, docker.io/prefecthq/prefect:2-latest. The behavior is as described in https://kubernetes.io/docs/concepts/containers/images/#image-names. Defaults to the Prefect image. image_pull_policy: The Kubernetes image pull policy to use for job containers. job: The base manifest for the Kubernetes Job. job_watch_timeout_seconds: Number of seconds to wait for the job to complete before marking it as crashed. Defaults to `None`, which means no timeout will be enforced. labels: An optional dictionary of labels to add to the job. name: An optional name for the job. namespace: An optional string signifying the Kubernetes namespace to use. pod_watch_timeout_seconds: Number of seconds to watch for pod creation before timing out (default 60). service_account_name: An optional string specifying which Kubernetes service account to use. stream_output: If set, stream output from the job to local standard output. """_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/2d0b896006ad463b49c28aaac14f31e00e32cfab-250x250.png"_documentation_url="https://docs.prefect.io/api-ref/prefect/infrastructure/#prefect.infrastructure.KubernetesJob"type:Literal["kubernetes-job"]=Field(default="kubernetes-job",description="The type of infrastructure.")# shortcuts for the most common user-serviceable settingsimage:Optional[str]=Field(default=None,description=("The image reference of a container image to use for the job, for example,"" `docker.io/prefecthq/prefect:2-latest`.The behavior is as described in"" the Kubernetes documentation and uses the latest version of Prefect by"" default, unless an image is already present in a provided job manifest."),)namespace:Optional[str]=Field(default=None,description=("The Kubernetes namespace to use for this job. Defaults to 'default' ""unless a namespace is already present in a provided job manifest."),)service_account_name:Optional[str]=Field(default=None,description="The Kubernetes service account to use for this job.")image_pull_policy:Optional[KubernetesImagePullPolicy]=Field(default=None,description="The Kubernetes image pull policy to use for job containers.",)# connection to a clustercluster_config:Optional[KubernetesClusterConfig]=Field(default=None,description="The Kubernetes cluster config to use for this job.")# settings allowing full customization of the Jobjob:KubernetesManifest=Field(default_factory=lambda:KubernetesJob.base_job_manifest(),description="The base manifest for the Kubernetes Job.",title="Base Job Manifest",)customizations:JsonPatch=Field(default_factory=lambda:JsonPatch([]),description="A list of JSON 6902 patches to apply to the base Job manifest.",)# controls the behavior of executionjob_watch_timeout_seconds:Optional[int]=Field(default=None,description=("Number of seconds to wait for the job to complete before marking it as"" crashed. Defaults to `None`, which means no timeout will be enforced."),)pod_watch_timeout_seconds:int=Field(default=60,description="Number of seconds to watch for pod creation before timing out.",)stream_output:bool=Field(default=True,description=("If set, output will be streamed from the job to local standard output."),)finished_job_ttl:Optional[int]=Field(default=None,description=("The number of seconds to retain jobs after completion. If set, finished"" jobs will be cleaned up by Kubernetes after the given delay. If None"" (default), jobs will need to be manually removed."),)# internal-use only right now_api_dns_name:Optional[str]=None# Replaces 'localhost' in API URL_block_type_name="Kubernetes Job"@validator("job")defensure_job_includes_all_required_components(cls,value:KubernetesManifest):returnvalidate_k8s_job_required_components(cls,value)@validator("job")defensure_job_has_compatible_values(cls,value:KubernetesManifest):returnvalidate_k8s_job_compatible_values(cls,value)@validator("customizations",pre=True)defcast_customizations_to_a_json_patch(cls,value:Union[List[Dict],JsonPatch,str])->JsonPatch:returncast_k8s_job_customizations(cls,value)@root_validatordefdefault_namespace(cls,values):returnset_default_namespace(values)@root_validatordefdefault_image(cls,values):returnset_default_image(values)# Support serialization of the 'JsonPatch' typeclassConfig:arbitrary_types_allowed=Truejson_encoders={JsonPatch:lambdap:p.patch}defdict(self,*args,**kwargs)->Dict:d=super().dict(*args,**kwargs)d["customizations"]=self.customizations.patchreturnd@classmethoddefbase_job_manifest(cls)->KubernetesManifest:"""Produces the bare minimum allowed Job manifest"""return{"apiVersion":"batch/v1","kind":"Job","metadata":{"labels":{}},"spec":{"template":{"spec":{"parallelism":1,"completions":1,"restartPolicy":"Never","containers":[{"name":"prefect-job","env":[],}],}}},}# Note that we're using the yaml package to load both YAML and JSON files below.# This works because YAML is a strict superset of JSON:## > The YAML 1.23 specification was published in 2009. Its primary focus was# > making YAML a strict superset of JSON. It also removed many of the problematic# > implicit typing recommendations.## https://yaml.org/spec/1.2.2/#12-yaml-history@classmethoddefjob_from_file(cls,filename:str)->KubernetesManifest:"""Load a Kubernetes Job manifest from a YAML or JSON file."""withopen(filename,"r",encoding="utf-8")asf:returnyaml.load(f,yaml.SafeLoader)@classmethoddefcustomize_from_file(cls,filename:str)->JsonPatch:"""Load an RFC 6902 JSON patch from a YAML or JSON file."""withopen(filename,"r",encoding="utf-8")asf:returnJsonPatch(yaml.load(f,yaml.SafeLoader))@sync_compatibleasyncdefrun(self,task_status:Optional[anyio.abc.TaskStatus]=None,)->KubernetesJobResult:ifnotself.command:raiseValueError("Kubernetes job cannot be run with empty command.")self._configure_kubernetes_library_client()manifest=self.build_job()job=awaitrun_sync_in_worker_thread(self._create_job,manifest)pid=awaitrun_sync_in_worker_thread(self._get_infrastructure_pid,job)# Indicate that the job has startediftask_statusisnotNone:task_status.started(pid)# Monitor the job until completionstatus_code=awaitrun_sync_in_worker_thread(self._watch_job,job.metadata.name)returnKubernetesJobResult(identifier=pid,status_code=status_code)asyncdefkill(self,infrastructure_pid:str,grace_seconds:int=30):self._configure_kubernetes_library_client()job_cluster_uid,job_namespace,job_name=self._parse_infrastructure_pid(infrastructure_pid)ifnotjob_namespace==self.namespace:raiseInfrastructureNotAvailable(f"Unable to kill job {job_name!r}: The job is running in namespace "f"{job_namespace!r} but this block is configured to use "f"{self.namespace!r}.")current_cluster_uid=self._get_cluster_uid()ifjob_cluster_uid!=current_cluster_uid:raiseInfrastructureNotAvailable(f"Unable to kill job {job_name!r}: The job is running on another ""cluster.")withself.get_batch_client()asbatch_client:try:batch_client.delete_namespaced_job(name=job_name,namespace=job_namespace,grace_period_seconds=grace_seconds,# Foreground propagation deletes dependent objects before deleting owner objects.# This ensures that the pods are cleaned up before the job is marked as deleted.# See: https://kubernetes.io/docs/concepts/architecture/garbage-collection/#foreground-deletionpropagation_policy="Foreground",)exceptkubernetes.client.exceptions.ApiExceptionasexc:ifexc.status==404:raiseInfrastructureNotFound(f"Unable to kill job {job_name!r}: The job was not found.")fromexcelse:raisedefpreview(self):returnyaml.dump(self.build_job())defget_corresponding_worker_type(self):return"kubernetes"asyncdefgenerate_work_pool_base_job_template(self):fromprefect.workers.utilitiesimport(get_default_base_job_template_for_infrastructure_type,)base_job_template=awaitget_default_base_job_template_for_infrastructure_type(self.get_corresponding_worker_type())assert(base_job_templateisnotNone),"Failed to retrieve default base job template."forkey,valueinself.dict(exclude_unset=True,exclude_defaults=True).items():ifkey=="command":base_job_template["variables"]["properties"]["command"]["default"]=shlex.join(value)elifkeyin["type","block_type_slug","_block_document_id","_block_document_name","_is_anonymous","job","customizations",]:continueelifkey=="image_pull_policy":base_job_template["variables"]["properties"]["image_pull_policy"]["default"]=value.valueelifkey=="cluster_config":base_job_template["variables"]["properties"]["cluster_config"]["default"]={"$ref":{"block_document_id":str(self.cluster_config._block_document_id)}}elifkeyinbase_job_template["variables"]["properties"]:base_job_template["variables"]["properties"][key]["default"]=valueelse:self.logger.warning(f"Variable {key!r} is not supported by Kubernetes work pools."" Skipping.")custom_job_manifest=self.dict(exclude_unset=True,exclude_defaults=True).get("job")ifcustom_job_manifest:job_manifest=self.build_job()else:job_manifest=copy.deepcopy(base_job_template["job_configuration"]["job_manifest"])job_manifest=self.customizations.apply(job_manifest)base_job_template["job_configuration"]["job_manifest"]=job_manifestreturnbase_job_templatedefbuild_job(self)->KubernetesManifest:"""Builds the Kubernetes Job Manifest"""job_manifest=copy.copy(self.job)job_manifest=self._shortcut_customizations().apply(job_manifest)job_manifest=self.customizations.apply(job_manifest)returnjob_manifest@contextmanagerdefget_batch_client(self)->Generator["BatchV1Api",None,None]:withkubernetes.client.ApiClient()asclient:try:yieldkubernetes.client.BatchV1Api(api_client=client)finally:client.rest_client.pool_manager.clear()@contextmanagerdefget_client(self)->Generator["CoreV1Api",None,None]:withkubernetes.client.ApiClient()asclient:try:yieldkubernetes.client.CoreV1Api(api_client=client)finally:client.rest_client.pool_manager.clear()def_get_infrastructure_pid(self,job:"V1Job")->str:""" Generates a Kubernetes infrastructure PID. The PID is in the format: "<cluster uid>:<namespace>:<job name>". """cluster_uid=self._get_cluster_uid()pid=f"{cluster_uid}:{self.namespace}:{job.metadata.name}"returnpiddef_parse_infrastructure_pid(self,infrastructure_pid:str)->Tuple[str,str,str]:""" Parse a Kubernetes infrastructure PID into its component parts. Returns a cluster UID, namespace, and job name. """cluster_uid,namespace,job_name=infrastructure_pid.split(":",2)returncluster_uid,namespace,job_namedef_get_cluster_uid(self)->str:""" Gets a unique id for the current cluster being used. There is no real unique identifier for a cluster. However, the `kube-system` namespace is immutable and has a persistence UID that we use instead. PREFECT_KUBERNETES_CLUSTER_UID can be set in cases where the `kube-system` namespace cannot be read e.g. when a cluster role cannot be created. If set, this variable will be used and we will not attempt to read the `kube-system` namespace. See https://github.com/kubernetes/kubernetes/issues/44954 """# Default to an environment variableenv_cluster_uid=os.environ.get("PREFECT_KUBERNETES_CLUSTER_UID")ifenv_cluster_uid:returnenv_cluster_uid# Read the UID from the cluster namespacewithself.get_client()asclient:namespace=client.read_namespace("kube-system")cluster_uid=namespace.metadata.uidreturncluster_uiddef_configure_kubernetes_library_client(self)->None:""" Set the correct kubernetes client configuration. WARNING: This action is not threadsafe and may override the configuration specified by another `KubernetesJob` instance. """# TODO: Investigate returning a configured client so calls on other threads# will not invalidate the config needed here# if a k8s cluster block is provided to the flow runner, use thatifself.cluster_config:self.cluster_config.configure_client()else:# If no block specified, try to load Kubernetes configuration within a cluster. If that doesn't# work, try to load the configuration from the local environment, allowing# any further ConfigExceptions to bubble up.try:kubernetes.config.load_incluster_config()exceptkubernetes.config.ConfigException:kubernetes.config.load_kube_config()def_shortcut_customizations(self)->JsonPatch:"""Produces the JSON 6902 patch for the most commonly used customizations, like image and namespace, which we offer as top-level parameters (with sensible default values)"""shortcuts=[]ifself.namespace:shortcuts.append({"op":"add","path":"/metadata/namespace","value":self.namespace,})ifself.image:shortcuts.append({"op":"add","path":"/spec/template/spec/containers/0/image","value":self.image,})shortcuts+=[{"op":"add","path":(f"/metadata/labels/{self._slugify_label_key(key).replace('/','~1',1)}"),"value":self._slugify_label_value(value),}forkey,valueinself.labels.items()]shortcuts+=[{"op":"add","path":"/spec/template/spec/containers/0/env/-","value":{"name":key,"value":value},}forkey,valueinself._get_environment_variables().items()]ifself.image_pull_policy:shortcuts.append({"op":"add","path":"/spec/template/spec/containers/0/imagePullPolicy","value":self.image_pull_policy.value,})ifself.service_account_name:shortcuts.append({"op":"add","path":"/spec/template/spec/serviceAccountName","value":self.service_account_name,})ifself.finished_job_ttlisnotNone:shortcuts.append({"op":"add","path":"/spec/ttlSecondsAfterFinished","value":self.finished_job_ttl,})ifself.command:shortcuts.append({"op":"add","path":"/spec/template/spec/containers/0/args","value":self.command,})ifself.name:shortcuts.append({"op":"add","path":"/metadata/generateName","value":self._slugify_name(self.name)+"-",})else:# Generate name is requiredshortcuts.append({"op":"add","path":"/metadata/generateName","value":("prefect-job-"# We generate a name using a hash of the primary job settings+stable_hash(*self.command,*self.env.keys(),*[vforvinself.env.values()ifvisnotNone],)+"-"),})returnJsonPatch(shortcuts)def_get_job(self,job_id:str)->Optional["V1Job"]:withself.get_batch_client()asbatch_client:try:job=batch_client.read_namespaced_job(job_id,self.namespace)exceptkubernetes.client.exceptions.ApiException:self.logger.error(f"Job {job_id!r} was removed.",exc_info=True)returnNonereturnjobdef_get_job_pod(self,job_name:str)->"V1Pod":"""Get the first running pod for a job."""# Wait until we find a running pod for the job# if `pod_watch_timeout_seconds` is None, no timeout will be enforcedwatch=kubernetes.watch.Watch()self.logger.debug(f"Job {job_name!r}: Starting watch for pod start...")last_phase=Nonewithself.get_client()asclient:foreventinwatch.stream(func=client.list_namespaced_pod,namespace=self.namespace,label_selector=f"job-name={job_name}",timeout_seconds=self.pod_watch_timeout_seconds,):phase=event["object"].status.phaseifphase!=last_phase:self.logger.info(f"Job {job_name!r}: Pod has status {phase!r}.")ifphase!="Pending":watch.stop()returnevent["object"]last_phase=phaseself.logger.error(f"Job {job_name!r}: Pod never started.")def_watch_job(self,job_name:str)->int:""" Watch a job. Return the final status code of the first container. """self.logger.debug(f"Job {job_name!r}: Monitoring job...")job=self._get_job(job_name)ifnotjob:return-1pod=self._get_job_pod(job_name)ifnotpod:return-1# Calculate the deadline before streaming outputdeadline=((time.monotonic()+self.job_watch_timeout_seconds)ifself.job_watch_timeout_secondsisnotNoneelseNone)ifself.stream_output:withself.get_client()asclient:logs=client.read_namespaced_pod_log(pod.metadata.name,self.namespace,follow=True,_preload_content=False,container="prefect-job",)try:forloginlogs.stream():print(log.decode().rstrip())# Check if we have passed the deadline and should stop streaming# logsremaining_time=(deadline-time.monotonic()ifdeadlineelseNone)ifdeadlineandremaining_time<=0:breakexceptException:self.logger.warning(("Error occurred while streaming logs - ""Job will continue to run but logs will ""no longer be streamed to stdout."),exc_info=True,)withself.get_batch_client()asbatch_client:# Check if the job is completed before beginning a watchjob=batch_client.read_namespaced_job(name=job_name,namespace=self.namespace)completed=job.status.completion_timeisnotNonewhilenotcompleted:remaining_time=(math.ceil(deadline-time.monotonic())ifdeadlineelseNone)ifdeadlineandremaining_time<=0:self.logger.error(f"Job {job_name!r}: Job did not complete within "f"timeout of {self.job_watch_timeout_seconds}s.")return-1watch=kubernetes.watch.Watch()# The kubernetes library will disable retries if the timeout kwarg is# present regardless of the value so we do not pass it unless given# https://github.com/kubernetes-client/python/blob/84f5fea2a3e4b161917aa597bf5e5a1d95e24f5a/kubernetes/base/watch/watch.py#LL160timeout_seconds=({"timeout_seconds":remaining_time}ifdeadlineelse{})foreventinwatch.stream(func=batch_client.list_namespaced_job,field_selector=f"metadata.name={job_name}",namespace=self.namespace,**timeout_seconds,):ifevent["type"]=="DELETED":self.logger.error(f"Job {job_name!r}: Job has been deleted.")completed=Trueelifevent["object"].status.completion_time:ifnotevent["object"].status.succeeded:# Job failed, exit while loop and return pod exit codeself.logger.error(f"Job {job_name!r}: Job failed.")completed=True# Check if the job has reached its backoff limit# and stop watching if it haselif(event["object"].spec.backoff_limitisnotNoneandevent["object"].status.failedisnotNoneandevent["object"].status.failed>event["object"].spec.backoff_limit):self.logger.error(f"Job {job_name!r}: Job reached backoff limit.")completed=True# If the job has no backoff limit, check if it has failed# and stop watching if it haselif(notevent["object"].spec.backoff_limitandevent["object"].status.failed):completed=Trueifcompleted:watch.stop()breakwithself.get_client()ascore_client:# Get all pods for the jobpods=core_client.list_namespaced_pod(namespace=self.namespace,label_selector=f"job-name={job_name}")# Get the status for only the most recently used podpods.items.sort(key=lambdapod:pod.metadata.creation_timestamp,reverse=True)most_recent_pod=pods.items[0]ifpods.itemselseNonefirst_container_status=(most_recent_pod.status.container_statuses[0]ifmost_recent_podelseNone)ifnotfirst_container_status:self.logger.error(f"Job {job_name!r}: No pods found for job.")return-1# In some cases, such as spot instance evictions, the pod will be forcibly# terminated and not report a status correctly.elif(first_container_status.stateisNoneorfirst_container_status.state.terminatedisNoneorfirst_container_status.state.terminated.exit_codeisNone):self.logger.error(f"Could not determine exit code for {job_name!r}.""Exit code will be reported as -1.""First container status info did not report an exit code."f"First container info: {first_container_status}.")return-1returnfirst_container_status.state.terminated.exit_codedef_create_job(self,job_manifest:KubernetesManifest)->"V1Job":""" Given a Kubernetes Job Manifest, create the Job on the configured Kubernetes cluster and return its name. """withself.get_batch_client()asbatch_client:job=batch_client.create_namespaced_job(self.namespace,job_manifest)returnjobdef_slugify_name(self,name:str)->str:""" Slugify text for use as a name. Keeps only alphanumeric characters and dashes, and caps the length of the slug at 45 chars. The 45 character length allows room for the k8s utility "generateName" to generate a unique name from the slug while keeping the total length of a name below 63 characters, which is the limit for e.g. label names that follow RFC 1123 (hostnames) and RFC 1035 (domain names). Args: name: The name of the job Returns: the slugified job name """slug=slugify(name,max_length=45,# Leave enough space for generateNameregex_pattern=r"[^a-zA-Z0-9-]+",)# TODO: Handle the case that the name is an empty string after being# slugified.returnslugdef_slugify_label_key(self,key:str)->str:""" Slugify text for use as a label key. Keys are composed of an optional prefix and name, separated by a slash (/). Keeps only alphanumeric characters, dashes, underscores, and periods. Limits the length of the label prefix to 253 characters. Limits the length of the label name to 63 characters. See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set Args: key: The label key Returns: The slugified label key """if"/"inkey:prefix,name=key.split("/",maxsplit=1)else:prefix=Nonename=keyname_slug=(slugify(name,max_length=63,regex_pattern=r"[^a-zA-Z0-9-_.]+").strip("_-."# Must start or end with alphanumeric characters)orname)# Fallback to the original if we end up with an empty slug, this will allow# Kubernetes to throw the validation errorifprefix:prefix_slug=(slugify(prefix,max_length=253,regex_pattern=r"[^a-zA-Z0-9-\.]+",).strip("_-.")# Must start or end with alphanumeric charactersorprefix)returnf"{prefix_slug}/{name_slug}"returnname_slugdef_slugify_label_value(self,value:str)->str:""" Slugify text for use as a label value. Keeps only alphanumeric characters, dashes, underscores, and periods. Limits the total length of label text to below 63 characters. See https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set Args: value: The text for the label Returns: The slugified value """slug=(slugify(value,max_length=63,regex_pattern=r"[^a-zA-Z0-9-_\.]+").strip("_-."# Must start or end with alphanumeric characters)orvalue)# Fallback to the original if we end up with an empty slug, this will allow# Kubernetes to throw the validation errorreturnslugdef_get_environment_variables(self):# If the API URL has been set by the base environment rather than the by the# user, update the value to ensure connectivity when using a bridge network by# updating local connections to use the internal hostenv={**self._base_environment(),**self.env}if("PREFECT_API_URL"inenvand"PREFECT_API_URL"notinself.envandself._api_dns_name):env["PREFECT_API_URL"]=(env["PREFECT_API_URL"].replace("localhost",self._api_dns_name).replace("127.0.0.1",self._api_dns_name))# Drop null values allowing users to "unset" variablesreturn{key:valueforkey,valueinenv.items()ifvalueisnotNone}
@classmethoddefbase_job_manifest(cls)->KubernetesManifest:"""Produces the bare minimum allowed Job manifest"""return{"apiVersion":"batch/v1","kind":"Job","metadata":{"labels":{}},"spec":{"template":{"spec":{"parallelism":1,"completions":1,"restartPolicy":"Never","containers":[{"name":"prefect-job","env":[],}],}}},}
Source code in src/prefect/infrastructure/kubernetes.py
399400401402403404
defbuild_job(self)->KubernetesManifest:"""Builds the Kubernetes Job Manifest"""job_manifest=copy.copy(self.job)job_manifest=self._shortcut_customizations().apply(job_manifest)job_manifest=self.customizations.apply(job_manifest)returnjob_manifest
Load an RFC 6902 JSON patch from a YAML or JSON file.
Source code in src/prefect/infrastructure/kubernetes.py
265266267268269
@classmethoddefcustomize_from_file(cls,filename:str)->JsonPatch:"""Load an RFC 6902 JSON patch from a YAML or JSON file."""withopen(filename,"r",encoding="utf-8")asf:returnJsonPatch(yaml.load(f,yaml.SafeLoader))
Load a Kubernetes Job manifest from a YAML or JSON file.
Source code in src/prefect/infrastructure/kubernetes.py
259260261262263
@classmethoddefjob_from_file(cls,filename:str)->KubernetesManifest:"""Load a Kubernetes Job manifest from a YAML or JSON file."""withopen(filename,"r",encoding="utf-8")asf:returnyaml.load(f,yaml.SafeLoader)
Current environment variables and Prefect settings will be included in the created
process. Configured environment variables will override any current environment
variables.
Attributes:
Name
Type
Description
command
A list of strings specifying the command to run in the container to
start the flow run. In most cases you should not override this.
env
Environment variables to set for the new process.
labels
Labels for the process. Labels are for metadata purposes only and
cannot be attached to the process itself.
name
A name for the process. For display purposes only.
stream_output
bool
Whether to stream output to local stdout.
working_dir
Union[str, Path, None]
Working directory where the process should be opened. If not set,
a tmp directory will be used.
Source code in src/prefect/infrastructure/process.py
@deprecated_class(start_date="Mar 2024",help="Use the process worker instead."" Refer to the upgrade guide for more information:"" https://docs.prefect.io/latest/guides/upgrade-guide-agents-to-workers/.",)classProcess(Infrastructure):""" Run a command in a new process. Current environment variables and Prefect settings will be included in the created process. Configured environment variables will override any current environment variables. Attributes: command: A list of strings specifying the command to run in the container to start the flow run. In most cases you should not override this. env: Environment variables to set for the new process. labels: Labels for the process. Labels are for metadata purposes only and cannot be attached to the process itself. name: A name for the process. For display purposes only. stream_output: Whether to stream output to local stdout. working_dir: Working directory where the process should be opened. If not set, a tmp directory will be used. """_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/356e6766a91baf20e1d08bbe16e8b5aaef4d8643-48x48.png"_documentation_url="https://docs.prefect.io/concepts/infrastructure/#process"type:Literal["process"]=Field(default="process",description="The type of infrastructure.")stream_output:bool=Field(default=True,description=("If set, output will be streamed from the process to local standard output."),)working_dir:Union[str,Path,None]=Field(default=None,description=("If set, the process will open within the specified path as the working"" directory. Otherwise, a temporary directory will be created."),)# Underlying accepted types are str, bytes, PathLike[str], None@sync_compatibleasyncdefrun(self,task_status:anyio.abc.TaskStatus=None,)->"ProcessResult":ifnotself.command:raiseValueError("Process cannot be run with empty command.")display_name=f" {self.name!r}"ifself.nameelse""# Open a subprocess to execute the flow runself.logger.info(f"Opening process{display_name}...")working_dir_ctx=(tempfile.TemporaryDirectory(suffix="prefect")ifnotself.working_direlsecontextlib.nullcontext(self.working_dir))withworking_dir_ctxasworking_dir:self.logger.debug(f"Process{display_name} running command: {' '.join(self.command)} in"f" {working_dir}")# We must add creationflags to a dict so it is only passed as a function# parameter on Windows, because the presence of creationflags causes# errors on Unix even if set to Nonekwargs:Dict[str,object]={}ifsys.platform=="win32":kwargs["creationflags"]=subprocess.CREATE_NEW_PROCESS_GROUPprocess=awaitrun_process(self.command,stream_output=self.stream_output,task_status=task_status,task_status_handler=_infrastructure_pid_from_process,env=self._get_environment_variables(),cwd=working_dir,**kwargs,)# Use the pid for display if no name was givendisplay_name=display_nameorf" {process.pid}"ifprocess.returncode:help_message=Noneifprocess.returncode==-9:help_message=("This indicates that the process exited due to a SIGKILL signal. ""Typically, this is either caused by manual cancellation or ""high memory usage causing the operating system to ""terminate the process.")ifprocess.returncode==-15:help_message=("This indicates that the process exited due to a SIGTERM signal. ""Typically, this is caused by manual cancellation.")elifprocess.returncode==247:help_message=("This indicates that the process was terminated due to high ""memory usage.")elif(sys.platform=="win32"andprocess.returncode==STATUS_CONTROL_C_EXIT):help_message=("Process was terminated due to a Ctrl+C or Ctrl+Break signal. ""Typically, this is caused by manual cancellation.")self.logger.error(f"Process{display_name} exited with status code: {process.returncode}"+(f"; {help_message}"ifhelp_messageelse""))else:self.logger.info(f"Process{display_name} exited cleanly.")returnProcessResult(status_code=process.returncode,identifier=str(process.pid))asyncdefkill(self,infrastructure_pid:str,grace_seconds:int=30):hostname,pid=_parse_infrastructure_pid(infrastructure_pid)ifhostname!=socket.gethostname():raiseInfrastructureNotAvailable(f"Unable to kill process {pid!r}: The process is running on a different"f" host {hostname!r}.")# In a non-windows environment first send a SIGTERM, then, after# `grace_seconds` seconds have passed subsequent send SIGKILL. In# Windows we use CTRL_BREAK_EVENT as SIGTERM is useless:# https://bugs.python.org/issue26350ifsys.platform=="win32":try:os.kill(pid,signal.CTRL_BREAK_EVENT)except(ProcessLookupError,WindowsError):raiseInfrastructureNotFound(f"Unable to kill process {pid!r}: The process was not found.")else:try:os.kill(pid,signal.SIGTERM)exceptProcessLookupError:raiseInfrastructureNotFound(f"Unable to kill process {pid!r}: The process was not found.")# Throttle how often we check if the process is still alive to keep# from making too many system calls in a short period of time.check_interval=max(grace_seconds/10,1)withanyio.move_on_after(grace_seconds):whileTrue:awaitanyio.sleep(check_interval)# Detect if the process is still alive. If not do an early# return as the process respected the SIGTERM from above.try:os.kill(pid,0)exceptProcessLookupError:returntry:os.kill(pid,signal.SIGKILL)exceptOSError:# We shouldn't ever end up here, but it's possible that the# process ended right after the check above.returndefpreview(self):environment=self._get_environment_variables(include_os_environ=False)return" \\\n".join([f"{key}={value}"forkey,valueinenvironment.items()]+[" ".join(self.command)])def_get_environment_variables(self,include_os_environ:bool=True):os_environ=os.environifinclude_os_environelse{}# The base environment must override the current environment or# the Prefect settings context may not be respectedenv={**os_environ,**self._base_environment(),**self.env}# Drop null values allowing users to "unset" variablesreturn{key:valueforkey,valueinenv.items()ifvalueisnotNone}def_base_flow_run_command(self):return[get_sys_executable(),"-m","prefect.engine"]defget_corresponding_worker_type(self):return"process"asyncdefgenerate_work_pool_base_job_template(self):fromprefect.workers.utilitiesimport(get_default_base_job_template_for_infrastructure_type,)base_job_template=awaitget_default_base_job_template_for_infrastructure_type(self.get_corresponding_worker_type(),)assert(base_job_templateisnotNone),"Failed to generate default base job template for Process worker."forkey,valueinself.dict(exclude_unset=True,exclude_defaults=True).items():ifkey=="command":base_job_template["variables"]["properties"]["command"]["default"]=shlex.join(value)elifkeyin["type","block_type_slug","_block_document_id","_block_document_name","_is_anonymous",]:continueelifkeyinbase_job_template["variables"]["properties"]:base_job_template["variables"]["properties"][key]["default"]=valueelse:self.logger.warning(f"Variable {key!r} is not supported by Process work pools."" Skipping.")returnbase_job_template