Functional
datapizza.pipeline.functional_pipeline.FunctionalPipeline
Pipeline for executing a series of nodes with dependencies.
branch
Branch execution based on a condition.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
condition
|
Callable
|
The condition to evaluate. |
required |
if_true
|
FunctionalPipeline
|
The pipeline to execute if the condition is True. |
required |
if_false
|
FunctionalPipeline
|
The pipeline to execute if the condition is False. |
required |
dependencies
|
list[Dependency]
|
List of dependencies for the node. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
FunctionalPipeline |
FunctionalPipeline
|
The pipeline instance. |
execute
Execute the pipeline and return the results.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
initial_data
|
dict[str, Any] | None
|
Dictionary where keys are node names and values are the data to be passed to those nodes when they execute. |
None
|
context
|
dict | None
|
Dictionary where keys are node names and values are the data to be passed to those nodes when they execute. |
None
|
Returns:
Name | Type | Description |
---|---|---|
dict |
dict[str, Any]
|
The results of the pipeline. |
foreach
Execute a sub-pipeline for each item in a collection.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the node. |
required |
do
|
PipelineComponent
|
The sub-pipeline to execute for each item. |
required |
dependencies
|
list[Dependency]
|
List of dependencies for the node. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
FunctionalPipeline |
FunctionalPipeline
|
The pipeline instance. |
from_yaml
staticmethod
Constructs a FunctionalPipeline from a YAML configuration file. The YAML should contain 'modules' (optional) defining reusable components and 'pipeline' defining the sequence of steps.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
yaml_path
|
str
|
Path to the YAML configuration file. |
required |
Returns:
Type | Description |
---|---|
FunctionalPipeline
|
A configured FunctionalPipeline instance. |
Raises:
Type | Description |
---|---|
ValueError
|
If the YAML format is invalid, a module cannot be loaded, or a referenced node/condition name is not found. |
KeyError
|
If a required key is missing in the YAML structure. |
FileNotFoundError
|
If the yaml_path does not exist. |
YAMLError
|
If the YAML file cannot be parsed. |
ImportError
|
If a specified module cannot be imported. |
AttributeError
|
If a specified class/function is not found in the module. |
get
Get the result of a node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the node. |
required |
Returns:
Name | Type | Description |
---|---|---|
FunctionalPipeline |
FunctionalPipeline
|
The pipeline instance. |
run
Add a node to the pipeline with optional dependencies.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the node. |
required |
node
|
PipelineComponent
|
The node to add. |
required |
dependencies
|
list[Dependency]
|
List of dependencies for the node. Defaults to None. |
None
|
kwargs
|
dict[str, Any]
|
Additional keyword arguments to pass to the node. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
FunctionalPipeline |
FunctionalPipeline
|
The pipeline instance. |
then
Add a node to execute after the previous node.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
name
|
str
|
The name of the node. |
required |
node
|
PipelineComponent
|
The node to add. |
required |
target_key
|
str
|
The key to store the result of the node in the previous node. |
required |
dependencies
|
list[Dependency]
|
List of dependencies for the node. Defaults to None. |
None
|
kwargs
|
dict[str, Any]
|
Additional keyword arguments to pass to the node. Defaults to None. |
None
|
Returns:
Name | Type | Description |
---|---|---|
FunctionalPipeline |
FunctionalPipeline
|
The pipeline instance. |