Skip to content

Functional

datapizza.pipeline.functional_pipeline.FunctionalPipeline

Pipeline for executing a series of nodes with dependencies.

branch

branch(condition, if_true, if_false, dependencies=None)

Branch execution based on a condition.

Parameters:

Name Type Description Default
condition Callable

The condition to evaluate.

required
if_true FunctionalPipeline

The pipeline to execute if the condition is True.

required
if_false FunctionalPipeline

The pipeline to execute if the condition is False.

required
dependencies list[Dependency]

List of dependencies for the node. Defaults to None.

None

Returns:

Name Type Description
FunctionalPipeline FunctionalPipeline

The pipeline instance.

execute

execute(initial_data=None, context=None)

Execute the pipeline and return the results.

Parameters:

Name Type Description Default
initial_data dict[str, Any] | None

Dictionary where keys are node names and values are the data to be passed to those nodes when they execute.

None
context dict | None

Dictionary where keys are node names and values are the data to be passed to those nodes when they execute.

None

Returns:

Name Type Description
dict dict[str, Any]

The results of the pipeline.

foreach

foreach(name, do, dependencies=None)

Execute a sub-pipeline for each item in a collection.

Parameters:

Name Type Description Default
name str

The name of the node.

required
do PipelineComponent

The sub-pipeline to execute for each item.

required
dependencies list[Dependency]

List of dependencies for the node. Defaults to None.

None

Returns:

Name Type Description
FunctionalPipeline FunctionalPipeline

The pipeline instance.

from_yaml staticmethod

from_yaml(yaml_path)

Constructs a FunctionalPipeline from a YAML configuration file. The YAML should contain 'modules' (optional) defining reusable components and 'pipeline' defining the sequence of steps.

Parameters:

Name Type Description Default
yaml_path str

Path to the YAML configuration file.

required

Returns:

Type Description
FunctionalPipeline

A configured FunctionalPipeline instance.

Raises:

Type Description
ValueError

If the YAML format is invalid, a module cannot be loaded, or a referenced node/condition name is not found.

KeyError

If a required key is missing in the YAML structure.

FileNotFoundError

If the yaml_path does not exist.

YAMLError

If the YAML file cannot be parsed.

ImportError

If a specified module cannot be imported.

AttributeError

If a specified class/function is not found in the module.

get

get(name)

Get the result of a node.

Parameters:

Name Type Description Default
name str

The name of the node.

required

Returns:

Name Type Description
FunctionalPipeline FunctionalPipeline

The pipeline instance.

run

run(name, node, dependencies=None, kwargs=None)

Add a node to the pipeline with optional dependencies.

Parameters:

Name Type Description Default
name str

The name of the node.

required
node PipelineComponent

The node to add.

required
dependencies list[Dependency]

List of dependencies for the node. Defaults to None.

None
kwargs dict[str, Any]

Additional keyword arguments to pass to the node. Defaults to None.

None

Returns:

Name Type Description
FunctionalPipeline FunctionalPipeline

The pipeline instance.

then

then(
    name, node, target_key, dependencies=None, kwargs=None
)

Add a node to execute after the previous node.

Parameters:

Name Type Description Default
name str

The name of the node.

required
node PipelineComponent

The node to add.

required
target_key str

The key to store the result of the node in the previous node.

required
dependencies list[Dependency]

List of dependencies for the node. Defaults to None.

None
kwargs dict[str, Any]

Additional keyword arguments to pass to the node. Defaults to None.

None

Returns:

Name Type Description
FunctionalPipeline FunctionalPipeline

The pipeline instance.