Skip to content
AgentEnsemble AgentEnsemble
Get Started

External Workflow Integration (Temporal, Step Functions, etc.)

The agentensemble-executor module lets you call AgentEnsemble directly in-process from any external workflow engine — no HTTP server, no network hop, no Temporal SDK dependency required inside this library.

Two execution modes are available:

ClassGranularityBest for
TaskExecutorOne task = one external activityTemporal workflows where each AgentEnsemble task is a separate activity with its own retry policy, timeout, and heartbeat
EnsembleExecutorOne ensemble = one external activitySimpler pipelines where AgentEnsemble’s internal orchestration handles the full run inside a single activity

Two test doubles are also provided so your Temporal activities can be tested without a real LLM:

ClassExtends
FakeTaskExecutorTaskExecutor
FakeEnsembleExecutorEnsembleExecutor

Yes — heartbeats still work. The HeartbeatEnsembleListener bridges EnsembleListener lifecycle events to any Consumer<Object>. You pass Temporal’s heartbeat method as the consumer:

executor.execute(request, Activity.getExecutionContext()::heartbeat);

The consumer receives a HeartbeatDetail record for each event:

eventTypeWhen fired
task_startedAgent begins executing a task
task_completedAgent finishes a task successfully
task_failedA task fails with an exception
tool_callAgent invokes a tool within the ReAct loop
iteration_startedNew ReAct iteration begins (LLM call pending)
iteration_completedReAct iteration finishes (LLM response received)

HeartbeatDetail is a plain Java record serializable by Temporal’s default Jackson DataConverter. Temporal stores the latest heartbeat detail in the activity’s history, visible in the Temporal UI and accessible from the workflow via Activity.getLastHeartbeatDetails().


// build.gradle.kts (in your Temporal worker project)
dependencies {
implementation("net.agentensemble:agentensemble-executor:$agentEnsembleVersion")
// Optional: whichever tool modules the agents need
implementation("net.agentensemble:agentensemble-tools-datetime:$agentEnsembleVersion")
}

Full Temporal Integration: Task-per-Activity

Section titled “Full Temporal Integration: Task-per-Activity”

This is the recommended Temporal pattern. Each TaskRequest maps to one Temporal activity. The Temporal workflow sequences activities and passes upstream outputs as context entries.

1. Define the Activity interface (in your Temporal project)

Section titled “1. Define the Activity interface (in your Temporal project)”
@ActivityInterface
public interface ResearchPipelineActivity {
@ActivityMethod
TaskResult research(TaskRequest request);
@ActivityMethod
TaskResult write(TaskRequest request);
}

Accept TaskExecutor by concrete type — FakeTaskExecutor (a subtype) can then be injected in tests without any additional interface.

public class ResearchPipelineActivityImpl implements ResearchPipelineActivity {
private final TaskExecutor executor;
/** Production constructor. */
public ResearchPipelineActivityImpl() {
this(new TaskExecutor(
SimpleModelProvider.of(
OpenAiChatModel.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName("gpt-4o-mini")
.build()),
SimpleToolProvider.builder()
.tool("web-search", new WebSearchTool(System.getenv("SEARCH_API_KEY")))
.tool("datetime", new DateTimeTool())
.build()));
}
/** Package-private constructor for testing -- accepts FakeTaskExecutor. */
ResearchPipelineActivityImpl(TaskExecutor executor) {
this.executor = executor;
}
@Override
public TaskResult research(TaskRequest request) {
// heartbeat() keeps the activity alive during long LLM / tool-call chains
return executor.execute(request, Activity.getExecutionContext()::heartbeat);
}
@Override
public TaskResult write(TaskRequest request) {
return executor.execute(request, Activity.getExecutionContext()::heartbeat);
}
}

The workflow orchestrates activities, passes upstream outputs as context entries, and handles all retry and timeout semantics via Temporal’s standard policies.

@WorkflowInterface
public interface ResearchWorkflow {
@WorkflowMethod
String run(String topic);
}
public class ResearchWorkflowImpl implements ResearchWorkflow {
private static final ActivityOptions ACTIVITY_OPTIONS = ActivityOptions.newBuilder()
// Allow up to 30 minutes per activity (LLM chains can be slow)
.setScheduleToCloseTimeout(Duration.ofMinutes(30))
// Temporal marks the activity as failed if no heartbeat arrives within 2 minutes.
// HeartbeatEnsembleListener fires on every LLM iteration and tool call, so
// the 2-minute window is generous for typical agent workloads.
.setHeartbeatTimeout(Duration.ofMinutes(2))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build();
private final ResearchPipelineActivity activity =
Workflow.newActivityStub(ResearchPipelineActivity.class, ACTIVITY_OPTIONS);
@Override
public String run(String topic) {
// Activity 1: Research
TaskResult research = activity.research(
TaskRequest.builder()
.description("Research the latest developments in {topic}")
.expectedOutput("A comprehensive, accurate research summary")
.agent(AgentSpec.builder()
.role("Research Analyst")
.goal("Find accurate, up-to-date information on any topic")
.toolNames(List.of("web-search", "datetime"))
.build())
.inputs(Map.of("topic", topic))
.build());
// Activity 2: Write -- injects research output as a template variable {research}
TaskResult article = activity.write(
TaskRequest.builder()
.description("Write a blog post about {topic} using this research: {research}")
.expectedOutput("A well-structured, engaging 500-word blog post")
.agent(AgentSpec.of("Technical Writer", "Write clear, compelling content"))
.context(Map.of("research", research.output())) // <-- upstream output
.inputs(Map.of("topic", topic))
.build());
return article.output();
}
}
WorkflowServiceStubs service = WorkflowServiceStubs.newLocalServiceStubs();
WorkflowClient client = WorkflowClient.newInstance(service);
WorkerFactory factory = WorkerFactory.newInstance(client);
Worker worker = factory.newWorker("research-task-queue");
worker.registerWorkflowImplementationTypes(ResearchWorkflowImpl.class);
worker.registerActivitiesImplementations(new ResearchPipelineActivityImpl());
factory.start();
ResearchWorkflow workflow = client.newWorkflowStub(
ResearchWorkflow.class,
WorkflowOptions.newBuilder()
.setTaskQueue("research-task-queue")
.setWorkflowId("research-" + UUID.randomUUID())
.build());
String result = workflow.run("Artificial Intelligence");
System.out.println(result);

Use FakeTaskExecutor to test both activity implementations and the full workflow (via TestWorkflowEnvironment) without any LLM calls.

import net.agentensemble.executor.FakeTaskExecutor;
import net.agentensemble.executor.TaskRequest;
import net.agentensemble.executor.TaskResult;
import static org.assertj.core.api.Assertions.assertThat;
class ResearchActivityTest {
@Test
void research_callsExecutorWithCorrectRequest_returnsResult() {
// Arrange
FakeTaskExecutor fake = FakeTaskExecutor.builder()
.whenDescriptionContains("Research", "AI is advancing rapidly in 2026.")
.defaultOutput("Unexpected request")
.build();
var activity = new ResearchPipelineActivityImpl(fake);
var request = TaskRequest.builder()
.description("Research the latest developments in {topic}")
.expectedOutput("A research summary")
.inputs(Map.of("topic", "AI"))
.build();
// Act
TaskResult result = activity.research(request);
// Assert
assertThat(result.output()).isEqualTo("AI is advancing rapidly in 2026.");
assertThat(result.isComplete()).isTrue();
}
}

Integration-testing the full Temporal workflow

Section titled “Integration-testing the full Temporal workflow”

Use Temporal’s TestWorkflowEnvironment to run the real workflow code with fake activity implementations — no LLM, no network, fast deterministic tests.

import io.temporal.testing.TestWorkflowEnvironment;
import io.temporal.client.WorkflowOptions;
import net.agentensemble.executor.FakeTaskExecutor;
class ResearchWorkflowTest {
private TestWorkflowEnvironment testEnv;
private static final String TASK_QUEUE = "test-queue";
@BeforeEach
void setUp() {
testEnv = TestWorkflowEnvironment.newInstance();
// Configure a fake executor for both research and write activities
FakeTaskExecutor fake = FakeTaskExecutor.builder()
.whenDescriptionContains("Research", "Research done: AI grows 40% YoY.")
.whenDescriptionContains("Write", "Article: AI is reshaping every industry.")
.build();
Worker worker = testEnv.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(ResearchWorkflowImpl.class);
worker.registerActivitiesImplementations(new ResearchPipelineActivityImpl(fake));
testEnv.start();
}
@AfterEach
void tearDown() {
testEnv.close();
}
@Test
void run_sequencesResearchThenWrite_returnsArticleOutput() {
ResearchWorkflow workflow = testEnv.newWorkflowStub(
ResearchWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
String result = workflow.run("Artificial Intelligence");
assertThat(result).isEqualTo("Article: AI is reshaping every industry.");
}
@Test
void run_passesResearchContextToWriteActivity() {
// Use a custom fake that validates context passing
List<TaskRequest> capturedRequests = new ArrayList<>();
FakeTaskExecutor capturingFake = FakeTaskExecutor.builder()
.whenDescriptionContains("Research", "Research output.")
.whenDescriptionContains("Write", "Article output.")
.build();
// You can also use Mockito to capture and verify the TaskRequest:
// TaskExecutor mockExecutor = mock(TaskExecutor.class);
// when(mockExecutor.execute(any(), any())).thenReturn(new TaskResult("output", 1, 0, "COMPLETED"));
// ArgumentCaptor<TaskRequest> captor = ArgumentCaptor.forClass(TaskRequest.class);
// verify(mockExecutor, times(2)).execute(captor.capture(), any());
// assertThat(captor.getAllValues().get(1).getContext()).containsKey("research");
}
}
MethodDescription
FakeTaskExecutor.alwaysReturns(String)Returns the same output for every execute() call
builder().whenDescriptionContains(substring, output)Returns output when the request’s description contains substring; first match wins
builder().whenDescription(Predicate<String>, output)Returns output when the description predicate is true
builder().whenAgentRole(role, output)Returns output when request.getAgent().getRole() matches
builder().defaultOutput(String)Output returned when no rule matches (default: "Fake task output.")

FakeEnsembleExecutor has the same API. For multi-task requests, each task is matched independently; the final task’s output becomes EnsembleResult.finalOutput().


Upstream task outputs become template variables in downstream tasks:

// Activity 1: Research returns "AI is growing fast."
TaskResult research = activity.research(
TaskRequest.builder()
.description("Research {topic}")
.expectedOutput("A research summary")
.inputs(Map.of("topic", topic))
.build());
// Activity 2: Write -- {research} resolves to the upstream output
TaskResult article = activity.write(
TaskRequest.builder()
.description("Write about {topic} using: {research}")
.expectedOutput("A blog post")
.context(Map.of("research", research.output())) // key = template variable name
.inputs(Map.of("topic", topic))
.build());

Context entries and explicit inputs are merged. Explicit inputs() take precedence over context() when both share a key.


Use EnsembleExecutor when you want AgentEnsemble to handle a full pipeline inside a single Temporal activity:

public class PipelineActivityImpl implements PipelineActivity {
private final EnsembleExecutor executor;
public PipelineActivityImpl() {
this(new EnsembleExecutor(SimpleModelProvider.of(buildModel())));
}
PipelineActivityImpl(EnsembleExecutor executor) {
this.executor = executor;
}
@Override
public EnsembleResult run(EnsembleRequest request) {
return executor.execute(request, Activity.getExecutionContext()::heartbeat);
}
}

In tests, inject FakeEnsembleExecutor:

FakeEnsembleExecutor fake = FakeEnsembleExecutor.builder()
.whenDescriptionContains("Research", "Research output.")
.whenDescriptionContains("Write", "Article output.")
.build();
PipelineActivityImpl activity = new PipelineActivityImpl(fake);

Models and tools are configured on the worker side and are never serialized into workflow history. Use modelName in a TaskRequest to select a specific model at request time:

ModelProvider models = SimpleModelProvider.builder()
.model("gpt-4o-mini", cheapModel)
.model("gpt-4o", premiumModel)
.defaultModel(cheapModel)
.build();
// Workflow code -- selecting a named model for a specific task:
TaskRequest.builder()
.description("Synthesize the final executive summary")
.expectedOutput("A crisp one-page summary")
.modelName("gpt-4o") // resolved by the worker's ModelProvider at run time
.agent(AgentSpec.of("Executive Synthesizer", "Produce board-level summaries"))
.build();

The agentensemble-executor module has no Temporal SDK dependency. The heartbeat consumer is a plain Consumer<Object>. The same executors work with:

  • TemporalActivity.getExecutionContext()::heartbeat
  • AWS Step Functions — pass a heartbeat callback to a state machine activity poller
  • Kafka Streams — call execute() inside a Processor
  • Spring Batch — wrap in a Tasklet
  • Plain threads — pass null for no heartbeating