classAutoEnum(str,Enum):""" An enum class that automatically generates value from variable names. This guards against common errors where variable names are updated but values are not. In addition, because AutoEnums inherit from `str`, they are automatically JSON-serializable. See https://docs.python.org/3/library/enum.html#using-automatic-values Example: ```python class MyEnum(AutoEnum): RED = AutoEnum.auto() # equivalent to RED = 'RED' BLUE = AutoEnum.auto() # equivalent to BLUE = 'BLUE' ``` """def_generate_next_value_(name,start,count,last_values):returnname@staticmethoddefauto():""" Exposes `enum.auto()` to avoid requiring a second import to use `AutoEnum` """returnauto()def__repr__(self)->str:returnf"{type(self).__name__}.{self.value}"
A special exception used to stop recursive visits in visit_collection.
When raised, the expression is returned without modification and recursive visits
in that path will end.
Source code in src/prefect/utilities/collections.py
217218219220221222223
classStopVisiting(BaseException):""" A special exception used to stop recursive visits in `visit_collection`. When raised, the expression is returned without modification and recursive visits in that path will end. """
defbatched_iterable(iterable:Iterable[T],size:int)->Iterator[Tuple[T,...]]:""" Yield batches of a certain size from an iterable Args: iterable (Iterable): An iterable size (int): The batch size to return Yields: tuple: A batch of the iterable """it=iter(iterable)whileTrue:batch=tuple(itertools.islice(it,size))ifnotbatch:breakyieldbatch
defdict_to_flatdict(dct:Dict[KT,Union[Any,Dict[KT,Any]]],_parent:Tuple[KT,...]=None)->Dict[Tuple[KT,...],Any]:"""Converts a (nested) dictionary to a flattened representation. Each key of the flat dict will be a CompoundKey tuple containing the "chain of keys" for the corresponding value. Args: dct (dict): The dictionary to flatten _parent (Tuple, optional): The current parent for recursion Returns: A flattened dict of the same type as dct """typ=cast(Type[Dict[Tuple[KT,...],Any]],type(dct))items:List[Tuple[Tuple[KT,...],Any]]=[]parent=_parentortuple()fork,vindct.items():k_parent=tuple(parent+(k,))# if v is a non-empty dict, recurseifisinstance(v,dict)andv:items.extend(dict_to_flatdict(v,_parent=k_parent).items())else:items.append((k_parent,v))returntyp(items)
defextract_instances(objects:Iterable,types:Union[Type[T],Tuple[Type[T],...]]=object,)->Union[List[T],Dict[Type[T],T]]:""" Extract objects from a file and returns a dict of type -> instances Args: objects: An iterable of objects types: A type or tuple of types to extract, defaults to all objects Returns: If a single type is given: a list of instances of that type If a tuple of types is given: a mapping of type to a list of instances """types=ensure_iterable(types)# Create a mapping of type -> instance from the exec valuesret=defaultdict(list)foroinobjects:# We iterate here so that the key is the passed type rather than type(o)fortype_intypes:ifisinstance(o,type_):ret[type_].append(o)iflen(types)==1:returnret[types[0]]returnret
defflatdict_to_dict(dct:Dict[Tuple[KT,...],VT],)->Dict[KT,Union[VT,Dict[KT,VT]]]:"""Converts a flattened dictionary back to a nested dictionary. Args: dct (dict): The dictionary to be nested. Each key should be a tuple of keys as generated by `dict_to_flatdict` Returns A nested dict of the same type as dct """typ=type(dct)result=cast(Dict[KT,Union[VT,Dict[KT,VT]]],typ())forkey_tuple,valueindct.items():current_dict=resultforprefix_keyinkey_tuple[:-1]:# Build nested dictionaries up for the current key tuple# Use `setdefault` in case the nested dict has already been createdcurrent_dict=current_dict.setdefault(prefix_key,typ())# type: ignore# Set the valuecurrent_dict[key_tuple[-1]]=valuereturnresult
Fetch a value from a nested dictionary or list using a sequence of keys.
This function allows to fetch a value from a deeply nested structure
of dictionaries and lists using either a dot-separated string or a list
of keys. If a requested key does not exist, the function returns the
provided default value.
Parameters:
Name
Type
Description
Default
dct
Dict
The nested dictionary or list from which to fetch the value.
required
keys
Union[str, List[str]]
The sequence of keys to use for access. Can be a
dot-separated string or a list of keys. List indices can be included
in the sequence as either integer keys or as string indices in square
brackets.
required
default
Any
The default value to return if the requested key path does not
exist. Defaults to None.
None
Returns:
Type
Description
Any
The fetched value if the key exists, or the default value if it does not.
defget_from_dict(dct:Dict,keys:Union[str,List[str]],default:Any=None)->Any:""" Fetch a value from a nested dictionary or list using a sequence of keys. This function allows to fetch a value from a deeply nested structure of dictionaries and lists using either a dot-separated string or a list of keys. If a requested key does not exist, the function returns the provided default value. Args: dct: The nested dictionary or list from which to fetch the value. keys: The sequence of keys to use for access. Can be a dot-separated string or a list of keys. List indices can be included in the sequence as either integer keys or as string indices in square brackets. default: The default value to return if the requested key path does not exist. Defaults to None. Returns: The fetched value if the key exists, or the default value if it does not. Examples: >>> get_from_dict({'a': {'b': {'c': [1, 2, 3, 4]}}}, 'a.b.c[1]') 2 >>> get_from_dict({'a': {'b': [0, {'c': [1, 2]}]}}, ['a', 'b', 1, 'c', 1]) 2 >>> get_from_dict({'a': {'b': [0, {'c': [1, 2]}]}}, 'a.b.1.c.2', 'default') 'default' """ifisinstance(keys,str):keys=keys.replace("[",".").replace("]","").split(".")try:forkeyinkeys:try:# Try to cast to int to handle list indiceskey=int(key)exceptValueError:# If it's not an int, use the key as-is# for dict lookuppassdct=dct[key]returndctexcept(TypeError,KeyError,IndexError):returndefault
Return a boolean indicating if an object is iterable.
Excludes types that are iterable but typically used as singletons:
- str
- bytes
- IO objects
Source code in src/prefect/utilities/collections.py
138139140141142143144145146147148149150151152
defisiterable(obj:Any)->bool:""" Return a boolean indicating if an object is iterable. Excludes types that are iterable but typically used as singletons: - str - bytes - IO objects """try:iter(obj)exceptTypeError:returnFalseelse:returnnotisinstance(obj,(str,bytes,io.IOBase))
defremove_nested_keys(keys_to_remove:List[Hashable],obj):""" Recurses a dictionary returns a copy without all keys that match an entry in `key_to_remove`. Return `obj` unchanged if not a dictionary. Args: keys_to_remove: A list of keys to remove from obj obj: The object to remove keys from. Returns: `obj` without keys matching an entry in `keys_to_remove` if `obj` is a dictionary. `obj` if `obj` is not a dictionary. """ifnotisinstance(obj,dict):returnobjreturn{key:remove_nested_keys(keys_to_remove,value)forkey,valueinobj.items()ifkeynotinkeys_to_remove}
This function visits every element of an arbitrary Python collection. If an element
is a Python collection, it will be visited recursively. If an element is not a
collection, visit_fn will be called with the element. The return value of
visit_fn can be used to alter the element if return_data is set.
Note that when using return_data a copy of each collection is created to avoid
mutating the original object. This may have significant performance penalties and
should only be used if you intend to transform the collection.
Supported types:
- List
- Tuple
- Set
- Dict (note: keys are also visited recursively)
- Dataclass
- Pydantic model
- Prefect annotations
Parameters:
Name
Type
Description
Default
expr
Any
a Python object or expression
required
visit_fn
Callable[[Any], Awaitable[Any]]
an async function that
will be applied to every non-collection element of expr.
required
return_data
bool
if True, a copy of expr containing data modified
by visit_fn will be returned. This is slower than return_data=False
(the default).
False
max_depth
int
Controls the depth of recursive visitation. If set to zero, no
recursion will occur. If set to a positive integer N, visitation will only
descend to N layers deep. If set to any negative integer, no limit will be
enforced and recursion will continue until terminal items are reached. By
default, recursion is unlimited.
-1
context
Optional[dict]
An optional dictionary. If passed, the context will be sent to each
call to the visit_fn. The context can be mutated by each visitor and will
be available for later visits to expressions at the given depth. Values
will not be available "up" a level from a given expression.
The context will be automatically populated with an 'annotation' key when
visiting collections within a BaseAnnotation type. This requires the
caller to pass context={} and will not be activated by default.
None
remove_annotations
bool
If set, annotations will be replaced by their contents. By
default, annotations are preserved but their contents are visited.
False
Source code in src/prefect/utilities/collections.py
defvisit_collection(expr,visit_fn:Callable[[Any],Any],return_data:bool=False,max_depth:int=-1,context:Optional[dict]=None,remove_annotations:bool=False,):""" This function visits every element of an arbitrary Python collection. If an element is a Python collection, it will be visited recursively. If an element is not a collection, `visit_fn` will be called with the element. The return value of `visit_fn` can be used to alter the element if `return_data` is set. Note that when using `return_data` a copy of each collection is created to avoid mutating the original object. This may have significant performance penalties and should only be used if you intend to transform the collection. Supported types: - List - Tuple - Set - Dict (note: keys are also visited recursively) - Dataclass - Pydantic model - Prefect annotations Args: expr (Any): a Python object or expression visit_fn (Callable[[Any], Awaitable[Any]]): an async function that will be applied to every non-collection element of expr. return_data (bool): if `True`, a copy of `expr` containing data modified by `visit_fn` will be returned. This is slower than `return_data=False` (the default). max_depth: Controls the depth of recursive visitation. If set to zero, no recursion will occur. If set to a positive integer N, visitation will only descend to N layers deep. If set to any negative integer, no limit will be enforced and recursion will continue until terminal items are reached. By default, recursion is unlimited. context: An optional dictionary. If passed, the context will be sent to each call to the `visit_fn`. The context can be mutated by each visitor and will be available for later visits to expressions at the given depth. Values will not be available "up" a level from a given expression. The context will be automatically populated with an 'annotation' key when visiting collections within a `BaseAnnotation` type. This requires the caller to pass `context={}` and will not be activated by default. remove_annotations: If set, annotations will be replaced by their contents. By default, annotations are preserved but their contents are visited. """defvisit_nested(expr):# Utility for a recursive call, preserving options and updating the depth.returnvisit_collection(expr,visit_fn=visit_fn,return_data=return_data,remove_annotations=remove_annotations,max_depth=max_depth-1,# Copy the context on nested calls so it does not "propagate up"context=context.copy()ifcontextisnotNoneelseNone,)defvisit_expression(expr):ifcontextisnotNone:returnvisit_fn(expr,context)else:returnvisit_fn(expr)# Visit every expressiontry:result=visit_expression(expr)exceptStopVisiting:max_depth=0result=exprifreturn_data:# Only mutate the expression while returning data, otherwise it could be nullexpr=result# Then, visit every child of the expression recursively# If we have reached the maximum depth, do not perform any recursionifmax_depth==0:returnresultifreturn_dataelseNone# Get the expression type; treat iterators like liststyp=listifisinstance(expr,IteratorABC)andisiterable(expr)elsetype(expr)typ=cast(type,typ)# mypy treats this as 'object' otherwise and complains# Then visit every item in the expression if it is a collectionifisinstance(expr,Mock):# Do not attempt to recurse into mock objectsresult=exprelifisinstance(expr,BaseAnnotation):ifcontextisnotNone:context["annotation"]=exprvalue=visit_nested(expr.unwrap())ifremove_annotations:result=valueifreturn_dataelseNoneelse:result=expr.rewrap(value)ifreturn_dataelseNoneeliftypin(list,tuple,set):items=[visit_nested(o)foroinexpr]result=typ(items)ifreturn_dataelseNoneeliftypin(dict,OrderedDict):assertisinstance(expr,(dict,OrderedDict))# typecheck assertionitems=[(visit_nested(k),visit_nested(v))fork,vinexpr.items()]result=typ(items)ifreturn_dataelseNoneelifis_dataclass(expr)andnotisinstance(expr,type):values=[visit_nested(getattr(expr,f.name))forfinfields(expr)]items={field.name:valueforfield,valueinzip(fields(expr),values)}result=typ(**items)ifreturn_dataelseNoneelifisinstance(expr,pydantic.BaseModel):# NOTE: This implementation *does not* traverse private attributes# Pydantic does not expose extras in `__fields__` so we use `__fields_set__`# as well to get all of the relevant attributes# Check for presence of attrs even if they're in the field set due to pydantic#4916model_fields={fforfinexpr.__fields_set__.union(expr.__fields__)ifhasattr(expr,f)}items=[visit_nested(getattr(expr,key))forkeyinmodel_fields]ifreturn_data:# Collect fields with aliases so reconstruction can use the correct field namealiases={key:value.aliasforkey,valueinexpr.__fields__.items()ifvalue.has_alias}model_instance=typ(**{aliases.get(key)orkey:valueforkey,valueinzip(model_fields,items)})# Private attributes are not included in `__fields_set__` but we do not want# to drop them from the model so we restore them after constructing a new# modelforattrinexpr.__private_attributes__:# Use `object.__setattr__` to avoid errors on immutable modelsobject.__setattr__(model_instance,attr,getattr(expr,attr))# Preserve data about which fields were explicitly set on the original modelobject.__setattr__(model_instance,"__fields_set__",expr.__fields_set__)result=model_instanceelse:result=Noneelse:result=resultifreturn_dataelseNonereturnresult