Skip to main content

Parallelization Workflow

To use this agent, add the Maven dependency to your project, replacing ${agentic-patterns.version} with the latest version.

Latest version: Maven Central Version

<dependency>
<groupId>com.javaaidev.agenticpatterns</groupId>
<artifactId>parallelization-workflow</artifactId>
<version>${agentic-patterns.version}</version>
</dependency>

See Javadoc here: javadoc

ParallelizationWorkflow

ParallelizationWorkflow executes a list of subtasks in parallel. Subtasks are just other TaskExecutionAgents. Each subtask must have a unique id. This id is used to get its execution result.

ResponseAssembler

ResponseAssembler assembles the final response from the execution results of subtasks.

DefaultResponseAssembler is an implementation of ResponseAssembler which uses an LLM to assemble the response. It's an TaskExecutionAgent.

Builder

The builder of ParallelizationWorkflow provides two approaches to add subtasks.

The first approach is using the addSubtask method. When adding a subtask, you should provide parameters shown below.

ParameterTypeDescription
taskIdStringTask id
subtaskTaskExecutionAgentTask agent
requestTransformerFunctionTransform the request into task input

The second approach is to provide a Function<Request, List<SubtaskCreationRequest<Request>>> which creates a list of subtasks from the request. SubtaskCreationRequest encapsulates taskId, task, and requestTransformer used in the addSubtask method.

Subtasks added by these two approaches are merged.

Example

The example is an agent to write articles about algorithms. Each article has code examples written in different programming languages. A parallelization workflow agent runs parallel subtasks to generate code examples, then generates an article using these code examples.

The ParallelizationWorkflow is created using a builder.

In the subtasksCreator method of the builder, each language is mapped to a SubtaskCreationRequest.

  • Task id is the language.
  • Task is a new TaskExecutionAgent created using a builder.
  • Request transformer converts the AlgorithmArticleGenerationRequest to a SampleCodeGenerationRequest used by the task.

In the responseAssembler method, DefaultResponseAssembler is used to generate the final result. In the prompt template, there are two variables, algorithm and code_sample.

  • Value of algorithm comes from the request.
  • Value of code_sample comes from subtask execution results.
ParallelizationWorkflow created using a builder
package com.javaaidev.agenticpatterns.examples.parallelizationworkflow;

import com.javaaidev.agenticpatterns.core.AgentUtils;
import com.javaaidev.agenticpatterns.parallelizationworkflow.DefaultResponseAssembler;
import com.javaaidev.agenticpatterns.parallelizationworkflow.ParallelizationWorkflow;
import com.javaaidev.agenticpatterns.parallelizationworkflow.SubtaskCreationRequest;
import com.javaaidev.agenticpatterns.taskexecution.TaskExecutionAgent;
import io.micrometer.observation.ObservationRegistry;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.springframework.ai.chat.client.ChatClient;
import org.springframework.ai.chat.client.advisor.SimpleLoggerAdvisor;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.util.CollectionUtils;

@Configuration
public class AlgorithmArticleGenerationConfiguration {

@Bean
@Qualifier("algorithmArticleGenerationWorkflow")
public ParallelizationWorkflow<AlgorithmArticleGenerationRequest, AlgorithmArticleGenerationResponse> algorithmArticleGenerationWorkflow(
ChatClient.Builder chatClientBuilder,
SimpleLoggerAdvisor simpleLoggerAdvisor,
ObservationRegistry observationRegistry
) {
var chatClient = chatClientBuilder.defaultAdvisors(simpleLoggerAdvisor).build();
return ParallelizationWorkflow.<AlgorithmArticleGenerationRequest, AlgorithmArticleGenerationResponse>builder()
.subtasksCreator(request -> {
var languages = AgentUtils.safeGet(request,
AlgorithmArticleGenerationRequest::languages, List.<String>of());
if (CollectionUtils.isEmpty(languages)) {
return List.of();
}
return languages.stream().map(
language -> new SubtaskCreationRequest<>(language,
TaskExecutionAgent.<SampleCodeGenerationRequest, AlgorithmArticleGenerationResponse>defaultBuilder()
.promptTemplate("""
Write {language} code to meet the requirement.
{description}
"""
)
.responseType(SampleCodeGenerationResponse.class)
.name("CodeGeneration_" + language)
.chatClient(chatClient)
.observationRegistry(observationRegistry)
.build(),
(AlgorithmArticleGenerationRequest req) -> new SampleCodeGenerationRequest(
language,
"Implement algorithm " + req.algorithm()))).toList();
})
.responseAssembler(
DefaultResponseAssembler.<AlgorithmArticleGenerationRequest, AlgorithmArticleGenerationResponse>builder()
.promptTemplate(
"""
Goal: Write an article about {algorithm}.

Requirements:
- Start with a brief introduction.
- Include only sample code listed below.
- Output the article in markdown.

{sample_code}
""")
.responseType(AlgorithmArticleGenerationResponse.class)
.promptTemplateContextProvider(input -> {
var request = input.request();
var results = input.results();
var algorithm = AgentUtils.safeGet(request,
AlgorithmArticleGenerationRequest::algorithm, "");
var sampleCode = results.allSuccessfulResults().entrySet().stream()
.map(entry -> """
Language: %s
Code:
%s
""".formatted(entry.getKey(),
((SampleCodeGenerationResponse) entry.getValue()).code()))
.collect(Collectors.joining("==========\n", "\n----------\n", "=========="));
return Map.of("algorithm", algorithm, "sample_code", sampleCode);
})
.name("AlgorithmArticleGeneration")
.chatClient(chatClient)
.observationRegistry(observationRegistry)
.build())
.name("algorithmArticleGenerationWorkflow")
.observationRegistry(observationRegistry)
.build();
}
}

We can expose a REST API to test this agent.

REST controller
package com.javaaidev.agenticpatterns.examples.parallelizationworkflow;

import com.javaaidev.agenticpatterns.parallelizationworkflow.ParallelizationWorkflow;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/algorithm_article")
public class AlgorithmArticleGenerationController {

private final ParallelizationWorkflow<AlgorithmArticleGenerationRequest, AlgorithmArticleGenerationResponse> workflow;

public AlgorithmArticleGenerationController(
@Qualifier("algorithmArticleGenerationWorkflow") ParallelizationWorkflow<AlgorithmArticleGenerationRequest, AlgorithmArticleGenerationResponse> workflow) {
this.workflow = workflow;
}


@PostMapping
public AlgorithmArticleGenerationResponse generateAlgorithmArticle(
@RequestBody AlgorithmArticleGenerationRequest request) {
return workflow.execute(request);
}
}