Read and Write Data to and from Cloud Provider Storage¶
Writing data to cloud-based storage and reading data from that storage is a common task in data engineering.
In this guide we'll learn how to use Prefect to move data to and from AWS, Azure, and GCP blob storage.
Create a storage bucket in the cloud provider account.
Ensure the bucket is publicly accessible or create a user or service account with the appropriate permissions to fetch and write data to the bucket.
If the bucket is private, there are several options to authenticate:
At deployment runtime, ensure the runtime environment is authenticated.
Create a block with configuration details and reference it when creating the storage block.
If saving credential details in a block we can use a credentials block specific to the cloud provider or use a more generic secret block.
We can create blocks via the UI or Python code.
Below we'll use Python code to create a credentials block for our cloud provider.
Credentials safety
Reminder, don't store credential values in public locations such as public git platform repositories.
In the examples below we use environment variables to store credential values.
We recommend specifying the service account key file contents as a string, rather than the path to the file, because that file might not be available in your production environments.
Let's create a block for the chosen cloud provider using Python code or the UI.
In this example we'll use Python code.
Note that the S3Bucket block is not the same as the S3 block that ships with Prefect.
The S3Bucket block we use in this example is part of the prefect-aws library and provides additional functionality.
We'll reference the credentials block created above.
Note that the AzureBlobStorageCredentials block is not the same as the Azure block that ships with Prefect.
The AzureBlobStorageCredentials block we use in this example is part of the prefect-azure library and provides additional functionality.
Azure blob storage doesn't require a separate block, the connection string used in the AzureBlobStorageCredentials block can encode the information needed.
Note that the GcsBucket block is not the same as the GCS block that ships with Prefect.
The GcsBucket block is part of the prefect-gcp library and provides additional functionality.
We'll use it here.
We'll reference the credentials block created above.
Use your new block inside a flow to write data to your cloud provider.
frompathlibimportPathfromprefectimportflowfromprefect_aws.s3importS3Bucket@flow()defupload_to_s3():"""Flow function to upload data"""path=Path("my_path_to/my_file.parquet")aws_block=S3Bucket.load("my-s3-bucket-block")aws_block.upload_from_path(from_path=path,to_path=path)if__name__=="__main__":upload_to_s3()
fromprefectimportflowfromprefect_azureimportAzureBlobStorageCredentialsfromprefect_azure.blob_storageimportblob_storage_upload@flowdefupload_to_azure():"""Flow function to upload data"""blob_storage_credentials=AzureBlobStorageCredentials.load(name="my-azure-creds-block")withopen("my_path_to/my_file.parquet","rb")asf:blob_storage_upload(data=f.read(),container="my_container",blob="my_path_to/my_file.parquet",blob_storage_credentials=blob_storage_credentials,)if__name__=="__main__":upload_to_azure()
frompathlibimportPathfromprefectimportflowfromprefect_gcp.cloud_storageimportGcsBucket@flow()defupload_to_gcs():"""Flow function to upload data"""path=Path("my_path_to/my_file.parquet")gcs_block=GcsBucket.load("my-gcs-bucket-block")gcs_block.upload_from_path(from_path=path,to_path=path)if__name__=="__main__":upload_to_gcs()
Use your block to read data from your cloud provider inside a flow.
fromprefectimportflowfromprefect_awsimportS3Bucket@flowdefdownload_from_s3():"""Flow function to download data"""s3_block=S3Bucket.load("my-s3-bucket-block")s3_block.get_directory(from_path="my_path_to/my_file.parquet",local_path="my_path_to/my_file.parquet")if__name__=="__main__":download_from_s3()
fromprefectimportflowfromprefect_azureimportAzureBlobStorageCredentialsfromprefect_azure.blob_storageimportblob_storage_download@flowdefdownload_from_azure():"""Flow function to download data"""blob_storage_credentials=AzureBlobStorageCredentials.load(name="my-azure-creds-block")blob_storage_download(blob="my_path_to/my_file.parquet",container="my_container",blob_storage_credentials=blob_storage_credentials,)if__name__=="__main__":download_from_azure()
Check out the prefect-aws, prefect-azure, and prefect-gcp docs to see additional methods for interacting with cloud storage providers.
Each library also contains blocks for interacting with other cloud-provider services.