Parfois il nous arrive d'avoir besoin de lancer des taches en parallèles afin d'accélérer le traitement d'une action. Ces taches s'exécuteront de manière indépendante dans des threads différents. Ces threads seront gérés par votre application principale. Ce processus de parallélisation (multithreading) est largement faciliter par springboot. Nous allons donc voir dans ce tutoriel, la parallélisation des taches à l'aide de springboot.
1. Pourquoi paralléliser les taches avec springboot (multithreading)
Comme nous l'avons déjà vu dans l'un de nos tutoriels, springboot vous simplifiera la mis en place et le prototypage de votre application en vous permettant de gérer aisément les taches parallèles, les pools de threads etc...
Vous aurez la possibilité de déclarer via une méthode l'asynchornisme d'une tache, et soit attendre ou non la fin de toutes les taches parallèles. La méthode principale d'exécution de l'application sera l'orchestrateur des taches que vous déciderez d'exécuter en parallèle. Et tout ce ci sans effort, ou plutôt avec le moins d'efforts possible du fait de l'utilisation du framework spring.
L'un des éléments critiques auquel il faut faire attention lorsqu'on exécute des taches parallèles, est l'arrêt complet des taches ou des pools d'exécution en cas d'erreur. Et ne vous inquiétez pas sur ces aspects, spring les gère très bien à votre place.
2. Les dépendances
Le projet ci dessous crée, l'a été via SpringInitializr, sans dépendances particulière autre que springboot, car créer des taches parallèles est présent nativement dans le framework spring.
Dépendances Gradle
plugins {
id 'java'
id 'org.springframework.boot' version '3.3.2'
id 'io.spring.dependency-management' version '1.1.6'
}
group = 'com.javatutorialshub.asynctasks'
version = '0.0.1-SNAPSHOT'
java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}
repositories {
mavenCentral()
}
dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}
tasks.named('test') {
useJUnitPlatform()
}
Dépendances Maven
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.javatutorialshub</groupId>
<artifactId>asynctasks</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>asynctasks</name>
<description>asynctasks</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
3. Comment créer des taches parallèles (async) dans une application springboot
La première des choses consiste à activer l'asynchronisme sur l'application à l'aide de l'annotation @EnableAsync comme suit:
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class AsyncTasksApplication {
public static void main(String[] args) {
SpringApplication.run(AsyncTasksApplication.class, args);
}
}
La seconde étape consiste à créer un composant (@Component) ou un service (@Service) avec une ou plusieurs méthodes que l'on souhaite exécuter en parallèle. Ces méthodes doivent nécessairement être annotées avec l'annotation @Async:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
@Component
public class LongTaskComputer {
private static final Logger logger = LoggerFactory.getLogger(LongTaskComputer.class);
@Async
public void compute(){
logger.info("Starting long task");
try {
Thread.sleep(2000);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
logger.info("Ending long task");
}
}
Attention: Lorsque vous invoquez une méthode @Async dans la meme classe où elle est définie, elle pert automatiquement son caractère asynchrone. En d'autres termes l'annotation @Async est sans effet et donc la méthode n'est pas exécutée de manière parallèle.
A titre d'exemple, nous pouvons donc utiliser cette méthode asynchrone dans la méthode main comme suit:
import com.javatutorialshub.asynctasks.tasks.LongTaskComputer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
public class AsyncTasksApplication {
private static final Logger logger = LoggerFactory.getLogger(AsyncTasksApplication.class);
public static void main(String[] args) {
ConfigurableApplicationContext appContext = SpringApplication.run(AsyncTasksApplication.class, args);
logger.info("-----> Start the main");
LongTaskComputer longTaskComputer = appContext.getBean(LongTaskComputer.class);
longTaskComputer.compute();
logger.info("-----> End the main");
}
}
A l'exécution de cette classe main, nous constatons dans les logs que la méthode principale se termine bien avant la méthode compute du LongTaskComputer et de plus les id de thread sont bien différents: main d'un coté et task-1 de l'autre.
23:35:33.696 [main] INFO c.j.asynctasks.AsyncTasksApplication -- -----> Start the main
23:35:33.698 [main] INFO c.j.asynctasks.AsyncTasksApplication -- -----> End the main
23:35:33.698 [task-1] INFO c.j.a.tasks.LongTaskComputer -- Starting long task
23:35:35.704 [task-1] INFO c.j.a.tasks.LongTaskComputer -- Ending long task
4. Comment attendre la fin de l'exécution de plusieurs taches parallèles
Imaginons que nous ayons une application qui doit lire un fichier csv, puis un fichier json. Ensuite, merger les deux fichiers lus pour enfin écrire le fichier résultant du merge au format json.
Lorsque vous souhaitez attendre la fin de l'exécution d'une tache parallèle, la première chose à faire est de retourner un Future en réponse de la méthode async comme suit:
import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;
import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
@Component
public class CardDataLoader {
private static final Logger logger = LoggerFactory.getLogger(CardDataLoader.class);
@Async
public CompletableFuture<Collection<Card>> loadAll(String cardFilePath) throws CardDataLoaderException {
try {
Card[] cards = new Gson().fromJson(new FileReader(cardFilePath), Card[].class);
return CompletableFuture.completedFuture(Arrays.stream(cards).toList());
} catch (IOException e) {
logger.warn("unable to load card data when trying to load json file: {}", cardFilePath);
throw new CardDataLoaderException(e);
}
}
}
Si la méthode est sensée ne rien retourner mais que vous souhaitez attendre la fin de son exécution, alors elle doit se présenter comme ceci:
@Component
public class CardDataLoader {
private static final Logger logger = LoggerFactory.getLogger(CardDataLoader.class);
@Async
public CompletableFuture<Void> loadAll(String cardFilePath) throws CardDataLoaderException {
try {
....
return return CompletableFuture.completedFuture(null);;
} catch (IOException e) {
logger.warn("unable to load card data when trying to load json file: {}", cardFilePath);
throw new CardDataLoaderException(e);
}
}
}
Et dans une application spring, nous préfèrerons retourner des CompletableFuture, car beaucoup plus faciles à manipuler.
Ensuite en second, il vous faut invoker toutes les méthodes parallèles (@Async) et utiliser CompletableFuture.allOf(....) pour attendre la fin de toutes les taches parallèles.
En dernière étape, il faut collecter les résultats de chaque taches parallèles en invoquant la méthode get de chaque Future comme suit:
import com.google.gson.GsonBuilder;
import com.javatutorialshub.asynctasks.card.Card;
import com.javatutorialshub.asynctasks.card.CardDataLoader;
import com.javatutorialshub.asynctasks.card.CardDataLoaderException;
import com.javatutorialshub.asynctasks.user.User;
import com.javatutorialshub.asynctasks.user.UserDataLoader;
import com.javatutorialshub.asynctasks.user.UserDataLoaderException;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.io.FileWriter;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;
@Service
@RequiredArgsConstructor
public class UserDataMergerService {
private static final Logger logger = LoggerFactory.getLogger(UserDataMergerService.class);
private final UserDataLoader userDataLoader;
private final CardDataLoader cardDataLoader;
public void merge(String userDataFilePath, String cardDataFile, String outputResultFileName) throws UserDataMergerException {
try {
CompletableFuture<Collection<User>> userDataFuture = userDataLoader.loadAll(userDataFilePath);
CompletableFuture<Collection<Card>> cardFuture = cardDataLoader.loadAll(cardDataFile);
CompletableFuture.allOf(userDataFuture, cardFuture);
Collection<User> users = userDataFuture.get();
Collection<Card> cards = cardFuture.get();
Map<String, Card> cardMap = cards.stream().collect(Collectors.toMap(Card::userId,
Function.identity(), (k1, k2) -> k1));
Collection<MergedUser> mergedUsers = users.stream().map(u -> new MergedUser(u,
cardMap.get(u.id()))).toList();
new GsonBuilder().setPrettyPrinting().create().toJson(mergedUsers.toArray(new MergedUser[0]),
MergedUser[].class, new FileWriter(outputResultFileName));
} catch (UserDataLoaderException | CardDataLoaderException e) {
throw new UserDataMergerException(e);
} catch (Throwable e) {
logger.warn("unable to merge user data in a resulting file: {}",
outputResultFileName);
throw new UserDataMergerException(e);
} finally {
logger.info("--> End merging file data");
}
}
}
Mis de bout en bout, notre application s'articule dans sa méthode main comme ci dessous :
import com.javatutorialshub.asynctasks.merge.UserDataMergerException;
import com.javatutorialshub.asynctasks.merge.UserDataMergerService;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;
@SpringBootApplication
@EnableAsync
@RequiredArgsConstructor
public class AsyncTasksApplication implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AsyncTasksApplication.class);
private final UserDataMergerService userDataMergerService;
@Value("${user.data.file.path}")
private String userDataFilePath;
@Value("${card.data.file.path}")
private String cardDataFilePath;
@Value("${output.file.path}")
private String outputFilePath;
public static void main(String[] args) {
SpringApplication.run(AsyncTasksApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
try {
userDataMergerService.merge(userDataFilePath, cardDataFilePath, outputFilePath);
} catch (UserDataMergerException e) {
logger.error(e.getMessage(), e);
System.exit(-1);
}
}
}
5. Configuration du ThreadPool pour les taches parallèles
Lorsqu'on exécute plusieurs taches en parallèle il faut prendre garde à ne pas saturer la machine d'exécution. Meme si le parallélisme est sensé faire gagner en performance vos applications et traitements, si vous ne contrôlez pas le nombre de taches parallèles qui tournent au même moment sur votre machine, celle ci peut saturer. Les performances de votre application vont alors se dégrader. A l'inverse si votre machine est très puissante et qu'elle peut encore supporter des taches en parallèles, et que vous ne le faites pas, alors vous la sous utilisez.
Pour résoudre ce problème, les taches parallèles (thread) sont mis dans une queue d'exécution et dépiler au fur et à mesure que les capacités de la machine augmentent ou diminuent.
Pour gérer la queue d'exécution des taches parallèles, spring vous met à disposition un bean configurable destiné à cet effet.
@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("javatutorialshub-async-");
executor.initialize();
return executor;
}
Dans cette configuration nous indiquons que la taille maximale du pool est de 10 et que le nombre de tache parallèles pouvant s'exécuter en meme temps est de 5. Lorsqu'une tache parallèle se termine, elle est automatiquement remplacée par une autre en attente dans le pool.
Le code source de ce tutoriel est présent sur github à cette adresse: asynctasks on github
Ne pas hésiter à commenter ou à poser une question ou à demander de l'aide autours de java et des technologies connexes. Nous nous ferons un plaisir de vous répondre.
Commentaires
Enregistrer un commentaire