Skip to content
AgentEnsemble AgentEnsemble
Get Started

Running Agent Tasks as Temporal Activities

If you’re running Temporal in production, you’ve already solved the hard parts of long-running workflow orchestration: durable execution, activity retries, heartbeating, workflow history, and cross-service coordination. The question is how agent tasks fit into that model.

The obvious answer — run AgentEnsemble as a separate service and call it over HTTP from Temporal activities — introduces latency, network failure modes, and another process to operate. A less obvious answer is that the two systems don’t need to be separated at all.

The agentensemble-executor module lets you call AgentEnsemble tasks directly in-process from any Temporal activity. No HTTP server. No Temporal SDK dependency inside the library. Just a Java call.

The module provides two executors with different granularity:

ClassGranularityWhen to use
TaskExecutorOne task = one external activityPer-task Temporal retry, timeout, and heartbeat
EnsembleExecutorOne ensemble = one external activitySimpler pipelines; AgentEnsemble handles internal orchestration inside a single activity

TaskExecutor is the recommended pattern when you want Temporal to own the retry and timeout semantics for individual AI steps. EnsembleExecutor is simpler when the pipeline is short and internal retry is not a concern.

A common concern when embedding long-running work inside a Temporal activity is heartbeating. If the activity doesn’t heartbeat frequently enough, Temporal marks it as timed out.

HeartbeatEnsembleListener bridges EnsembleListener lifecycle events to any Consumer<Object>. Passing Temporal’s heartbeat method as the consumer is one line:

return executor.execute(request, Activity.getExecutionContext()::heartbeat);

The consumer fires on task_started, task_completed, tool_call, and llm_iteration_started — frequently enough that a 2-minute heartbeat window is generous for typical agent workloads. The heartbeat payload is a HeartbeatDetail record serializable by Temporal’s default Jackson DataConverter, so it’s visible in the Temporal UI and accessible via Activity.getLastHeartbeatDetails().

The recommended pattern wraps each AgentEnsemble task as a separate @ActivityMethod:

@ActivityInterface
public interface ResearchPipelineActivity {
@ActivityMethod TaskResult research(TaskRequest request);
@ActivityMethod TaskResult write(TaskRequest request);
}
public class ResearchPipelineActivityImpl implements ResearchPipelineActivity {
private final TaskExecutor executor;
/** Production constructor. */
public ResearchPipelineActivityImpl() {
this(new TaskExecutor(
SimpleModelProvider.of(
OpenAiChatModel.builder()
.apiKey(System.getenv("OPENAI_API_KEY"))
.modelName("gpt-4o-mini")
.build()),
SimpleToolProvider.builder()
.tool("web-search", new WebSearchTool(System.getenv("SEARCH_API_KEY")))
.build()));
}
/** Package-private constructor for testing -- accepts FakeTaskExecutor. */
ResearchPipelineActivityImpl(TaskExecutor executor) {
this.executor = executor;
}
@Override
public TaskResult research(TaskRequest request) {
return executor.execute(request, Activity.getExecutionContext()::heartbeat);
}
@Override
public TaskResult write(TaskRequest request) {
return executor.execute(request, Activity.getExecutionContext()::heartbeat);
}
}

The workflow sequences activities and passes upstream outputs as context entries:

public class ResearchWorkflowImpl implements ResearchWorkflow {
private final ResearchPipelineActivity activity =
Workflow.newActivityStub(ResearchPipelineActivity.class,
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofMinutes(30))
.setHeartbeatTimeout(Duration.ofMinutes(2))
.setRetryOptions(RetryOptions.newBuilder()
.setMaximumAttempts(3)
.build())
.build());
@Override
public String run(String topic) {
TaskResult research = activity.research(
TaskRequest.builder()
.description("Research the latest developments in {topic}")
.expectedOutput("A comprehensive, accurate research summary")
.agent(AgentSpec.builder()
.role("Research Analyst")
.goal("Find accurate, up-to-date information on any topic")
.toolNames(List.of("web-search"))
.build())
.inputs(Map.of("topic", topic))
.build());
TaskResult article = activity.write(
TaskRequest.builder()
.description("Write a blog post about {topic} using this research: {research}")
.expectedOutput("A well-structured, engaging 500-word blog post")
.agent(AgentSpec.of("Technical Writer", "Write clear, compelling content"))
.context(Map.of("research", research.output()))
.inputs(Map.of("topic", topic))
.build());
return article.output();
}
}

Temporal handles sequencing, retry, and timeout. AgentEnsemble handles LLM calls, tool execution, and the ReAct loop. Each concern stays in the system designed for it.

Both executors ship with test doubles — FakeTaskExecutor and FakeEnsembleExecutor — that can be injected without any LLM calls:

FakeTaskExecutor fake = FakeTaskExecutor.builder()
.whenDescriptionContains("Research", "AI is advancing rapidly in 2026.")
.whenDescriptionContains("Write", "Article: AI reshapes every industry.")
.build();
ResearchPipelineActivityImpl activity = new ResearchPipelineActivityImpl(fake);

Combined with Temporal’s TestWorkflowEnvironment, this lets you run the full workflow in fast deterministic tests:

@BeforeEach
void setUp() {
testEnv = TestWorkflowEnvironment.newInstance();
FakeTaskExecutor fake = FakeTaskExecutor.builder()
.whenDescriptionContains("Research", "Research done: AI grows 40% YoY.")
.whenDescriptionContains("Write", "Article: AI is reshaping every industry.")
.build();
Worker worker = testEnv.newWorker(TASK_QUEUE);
worker.registerWorkflowImplementationTypes(ResearchWorkflowImpl.class);
worker.registerActivitiesImplementations(new ResearchPipelineActivityImpl(fake));
testEnv.start();
}
@Test
void run_sequencesResearchThenWrite_returnsArticleOutput() {
ResearchWorkflow workflow = testEnv.newWorkflowStub(ResearchWorkflow.class,
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).build());
String result = workflow.run("Artificial Intelligence");
assertThat(result).isEqualTo("Article: AI is reshaping every industry.");
}

Models and tools are configured on the worker side and never serialized into workflow history. A modelName in a TaskRequest selects a specific model at request time:

ModelProvider models = SimpleModelProvider.builder()
.model("gpt-4o-mini", cheapModel)
.model("gpt-4o", premiumModel)
.defaultModel(cheapModel)
.build();
// In the workflow:
TaskRequest.builder()
.description("Synthesize the final executive summary")
.modelName("gpt-4o") // resolved by the worker's ModelProvider at run time
.agent(AgentSpec.of("Executive Synthesizer", "Produce board-level summaries"))
.build();

The heartbeat consumer is a plain Consumer<Object>. The agentensemble-executor module has no Temporal SDK dependency. The same executors work with any external orchestrator:

  • AWS Step Functions — pass a heartbeat callback to a state machine activity poller
  • Kafka Streams — call execute() inside a Processor
  • Spring Batch — wrap in a Tasklet
  • Plain threads — pass null for no heartbeating

Task-per-activity gives you more operational visibility — each task is a separate entry in the Temporal UI, with its own retry history and timeout. Ensemble-per-activity is simpler to write but treats the entire pipeline as a black box from Temporal’s perspective.

The deeper tradeoff is about where you want the orchestration intelligence to live. If your Temporal workflows are already sophisticated — routing between task types, branching on outcomes, passing context between many steps — then task-per-activity is the natural fit. If AgentEnsemble’s phase grouping, DAG parallelism, or phase review gates are doing the interesting coordination work, then ensemble-per-activity keeps that logic inside the framework and Temporal handles only the outer lifecycle.


The executor module is documented in the integration guide. Source is on GitHub.

I’d be interested in whether the two-mode design maps cleanly to your Temporal workflows, or whether there are integration patterns that don’t fit either executor.