Skip to content
AgentEnsemble AgentEnsemble
Get Started

Cross-Ensemble Delegation

AgentEnsemble v3.0 enables ensembles to delegate work to each other over WebSocket using two primitives: NetworkTask (full task delegation) and NetworkTool (remote tool call).

A NetworkTask delegates a full task to a remote ensemble. The remote ensemble runs its complete pipeline — agent synthesis, ReAct loop, tools, review gates — and returns the final output. The calling agent does not know or care about the internal process.

// Room service delegates meal preparation to the kitchen ensemble
Ensemble roomService = Ensemble.builder()
.chatLanguageModel(model)
.task(Task.builder()
.description("Handle guest room service request")
.tools(NetworkTask.from("kitchen", "prepare-meal", registry))
.build())
.build();
SettingDefaultDescription
Connect timeout10 secondsTime to establish WebSocket connection
Execution timeout30 minutesTime to wait for task completion
NetworkTask.from("kitchen", "prepare-meal",
Duration.ofMinutes(15), registry)

A NetworkTool invokes a single tool on a remote ensemble. The calling agent retains control of its reasoning loop — it just borrows a remote capability for one call.

Ensemble roomService = Ensemble.builder()
.chatLanguageModel(model)
.task(Task.builder()
.description("Handle guest room service request")
.tools(
NetworkTool.from("kitchen", "check-inventory", registry),
NetworkTool.from("kitchen", "dietary-check", registry))
.build())
.build();

Tool calls default to 30 seconds (vs. 30 minutes for tasks).

Both NetworkTask and NetworkTool use a NetworkClientRegistry to manage WebSocket connections. The registry lazily creates and caches connections by ensemble name.

NetworkConfig config = NetworkConfig.builder()
.ensemble("kitchen", "ws://kitchen:7329/ws")
.ensemble("maintenance", "ws://maintenance:7329/ws")
.defaultConnectTimeout(Duration.ofSeconds(5))
.build();
try (NetworkClientRegistry registry = new NetworkClientRegistry(config)) {
Ensemble roomService = Ensemble.builder()
.chatLanguageModel(model)
.task(Task.builder()
.description("Handle room service request")
.tools(
NetworkTask.from("kitchen", "prepare-meal", registry),
NetworkTool.from("kitchen", "check-inventory", registry),
NetworkTask.from("maintenance", "repair-request", registry))
.build())
.build()
.run();
}

The remote ensemble must share the tasks and tools that other ensembles want to use:

Ensemble kitchen = Ensemble.builder()
.chatLanguageModel(model)
.task(Task.of("Manage kitchen operations"))
.shareTask("prepare-meal", Task.builder()
.description("Prepare a meal as specified")
.expectedOutput("Confirmation with preparation details")
.build())
.shareTool("check-inventory", inventoryTool)
.shareTool("dietary-check", allergyCheckTool)
.webDashboard(WebDashboard.builder().port(7329).build())
.build();
kitchen.start(7329);

Both NetworkTask and NetworkTool return ToolResult.failure() on errors — the calling agent can adapt its behavior based on the error message:

ScenarioBehavior
Remote ensemble unreachableToolResult.failure("Network error: ...")
Execution timeoutToolResult.failure("Task '...' timed out after ...")
Remote task failsToolResult.failure(error message from remote)
Unknown task/tool nameToolResult.failure("Unknown shared task: ...")
Remote ensemble drainingToolResult.failure("Ensemble is DRAINING")

Both NetworkTask and NetworkTool implement AgentTool. An agent does not know whether a tool is local or remote. The existing ReAct loop, tool executor, metrics, and tracing all work unchanged.

The transport layer is pluggable via the Transport SPI. The default is simple mode: in-process queues with no external infrastructure.

// Transport for the kitchen ensemble (bound to its inbox)
Transport kitchenTransport = Transport.websocket("kitchen");
// Another ensemble sends a work request to the kitchen's inbox
kitchenTransport.send(workRequest);
// Kitchen receives work from its inbox (blocking)
WorkRequest incoming = kitchenTransport.receive(Duration.ofSeconds(30));
// Kitchen processes the request and delivers a response
kitchenTransport.deliver(workResponse);

Each transport instance is bound to an ensemble name that identifies its inbox. Transport.websocket("kitchen") creates a transport whose send() and receive() both operate on the "kitchen" inbox.

Transport.websocket(ensembleName) creates a simple transport backed by in-process LinkedBlockingQueue instances for request delivery and ConcurrentHashMap for response storage. No external infrastructure is required.

This is suitable for local development and testing. It does not survive process restarts and does not support horizontal scaling.

The SPI is open for custom implementations. Implement the Transport interface to integrate with your messaging infrastructure:

public class MyCustomTransport implements Transport {
@Override public void send(WorkRequest request) { /* ... */ }
@Override public WorkRequest receive(Duration timeout) { /* ... */ }
@Override public void deliver(WorkResponse response) { /* ... */ }
}

The companion RequestQueue and ResultStore SPIs provide finer-grained abstractions for the request and response paths independently. Both include inMemory() factories for development.

For production deployments that need to survive pod restarts and support horizontal scaling, use the Redis-backed transport from the agentensemble-transport-redis module:

implementation("net.agentensemble:agentensemble-transport-redis:${agentensembleVersion}")
RedisClient redisClient = RedisClient.create("redis://localhost:6379");
Transport transport = Transport.durable(
"kitchen",
RedisRequestQueue.create(redisClient),
RedisResultStore.create(redisClient));

RedisRequestQueue uses Redis Streams with consumer groups for durable, at-least-once delivery. RedisResultStore uses Redis key-value storage with TTL for automatic cleanup and Pub/Sub for result notifications.

When running multiple replicas, each replica uses a different consumer name so that Redis distributes messages across consumers:

String consumerName = InetAddress.getLocalHost().getHostName();
Transport transport = Transport.durable(
"kitchen",
RedisRequestQueue.create(redisClient, consumerName),
RedisResultStore.create(redisClient));

If a consumer crashes before acknowledging a message, the visibility timeout (default 5 minutes) expires and the message is automatically redelivered to a healthy consumer.

See Chapter 6: Durable Transport for a detailed explanation of the asymmetric routing pattern and consumer group semantics.

For Kafka-backed transport, see agentensemble-transport-kafka which provides Kafka-backed RequestQueue, DeliveryHandler, and IngressSource implementations.

The RequestQueue SPI includes a priority-aware implementation that orders requests by priority level (CRITICAL > HIGH > NORMAL > LOW) with FIFO ordering within the same level.

// Priority queue with aging disabled
RequestQueue queue = RequestQueue.priority();
queue.enqueue("kitchen", workRequest);
WorkRequest next = queue.dequeue("kitchen", Duration.ofSeconds(30));

Low-priority requests can be promoted over time to prevent starvation. Configure an AgingPolicy to control how frequently promotions occur:

// Promote unprocessed requests one priority level every 30 minutes
// LOW -> NORMAL (30 min) -> HIGH (60 min) -> CRITICAL (90 min)
RequestQueue queue = RequestQueue.priority(AgingPolicy.every(Duration.ofMinutes(30)));

Use AgingPolicy.none() to disable aging (the default).

The PriorityWorkQueue provides queue position and ETA for populating task_accepted messages:

PriorityWorkQueue queue = RequestQueue.priority(
AgingPolicy.every(Duration.ofMinutes(30)));
queue.enqueue("kitchen", workRequest);
QueueStatus status = queue.queueStatus("kitchen", workRequest.requestId());
// status.queuePosition() -> 0 (next to be processed)
// status.estimatedCompletion() -> PT30S (estimated time)

Pass a QueueMetrics callback to receive queue depth reports after each enqueue/dequeue:

QueueMetrics metrics = new QueueMetrics() {
private final Map<String, AtomicInteger> depths = new ConcurrentHashMap<>();
@Override
public void recordQueueDepth(String queueName, Priority priority, int depth) {
String key = queueName + ":" + priority.name();
AtomicInteger gaugeValue = depths.computeIfAbsent(key, k -> {
AtomicInteger value = new AtomicInteger(depth);
Gauge.builder("agentensemble.queue.depth", value, AtomicInteger::get)
.tag("ensemble", queueName)
.tag("priority", priority.name())
.register(meterRegistry);
return value;
});
gaugeValue.set(depth);
}
};

By default, NetworkTask and NetworkTool block until the result arrives (AWAIT mode). Two additional modes are available via the builder API:

// AWAIT (default -- same as from())
NetworkTask task = NetworkTask.builder()
.ensembleName("kitchen")
.taskName("prepare-meal")
.clientRegistry(registry)
.build();
// ASYNC -- submit and return immediately
NetworkTask asyncTask = NetworkTask.builder()
.ensembleName("kitchen")
.taskName("prepare-meal")
.clientRegistry(registry)
.mode(RequestMode.ASYNC)
.onComplete(result -> log.info("Meal ready: {}", result.getOutput()))
.build();
// AWAIT_WITH_DEADLINE -- block up to 30 seconds
NetworkTask deadlineTask = NetworkTask.builder()
.ensembleName("kitchen")
.taskName("prepare-meal")
.clientRegistry(registry)
.mode(RequestMode.AWAIT_WITH_DEADLINE)
.deadline(Duration.ofSeconds(30))
.deadlineAction(DeadlineAction.CONTINUE_IN_BACKGROUND)
.onComplete(result -> log.info("Background result: {}", result.getOutput()))
.build();
ModeBehavior
AWAITBlock until result (default)
ASYNCReturn immediately; result via onComplete callback
AWAIT_WITH_DEADLINEBlock up to deadline; on timeout, apply DeadlineAction
ActionOn timeout
RETURN_TIMEOUT_ERRORReturn ToolResult.failure("Deadline exceeded")
RETURN_PARTIALReturn success with “continuing in background” message
CONTINUE_IN_BACKGROUNDReturn success + fire onComplete when result arrives

Work responses can be delivered via different transports using the DeliveryHandler SPI.

MethodHandlerBehavior
WEBSOCKETWebSocketDeliveryHandlerSend via WebSocket
QUEUEQueueDeliveryHandlerWrite to a named queue
TOPICKafkaTopicDeliveryPublish to Kafka topic (requires agentensemble-transport-kafka)
WEBHOOKWebhookDeliveryHandlerHTTP POST to a URL
STOREStoreDeliveryHandlerWrite to ResultStore
BROADCAST_CLAIMBroadcastClaimDeliveryHandlerBroadcast to all replicas
NONENoneDeliveryHandlerFire and forget
DeliveryRegistry registry = DeliveryRegistry.withDefaults(ResultStore.inMemory());
registry.register(new WebhookDeliveryHandler());
registry.register(new QueueDeliveryHandler((queue, response) -> { /* write to queue */ }));
// Use with transport
Transport transport = Transport.simple("kitchen", registry);
public class SlackDeliveryHandler implements DeliveryHandler {
@Override
public DeliveryMethod method() { return DeliveryMethod.WEBHOOK; }
@Override
public void deliver(DeliverySpec spec, WorkResponse response) {
// POST to Slack webhook at spec.address()
}
}

Work can arrive at an ensemble via multiple ingress sources simultaneously.

SourceBehavior
HttpIngressPOST /api/work HTTP endpoint
QueueIngressPoll a RequestQueue
WebSocketIngressReceive via WebSocket
KafkaTopicIngressSubscribe to Kafka topic (requires agentensemble-transport-kafka)
IngressCoordinator ingress = IngressCoordinator.builder()
.add(new HttpIngress(8080))
.add(new QueueIngress(RequestQueue.inMemory(), "kitchen"))
.build();
Transport transport = Transport.websocket("kitchen");
ingress.startAll(transport::send);
// Work submitted via HTTP or queue is routed to the transport

The IdempotencyGuard prevents duplicate processing of the same request:

IdempotencyGuard guard = IdempotencyGuard.inMemory();
ResultCache cache = ResultCache.inMemory();
// Wrap the request handler with caching support
RequestHandler cachedHandler = new CachingRequestHandler(baseHandler, guard, cache);

When a WorkRequest arrives with a previously-seen requestId, the guard returns the cached result instead of re-executing.

The ResultCache caches results by semantic cacheKey:

WorkRequest request = new WorkRequest(
"req-1", "frontend", "get-menu", null,
Priority.NORMAL, null, null, null,
CachePolicy.USE_CACHED, "menu-cache-key",
Duration.ofMinutes(30)); // maxAge
PolicyBehavior
USE_CACHEDReturn cached result if valid; execute if miss
FORCE_FRESHBypass cache; execute and update cache

Long-running ensembles can execute tasks on a schedule:

Ensemble kitchen = Ensemble.builder()
.chatLanguageModel(model)
.task(Task.of("Manage kitchen operations"))
.scheduledTask(ScheduledTask.builder()
.name("inventory-report")
.task(Task.of("Check current inventory levels and generate report"))
.schedule(Schedule.every(Duration.ofHours(1)))
.broadcastTo("hotel.inventory")
.build())
.broadcastHandler((topic, result) -> {
log.info("Broadcast to {}: {}", topic, result);
})
.webDashboard(WebDashboard.builder().port(7329).build())
.build();
kitchen.start(7329);

Scheduled tasks automatically stop when the ensemble enters the DRAINING state.

The agentensemble-transport-kafka module provides Kafka-backed implementations.

Add agentensemble-transport-kafka to your project.

KafkaTransportConfig config = KafkaTransportConfig.builder()
.bootstrapServers("kafka:9092")
.consumerGroupId("kitchen-ensemble")
.build();
KafkaRequestQueue queue = KafkaRequestQueue.create(config);
queue.enqueue("kitchen", workRequest);
WorkRequest received = queue.dequeue("kitchen", Duration.ofSeconds(30));
queue.acknowledge("kitchen", received.requestId());
KafkaTopicDelivery topicDelivery = new KafkaTopicDelivery(config);
DeliveryRegistry registry = DeliveryRegistry.withDefaults(ResultStore.inMemory());
registry.register(topicDelivery);
KafkaTopicIngress ingress = new KafkaTopicIngress(config, "work-requests");
ingress.start(transport::send);