Cross-Ensemble Delegation
AgentEnsemble v3.0 enables ensembles to delegate work to each other over WebSocket using two primitives: NetworkTask (full task delegation) and NetworkTool (remote tool call).
NetworkTask: “Hire a Department”
Section titled “NetworkTask: “Hire a Department””A NetworkTask delegates a full task to a remote ensemble. The remote ensemble runs its
complete pipeline — agent synthesis, ReAct loop, tools, review gates — and returns the
final output. The calling agent does not know or care about the internal process.
// Room service delegates meal preparation to the kitchen ensembleEnsemble roomService = Ensemble.builder() .chatLanguageModel(model) .task(Task.builder() .description("Handle guest room service request") .tools(NetworkTask.from("kitchen", "prepare-meal", registry)) .build()) .build();Default timeouts
Section titled “Default timeouts”| Setting | Default | Description |
|---|---|---|
| Connect timeout | 10 seconds | Time to establish WebSocket connection |
| Execution timeout | 30 minutes | Time to wait for task completion |
Custom timeout
Section titled “Custom timeout”NetworkTask.from("kitchen", "prepare-meal", Duration.ofMinutes(15), registry)NetworkTool: “Borrow a Tool”
Section titled “NetworkTool: “Borrow a Tool””A NetworkTool invokes a single tool on a remote ensemble. The calling agent retains
control of its reasoning loop — it just borrows a remote capability for one call.
Ensemble roomService = Ensemble.builder() .chatLanguageModel(model) .task(Task.builder() .description("Handle guest room service request") .tools( NetworkTool.from("kitchen", "check-inventory", registry), NetworkTool.from("kitchen", "dietary-check", registry)) .build()) .build();Default timeout
Section titled “Default timeout”Tool calls default to 30 seconds (vs. 30 minutes for tasks).
NetworkClientRegistry
Section titled “NetworkClientRegistry”Both NetworkTask and NetworkTool use a NetworkClientRegistry to manage WebSocket
connections. The registry lazily creates and caches connections by ensemble name.
NetworkConfig config = NetworkConfig.builder() .ensemble("kitchen", "ws://kitchen:7329/ws") .ensemble("maintenance", "ws://maintenance:7329/ws") .defaultConnectTimeout(Duration.ofSeconds(5)) .build();
try (NetworkClientRegistry registry = new NetworkClientRegistry(config)) { Ensemble roomService = Ensemble.builder() .chatLanguageModel(model) .task(Task.builder() .description("Handle room service request") .tools( NetworkTask.from("kitchen", "prepare-meal", registry), NetworkTool.from("kitchen", "check-inventory", registry), NetworkTask.from("maintenance", "repair-request", registry)) .build()) .build() .run();}Sharing Capabilities
Section titled “Sharing Capabilities”The remote ensemble must share the tasks and tools that other ensembles want to use:
Ensemble kitchen = Ensemble.builder() .chatLanguageModel(model) .task(Task.of("Manage kitchen operations")) .shareTask("prepare-meal", Task.builder() .description("Prepare a meal as specified") .expectedOutput("Confirmation with preparation details") .build()) .shareTool("check-inventory", inventoryTool) .shareTool("dietary-check", allergyCheckTool) .webDashboard(WebDashboard.builder().port(7329).build()) .build();
kitchen.start(7329);Error Handling
Section titled “Error Handling”Both NetworkTask and NetworkTool return ToolResult.failure() on errors — the calling
agent can adapt its behavior based on the error message:
| Scenario | Behavior |
|---|---|
| Remote ensemble unreachable | ToolResult.failure("Network error: ...") |
| Execution timeout | ToolResult.failure("Task '...' timed out after ...") |
| Remote task fails | ToolResult.failure(error message from remote) |
| Unknown task/tool name | ToolResult.failure("Unknown shared task: ...") |
| Remote ensemble draining | ToolResult.failure("Ensemble is DRAINING") |
Transparency
Section titled “Transparency”Both NetworkTask and NetworkTool implement AgentTool. An agent does not know whether
a tool is local or remote. The existing ReAct loop, tool executor, metrics, and tracing
all work unchanged.
Transport SPI
Section titled “Transport SPI”The transport layer is pluggable via the Transport SPI. The default is simple mode:
in-process queues with no external infrastructure.
// Transport for the kitchen ensemble (bound to its inbox)Transport kitchenTransport = Transport.websocket("kitchen");
// Another ensemble sends a work request to the kitchen's inboxkitchenTransport.send(workRequest);
// Kitchen receives work from its inbox (blocking)WorkRequest incoming = kitchenTransport.receive(Duration.ofSeconds(30));
// Kitchen processes the request and delivers a responsekitchenTransport.deliver(workResponse);Each transport instance is bound to an ensemble name that identifies its inbox.
Transport.websocket("kitchen") creates a transport whose send() and receive()
both operate on the "kitchen" inbox.
Simple mode
Section titled “Simple mode”Transport.websocket(ensembleName) creates a simple transport backed by in-process
LinkedBlockingQueue instances for request delivery and ConcurrentHashMap for response
storage. No external infrastructure is required.
This is suitable for local development and testing. It does not survive process restarts and does not support horizontal scaling.
Custom transports
Section titled “Custom transports”The SPI is open for custom implementations. Implement the Transport interface to
integrate with your messaging infrastructure:
public class MyCustomTransport implements Transport { @Override public void send(WorkRequest request) { /* ... */ } @Override public WorkRequest receive(Duration timeout) { /* ... */ } @Override public void deliver(WorkResponse response) { /* ... */ }}The companion RequestQueue and ResultStore SPIs provide finer-grained abstractions
for the request and response paths independently. Both include inMemory() factories
for development.
Durable mode (Redis)
Section titled “Durable mode (Redis)”For production deployments that need to survive pod restarts and support horizontal
scaling, use the Redis-backed transport from the agentensemble-transport-redis module:
implementation("net.agentensemble:agentensemble-transport-redis:${agentensembleVersion}")RedisClient redisClient = RedisClient.create("redis://localhost:6379");
Transport transport = Transport.durable( "kitchen", RedisRequestQueue.create(redisClient), RedisResultStore.create(redisClient));RedisRequestQueue uses Redis Streams with consumer groups for durable, at-least-once
delivery. RedisResultStore uses Redis key-value storage with TTL for automatic cleanup
and Pub/Sub for result notifications.
Consumer groups for horizontal scaling
Section titled “Consumer groups for horizontal scaling”When running multiple replicas, each replica uses a different consumer name so that Redis distributes messages across consumers:
String consumerName = InetAddress.getLocalHost().getHostName();
Transport transport = Transport.durable( "kitchen", RedisRequestQueue.create(redisClient, consumerName), RedisResultStore.create(redisClient));If a consumer crashes before acknowledging a message, the visibility timeout (default 5 minutes) expires and the message is automatically redelivered to a healthy consumer.
See Chapter 6: Durable Transport for a detailed explanation of the asymmetric routing pattern and consumer group semantics.
For Kafka-backed transport, see agentensemble-transport-kafka which provides
Kafka-backed RequestQueue, DeliveryHandler, and IngressSource implementations.
Priority Queue
Section titled “Priority Queue”The RequestQueue SPI includes a priority-aware implementation that orders requests by
priority level (CRITICAL > HIGH > NORMAL > LOW) with FIFO ordering within the same level.
Basic usage
Section titled “Basic usage”// Priority queue with aging disabledRequestQueue queue = RequestQueue.priority();queue.enqueue("kitchen", workRequest);WorkRequest next = queue.dequeue("kitchen", Duration.ofSeconds(30));Aging (starvation prevention)
Section titled “Aging (starvation prevention)”Low-priority requests can be promoted over time to prevent starvation. Configure
an AgingPolicy to control how frequently promotions occur:
// Promote unprocessed requests one priority level every 30 minutes// LOW -> NORMAL (30 min) -> HIGH (60 min) -> CRITICAL (90 min)RequestQueue queue = RequestQueue.priority(AgingPolicy.every(Duration.ofMinutes(30)));Use AgingPolicy.none() to disable aging (the default).
Queue status for task_accepted responses
Section titled “Queue status for task_accepted responses”The PriorityWorkQueue provides queue position and ETA for populating task_accepted
messages:
PriorityWorkQueue queue = RequestQueue.priority( AgingPolicy.every(Duration.ofMinutes(30)));
queue.enqueue("kitchen", workRequest);
QueueStatus status = queue.queueStatus("kitchen", workRequest.requestId());// status.queuePosition() -> 0 (next to be processed)// status.estimatedCompletion() -> PT30S (estimated time)Metrics
Section titled “Metrics”Pass a QueueMetrics callback to receive queue depth reports after each enqueue/dequeue:
QueueMetrics metrics = new QueueMetrics() { private final Map<String, AtomicInteger> depths = new ConcurrentHashMap<>();
@Override public void recordQueueDepth(String queueName, Priority priority, int depth) { String key = queueName + ":" + priority.name(); AtomicInteger gaugeValue = depths.computeIfAbsent(key, k -> { AtomicInteger value = new AtomicInteger(depth); Gauge.builder("agentensemble.queue.depth", value, AtomicInteger::get) .tag("ensemble", queueName) .tag("priority", priority.name()) .register(meterRegistry); return value; }); gaugeValue.set(depth); }};Request Modes
Section titled “Request Modes”By default, NetworkTask and NetworkTool block until the result arrives (AWAIT mode).
Two additional modes are available via the builder API:
Builder API
Section titled “Builder API”// AWAIT (default -- same as from())NetworkTask task = NetworkTask.builder() .ensembleName("kitchen") .taskName("prepare-meal") .clientRegistry(registry) .build();
// ASYNC -- submit and return immediatelyNetworkTask asyncTask = NetworkTask.builder() .ensembleName("kitchen") .taskName("prepare-meal") .clientRegistry(registry) .mode(RequestMode.ASYNC) .onComplete(result -> log.info("Meal ready: {}", result.getOutput())) .build();
// AWAIT_WITH_DEADLINE -- block up to 30 secondsNetworkTask deadlineTask = NetworkTask.builder() .ensembleName("kitchen") .taskName("prepare-meal") .clientRegistry(registry) .mode(RequestMode.AWAIT_WITH_DEADLINE) .deadline(Duration.ofSeconds(30)) .deadlineAction(DeadlineAction.CONTINUE_IN_BACKGROUND) .onComplete(result -> log.info("Background result: {}", result.getOutput())) .build();| Mode | Behavior |
|---|---|
AWAIT | Block until result (default) |
ASYNC | Return immediately; result via onComplete callback |
AWAIT_WITH_DEADLINE | Block up to deadline; on timeout, apply DeadlineAction |
Deadline Actions
Section titled “Deadline Actions”| Action | On timeout |
|---|---|
RETURN_TIMEOUT_ERROR | Return ToolResult.failure("Deadline exceeded") |
RETURN_PARTIAL | Return success with “continuing in background” message |
CONTINUE_IN_BACKGROUND | Return success + fire onComplete when result arrives |
Delivery Methods
Section titled “Delivery Methods”Work responses can be delivered via different transports using the DeliveryHandler SPI.
Built-in handlers
Section titled “Built-in handlers”| Method | Handler | Behavior |
|---|---|---|
WEBSOCKET | WebSocketDeliveryHandler | Send via WebSocket |
QUEUE | QueueDeliveryHandler | Write to a named queue |
TOPIC | KafkaTopicDelivery | Publish to Kafka topic (requires agentensemble-transport-kafka) |
WEBHOOK | WebhookDeliveryHandler | HTTP POST to a URL |
STORE | StoreDeliveryHandler | Write to ResultStore |
BROADCAST_CLAIM | BroadcastClaimDeliveryHandler | Broadcast to all replicas |
NONE | NoneDeliveryHandler | Fire and forget |
DeliveryRegistry
Section titled “DeliveryRegistry”DeliveryRegistry registry = DeliveryRegistry.withDefaults(ResultStore.inMemory());registry.register(new WebhookDeliveryHandler());registry.register(new QueueDeliveryHandler((queue, response) -> { /* write to queue */ }));
// Use with transportTransport transport = Transport.simple("kitchen", registry);Custom delivery handler
Section titled “Custom delivery handler”public class SlackDeliveryHandler implements DeliveryHandler { @Override public DeliveryMethod method() { return DeliveryMethod.WEBHOOK; }
@Override public void deliver(DeliverySpec spec, WorkResponse response) { // POST to Slack webhook at spec.address() }}Ingress Methods
Section titled “Ingress Methods”Work can arrive at an ensemble via multiple ingress sources simultaneously.
Built-in sources
Section titled “Built-in sources”| Source | Behavior |
|---|---|
HttpIngress | POST /api/work HTTP endpoint |
QueueIngress | Poll a RequestQueue |
WebSocketIngress | Receive via WebSocket |
KafkaTopicIngress | Subscribe to Kafka topic (requires agentensemble-transport-kafka) |
IngressCoordinator
Section titled “IngressCoordinator”IngressCoordinator ingress = IngressCoordinator.builder() .add(new HttpIngress(8080)) .add(new QueueIngress(RequestQueue.inMemory(), "kitchen")) .build();
Transport transport = Transport.websocket("kitchen");ingress.startAll(transport::send);
// Work submitted via HTTP or queue is routed to the transportIdempotency & Caching
Section titled “Idempotency & Caching”Idempotency
Section titled “Idempotency”The IdempotencyGuard prevents duplicate processing of the same request:
IdempotencyGuard guard = IdempotencyGuard.inMemory();ResultCache cache = ResultCache.inMemory();
// Wrap the request handler with caching supportRequestHandler cachedHandler = new CachingRequestHandler(baseHandler, guard, cache);When a WorkRequest arrives with a previously-seen requestId, the guard returns the
cached result instead of re-executing.
Result caching
Section titled “Result caching”The ResultCache caches results by semantic cacheKey:
WorkRequest request = new WorkRequest( "req-1", "frontend", "get-menu", null, Priority.NORMAL, null, null, null, CachePolicy.USE_CACHED, "menu-cache-key", Duration.ofMinutes(30)); // maxAge| Policy | Behavior |
|---|---|
USE_CACHED | Return cached result if valid; execute if miss |
FORCE_FRESH | Bypass cache; execute and update cache |
Scheduled Tasks
Section titled “Scheduled Tasks”Long-running ensembles can execute tasks on a schedule:
Ensemble kitchen = Ensemble.builder() .chatLanguageModel(model) .task(Task.of("Manage kitchen operations")) .scheduledTask(ScheduledTask.builder() .name("inventory-report") .task(Task.of("Check current inventory levels and generate report")) .schedule(Schedule.every(Duration.ofHours(1))) .broadcastTo("hotel.inventory") .build()) .broadcastHandler((topic, result) -> { log.info("Broadcast to {}: {}", topic, result); }) .webDashboard(WebDashboard.builder().port(7329).build()) .build();
kitchen.start(7329);Scheduled tasks automatically stop when the ensemble enters the DRAINING state.
Kafka Transport
Section titled “Kafka Transport”The agentensemble-transport-kafka module provides Kafka-backed implementations.
Dependency
Section titled “Dependency”Add agentensemble-transport-kafka to your project.
KafkaRequestQueue
Section titled “KafkaRequestQueue”KafkaTransportConfig config = KafkaTransportConfig.builder() .bootstrapServers("kafka:9092") .consumerGroupId("kitchen-ensemble") .build();
KafkaRequestQueue queue = KafkaRequestQueue.create(config);queue.enqueue("kitchen", workRequest);WorkRequest received = queue.dequeue("kitchen", Duration.ofSeconds(30));queue.acknowledge("kitchen", received.requestId());KafkaTopicDelivery
Section titled “KafkaTopicDelivery”KafkaTopicDelivery topicDelivery = new KafkaTopicDelivery(config);DeliveryRegistry registry = DeliveryRegistry.withDefaults(ResultStore.inMemory());registry.register(topicDelivery);KafkaTopicIngress
Section titled “KafkaTopicIngress”KafkaTopicIngress ingress = new KafkaTopicIngress(config, "work-requests");ingress.start(transport::send);