This class is deprecated as of March 2024 and will not be available after September 2024.
It has been replaced by AzureBlobStorageContainer from the prefect-azure package, which
offers enhanced functionality and better a better user experience.
Store data as a file on Azure Datalake and Azure Blob Storage.
@deprecated_class(start_date="Mar 2024",help="Use the `AzureBlobStorageContainer` block from prefect-azure instead.",)classAzure(WritableFileSystem,WritableDeploymentStorage):""" DEPRECATION WARNING: This class is deprecated as of March 2024 and will not be available after September 2024. It has been replaced by `AzureBlobStorageContainer` from the `prefect-azure` package, which offers enhanced functionality and better a better user experience. Store data as a file on Azure Datalake and Azure Blob Storage. Example: Load stored Azure config: ```python from prefect.filesystems import Azure az_block = Azure.load("BLOCK_NAME") ``` """_block_type_name="Azure"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/54e3fa7e00197a4fbd1d82ed62494cb58d08c96a-250x250.png"_documentation_url="https://docs.prefect.io/concepts/filesystems/#azure"bucket_path:str=Field(default=...,description="An Azure storage bucket path.",examples=["my-bucket/a-directory-within"],)azure_storage_connection_string:Optional[SecretStr]=Field(default=None,title="Azure storage connection string",description=("Equivalent to the AZURE_STORAGE_CONNECTION_STRING environment variable."),)azure_storage_account_name:Optional[SecretStr]=Field(default=None,title="Azure storage account name",description=("Equivalent to the AZURE_STORAGE_ACCOUNT_NAME environment variable."),)azure_storage_account_key:Optional[SecretStr]=Field(default=None,title="Azure storage account key",description="Equivalent to the AZURE_STORAGE_ACCOUNT_KEY environment variable.",)azure_storage_tenant_id:Optional[SecretStr]=Field(None,title="Azure storage tenant ID",description="Equivalent to the AZURE_TENANT_ID environment variable.",)azure_storage_client_id:Optional[SecretStr]=Field(None,title="Azure storage client ID",description="Equivalent to the AZURE_CLIENT_ID environment variable.",)azure_storage_client_secret:Optional[SecretStr]=Field(None,title="Azure storage client secret",description="Equivalent to the AZURE_CLIENT_SECRET environment variable.",)azure_storage_anon:bool=Field(default=True,title="Azure storage anonymous connection",description=("Set the 'anon' flag for ADLFS. This should be False for systems that"" require ADLFS to use DefaultAzureCredentials."),)azure_storage_container:Optional[SecretStr]=Field(default=None,title="Azure storage container",description=("Blob Container in Azure Storage Account. If set the 'bucket_path' will"" be interpreted using the following URL format:""'az://<container>@<storage_account>.dfs.core.windows.net/<bucket_path>'."),)_remote_file_system:RemoteFileSystem=None@propertydefbasepath(self)->str:ifself.azure_storage_container:return(f"az://{self.azure_storage_container.get_secret_value()}"f"@{self.azure_storage_account_name.get_secret_value()}"f".dfs.core.windows.net/{self.bucket_path}")else:returnf"az://{self.bucket_path}"@propertydeffilesystem(self)->RemoteFileSystem:settings={}ifself.azure_storage_connection_string:settings["connection_string"]=self.azure_storage_connection_string.get_secret_value()ifself.azure_storage_account_name:settings["account_name"]=self.azure_storage_account_name.get_secret_value()ifself.azure_storage_account_key:settings["account_key"]=self.azure_storage_account_key.get_secret_value()ifself.azure_storage_tenant_id:settings["tenant_id"]=self.azure_storage_tenant_id.get_secret_value()ifself.azure_storage_client_id:settings["client_id"]=self.azure_storage_client_id.get_secret_value()ifself.azure_storage_client_secret:settings["client_secret"]=self.azure_storage_client_secret.get_secret_value()settings["anon"]=self.azure_storage_anonself._remote_file_system=RemoteFileSystem(basepath=self.basepath,settings=settings)returnself._remote_file_system@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file)@sync_compatibleasyncdefread_path(self,path:str)->bytes:returnawaitself.filesystem.read_path(path)@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:returnawaitself.filesystem.write_path(path=path,content=content)
Downloads a directory from a given remote path to a local directory.
Defaults to downloading the entire contents of the block's basepath to the current working directory.
Source code in src/prefect/filesystems.py
749750751752753754755756757758759760
@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)
Uploads a directory from a given local path to a remote directory.
Defaults to uploading the entire contents of the current working directory to the block's basepath.
Source code in src/prefect/filesystems.py
762763764765766767768769770771772773774775776
@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file)
This class is deprecated as of March 2024 and will not be available after September 2024.
It has been replaced by GcsBucket from the prefect-gcp package, which offers enhanced functionality
and better a better user experience.
Store data as a file on Google Cloud Storage.
@deprecated_class(start_date="Mar 2024",help="Use the `GcsBucket` block from prefect-gcp instead.")classGCS(WritableFileSystem,WritableDeploymentStorage):""" DEPRECATION WARNING: This class is deprecated as of March 2024 and will not be available after September 2024. It has been replaced by `GcsBucket` from the `prefect-gcp` package, which offers enhanced functionality and better a better user experience. Store data as a file on Google Cloud Storage. Example: Load stored GCS config: ```python from prefect.filesystems import GCS gcs_block = GCS.load("BLOCK_NAME") ``` """_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/422d13bb838cf247eb2b2cf229ce6a2e717d601b-256x256.png"_documentation_url="https://docs.prefect.io/concepts/filesystems/#gcs"bucket_path:str=Field(default=...,description="A GCS bucket path.",examples=["my-bucket/a-directory-within"],)service_account_info:Optional[SecretStr]=Field(default=None,description="The contents of a service account keyfile as a JSON string.",)project:Optional[str]=Field(default=None,description=("The project the GCS bucket resides in. If not provided, the project will"" be inferred from the credentials or environment."),)@propertydefbasepath(self)->str:returnf"gcs://{self.bucket_path}"@propertydeffilesystem(self)->RemoteFileSystem:settings={}ifself.service_account_info:try:settings["token"]=json.loads(self.service_account_info.get_secret_value())exceptjson.JSONDecodeError:raiseValueError("Unable to load provided service_account_info. Please make sure"" that the provided value is a valid JSON string.")remote_file_system=RemoteFileSystem(basepath=f"gcs://{self.bucket_path}",settings=settings)returnremote_file_system@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file)@sync_compatibleasyncdefread_path(self,path:str)->bytes:returnawaitself.filesystem.read_path(path)@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:returnawaitself.filesystem.write_path(path=path,content=content)
Downloads a directory from a given remote path to a local directory.
Defaults to downloading the entire contents of the block's basepath to the current working directory.
Source code in src/prefect/filesystems.py
588589590591592593594595596597598599
@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)
Uploads a directory from a given local path to a remote directory.
Defaults to uploading the entire contents of the current working directory to the block's basepath.
Source code in src/prefect/filesystems.py
601602603604605606607608609610611612613614615
@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file)
@deprecated_class(start_date="Mar 2024",help="Use the `GitHubRepository` block from prefect-github instead.",)classGitHub(ReadableDeploymentStorage):""" DEPRECATION WARNING: This class is deprecated as of March 2024 and will not be available after September 2024. It has been replaced by `GitHubRepository` from the `prefect-github` package, which offers enhanced functionality and better a better user experience. q Interact with files stored on GitHub repositories. """_block_type_name="GitHub"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/41971cfecfea5f79ff334164f06ecb34d1038dd4-250x250.png"_documentation_url="https://docs.prefect.io/concepts/filesystems/#github"repository:str=Field(default=...,description=("The URL of a GitHub repository to read from, in either HTTPS or SSH"" format."),)reference:Optional[str]=Field(default=None,description="An optional reference to pin to; can be a branch name or tag.",)access_token:Optional[SecretStr]=Field(name="Personal Access Token",default=None,description=("A GitHub Personal Access Token (PAT) with repo scope."" To use a fine-grained PAT, provide '{username}:{PAT}' as the value."),)include_git_objects:bool=Field(default=True,description=("Whether to include git objects when copying the repo contents to a"" directory."),)@validator("access_token")def_ensure_credentials_go_with_https(cls,v:str,values:dict)->str:returnvalidate_github_access_token(v,values)def_create_repo_url(self)->str:"""Format the URL provided to the `git clone` command. For private repos: https://<oauth-key>@github.com/<username>/<repo>.git All other repos should be the same as `self.repository`. """url_components=urllib.parse.urlparse(self.repository)ifurl_components.scheme=="https"andself.access_tokenisnotNone:updated_components=url_components._replace(netloc=f"{self.access_token.get_secret_value()}@{url_components.netloc}")full_url=urllib.parse.urlunparse(updated_components)else:full_url=self.repositoryreturnfull_url@staticmethoddef_get_paths(dst_dir:Union[str,None],src_dir:str,sub_directory:str)->Tuple[str,str]:"""Returns the fully formed paths for GitHubRepository contents in the form (content_source, content_destination). """ifdst_dirisNone:content_destination=Path(".").absolute()else:content_destination=Path(dst_dir)content_source=Path(src_dir)ifsub_directory:content_destination=content_destination.joinpath(sub_directory)content_source=content_source.joinpath(sub_directory)returnstr(content_source),str(content_destination)@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->None:""" Clones a GitHub project specified in `from_path` to the provided `local_path`; defaults to cloning the repository reference configured on the Block to the present working directory. Args: from_path: If provided, interpreted as a subdirectory of the underlying repository that will be copied to the provided local path. local_path: A local path to clone to; defaults to present working directory. """# CONSTRUCT COMMANDcmd=["git","clone",self._create_repo_url()]ifself.reference:cmd+=["-b",self.reference]# Limit git historycmd+=["--depth","1"]# Clone to a temporary directory and move the subdirectory overwithTemporaryDirectory(suffix="prefect")astmp_dir:cmd.append(tmp_dir)err_stream=io.StringIO()out_stream=io.StringIO()process=awaitrun_process(cmd,stream_output=(out_stream,err_stream))ifprocess.returncode!=0:err_stream.seek(0)raiseOSError(f"Failed to pull from remote:\n{err_stream.read()}")content_source,content_destination=self._get_paths(dst_dir=local_path,src_dir=tmp_dir,sub_directory=from_path)ignore_func=Noneifnotself.include_git_objects:ignore_func=ignore_patterns(".git")copytree(src=content_source,dst=content_destination,dirs_exist_ok=True,ignore=ignore_func,)
Clones a GitHub project specified in from_path to the provided local_path;
defaults to cloning the repository reference configured on the Block to the
present working directory.
Parameters:
Name
Type
Description
Default
from_path
Optional[str]
If provided, interpreted as a subdirectory of the underlying
repository that will be copied to the provided local path.
None
local_path
Optional[str]
A local path to clone to; defaults to present working directory.
@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->None:""" Clones a GitHub project specified in `from_path` to the provided `local_path`; defaults to cloning the repository reference configured on the Block to the present working directory. Args: from_path: If provided, interpreted as a subdirectory of the underlying repository that will be copied to the provided local path. local_path: A local path to clone to; defaults to present working directory. """# CONSTRUCT COMMANDcmd=["git","clone",self._create_repo_url()]ifself.reference:cmd+=["-b",self.reference]# Limit git historycmd+=["--depth","1"]# Clone to a temporary directory and move the subdirectory overwithTemporaryDirectory(suffix="prefect")astmp_dir:cmd.append(tmp_dir)err_stream=io.StringIO()out_stream=io.StringIO()process=awaitrun_process(cmd,stream_output=(out_stream,err_stream))ifprocess.returncode!=0:err_stream.seek(0)raiseOSError(f"Failed to pull from remote:\n{err_stream.read()}")content_source,content_destination=self._get_paths(dst_dir=local_path,src_dir=tmp_dir,sub_directory=from_path)ignore_func=Noneifnotself.include_git_objects:ignore_func=ignore_patterns(".git")copytree(src=content_source,dst=content_destination,dirs_exist_ok=True,ignore=ignore_func,)
classLocalFileSystem(WritableFileSystem,WritableDeploymentStorage):""" Store data as a file on a local file system. Example: Load stored local file system config: ```python from prefect.filesystems import LocalFileSystem local_file_system_block = LocalFileSystem.load("BLOCK_NAME") ``` """_block_type_name="Local File System"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/ad39089fa66d273b943394a68f003f7a19aa850e-48x48.png"_documentation_url=("https://docs.prefect.io/concepts/filesystems/#local-filesystem")basepath:Optional[str]=Field(default=None,description="Default local path for this block to write to.")@validator("basepath",pre=True)defcast_pathlib(cls,value):returnstringify_path(value)def_resolve_path(self,path:str)->Path:# Only resolve the base path at runtime, default to the current directorybasepath=(Path(self.basepath).expanduser().resolve()ifself.basepathelsePath(".").resolve())# Determine the path to access relative to the base path, ensuring that paths# outside of the base path are off limitsifpathisNone:returnbasepathpath:Path=Path(path).expanduser()ifnotpath.is_absolute():path=basepath/pathelse:path=path.resolve()ifbasepathnotinpath.parentsand(basepath!=path):raiseValueError(f"Provided path {path} is outside of the base path {basepath}.")returnpath@sync_compatibleasyncdefget_directory(self,from_path:str=None,local_path:str=None)->None:""" Copies a directory from one place to another on the local filesystem. Defaults to copying the entire contents of the block's basepath to the current working directory. """ifnotfrom_path:from_path=Path(self.basepath).expanduser().resolve()else:from_path=self._resolve_path(from_path)ifnotlocal_path:local_path=Path(".").resolve()else:local_path=Path(local_path).resolve()iffrom_path==local_path:# If the paths are the same there is no need to copy# and we avoid shutil.copytree raising an errorreturn# .prefectignore exists in the original location, not the current location which# is most likely temporaryif(from_path/Path(".prefectignore")).exists():ignore_func=awaitself._get_ignore_func(local_path=from_path,ignore_file=from_path/Path(".prefectignore"))else:ignore_func=Nonecopytree(from_path,local_path,dirs_exist_ok=True,ignore=ignore_func)asyncdef_get_ignore_func(self,local_path:str,ignore_file:str):withopen(ignore_file,"r")asf:ignore_patterns=f.readlines()included_files=filter_files(root=local_path,ignore_patterns=ignore_patterns)defignore_func(directory,files):relative_path=Path(directory).relative_to(local_path)files_to_ignore=[fforfinfilesifstr(relative_path/f)notinincluded_files]returnfiles_to_ignorereturnignore_func@sync_compatibleasyncdefput_directory(self,local_path:str=None,to_path:str=None,ignore_file:str=None)->None:""" Copies a directory from one place to another on the local filesystem. Defaults to copying the entire contents of the current working directory to the block's basepath. An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore. """destination_path=self._resolve_path(to_path)ifnotlocal_path:local_path=Path(".").absolute()ifignore_file:ignore_func=awaitself._get_ignore_func(local_path=local_path,ignore_file=ignore_file)else:ignore_func=Noneiflocal_path==destination_path:passelse:copytree(src=local_path,dst=destination_path,ignore=ignore_func,dirs_exist_ok=True,)@sync_compatibleasyncdefread_path(self,path:str)->bytes:path:Path=self._resolve_path(path)# Check if the path existsifnotpath.exists():raiseValueError(f"Path {path} does not exist.")# Validate that its a fileifnotpath.is_file():raiseValueError(f"Path {path} is not a file.")asyncwithawaitanyio.open_file(str(path),mode="rb")asf:content=awaitf.read()returncontent@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:path:Path=self._resolve_path(path)# Construct the path if it does not existpath.parent.mkdir(exist_ok=True,parents=True)# Check if the file already existsifpath.exists()andnotpath.is_file():raiseValueError(f"Path {path} already exists and is not a file.")asyncwithawaitanyio.open_file(path,mode="wb")asf:awaitf.write(content)# Leave path stringify to the OSreturnstr(path)
@sync_compatibleasyncdefget_directory(self,from_path:str=None,local_path:str=None)->None:""" Copies a directory from one place to another on the local filesystem. Defaults to copying the entire contents of the block's basepath to the current working directory. """ifnotfrom_path:from_path=Path(self.basepath).expanduser().resolve()else:from_path=self._resolve_path(from_path)ifnotlocal_path:local_path=Path(".").resolve()else:local_path=Path(local_path).resolve()iffrom_path==local_path:# If the paths are the same there is no need to copy# and we avoid shutil.copytree raising an errorreturn# .prefectignore exists in the original location, not the current location which# is most likely temporaryif(from_path/Path(".prefectignore")).exists():ignore_func=awaitself._get_ignore_func(local_path=from_path,ignore_file=from_path/Path(".prefectignore"))else:ignore_func=Nonecopytree(from_path,local_path,dirs_exist_ok=True,ignore=ignore_func)
Copies a directory from one place to another on the local filesystem.
Defaults to copying the entire contents of the current working directory to the block's basepath.
An ignore_file path may be provided that can include gitignore style expressions for filepaths to ignore.
@sync_compatibleasyncdefput_directory(self,local_path:str=None,to_path:str=None,ignore_file:str=None)->None:""" Copies a directory from one place to another on the local filesystem. Defaults to copying the entire contents of the current working directory to the block's basepath. An `ignore_file` path may be provided that can include gitignore style expressions for filepaths to ignore. """destination_path=self._resolve_path(to_path)ifnotlocal_path:local_path=Path(".").absolute()ifignore_file:ignore_func=awaitself._get_ignore_func(local_path=local_path,ignore_file=ignore_file)else:ignore_func=Noneiflocal_path==destination_path:passelse:copytree(src=local_path,dst=destination_path,ignore=ignore_func,dirs_exist_ok=True,)
Supports any remote file system supported by fsspec. The file system is specified
using a protocol. For example, "s3://my-bucket/my-folder/" will use S3.
classRemoteFileSystem(WritableFileSystem,WritableDeploymentStorage):""" Store data as a file on a remote file system. Supports any remote file system supported by `fsspec`. The file system is specified using a protocol. For example, "s3://my-bucket/my-folder/" will use S3. Example: Load stored remote file system config: ```python from prefect.filesystems import RemoteFileSystem remote_file_system_block = RemoteFileSystem.load("BLOCK_NAME") ``` """_block_type_name="Remote File System"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/e86b41bc0f9c99ba9489abeee83433b43d5c9365-48x48.png"_documentation_url=("https://docs.prefect.io/concepts/filesystems/#remote-file-system")basepath:str=Field(default=...,description="Default path for this block to write to.",examples=["s3://my-bucket/my-folder/"],)settings:Dict[str,Any]=Field(default_factory=dict,description="Additional settings to pass through to fsspec.",)# Cache for the configured fsspec file system used for access_filesystem:fsspec.AbstractFileSystem=None@validator("basepath")defcheck_basepath(cls,value):returnvalidate_basepath(value)def_resolve_path(self,path:str)->str:base_scheme,base_netloc,base_urlpath,_,_=urllib.parse.urlsplit(self.basepath)scheme,netloc,urlpath,_,_=urllib.parse.urlsplit(path)# Confirm that absolute paths are validifscheme:ifscheme!=base_scheme:raiseValueError(f"Path {path!r} with scheme {scheme!r} must use the same scheme as"f" the base path {base_scheme!r}.")ifnetloc:if(netloc!=base_netloc)ornoturlpath.startswith(base_urlpath):raiseValueError(f"Path {path!r} is outside of the base path {self.basepath!r}.")returnf"{self.basepath.rstrip('/')}/{urlpath.lstrip('/')}"@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->None:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """iffrom_pathisNone:from_path=str(self.basepath)else:from_path=self._resolve_path(from_path)iflocal_pathisNone:local_path=Path(".").absolute()# validate that from_path has a trailing slash for proper fsspec behavior across versionsifnotfrom_path.endswith("/"):from_path+="/"returnself.filesystem.get(from_path,local_path,recursive=True)@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,overwrite:bool=True,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """ifto_pathisNone:to_path=str(self.basepath)else:to_path=self._resolve_path(to_path)iflocal_pathisNone:local_path="."included_files=Noneifignore_file:withopen(ignore_file,"r")asf:ignore_patterns=f.readlines()included_files=filter_files(local_path,ignore_patterns,include_dirs=True)counter=0forfinPath(local_path).rglob("*"):relative_path=f.relative_to(local_path)ifincluded_filesandstr(relative_path)notinincluded_files:continueifto_path.endswith("/"):fpath=to_path+relative_path.as_posix()else:fpath=to_path+"/"+relative_path.as_posix()iff.is_dir():passelse:f=f.as_posix()ifoverwrite:self.filesystem.put_file(f,fpath,overwrite=True)else:self.filesystem.put_file(f,fpath)counter+=1returncounter@sync_compatibleasyncdefread_path(self,path:str)->bytes:path=self._resolve_path(path)withself.filesystem.open(path,"rb")asfile:content=awaitrun_sync_in_worker_thread(file.read)returncontent@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:path=self._resolve_path(path)dirpath=path[:path.rindex("/")]self.filesystem.makedirs(dirpath,exist_ok=True)withself.filesystem.open(path,"wb")asfile:awaitrun_sync_in_worker_thread(file.write,content)returnpath@propertydeffilesystem(self)->fsspec.AbstractFileSystem:ifnotself._filesystem:scheme,_,_,_,_=urllib.parse.urlsplit(self.basepath)try:self._filesystem=fsspec.filesystem(scheme,**self.settings)exceptImportErrorasexc:# The path is a remote file system that uses a lib that is not installedraiseRuntimeError(f"File system created with scheme {scheme!r} from base path "f"{self.basepath!r} could not be created. ""You are likely missing a Python module required to use the given ""storage protocol.")fromexcreturnself._filesystem
@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->None:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """iffrom_pathisNone:from_path=str(self.basepath)else:from_path=self._resolve_path(from_path)iflocal_pathisNone:local_path=Path(".").absolute()# validate that from_path has a trailing slash for proper fsspec behavior across versionsifnotfrom_path.endswith("/"):from_path+="/"returnself.filesystem.get(from_path,local_path,recursive=True)
@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,overwrite:bool=True,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """ifto_pathisNone:to_path=str(self.basepath)else:to_path=self._resolve_path(to_path)iflocal_pathisNone:local_path="."included_files=Noneifignore_file:withopen(ignore_file,"r")asf:ignore_patterns=f.readlines()included_files=filter_files(local_path,ignore_patterns,include_dirs=True)counter=0forfinPath(local_path).rglob("*"):relative_path=f.relative_to(local_path)ifincluded_filesandstr(relative_path)notinincluded_files:continueifto_path.endswith("/"):fpath=to_path+relative_path.as_posix()else:fpath=to_path+"/"+relative_path.as_posix()iff.is_dir():passelse:f=f.as_posix()ifoverwrite:self.filesystem.put_file(f,fpath,overwrite=True)else:self.filesystem.put_file(f,fpath)counter+=1returncounter
This class is deprecated as of March 2024 and will not be available after September 2024.
It has been replaced by S3Bucket from the prefect-aws package, which offers enhanced functionality
and better a better user experience.
@deprecated_class(start_date="Mar 2024",help="Use the `S3Bucket` block from prefect-aws instead.")classS3(WritableFileSystem,WritableDeploymentStorage):""" DEPRECATION WARNING: This class is deprecated as of March 2024 and will not be available after September 2024. It has been replaced by `S3Bucket` from the `prefect-aws` package, which offers enhanced functionality and better a better user experience. Store data as a file on AWS S3. Example: Load stored S3 config: ```python from prefect.filesystems import S3 s3_block = S3.load("BLOCK_NAME") ``` """_block_type_name="S3"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/d74b16fe84ce626345adf235a47008fea2869a60-225x225.png"_documentation_url="https://docs.prefect.io/concepts/filesystems/#s3"bucket_path:str=Field(default=...,description="An S3 bucket path.",examples=["my-bucket/a-directory-within"],)aws_access_key_id:Optional[SecretStr]=Field(default=None,title="AWS Access Key ID",description="Equivalent to the AWS_ACCESS_KEY_ID environment variable.",examples=["AKIAIOSFODNN7EXAMPLE"],)aws_secret_access_key:Optional[SecretStr]=Field(default=None,title="AWS Secret Access Key",description="Equivalent to the AWS_SECRET_ACCESS_KEY environment variable.",examples=["wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"],)_remote_file_system:RemoteFileSystem=None@propertydefbasepath(self)->str:returnf"s3://{self.bucket_path}"@propertydeffilesystem(self)->RemoteFileSystem:settings={}ifself.aws_access_key_id:settings["key"]=self.aws_access_key_id.get_secret_value()ifself.aws_secret_access_key:settings["secret"]=self.aws_secret_access_key.get_secret_value()self._remote_file_system=RemoteFileSystem(basepath=f"s3://{self.bucket_path}",settings=settings)returnself._remote_file_system@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file)@sync_compatibleasyncdefread_path(self,path:str)->bytes:returnawaitself.filesystem.read_path(path)@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:returnawaitself.filesystem.write_path(path=path,content=content)
Downloads a directory from a given remote path to a local directory.
Defaults to downloading the entire contents of the block's basepath to the current working directory.
Source code in src/prefect/filesystems.py
487488489490491492493494495496497498
@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)
Uploads a directory from a given local path to a remote directory.
Defaults to uploading the entire contents of the current working directory to the block's basepath.
Source code in src/prefect/filesystems.py
500501502503504505506507508509510511512513514
@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file)
classSMB(WritableFileSystem,WritableDeploymentStorage):""" Store data as a file on a SMB share. Example: Load stored SMB config: ```python from prefect.filesystems import SMB smb_block = SMB.load("BLOCK_NAME") ``` """_block_type_name="SMB"_logo_url="https://cdn.sanity.io/images/3ugk85nk/production/3f624663f7beb97d011d011bffd51ecf6c499efc-195x195.png"_documentation_url="https://docs.prefect.io/concepts/filesystems/#smb"share_path:str=Field(default=...,description="SMB target (requires <SHARE>, followed by <PATH>).",examples=["/SHARE/dir/subdir"],)smb_username:Optional[SecretStr]=Field(default=None,title="SMB Username",description="Username with access to the target SMB SHARE.",)smb_password:Optional[SecretStr]=Field(default=None,title="SMB Password",description="Password for SMB access.")smb_host:str=Field(default=...,tile="SMB server/hostname",description="SMB server/hostname.")smb_port:Optional[int]=Field(default=None,title="SMB port",description="SMB port (default: 445).")_remote_file_system:RemoteFileSystem=None@propertydefbasepath(self)->str:returnf"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}"@propertydeffilesystem(self)->RemoteFileSystem:settings={}ifself.smb_username:settings["username"]=self.smb_username.get_secret_value()ifself.smb_password:settings["password"]=self.smb_password.get_secret_value()ifself.smb_host:settings["host"]=self.smb_hostifself.smb_port:settings["port"]=self.smb_portself._remote_file_system=RemoteFileSystem(basepath=f"smb://{self.smb_host.rstrip('/')}/{self.share_path.lstrip('/')}",settings=settings,)returnself._remote_file_system@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file,overwrite=False,)@sync_compatibleasyncdefread_path(self,path:str)->bytes:returnawaitself.filesystem.read_path(path)@sync_compatibleasyncdefwrite_path(self,path:str,content:bytes)->str:returnawaitself.filesystem.write_path(path=path,content=content)
Downloads a directory from a given remote path to a local directory.
Defaults to downloading the entire contents of the block's basepath to the current working directory.
Source code in src/prefect/filesystems.py
847848849850851852853854855856857
@sync_compatibleasyncdefget_directory(self,from_path:Optional[str]=None,local_path:Optional[str]=None)->bytes:""" Downloads a directory from a given remote path to a local directory. Defaults to downloading the entire contents of the block's basepath to the current working directory. """returnawaitself.filesystem.get_directory(from_path=from_path,local_path=local_path)
Uploads a directory from a given local path to a remote directory.
Defaults to uploading the entire contents of the current working directory to the block's basepath.
@sync_compatibleasyncdefput_directory(self,local_path:Optional[str]=None,to_path:Optional[str]=None,ignore_file:Optional[str]=None,)->int:""" Uploads a directory from a given local path to a remote directory. Defaults to uploading the entire contents of the current working directory to the block's basepath. """returnawaitself.filesystem.put_directory(local_path=local_path,to_path=to_path,ignore_file=ignore_file,overwrite=False,)