class Workflow

The Workflow class provides a framework for building stateful workflows with selective recomputation and intelligent caching. This is particularly useful for tasks like data loading, cleaning, and analysis, where intermediate results can be cached to avoid unnecessary re-execution.


Key Features

  1. Stateful Workflow Object: Maintains the state of computations, allowing efficient and reusable workflows.
  2. Caching: Intelligent caching avoids redundant computations for tasks that haven’t changed.
  3. Retry Policy: Optional retry policies for handling failures, with a default policy provided.
  4. Selective Recomputation: Rerun only affected parts of the workflow when changes occur.

Workflow Structure

Atoms

Atoms are individual, reusable units of computation. These are functions decorated with @workflow.atom(). Atoms can:

  • Have dependencies (other atoms they rely on).
  • Be cached to prevent redundant execution.

Functions

def atom(
        self,
        dependencies: Optional[List[str]] = None,
        retry_policy: Optional[RetryPolicy] = None,
        force_recompute: bool = False,
    ):

A decorator used to define an atom (a node in the workflow).

Parameters:

  • dependencies (list, optional): Names of other atoms this atom depends on.
  • RetryPolicy (optional): Specify a retry policy for this atom. If not provided, the default retry policy is used.
  • force_recompute (bool, optional): Whether to force computation of this atom even if it is unchanged.

def execute(
        self, recompute_atoms: Optional[Set[str]] = None
    ) -> Dict[str, AtomResult]:

Executes all atoms in the workflow, respecting dependencies and caching.

Parameters:

  • recompute_atoms (set, optional): Names of atoms to force recomputation, bypassing the cache.

Returns:

  • results (dict): A dictionary mapping atom names to their results.

Common Use Cases

  • Data Loading and Cleaning: Load and preprocess data in stages, caching results to avoid reloading.
  • Selective Execution: Rerun only the parts of the workflow affected by changes to interactive elements or inputs.
  • Intermediate Results: Inspect cached intermediate results for debugging or analysis.

Example Workflow

from preswald import Workflow
import pandas as pd

# Create a workflow instance
workflow = Workflow()

# Define an atom for loading data
@workflow.atom()
def load_data():
    # This atom has no dependencies
    return pd.read_csv("data.csv")

# Define an atom for cleaning data, dependent on load_data
@workflow.atom(dependencies=['load_data'])
def clean_data(load_data):
    # Automatically passes the output of load_data as an argument
    return load_data.dropna()

# Define an atom for analysis, dependent on clean_data
@workflow.atom(dependencies=['clean_data'])
def analyze_data(clean_data):
    # Uses the cleaned data from the previous atom
    return clean_data.describe()

# Execute the workflow
results = workflow.execute()

# Access the results
print(results)

Output Behavior

  1. Results: A dictionary mapping atom names to their outputs:
    {
        "load_data": <DataFrame>,
        "clean_data": <Cleaned DataFrame>,
        "analyze_data": <Analysis Results>
    }
    
  2. Selective Execution: If the script is rerun without changes, only affected atoms are recomputed.

Reactive Runtime: Dependency-Aware Selective Execution

Preswald’s Workflow engine uses a reactive runtime that selectively reruns only the atoms affected by state changes — skipping any computation whose inputs haven’t changed. This improves performance and ensures fast, responsive apps.

The dependency graph that powers this system can be built in two ways:


1. Manual dependency declaration

You can explicitly declare dependencies using the dependencies argument:

@workflow.atom()
def task_a():
    return text("A")

@workflow.atom(dependencies=['task_a'])
def task_b(task_a):
    return text("B")

workflow.execute()

In this example:

  • task_b depends on task_a even though it doesn’t use its output.
  • This ensures that task_a always runs before task_b.

Manual dependencies are useful when:

  • You need explicit control of execution order
  • Atoms produce side effects or state changes rather than return values
  • Dependencies cannot be inferred from function parameters

2. Automatic dependency inference

If an atom accepts another atom’s name as a parameter, the dependency is inferred automatically:

@workflow.atom()
def select_value():
    return slider("Pick a number", min_val=0, max_val=10, default=1)

@workflow.atom()
def show_value(select_value):
    return text(f"You picked: {select_value}")

workflow.execute()

In this example, show_value automatically depends on select_value.


No matter how you define dependencies, the reactive runtime ensures that only atoms whose inputs have changed and their downstream dependents are recomputed.


3. Automatic Atom Lifting of Top-Level Code

Preswald automatically transforms top-level statements into workflow atoms — including UI components, assignments, and expressions — even if you don’t explicitly decorate them.

This enables you to write plain Python code without worrying about workflow setup. Preswald will:

  • Detect dependencies across statements
  • Assign stable component_ids to UI components
  • Automatically recompute only the parts of your script that need to update

What Gets Lifted

The following kinds of top-level statements are automatically lifted:

  1. UI component calls These are calls to built-in components like slider(), text(), table(), etc.

    val = slider("Pick a number", min_val=0, max_val=10)
    text(f"You picked: {val}")
    
  2. Producer assignments If a top level assignment depends on reactive inputs, it’s lifted into an atom.

    base = slider("Base", min_val=1, max_val=10)
    double = base * 2
    
  3. Consumer expressions Expressions that use reactive inputs (including logging, print statements, or side-effecting code) are also wrapped in atoms.

    logger.info(f"Double is: {double}")
    
Assignment Limitations

Preswald currently supports only simple name based assignments when lifting producers. The following are supported:

x = y + 1               # simple assignments
x += y + 1              # augmented assignments
a, b = my_func(z)       # tuple/list unpacking

The following patterns are not yet supported and will raise errors during transformation:

  • Attribute assignments
obj.attr = 42  # Not supported
  • Subscript assignments
data["key"] = "value"  # Not supported
  • Starred unpacking
a, *rest = items  # Not supported

By supporting full script lifting and dependency tracking, Preswald allows you to write interactive data apps with clean, minimal Python; no manual decorators required.


If you run the app with debug logging enabled, you’ll see the transformed source in your logs:

2025-05-05 05:50:56,327 - preswald.engine.transformers.reactive_runtime - INFO - Transformed source code:
from preswald import get_df, text, slider, table
from preswald import get_workflow

@workflow.atom(name='_auto_atom_b4f77a63')
def _auto_atom_b4f77a63():
    return slider('Pick a number', min_val=0, max_val=10, default=1, component_id='slider-b4f77a63')

@workflow.atom(name='_auto_atom_d93912c6', dependencies=['_auto_atom_b4f77a63'])
def _auto_atom_d93912c6(param0):
    return table(df, limit=param0, component_id='table-d93912c6')

workflow = get_workflow()
df = get_df('iris_csv')
workflow.execute()

This automatic lifting enables full reactivity while keeping your scripts clean and minimal.


Why Use Workflow?

  • Efficiency: Avoids redundant computation with caching.
  • Modularity: Break down workflows into reusable, testable atoms.
  • Flexibility: Supports retry policies and selective execution.

Streamline your data workflows with Workflow! 🚀

class Workflow

The Workflow class provides a framework for building stateful workflows with selective recomputation and intelligent caching. This is particularly useful for tasks like data loading, cleaning, and analysis, where intermediate results can be cached to avoid unnecessary re-execution.


Key Features

  1. Stateful Workflow Object: Maintains the state of computations, allowing efficient and reusable workflows.
  2. Caching: Intelligent caching avoids redundant computations for tasks that haven’t changed.
  3. Retry Policy: Optional retry policies for handling failures, with a default policy provided.
  4. Selective Recomputation: Rerun only affected parts of the workflow when changes occur.

Workflow Structure

Atoms

Atoms are individual, reusable units of computation. These are functions decorated with @workflow.atom(). Atoms can:

  • Have dependencies (other atoms they rely on).
  • Be cached to prevent redundant execution.

Functions

def atom(
        self,
        dependencies: Optional[List[str]] = None,
        retry_policy: Optional[RetryPolicy] = None,
        force_recompute: bool = False,
    ):

A decorator used to define an atom (a node in the workflow).

Parameters:

  • dependencies (list, optional): Names of other atoms this atom depends on.
  • RetryPolicy (optional): Specify a retry policy for this atom. If not provided, the default retry policy is used.
  • force_recompute (bool, optional): Whether to force computation of this atom even if it is unchanged.

def execute(
        self, recompute_atoms: Optional[Set[str]] = None
    ) -> Dict[str, AtomResult]:

Executes all atoms in the workflow, respecting dependencies and caching.

Parameters:

  • recompute_atoms (set, optional): Names of atoms to force recomputation, bypassing the cache.

Returns:

  • results (dict): A dictionary mapping atom names to their results.

Common Use Cases

  • Data Loading and Cleaning: Load and preprocess data in stages, caching results to avoid reloading.
  • Selective Execution: Rerun only the parts of the workflow affected by changes to interactive elements or inputs.
  • Intermediate Results: Inspect cached intermediate results for debugging or analysis.

Example Workflow

from preswald import Workflow
import pandas as pd

# Create a workflow instance
workflow = Workflow()

# Define an atom for loading data
@workflow.atom()
def load_data():
    # This atom has no dependencies
    return pd.read_csv("data.csv")

# Define an atom for cleaning data, dependent on load_data
@workflow.atom(dependencies=['load_data'])
def clean_data(load_data):
    # Automatically passes the output of load_data as an argument
    return load_data.dropna()

# Define an atom for analysis, dependent on clean_data
@workflow.atom(dependencies=['clean_data'])
def analyze_data(clean_data):
    # Uses the cleaned data from the previous atom
    return clean_data.describe()

# Execute the workflow
results = workflow.execute()

# Access the results
print(results)

Output Behavior

  1. Results: A dictionary mapping atom names to their outputs:
    {
        "load_data": <DataFrame>,
        "clean_data": <Cleaned DataFrame>,
        "analyze_data": <Analysis Results>
    }
    
  2. Selective Execution: If the script is rerun without changes, only affected atoms are recomputed.

Reactive Runtime: Dependency-Aware Selective Execution

Preswald’s Workflow engine uses a reactive runtime that selectively reruns only the atoms affected by state changes — skipping any computation whose inputs haven’t changed. This improves performance and ensures fast, responsive apps.

The dependency graph that powers this system can be built in two ways:


1. Manual dependency declaration

You can explicitly declare dependencies using the dependencies argument:

@workflow.atom()
def task_a():
    return text("A")

@workflow.atom(dependencies=['task_a'])
def task_b(task_a):
    return text("B")

workflow.execute()

In this example:

  • task_b depends on task_a even though it doesn’t use its output.
  • This ensures that task_a always runs before task_b.

Manual dependencies are useful when:

  • You need explicit control of execution order
  • Atoms produce side effects or state changes rather than return values
  • Dependencies cannot be inferred from function parameters

2. Automatic dependency inference

If an atom accepts another atom’s name as a parameter, the dependency is inferred automatically:

@workflow.atom()
def select_value():
    return slider("Pick a number", min_val=0, max_val=10, default=1)

@workflow.atom()
def show_value(select_value):
    return text(f"You picked: {select_value}")

workflow.execute()

In this example, show_value automatically depends on select_value.


No matter how you define dependencies, the reactive runtime ensures that only atoms whose inputs have changed and their downstream dependents are recomputed.


3. Automatic Atom Lifting of Top-Level Code

Preswald automatically transforms top-level statements into workflow atoms — including UI components, assignments, and expressions — even if you don’t explicitly decorate them.

This enables you to write plain Python code without worrying about workflow setup. Preswald will:

  • Detect dependencies across statements
  • Assign stable component_ids to UI components
  • Automatically recompute only the parts of your script that need to update

What Gets Lifted

The following kinds of top-level statements are automatically lifted:

  1. UI component calls These are calls to built-in components like slider(), text(), table(), etc.

    val = slider("Pick a number", min_val=0, max_val=10)
    text(f"You picked: {val}")
    
  2. Producer assignments If a top level assignment depends on reactive inputs, it’s lifted into an atom.

    base = slider("Base", min_val=1, max_val=10)
    double = base * 2
    
  3. Consumer expressions Expressions that use reactive inputs (including logging, print statements, or side-effecting code) are also wrapped in atoms.

    logger.info(f"Double is: {double}")
    
Assignment Limitations

Preswald currently supports only simple name based assignments when lifting producers. The following are supported:

x = y + 1               # simple assignments
x += y + 1              # augmented assignments
a, b = my_func(z)       # tuple/list unpacking

The following patterns are not yet supported and will raise errors during transformation:

  • Attribute assignments
obj.attr = 42  # Not supported
  • Subscript assignments
data["key"] = "value"  # Not supported
  • Starred unpacking
a, *rest = items  # Not supported

By supporting full script lifting and dependency tracking, Preswald allows you to write interactive data apps with clean, minimal Python; no manual decorators required.


If you run the app with debug logging enabled, you’ll see the transformed source in your logs:

2025-05-05 05:50:56,327 - preswald.engine.transformers.reactive_runtime - INFO - Transformed source code:
from preswald import get_df, text, slider, table
from preswald import get_workflow

@workflow.atom(name='_auto_atom_b4f77a63')
def _auto_atom_b4f77a63():
    return slider('Pick a number', min_val=0, max_val=10, default=1, component_id='slider-b4f77a63')

@workflow.atom(name='_auto_atom_d93912c6', dependencies=['_auto_atom_b4f77a63'])
def _auto_atom_d93912c6(param0):
    return table(df, limit=param0, component_id='table-d93912c6')

workflow = get_workflow()
df = get_df('iris_csv')
workflow.execute()

This automatic lifting enables full reactivity while keeping your scripts clean and minimal.


Why Use Workflow?

  • Efficiency: Avoids redundant computation with caching.
  • Modularity: Break down workflows into reusable, testable atoms.
  • Flexibility: Supports retry policies and selective execution.

Streamline your data workflows with Workflow! 🚀