Appearance
Workflow
A Workflow coordinates multiple agents through an explicitly ordered pipeline. Unlike Graph (which the framework executes) or Swarm (which is autonomous), a Workflow gives you imperative control over every step — useful for complex processes with error recovery, conditional branches, or parallel execution.
When to use Workflow
| Pattern | Best for |
|---|---|
| Workflow | Distinct sequential stages, conditional logic, step-level error recovery |
| Graph | Static dependency graph managed by the framework |
| Swarm | Autonomous collaboration with no fixed execution order |
Sequential pipeline
The simplest workflow — each agent's output feeds the next:
python
from elsai import Agent
researcher = Agent(
name="researcher",
system_prompt="You are a research specialist. Find accurate facts about the given topic.",
)
analyst = Agent(
name="analyst",
system_prompt="You analyse research data and extract key insights.",
)
writer = Agent(
name="writer",
system_prompt="You write polished, reader-friendly reports.",
)
def run_pipeline(topic: str) -> str:
research = researcher(f"Research: {topic}")
analysis = analyst(f"Analyse this research:\n\n{research}")
report = writer(f"Write a report based on this analysis:\n\n{analysis}")
return str(report)
print(run_pipeline("the impact of AI on healthcare"))Parallel execution
Run independent steps concurrently, then merge results:
python
import concurrent.futures
from elsai import Agent
researcher = Agent(name="researcher", system_prompt="Research the given topic thoroughly.")
data_agent = Agent(name="data", system_prompt="Find statistics and figures.")
analyst = Agent(name="analyst", system_prompt="Synthesise research and data into insights.")
writer = Agent(name="writer", system_prompt="Write a polished report.")
def run_parallel_pipeline(topic: str) -> str:
# Step 1 — gather in parallel
with concurrent.futures.ThreadPoolExecutor() as pool:
fut_research = pool.submit(researcher, f"Research: {topic}")
fut_data = pool.submit(data_agent, f"Find statistics on: {topic}")
research = str(fut_research.result())
data = str(fut_data.result())
# Step 2 — synthesise sequentially
insights = analyst(f"Synthesise:\nResearch: {research}\nData: {data}")
return str(writer(f"Write a report:\n{insights}"))Workflow tool
For task-level tracking and dependency management, use the built-in workflow tool:
python
from elsai import Agent
from elsai.tools.workflow import workflow
agent = Agent(tools=[workflow])
# Define tasks with dependencies
agent.tool.workflow(
action="create",
workflow_id="data_pipeline",
tasks=[
{
"task_id": "extract",
"description": "Extract raw data from the source",
"priority": 1,
},
{
"task_id": "transform",
"description": "Clean and transform the data",
"dependencies": ["extract"],
},
{
"task_id": "load",
"description": "Load into the data warehouse",
"dependencies": ["transform"],
},
{
"task_id": "report",
"description": "Generate a summary report",
"dependencies": ["transform"], # runs in parallel with load
},
],
)
agent.tool.workflow(action="start", workflow_id="data_pipeline")Passing context between steps
Build each step's prompt from upstream results:
python
def build_prompt(task_id: str, tasks: dict, results: dict) -> str:
prior = []
for dep_id in tasks[task_id].get("dependencies", []):
if dep_id in results:
prior.append(f"Output from {dep_id}:\n{results[dep_id]}")
prompt = tasks[task_id]["description"]
if prior:
prompt = "Prior step results:\n\n" + "\n\n".join(prior) + "\n\nTask:\n" + prompt
return promptWorkflow vs Graph
python
# Graph — framework manages dependency resolution and execution
from elsai.multiagent import Graph
graph = Graph()
graph.add_node(researcher)
graph.add_node(writer)
graph.add_edge(researcher, writer)
result = graph("Topic")
# Workflow — you write the execution logic explicitly
research = researcher("Topic")
result = writer(f"Write about:\n{research}")Choose Graph when you want the framework to manage dependencies and concurrency automatically. Choose Workflow when you need conditional branches, custom error handling, or fine-grained control over what happens at each step.
Error recovery
Because you control execution, you can retry individual steps:
python
import time
def run_with_retry(agent: Agent, prompt: str, retries: int = 3) -> str:
for attempt in range(retries):
try:
return str(agent(prompt))
except Exception as e:
if attempt == retries - 1:
raise
time.sleep(2 ** attempt)
return ""
research = run_with_retry(researcher, f"Research: {topic}")