Skip to content
AgentEnsemble AgentEnsemble
Get Started

Using AgentEnsemble as a Pure Workflow Engine: Orchestration Without AI

Most of the conversation around agent frameworks assumes that orchestration is synonymous with LLM calls. Pick a framework, wire up your model, define your agents. That framing is understandable, but it leaves something useful on the table.

The harder problem for many Java teams isn’t choosing a model. It’s building pipelines that are observable, composable, and consistent — whether or not any given step involves an LLM. A pipeline that mixes REST API calls, data transformations, and a few AI-backed steps shouldn’t need two separate orchestration systems.

AgentEnsemble is not exclusively an AI orchestrator. The same DAG execution engine, parallel phase runner, callbacks, and review gates that coordinate LLM-backed agents also work for purely deterministic Java functions. No ChatModel required.

This post covers the deterministic-only orchestration pattern: what the API looks like, how data flows between tasks, and when this pattern is worth reaching for.


The Static Factory for Deterministic Pipelines

Section titled “The Static Factory for Deterministic Pipelines”

For pipelines with no LLM steps, the entry point is a static factory method that takes no model parameter:

Ensemble.run(task1, task2, task3);

If any task passed to this method lacks a handler, the framework throws at startup. This keeps the contract explicit: deterministic tasks must always declare how they execute.

For more complex setups — phases, callbacks, guardrails — you use the builder:

Ensemble.builder()
.phases(List.of(ingestion, processing, output))
.onTaskComplete(event -> log.info("Completed: {}", event.taskId()))
.build()
.run();

The builder path accepts a chatModel, but it isn’t required. If all tasks have handlers, the ensemble runs without one.


A deterministic task declares its behavior via a handler lambda. The handler receives a TaskHandlerContext and returns a ToolResult:

Task extract = Task.builder()
.description("extract-records")
.handler(ctx -> ToolResult.success("42 records extracted"))
.build();

ToolResult.success(String content) wraps the output. ToolResult.failure(String reason) triggers task failure handling. The string content becomes the task’s raw output, accessible downstream.

For tasks that need to fail conditionally:

Task validate = Task.builder()
.description("validate-input")
.handler(ctx -> {
String input = ctx.contextOutputs().get(0).getRaw();
if (input.isBlank()) {
return ToolResult.failure("input was empty");
}
return ToolResult.success("valid: " + input.length() + " chars");
})
.build();

Downstream tasks declare upstream dependencies via .context(). The framework resolves outputs in declaration order and delivers them through ctx.contextOutputs():

Task extract = Task.builder()
.description("extract-records")
.handler(ctx -> ToolResult.success("42 records"))
.build();
Task transform = Task.builder()
.description("transform-records")
.context(List.of(extract))
.handler(ctx -> {
String raw = ctx.contextOutputs().get(0).getRaw();
return ToolResult.success("transformed: " + raw);
})
.build();
Task load = Task.builder()
.description("load-records")
.context(List.of(transform))
.handler(ctx -> {
String data = ctx.contextOutputs().get(0).getRaw();
// write to database
return ToolResult.success("loaded: " + data);
})
.build();
Ensemble.run(extract, transform, load);

Multiple upstream outputs are accessible by index in the order they were declared:

Task merge = Task.builder()
.description("merge-results")
.context(List.of(fetchA, fetchB, fetchC))
.handler(ctx -> {
List<String> results = ctx.contextOutputs().stream()
.map(TaskOutput::getRaw)
.toList();
return ToolResult.success(String.join(", ", results));
})
.build();

Independent tasks run concurrently on virtual threads automatically. If tasks declare no context() dependency on each other, the executor runs them in parallel:

Task fetchA = Task.builder()
.description("fetch-region-a")
.handler(ctx -> ToolResult.success("region-a: 120 records"))
.build();
Task fetchB = Task.builder()
.description("fetch-region-b")
.handler(ctx -> ToolResult.success("region-b: 98 records"))
.build();
Task fetchC = Task.builder()
.description("fetch-region-c")
.handler(ctx -> ToolResult.success("region-c: 77 records"))
.build();
Task merge = Task.builder()
.description("merge-all-regions")
.context(List.of(fetchA, fetchB, fetchC))
.handler(ctx -> {
List<String> outputs = ctx.contextOutputs().stream()
.map(TaskOutput::getRaw)
.toList();
return ToolResult.success(String.join("; ", outputs));
})
.build();
Ensemble.run(fetchA, fetchB, fetchC, merge);

The DAG is inferred from context() declarations. merge waits for all three fetch tasks. The three fetches run concurrently. No explicit thread management required.


For larger pipelines with distinct stages, phases provide explicit sequencing and named output access:

Task ingest = Task.builder()
.description("ingest-raw-data")
.handler(ctx -> ToolResult.success("1000 rows ingested"))
.build();
Task schemaCheck = Task.builder()
.description("validate-schema")
.handler(ctx -> ToolResult.success("schema valid"))
.build();
Phase ingestion = Phase.of("ingestion", ingest, schemaCheck);
Task normalize = Task.builder()
.description("normalize-records")
.handler(ctx -> ToolResult.success("normalized"))
.build();
Task enrich = Task.builder()
.description("enrich-with-reference-data")
.handler(ctx -> ToolResult.success("enriched"))
.build();
Phase processing = Phase.builder()
.name("processing")
.tasks(List.of(normalize, enrich))
.after(ingestion)
.build();
Task writeOutput = Task.builder()
.description("write-to-data-warehouse")
.handler(ctx -> ToolResult.success("written"))
.build();
Phase output = Phase.builder()
.name("output")
.tasks(List.of(writeOutput))
.after(processing)
.build();
EnsembleOutput result = Ensemble.builder()
.phases(List.of(ingestion, processing, output))
.build()
.run();

Phase outputs are accessible by name:

PhaseOutput ingestionOut = result.getPhaseOutputs().get("ingestion");
String ingestedData = ingestionOut.taskOutputs().get(0).getRaw();

Within a phase, task parallelism follows the same DAG rules. normalize and enrich run concurrently if they don’t depend on each other.


The same onTaskStart, onTaskComplete, and onTaskFailed callbacks work for deterministic tasks:

Ensemble.builder()
.phases(List.of(ingestion, processing, output))
.onTaskStart(e -> log.info("[START] {}", e.taskId()))
.onTaskComplete(e -> log.info("[DONE] {} -> {}", e.taskId(), e.output().getRaw()))
.onTaskFailed(e -> log.error("[FAIL] {} -> {}", e.taskId(), e.error().getMessage()))
.build()
.run();

Metrics via Micrometer and OpenTelemetry tracing are equally available. Deterministic tasks participate in the same span hierarchy as AI tasks. If your ensemble mixes both, traces show the full picture.


Deterministic and AI-backed tasks compose freely within the same pipeline:

Task fetchData = Task.builder()
.description("fetch-customer-records")
.handler(ctx -> ToolResult.success(fetchFromDatabase()))
.build();
Task analyzeData = Task.builder()
.description("identify key themes and concerns from these customer records")
.chatModel(model)
.context(List.of(fetchData))
.build();
Task writeReport = Task.builder()
.description("format-as-json-report")
.context(List.of(analyzeData))
.handler(ctx -> {
String analysis = ctx.contextOutputs().get(0).getRaw();
return ToolResult.success(toJson(analysis));
})
.build();
Ensemble.builder()
.tasks(List.of(fetchData, analyzeData, writeReport))
.chatModel(model)
.build()
.run();

The chatModel on the ensemble is only used for tasks that don’t have a handler. Deterministic tasks bypass the LLM entirely.


Deterministic-only orchestration in AgentEnsemble is useful when you need:

  • Multi-step pipelines with DAG dependencies — sequential or parallel, inferred from context declarations
  • Observable step-level execution — callbacks, metrics, and tracing without external tooling
  • Mixed pipelines — some steps are AI-backed, some are not; one orchestration model covers both
  • Phase-level sequencing — named stages with explicit ordering and structured output access
  • Review gates on deterministic steps — the PhaseReview API works regardless of whether the tasks inside are AI or handler-based

It is not a replacement for a dedicated workflow engine like Temporal if you need durable execution, long-running workflows with days or weeks of persistence, or replays across process restarts. For those requirements, Temporal or a similar system is the right choice.

For pipelines that live within a single process, run in seconds or minutes, and need consistent observability without adding infrastructure, this pattern removes a lot of plumbing.


No persistence: if the process dies mid-run, the pipeline restarts from the beginning. There is no checkpoint mechanism.

No scheduling: the orchestration layer has no built-in cron or trigger. External schedulers (Quartz, Spring Scheduler, Kubernetes CronJobs) are needed if the pipeline runs on a schedule.

No distributed execution: all tasks run in-process on virtual threads. Cross-JVM task distribution is not supported.

These are acceptable tradeoffs for many internal pipelines. They are not appropriate for workflows that span hours or need durable state across process restarts.


Guide: Deterministic Orchestration | GitHub | Example source

AgentEnsemble is open-source under the MIT license.