Skip to content

MapReduceEnsemble Guide

MapReduceEnsemble solves the context window overflow problem that arises when using Workflow.PARALLEL to fan out to a large number of agents. It supports two reduction strategies:

  • Static (chunkSize): the DAG is pre-built at build() time with a fixed group size. Use when N and average output sizes are predictable.
  • Adaptive (targetTokenBudget): the DAG is built level-by-level at runtime based on actual output token counts. Use when output sizes vary or context window overflow is a hard constraint.

In the v2.0.0 task-first paradigm, agents are optional. You declare what work needs to be done, and the framework synthesises agents automatically from each task description using the configured AgentSynthesizer (default: template-based derivation, no extra LLM call).

For the simplest cases, MapReduceEnsemble.of() builds and runs a map-reduce in a single call:

EnsembleOutput output = MapReduceEnsemble.of(
model,
List.of("Risotto", "Duck Breast", "Salmon"),
"Prepare a detailed recipe for",
"Combine these individual recipes into a cohesive dinner menu");

Each map task gets a description of mapDescription + ": " + item.toString(). The reduce task uses reduceDescription and receives all map outputs as context. Static mode with chunkSize=5 is used. The model is used to synthesise all agents automatically.

For more control over task configuration without declaring agents:

EnsembleOutput output = MapReduceEnsemble.<OrderItem>builder()
.chatLanguageModel(model) // LLM for all synthesised agents
.items(order.getItems())
// Map phase: one task per item -- agent synthesised from description
.mapTask(item -> Task.builder()
.description("Prepare a detailed recipe for: " + item.getDish()
+ ". Dietary requirements: " + item.getDietaryNotes())
.expectedOutput("Recipe with ingredients, preparation steps, and timing")
.outputType(DishResult.class) // structured output supported
.build())
// Reduce phase: one task per chunk -- agent synthesised from description
.reduceTask(chunkTasks -> Task.builder()
.description("Consolidate these dish preparations into a coordinated sub-plan. "
+ "Note timing dependencies and shared mise en place.")
.expectedOutput("A coordinated sub-plan")
.context(chunkTasks) // wire context explicitly
.build())
.chunkSize(3)
.build()
.run();

Agents are synthesised at run() time from each task’s description. The synthesised agent inherits any tools, chatLanguageModel, and maxIterations declared on the task.

Tools can be declared directly on tasks without defining an agent:

.mapTask(item -> Task.builder()
.description("Research and summarise: " + item.getTopic())
.expectedOutput("A research summary with key findings")
.tools(List.of(webSearchTool, calculatorTool)) // tools on the task
.chatLanguageModel(researchModel) // per-task LLM override
.build())

The framework applies the task-level tools and LLM to the synthesised agent before execution.

Task-first (v2.0.0)Agent-first (power-user)
Agent declarationNone requiredExplicit per item / per group
Agent persona controlVia descriptionFull control (role, goal, background)
Required fieldsitems, mapTask(Function), reduceTask(Function), chatLanguageModelitems, mapAgent, mapTask(BiFunction), reduceAgent, reduceTask(BiFunction)
When to useMost use casesWhen precise persona matters

The two styles are mutually exclusive per phase. You cannot combine mapTask(Function) with mapAgent in the same builder.


With a standard parallel workflow, the aggregation task receives all N outputs as context:

// All N outputs -> one aggregator: context = N * avg_output_size
Task aggregate = Task.builder()
.context(allMapTasks) // grows without bound
...
.build();

For small N this is fine. But when N grows (e.g. 20+ items) or each map output is verbose, the aggregator’s context can exceed the model’s limit — or degrade silently as context quality drops with excessively long inputs.

MapReduceEnsemble solves this with a tree-reduction:

N=7, chunkSize=3:
Map level: [M1] [M2] [M3] [M4] [M5] [M6] [M7]
\ \
Reduce L1: [R-0] [R-1] [R-2]
\ / /
Final reduce: [Final]

Each reducer receives at most chunkSize inputs. Tree depth is O(log_K(N)).


record DishResult(String dish, List<String> ingredients, int prepMinutes, String plating) {}
EnsembleOutput output = MapReduceEnsemble.<OrderItem>builder()
.items(order.getItems()) // items to fan out over
// Map phase: one agent + task per item
.mapAgent(item -> Agent.builder()
.role(item.getDish() + " Chef")
.goal("Prepare " + item.getDish())
.llm(model)
.build())
.mapTask((item, agent) -> Task.builder()
.description("Execute the recipe for: " + item.getDish())
.expectedOutput("Recipe with ingredients, steps, and timing")
.agent(agent)
.outputType(DishResult.class)
.build())
// Reduce phase: consolidate groups of chunkSize map outputs
.reduceAgent(() -> Agent.builder()
.role("Sub-Chef")
.goal("Consolidate dish preparations")
.llm(model)
.build())
.reduceTask((agent, chunkTasks) -> Task.builder()
.description("Consolidate these dish preparations.")
.expectedOutput("Consolidated plan")
.agent(agent)
.context(chunkTasks) // wire context explicitly
.build())
.chunkSize(3)
.build()
.run();

FieldTypeDescription
itemsList<T>Input items. Must not be null or empty.
mapTask(Function<T, Task>)Function<T, Task>Task-first: factory called once per item. Agent is synthesised automatically. Mutually exclusive with mapAgent + mapTask(BiFunction).
reduceTask(Function<List<Task>, Task>)Function<List<Task>, Task>Task-first: factory called once per reduce group. Must wire .context(chunkTasks). Mutually exclusive with reduceAgent + reduceTask(BiFunction).
chatLanguageModelChatModelLLM for synthesised agents. Required when using task-first factories unless each task carries its own chatLanguageModel.
FieldTypeDescription
itemsList<T>Input items. Must not be null or empty.
mapAgentFunction<T, Agent>Factory called once per item to create the map-phase agent. Must be paired with mapTask(BiFunction).
mapTask(BiFunction<T, Agent, Task>)BiFunction<T, Agent, Task>Factory called once per item. Receives the item and agent from mapAgent. Must be paired with mapAgent.
reduceAgentSupplier<Agent>Factory called once per reduce group. Must be paired with reduceTask(BiFunction).
reduceTask(BiFunction<Agent, List<Task>, Task>)BiFunction<Agent, List<Task>, Task>Factory called once per group. Receives the agent and upstream tasks. Must call .context(chunkTasks). Must be paired with reduceAgent.
FieldTypeDefaultDescription
chunkSizeint5Maximum number of upstream tasks per reduce group. Must be >= 2.
verbosebooleanfalseElevates execution logging to INFO level.
listenerEnsembleListenerRegister event listeners (repeatable).
captureModeCaptureModeOFFData collection depth.
parallelErrorStrategyParallelErrorStrategyFAIL_FASTHow to handle failures in map or reduce tasks.
costConfigurationCostConfigurationnullOptional per-token cost rates.
traceExporterExecutionTraceExporternullOptional trace exporter.
toolExecutorExecutorvirtual-threadExecutor for parallel tool calls.
toolMetricsToolMetricsNoOpToolMetricsMetrics backend for tool execution.
input / inputsMap<String,String>{}Template variable inputs.
MethodReturnsDescription
build()MapReduceEnsemble<T>Validates configuration, builds the DAG, returns a ready instance.
run()EnsembleOutputExecutes and returns the final output.
run(Map<String,String>)EnsembleOutputRun with additional template variable overrides.
toEnsemble()EnsembleReturns the pre-built inner Ensemble for devtools inspection.

Given N items and chunkSize K:

  1. Create N map agents and tasks (no context, all independent).
  2. If N <= K: final reduce gets context = all N map tasks directly (2 levels total).
  3. If N > K: partition map tasks into groups of at most K. Create one reduce agent + task per group. If the resulting reduce level has more than K tasks, partition again and repeat. When the level has <= K tasks, create the final reduce task.

Tree depth: O(log_K(N)).

NKLevelsTotal tasks
1any2 (1 map + 1 final)2
552 (5 map + 1 final)6
733 (7 map + 3 L1 + 1 final)11
2553 (25 map + 5 L1 + 1 final)31
2654 (26 map + 6 L1 + 2 L2 + 1 final)35

chunkSize controls how many map outputs each reducer reads. Trade-offs:

  • Larger chunkSize: fewer reduce levels (shallower tree), but each reducer has more context. Use when outputs are compact and the model handles larger context well.
  • Smaller chunkSize: more reduce levels, each with less context per reducer. Use when map outputs are verbose or the model has a limited context window.

A rule of thumb: estimate chunkSize * avg_output_tokens and ensure this stays well within your model’s context limit. For a model with 128K tokens and ~500 token outputs, chunkSize=50 gives comfortable margin; for 2K token outputs, chunkSize=10-20 is safer.


The reduce task factory receives chunkTasks — the list of upstream Task objects. You must wire these as context on the returned task. The framework does not mutate the returned task:

.reduceTask((agent, chunkTasks) -> Task.builder()
.description("Consolidate these preparations.")
.expectedOutput("Consolidated plan")
.agent(agent)
.context(chunkTasks) // required -- without this, the inner Ensemble will throw ValidationException
.build())

Structured output works naturally in map tasks. The LLM produces JSON that is parsed into the target type. Reduce tasks receive the structured output as their context:

record RecipeResult(String dish, List<String> ingredients, int prepMinutes) {}
.mapTask((item, agent) -> Task.builder()
.description("Prepare recipe for " + item.name())
.expectedOutput("Recipe result as JSON")
.agent(agent)
.outputType(RecipeResult.class) // structured output
.build())

Map and reduce tasks are executed with Workflow.PARALLEL internally. The parallelErrorStrategy field (default: FAIL_FAST) controls failure behavior:

  • FAIL_FAST (default): first failure throws TaskExecutionException and stops all remaining tasks in that level.
  • CONTINUE_ON_ERROR: failed tasks are skipped; tasks that depend on them are also skipped. If any task fails, ParallelExecutionException is thrown at the end, carrying completed outputs and failure details.

Call toEnsemble() to access the pre-built inner Ensemble for devtools inspection or DAG export before execution:

MapReduceEnsemble<OrderItem> mre = MapReduceEnsemble.<OrderItem>builder()
...
.build();
// Inspect structure
System.out.printf("DAG: %d agents, %d tasks%n",
mre.toEnsemble().getAgents().size(),
mre.toEnsemble().getTasks().size());
// Export enriched DAG with map/reduce node metadata for agentensemble-viz
DagModel dag = DagExporter.build(mre); // includes nodeType, mapReduceLevel, mapReduceMode
dag.toJson(Path.of("./traces/kitchen.dag.json"));

The DagExporter.build(MapReduceEnsemble) overload enriches each task node with:

  • nodeType: "map", "reduce", or "final-reduce"
  • mapReduceLevel: 0 for map, 1+ for reduce levels
  • mapReduceMode: "STATIC" on the DagModel

agentensemble-viz renders these with distinct badges (MAP, REDUCE Ln, AGGREGATE).


Instead of a fixed chunkSize, adaptive mode measures actual output token counts after each level and bins them into groups that collectively fit within targetTokenBudget. This eliminates the need to guess output sizes upfront.

STEP 1: Run N map tasks in parallel.
STEP 2: Estimate total output tokens.
- If total <= targetTokenBudget: run one final reduce, done.
- Else: go to step 3.
STEP 3: Bin-pack outputs (first-fit-decreasing) into groups of <= targetTokenBudget.
Run one reduce per bin in parallel.
STEP 4: Repeat from step 2 with reduce outputs, until within budget
or maxReduceLevels is reached.
STEP 5: Final reduce.
EnsembleOutput output = MapReduceEnsemble.<OrderItem>builder()
.items(order.getItems())
.mapAgent(item -> Agent.builder()
.role(item.getDish() + " Chef")
.goal("Prepare " + item.getDish())
.llm(model)
.build())
.mapTask((item, agent) -> Task.builder()
.description("Execute recipe for: " + item.getDish())
.expectedOutput("Recipe with ingredients, steps, and timing")
.agent(agent)
.build())
.reduceAgent(() -> Agent.builder()
.role("Sub-Chef")
.goal("Consolidate dish preparations")
.llm(model)
.build())
.reduceTask((agent, chunkTasks) -> Task.builder()
.description("Consolidate these dish preparations.")
.expectedOutput("Consolidated plan")
.agent(agent)
.context(chunkTasks) // same as static mode -- wire context explicitly
.build())
// Adaptive strategy: keep reducing until total context < 8000 tokens
.targetTokenBudget(8_000)
.maxReduceLevels(10) // safety valve (default: 10)
.build()
.run();

Or derive the budget from the model’s context window:

.contextWindowSize(128_000) // model context window in tokens
.budgetRatio(0.5) // use at most 50% -> budget = 64_000 tokens
FieldTypeDefaultDescription
targetTokenBudgetintToken limit per reduce group. Must be > 0. Mutually exclusive with chunkSize.
contextWindowSizeintConvenience: derives targetTokenBudget = contextWindowSize * budgetRatio. Must be set together with budgetRatio.
budgetRatiodouble0.5Fraction of context window for reduce input. Range: (0.0, 1.0]. Must be set together with contextWindowSize.
maxReduceLevelsint10Maximum adaptive reduce levels before final reduce is forced. Must be >= 1.
tokenEstimatorFunction<String, Integer>built-inCustom token estimator. Overrides the heuristic fallback when the LLM provider does not return token counts.

The adaptive executor determines token counts using a three-tier strategy:

  1. Provider count (highest priority): TaskOutput.getMetrics().getOutputTokens() when the LLM provider returns a non-negative value.
  2. Custom estimator: the tokenEstimator function, if provided.
  3. Heuristic fallback: rawOutput.length() / 4. A WARN is logged when this is used.

For accurate bin-packing, prefer using a model that returns token usage metadata. If the provider does not, supply a custom estimator using a tokenizer library:

.tokenEstimator(text -> myTokenizer.countTokens(text))

In adaptive mode, toEnsemble() throws UnsupportedOperationException because the DAG shape is not known until runtime. Instead, inspect the aggregated ExecutionTrace after execution, or use DagExporter.build(output.getTrace()) for a post-execution DAG:

EnsembleOutput output = mre.run();
// Post-execution DAG export for visualization
DagModel dag = DagExporter.build(output.getTrace());
dag.toJson(Path.of("./traces/adaptive-run.dag.json"));
// Per-level timing breakdown
output.getTrace().getMapReduceLevels().forEach(level ->
System.out.printf("Level %d: %d tasks, duration=%s%n",
level.getLevel(), level.getTaskCount(), level.getDuration()));

The aggregated ExecutionTrace from an adaptive run has:

  • workflow = "MAP_REDUCE_ADAPTIVE"
  • mapReduceLevels: list of per-level summaries (level index, task count, duration)
  • All TaskTrace objects annotated with mapReduceLevel (int) and nodeType (String)
  • ExecutionMetrics summed across all levels
ScenarioRecommended strategy
N is known, output sizes are predictableStatic (chunkSize)
Want to inspect/export the DAG before runningStatic (toEnsemble() works)
Same inputs always produce the same tree shapeStatic (deterministic)
Output sizes vary significantly across agentsAdaptive (targetTokenBudget)
Context window overflow is a hard constraintAdaptive (measures actual sizes)
LLM provider returns token usage metadataAdaptive (most accurate)

Short-circuit optimization (adaptive mode)

Section titled “Short-circuit optimization (adaptive mode)”

When the total input is small enough to process directly, spawning N separate map tasks is unnecessary overhead. The short-circuit optimization detects this case before any LLM call and routes execution through a single direct task instead of the full map-reduce pipeline.

Before the map phase runs, the framework estimates total input size:

estimated_input_tokens = sum(inputEstimator(item).length() / 4 for item in items)
IF estimated_input_tokens <= targetTokenBudget
AND directAgent is configured
AND directTask is configured:
--> SHORT-CIRCUIT: run single direct task with all items
--> Return EnsembleOutput (single task, single LLM call)
ELSE:
--> Run normal map-reduce pipeline

The check is purely pre-execution and costs nothing when directAgent/directTask are not configured (backwards-compatible opt-in).

Add directAgent and directTask to an adaptive-mode builder:

EnsembleOutput output = MapReduceEnsemble.<OrderItem>builder()
.items(order.getItems())
// Standard map + reduce config (used when input is too large for direct processing)
.mapAgent(item -> Agent.builder()
.role(item.getDish() + " Chef")
.goal("Prepare " + item.getDish())
.llm(model)
.build())
.mapTask((item, agent) -> Task.builder()
.description("Execute recipe for: " + item.getDish())
.expectedOutput("Recipe with steps and timing")
.agent(agent)
.build())
.reduceAgent(() -> Agent.builder()
.role("Sub-Chef")
.goal("Consolidate preparations")
.llm(model)
.build())
.reduceTask((agent, chunkTasks) -> Task.builder()
.description("Consolidate these preparations.")
.expectedOutput("Consolidated plan")
.agent(agent)
.context(chunkTasks)
.build())
// Short-circuit: if total input fits in budget, skip map-reduce entirely
.directAgent(() -> Agent.builder()
.role("Head Chef")
.goal("Handle the entire order directly")
.llm(model)
.build())
.directTask((agent, allItems) -> {
String allDishes = allItems.stream()
.map(OrderItem::getDish)
.collect(Collectors.joining(", "));
return Task.builder()
.description("Plan the complete meal for: " + allDishes)
.expectedOutput("Complete meal plan with all dishes")
.agent(agent)
.build();
})
.contextWindowSize(128_000)
.budgetRatio(0.5) // targetTokenBudget = 64_000
.build()
.run();

When the input is small, output.getTaskOutputs() has exactly one entry and the execution trace shows a single node with nodeType = "direct". When the input is large, the normal map-reduce pipeline runs and the direct factories are ignored.

By default, each item is converted to a string via toString() and the token count is estimated as text.length() / 4. When toString() is verbose (e.g., a full Java object dump) or not representative of the actual context cost, supply a compact representation:

.inputEstimator(item -> item.getDish() + " " + item.getQuantity())

inputEstimator can be set independently of directAgent/directTask — it has no pairing constraint.

ConstraintDetails
directAgent and directTask must both be set or both be nullSetting one without the other throws ValidationException at build() time.
Not available in static mode (chunkSize)ValidationException at build() time if either is set. Short-circuit requires a token budget to evaluate.
Boundary is inclusiveShort-circuit fires when estimated <= targetTokenBudget (not strictly less than).
ScenarioRecommendation
Orders / batches vary from 1 to 100+ itemsConfigure short-circuit to handle small batches cheaply
Input items are known to be small at design timeOmit short-circuit; just run with a large targetTokenBudget
A single-agent prompt for all items would exceed the context windowShort-circuit will not fire — normal pipeline runs
Items have compact IDs or summaries but verbose toString()Use inputEstimator for accurate estimation

FeatureWorkflow.PARALLEL (manual)MapReduceEnsemble
Aggregator context sizeN * avg_output (unbounded)chunkSize * avg_output (bounded)
DAG constructionManual, error-proneAutomatic, O(log_K(N)) depth
toEnsemble() / devtoolsStandard EnsembleEnriched with map-reduce metadata
Structured outputSupportedSupported
Error handlingparallelErrorStrategySame parallelErrorStrategy
Template variables.input() / run(Map)Same API

Use plain Workflow.PARALLEL when N is small (e.g. < 10) and output sizes are compact. Use MapReduceEnsemble when N is large, outputs are verbose, or context window overflow is a concern.