Accéder au contenu principal

Découvrez comment SpringBoot facilite la parallélisation des tâches

Parfois il nous arrive d'avoir besoin de lancer des taches en parallèles afin d'accélérer le traitement d'une action. Ces taches s'exécuteront de manière indépendante dans des threads différents. Ces threads seront gérés par votre application principale. Ce processus de parallélisation (multithreading) est largement faciliter par springboot. Nous allons donc voir dans ce tutoriel, la parallélisation des taches à l'aide de springboot.


1.  Pourquoi paralléliser les taches avec springboot (multithreading)

Comme nous l'avons déjà vu dans l'un de nos tutoriels, springboot vous simplifiera la mis en place et le prototypage de votre application en vous permettant de gérer aisément les taches parallèles, les pools de threads etc...

Vous aurez la possibilité de déclarer via une méthode l'asynchornisme d'une tache, et soit attendre ou non la fin de toutes les taches parallèles. La méthode principale d'exécution de l'application sera l'orchestrateur des taches que vous déciderez d'exécuter en parallèle. Et tout ce ci sans effort, ou plutôt avec le moins d'efforts possible du fait de l'utilisation du framework spring.

L'un des éléments critiques auquel il faut faire attention lorsqu'on exécute des taches parallèles, est l'arrêt complet des taches ou des pools d'exécution en cas d'erreur. Et ne vous inquiétez pas sur ces aspects, spring les gère très bien à votre place.


2. Les dépendances

Le projet ci dessous crée, l'a été via SpringInitializr, sans dépendances particulière autre que springboot, car créer des taches parallèles est présent nativement dans le framework spring.

Dépendances Gradle

plugins {
id 'java'
id 'org.springframework.boot' version '3.3.2'
id 'io.spring.dependency-management' version '1.1.6'
}

group = 'com.javatutorialshub.asynctasks'
version = '0.0.1-SNAPSHOT'

java {
toolchain {
languageVersion = JavaLanguageVersion.of(17)
}
}

repositories {
mavenCentral()
}

dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testRuntimeOnly 'org.junit.platform:junit-platform-launcher'
}

tasks.named('test') {
useJUnitPlatform()
}

Dépendances Maven

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>3.3.2</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.javatutorialshub</groupId>
<artifactId>asynctasks</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>asynctasks</name>
<description>asynctasks</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>


3. Comment créer des taches parallèles (async) dans une application springboot

La première des choses consiste à activer l'asynchronisme sur l'application à l'aide de l'annotation @EnableAsync comme suit:

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class AsyncTasksApplication {

public static void main(String[] args) {
SpringApplication.run(AsyncTasksApplication.class, args);
}

}

La seconde étape consiste à créer un composant  (@Component) ou un service (@Service) avec une ou plusieurs méthodes que l'on souhaite exécuter en parallèle. Ces méthodes doivent nécessairement être annotées avec l'annotation @Async:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

@Component
public class LongTaskComputer {

private static final Logger logger = LoggerFactory.getLogger(LongTaskComputer.class);

@Async
public void compute(){
logger.info("Starting long task");
try {
Thread.sleep(2000);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
logger.info("Ending long task");
}
}

Attention: Lorsque vous invoquez une méthode @Async dans la meme classe où elle est définie, elle pert automatiquement son caractère asynchrone. En d'autres termes l'annotation @Async est sans effet et donc la méthode n'est pas exécutée de manière parallèle.

A titre d'exemple, nous pouvons donc utiliser cette méthode asynchrone dans la méthode main comme suit:

import com.javatutorialshub.asynctasks.tasks.LongTaskComputer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
public class AsyncTasksApplication {
private static final Logger logger = LoggerFactory.getLogger(AsyncTasksApplication.class);

public static void main(String[] args) {
ConfigurableApplicationContext appContext = SpringApplication.run(AsyncTasksApplication.class, args);
logger.info("-----> Start the main");
LongTaskComputer longTaskComputer = appContext.getBean(LongTaskComputer.class);
longTaskComputer.compute();
logger.info("-----> End the main");
}

}

A l'exécution de cette classe main, nous constatons dans les logs que la méthode principale se termine bien avant la méthode compute du  LongTaskComputer et de plus les id de thread sont bien différents: main d'un coté et task-1 de l'autre.

23:35:33.696 [main] INFO c.j.asynctasks.AsyncTasksApplication -- -----> Start the main
23:35:33.698 [main] INFO c.j.asynctasks.AsyncTasksApplication -- -----> End the main
23:35:33.698 [task-1] INFO c.j.a.tasks.LongTaskComputer -- Starting long task
23:35:35.704 [task-1] INFO c.j.a.tasks.LongTaskComputer -- Ending long task


4. Comment attendre la fin de l'exécution de plusieurs taches parallèles

Imaginons que nous ayons une application qui doit lire un fichier csv, puis un fichier json. Ensuite, merger les deux fichiers lus pour enfin écrire le fichier résultant du merge au format json.

Lorsque vous souhaitez attendre la fin de l'exécution d'une tache parallèle, la première chose à faire est de retourner un Future en réponse de la méthode async comme suit:

import com.google.gson.Gson;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Component;

import java.io.FileReader;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;

@Component
public class CardDataLoader {
private static final Logger logger = LoggerFactory.getLogger(CardDataLoader.class);

@Async
public CompletableFuture<Collection<Card>> loadAll(String cardFilePath) throws CardDataLoaderException {
try {
Card[] cards = new Gson().fromJson(new FileReader(cardFilePath), Card[].class);
return CompletableFuture.completedFuture(Arrays.stream(cards).toList());
} catch (IOException e) {
logger.warn("unable to load card data when trying to load json file: {}", cardFilePath);
throw new CardDataLoaderException(e);
}
}
}

Si la méthode est sensée ne rien retourner mais que vous souhaitez attendre la fin de son exécution, alors elle doit se présenter comme ceci:

@Component
public class CardDataLoader {
private static final Logger logger = LoggerFactory.getLogger(CardDataLoader.class);

@Async
public CompletableFuture<Void> loadAll(String cardFilePath) throws CardDataLoaderException {
try {
....
return return CompletableFuture.completedFuture(null);;
} catch (IOException e) {
logger.warn("unable to load card data when trying to load json file: {}", cardFilePath);
throw new CardDataLoaderException(e);
}
}
}

Et dans une application spring, nous préfèrerons retourner des CompletableFuture, car beaucoup plus faciles à manipuler.

Ensuite en second, il vous faut invoker toutes les méthodes parallèles (@Async) et utiliser CompletableFuture.allOf(....) pour attendre la fin de toutes les taches parallèles.

En dernière étape, il faut collecter les résultats de chaque taches parallèles en invoquant la méthode get de chaque Future comme suit:

import com.google.gson.GsonBuilder;
import com.javatutorialshub.asynctasks.card.Card;
import com.javatutorialshub.asynctasks.card.CardDataLoader;
import com.javatutorialshub.asynctasks.card.CardDataLoaderException;
import com.javatutorialshub.asynctasks.user.User;
import com.javatutorialshub.asynctasks.user.UserDataLoader;
import com.javatutorialshub.asynctasks.user.UserDataLoaderException;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.io.FileWriter;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import java.util.stream.Collectors;

@Service
@RequiredArgsConstructor
public class UserDataMergerService {
private static final Logger logger = LoggerFactory.getLogger(UserDataMergerService.class);

private final UserDataLoader userDataLoader;
private final CardDataLoader cardDataLoader;

public void merge(String userDataFilePath, String cardDataFile, String outputResultFileName) throws UserDataMergerException {
try {
CompletableFuture<Collection<User>> userDataFuture = userDataLoader.loadAll(userDataFilePath);
CompletableFuture<Collection<Card>> cardFuture = cardDataLoader.loadAll(cardDataFile);

CompletableFuture.allOf(userDataFuture, cardFuture);

Collection<User> users = userDataFuture.get();
Collection<Card> cards = cardFuture.get();

Map<String, Card> cardMap = cards.stream().collect(Collectors.toMap(Card::userId,
                Function.identity(), (k1, k2) -> k1));
Collection<MergedUser> mergedUsers = users.stream().map(u -> new MergedUser(u,
                cardMap.get(u.id()))).toList();
new GsonBuilder().setPrettyPrinting().create().toJson(mergedUsers.toArray(new MergedUser[0]),
MergedUser[].class, new FileWriter(outputResultFileName));

} catch (UserDataLoaderException | CardDataLoaderException e) {
throw new UserDataMergerException(e);
} catch (Throwable e) {
logger.warn("unable to merge user data in a resulting file: {}",
                outputResultFileName);
throw new UserDataMergerException(e);
} finally {
logger.info("--> End merging file data");
}
}
}


Mis de bout en bout, notre application s'articule dans sa méthode main comme ci dessous :

import com.javatutorialshub.asynctasks.merge.UserDataMergerException;
import com.javatutorialshub.asynctasks.merge.UserDataMergerService;
import lombok.RequiredArgsConstructor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;

@SpringBootApplication
@EnableAsync
@RequiredArgsConstructor
public class AsyncTasksApplication implements CommandLineRunner {
private static final Logger logger = LoggerFactory.getLogger(AsyncTasksApplication.class);

private final UserDataMergerService userDataMergerService;
@Value("${user.data.file.path}")
private String userDataFilePath;
@Value("${card.data.file.path}")
private String cardDataFilePath;
@Value("${output.file.path}")
private String outputFilePath;

public static void main(String[] args) {
SpringApplication.run(AsyncTasksApplication.class, args);
}

@Override
public void run(String... args) throws Exception {
try {
userDataMergerService.merge(userDataFilePath, cardDataFilePath, outputFilePath);
} catch (UserDataMergerException e) {
logger.error(e.getMessage(), e);
System.exit(-1);
}
}
}

5. Configuration du ThreadPool pour les taches parallèles

Lorsqu'on exécute plusieurs taches en parallèle il faut prendre garde à ne pas saturer la machine d'exécution. Meme si le parallélisme est sensé faire gagner en performance vos applications et traitements, si vous ne contrôlez pas le nombre de taches parallèles qui tournent au même moment sur votre machine, celle ci peut saturer. Les performances de votre application vont alors se dégrader. A l'inverse si votre machine est très puissante et qu'elle peut encore supporter des taches en parallèles, et que vous ne le faites pas,  alors vous la sous utilisez.

Pour résoudre ce problème, les taches parallèles (thread) sont mis dans une queue d'exécution et dépiler au fur et à mesure que les capacités de la machine augmentent ou diminuent.

Pour gérer la queue d'exécution des taches parallèles, spring vous met à disposition un bean configurable destiné à cet effet.

@Bean
public Executor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(500);
executor.setThreadNamePrefix("javatutorialshub-async-");
executor.initialize();
return executor;
}

Dans cette configuration nous indiquons que la taille maximale du pool est de 10 et que le nombre de tache parallèles pouvant s'exécuter en meme temps est de 5. Lorsqu'une tache parallèle se termine, elle est automatiquement remplacée par une autre en attente dans le pool.

Le code source de ce tutoriel est présent sur github à cette adresse: asynctasks on github

Ne pas hésiter à commenter ou à poser une question ou à demander de l'aide autours de java et des technologies connexes. Nous nous ferons un plaisir de vous répondre.

Commentaires

Posts les plus consultés de ce blog

Comment valider les données utilisateur dans votre application Java : Guide pratique avec Jakarta Bean Validation

Lorsque vous développez une application qui interagit avec un utilisateur, en lui demandant de saisir des données, il est primordial de vérifier et valider ces données avant tout traitement. Et ceci pour plusieurs raisons: vous aurez la possibilité d'éviter des traitements inutiles si les données reçues ne sont pas conformes aux données attendues et aussi cela permet de protéger votre application en cas d'attaque orchestrée par des hackers. Dans le langage java, il existe une spécification qui vous permet de valider tout type de données transmis dans un bean: Java Bean Validation (ou plus récemment: Jakarta Bean Validation). Dans ce tutoriel, au travers d'une petite application de gestion d'une bibliothèque de quartier, nous allons voir ensemble comment utiliser Jakarta Bean Validation. 1. Pourquoi Jakarta Bean Validation L'objectif de Jakarta Bean Validation est de standardiser la validation des champs au travers de specification (JSR 380). Aussi cette standardisat...

Tout ce que vous devez savoir sur l'utilisation des collections en Java

Une collection est une structure de données qui permet à tout développeur java de stocker des données en mémoire et facilement accessible. Les collections peuvent dans certains cas être vues comme des tableaux de données évolués car ces dernières proposent d'innombrables méthodes d'accès afin de faciliter l'accès et la manipulation des données. 1. Qu'est qu'une collection Dans le langage Java, une collection est un objet qui implémente l'interface de base java.util.Collection. Une collection n'est pas une Map. Une collection est un objet dont la structure sou-jacente est un tableau à accès indexé. Il existe plusieurs catégories de collection. 2. Les List Il s'agit du type de collection le plus utilisé. Aucune contrainte sur les objets, aucune contrainte sur la taille de la liste. De plus les objets contenues dans une telle collection sont accessibles directement par un index. L'interface java.util.List est implémentée par plusieurs classes telles que...