Skip to content
AgentEnsemble AgentEnsemble
Get Started

Deterministic Orchestration

AgentEnsemble can orchestrate purely deterministic (non-AI) workflows with the same DAG execution, parallel phases, callbacks, guardrails, metrics, and review gates that AI ensembles use — but with zero LLM calls.


Every task in a deterministic pipeline has a handler — a Java lambda that receives a TaskHandlerContext and returns a ToolResult:

import net.agentensemble.Ensemble;
import net.agentensemble.Task;
import net.agentensemble.ensemble.EnsembleOutput;
import net.agentensemble.tool.ToolResult;
EnsembleOutput output = Ensemble.run(
Task.builder()
.description("Compute checksum of input data")
.expectedOutput("SHA-256 hex string")
.handler(ctx -> ToolResult.success(sha256(inputData)))
.build(),
Task.builder()
.description("Store checksum to database")
.expectedOutput("Storage confirmation")
.handler(ctx -> {
db.store(inputData, ctx.description());
return ToolResult.success("stored");
})
.build()
);
System.out.println(output.getRaw()); // "stored"

Ensemble.run(Task...) requires all tasks to have handlers. If any task lacks one, a clear error message points to the offending task and suggests using Ensemble.run(ChatModel, Task...) for AI tasks.


Use Task.builder().context(List.of(upstreamTask)) to declare a data dependency. Inside the downstream handler, ctx.contextOutputs() delivers the prior task’s output as a TaskOutput whose raw string is accessed via .getRaw():

Task fetchTask = Task.builder()
.description("Fetch product catalogue from REST API")
.expectedOutput("JSON product list")
.handler(ctx -> {
String json = httpClient.get("https://api.example.com/products");
return ToolResult.success(json);
})
.build();
Task parseTask = Task.builder()
.description("Parse product list into structured records")
.expectedOutput("CSV of product records")
.context(List.of(fetchTask))
.handler(ctx -> {
String json = ctx.contextOutputs().get(0).getRaw();
String csv = jsonToCsv(json);
return ToolResult.success(csv);
})
.build();
Task storeTask = Task.builder()
.description("Write CSV to data warehouse")
.expectedOutput("Row count written")
.context(List.of(parseTask))
.handler(ctx -> {
String csv = ctx.contextOutputs().get(0).getRaw();
int rows = warehouse.bulkInsert(csv);
return ToolResult.success(rows + " rows inserted");
})
.build();
EnsembleOutput output = Ensemble.run(fetchTask, parseTask, storeTask);
System.out.println(output.getRaw()); // "1234 rows inserted"

The full output of every task is accessible in output.getTaskOutputs():

for (TaskOutput taskOutput : output.getTaskOutputs()) {
System.out.printf(" %s -> %s%n",
taskOutput.getTaskDescription(),
taskOutput.getRaw());
}

A task can declare multiple context dependencies. The outputs are delivered in declaration order:

Task serviceA = Task.builder()
.description("Call service A")
.handler(ctx -> ToolResult.success(serviceA.getData()))
.build();
Task serviceB = Task.builder()
.description("Call service B")
.handler(ctx -> ToolResult.success(serviceB.getData()))
.build();
Task aggregator = Task.builder()
.description("Merge results from A and B")
.context(List.of(serviceA, serviceB))
.handler(ctx -> {
String dataA = ctx.contextOutputs().get(0).getRaw(); // serviceA output
String dataB = ctx.contextOutputs().get(1).getRaw(); // serviceB output
return ToolResult.success(merge(dataA, dataB));
})
.build();

When aggregator is added to an ensemble alongside serviceA and serviceB, the framework infers a PARALLEL workflow automatically — serviceA and serviceB run concurrently, and aggregator starts only after both complete:

EnsembleOutput output = Ensemble.builder()
.task(serviceA)
.task(serviceB)
.task(aggregator)
.build()
.run();

For tasks with no data dependency on each other, declare them in an ensemble with workflow(Workflow.PARALLEL):

EnsembleOutput output = Ensemble.builder()
.task(Task.builder()
.description("Send email notification")
.handler(ctx -> { email.send(); return ToolResult.success("sent"); })
.build())
.task(Task.builder()
.description("Post Slack message")
.handler(ctx -> { slack.post(); return ToolResult.success("posted"); })
.build())
.task(Task.builder()
.description("Update metrics dashboard")
.handler(ctx -> { metrics.record(); return ToolResult.success("recorded"); })
.build())
.workflow(Workflow.PARALLEL)
.build()
.run();

All three tasks start immediately and run concurrently on virtual threads.


Use phases when the pipeline has distinct stages with named logical groupings:

Task extract = Task.builder().description("Extract").handler(ctx -> ...).build();
Task transform = Task.builder().description("Transform")
.context(List.of(extract)).handler(ctx -> ...).build();
Task load = Task.builder().description("Load")
.context(List.of(transform)).handler(ctx -> ...).build();
Phase extractPhase = Phase.of("extract", extract);
Phase transformPhase = Phase.builder()
.name("transform")
.task(transform)
.after(extractPhase)
.build();
Phase loadPhase = Phase.builder()
.name("load")
.task(load)
.after(transformPhase)
.build();
EnsembleOutput output = Ensemble.builder()
.phase(extractPhase)
.phase(transformPhase)
.phase(loadPhase)
.build()
.run();
// Per-phase results are accessible by name
List<TaskOutput> extractResults = output.getPhaseOutputs().get("extract");
List<TaskOutput> transformResults = output.getPhaseOutputs().get("transform");
List<TaskOutput> loadResults = output.getPhaseOutputs().get("load");

Independent phases (with no after() dependency between them) run in parallel automatically.


All task lifecycle callbacks work on deterministic tasks exactly as they do for AI tasks:

EnsembleOutput output = Ensemble.builder()
.task(fetchTask)
.task(transformTask)
.onTaskStart(e -> log.info("Starting: {}", e.taskDescription()))
.onTaskComplete(e -> log.info("Completed: {} in {}",
e.taskDescription(), e.duration()))
.onTaskFailed(e -> log.error("Failed: {}", e.taskDescription(), e.cause()))
.build()
.run();

Input and output guardrails enforce pre/post conditions on handler tasks:

Task validate = Task.builder()
.description("Validate incoming payload")
.handler(ctx -> ToolResult.success(process(ctx.description())))
.inputGuardrail(input -> {
if (input.text().isBlank()) return GuardrailResult.failure("Input must not be blank");
return GuardrailResult.success();
})
.outputGuardrail(output -> {
if (output.text().length() > MAX_SIZE) return GuardrailResult.failure("Output too large");
return GuardrailResult.success();
})
.build();

When a handler returns ToolResult.failure(...), the ensemble throws a TaskExecutionException. The exception contains all outputs from tasks that completed before the failure:

try {
Ensemble.run(step1, step2, step3);
} catch (TaskExecutionException e) {
log.error("Pipeline failed at task: {}", e.getTaskDescription());
log.error("Completed before failure: {} tasks", e.getCompletedTaskOutputs().size());
}

Deterministic and AI tasks compose freely in the same ensemble. Only tasks without a handler require a ChatModel:

Task fetchData = Task.builder()
.description("Fetch raw user feedback from API")
.handler(ctx -> ToolResult.success(api.fetchFeedback()))
.build();
Task summarize = Task.builder()
.description("Summarise the feedback into three bullet points")
.expectedOutput("Three bullet point summary")
.context(List.of(fetchData))
.build(); // AI task -- will use the ensemble-level model
Task storeResult = Task.builder()
.description("Store summarised feedback to database")
.context(List.of(summarize))
.handler(ctx -> {
db.store(ctx.contextOutputs().get(0).getRaw());
return ToolResult.success("stored");
})
.build();
EnsembleOutput output = Ensemble.builder()
.chatLanguageModel(model) // required for the AI task only
.task(fetchData)
.task(summarize)
.task(storeResult)
.build()
.run();

Builder reference for deterministic-only ensembles

Section titled “Builder reference for deterministic-only ensembles”
MethodRequired?Notes
task(...).handler(...)Yes (all tasks)The functional handler to execute
chatLanguageModel(model)NoNot needed when all tasks have handlers
workflow(Workflow.SEQUENTIAL)NoInferred as SEQUENTIAL when no context deps exist
workflow(Workflow.PARALLEL)NoUse when tasks should run concurrently
onTaskStart(...)NoCallback fired before each task
onTaskComplete(...)NoCallback fired after each task succeeds
onTaskFailed(...)NoCallback fired when a task fails
phase(...)NoUse instead of task() for named workstreams
reviewHandler(...)NoHuman-in-the-loop review gates