RetryPolicy

The RetryPolicy class defines how retries should be handled for failed atoms within a workflow. It provides customizable options for retrying operations, such as setting the number of attempts, delay between retries, and handling specific exceptions.

Flexible Application:

  • Can be applied at the workflow level (default for all atoms).
  • Can be overridden by individual atoms with a custom retry policy.

Parameters

Optional Parameters (Defaults Provided):

  • max_attempts (int): Maximum number of retry attempts.
    Default: 3
  • delay (float): Initial delay in seconds between retry attempts.
    Default: 1.0
  • backoff_factor (float): Multiplier to increase delay between each failed attempt.
    Default: 2.0
  • retry_exceptions (tuple of exceptions): Specific exceptions on which retries are attempted. By default, retries are attempted on all exceptions (Exception).
    Default: (Exception,)

Default Behavior

If no custom retry policy is provided, the following defaults are used:

def __init__(
    self,
    max_attempts: int = 3,
    delay: float = 1.0,
    backoff_factor: float = 2.0,
    retry_exceptions: tuple = (Exception,),
):
    pass

Usage Examples

1. Applying a Retry Policy to a Workflow

Define a retry policy for the entire workflow. This policy will be applied to all atoms unless overridden at the atom level.

from preswald import Workflow, RetryPolicy

# Define a custom retry policy
custom_policy = RetryPolicy(
    max_attempts=3,
    delay=1.0,
    backoff_factor=2.0,
    retry_exceptions=(IOError, TimeoutError)
)

# Apply the policy to the workflow
workflow = Workflow(default_retry_policy=custom_policy)

2. Overriding a Retry Policy for a Specific Atom

Atoms can have their own retry policy, which overrides the workflow’s default policy.

@workflow.atom(retry_policy=RetryPolicy(max_attempts=5, delay=0.5))
def fetch_data():
    # Simulate a network operation
    raise IOError("Simulated failure")

3. Combining Workflow and Atom Policies

If both workflow-level and atom-level policies are specified, the atom’s policy takes precedence.


Key Features

  1. Customizable Retry Logic:

    • Adjust the number of attempts, delay, and backoff.
    • Target specific exceptions for retries.
  2. Exponential Backoff:

    • Avoid excessive retry attempts by increasing delay incrementally with the backoff_factor.
  3. Granular Control:

    • Use workflow-level defaults for simplicity.
    • Override defaults at the atom level for specific tasks.

Example Workflow with Retry Policy

from preswald import Workflow, RetryPolicy

# Define a workflow-wide retry policy
workflow = Workflow(
    default_retry_policy=RetryPolicy(
        max_attempts=3,
        delay=1.0,
        backoff_factor=2.0,
        retry_exceptions=(IOError, TimeoutError)
    )
)

@workflow.atom()
def load_data():
    raise IOError("Simulated failure")

# Atom-specific retry policy
@workflow.atom(retry_policy=RetryPolicy(max_attempts=5, delay=0.5))
def fetch_data():
    raise TimeoutError("Another simulated failure")

# Execute the workflow
try:
    workflow.execute()
except Exception as e:
    print(f"Workflow execution failed: {e}")

Why Use RetryPolicy?

  • Reliability: Automatically handle transient failures with retries.
  • Efficiency: Prevent infinite retry loops with customizable limits and backoff strategies.
  • Control: Fine-tune retry behavior for specific workflows or tasks.

Enhance your workflow resilience with RetryPolicy! 🔄