L’intérêt de Parallel LINQ (PLINQ) est de proposer plusieurs méthodes pour regrouper (“merger”) le résultat d’un traitement parallèle.
Simplement en lançant un traitement parallèle en utilisant AsParallel()
, on bénéficiera de méthodes pour, par exemple:
- Ranger le résultat de chaque traitement dans une structure de données,
- Agréger les résultats sous forme d’un entier ou d’un double pour obtenir une moyenne,
- Agréger tous les résultats en utilisant une opération commutative et associative.
Certaines opérations sont similaires à celles de Linq:
Select
pour effectuer une projection,Where
pour filtrer,OrderBy
pour ordonner.
1. Comparaison des performances
Performances différentes entre IEnumerable<T> et IList<T>
2. Exemple simple
Notation “Query”
Notation “fluent”
3. D’autres alternatives possibles
Rangement dans une structure classique indexée
Rangement dans un structure de données autorisant les appels concurrents
4. Maintenir l’ordre des éléments
5. Limiter le degré de parallélisme
6. Agréger le résultat d’une exécution parallèle en utilisant PLINQ
Agréger le résultat dans une structure de données
Agréger le résultat dans une valeur unique
Agréger les résultats en utilisant ForAll()
7. Sortie du traitement parallèle
CancellationToken
Sortie par exception
1. Comparaison des performances
L’exécution d’un traitement en parallèle n’est pas forcément plus performant qu’un traitement séquentiel (cf. Parallel.For et Parallel.ForEach en 5 min). Un traitement parallèle occasionne:
- Des changements de contexte entre threads,
- Une synchronisation des threads pour accéder à des objets partagés.
Il faut comparer des exécutions avec une implémentation séquentielle et son équivalent en parallèle de façon à être sûr du gain apporté par un processeur multi-coeur.
Certaines instructions Linq autorisent la parallélisation mais sont généralement couteuses en performance:
Join, GroupBy, GroupJoin, Distinct, Union, Intersect et Except
Pour ces instructions, l’exécution séquentielle s’avère plus rapide que l’exécution en parallèle. Il convient donc de comparer le 2 types d’implémentation pour privilégier celle qui est la plus efficace.
Dans certains cas, suivant les opérateurs qui sont utilisés, une instruction PLINQ ne sera pas forcément exécutée en parallèle. Elle peut être exécutée séquentiellement. On peut forcer l’exécution parallèle en ajoutant:
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
Performances différentes entre IEnumerable<T> et IList<T>
Pour utiliser PLINQ, il suffit que la structure satisfait au moins IEnumerable<T>
. Toutefois, comme pour Parallel.For()
et Parallel.ForEach()
, les performances de PLINQ sont différentes suivant si la structure d’entrée satisfait IList<T>
ou seulement IEnumerable<T>
.
En effet:
- Si la structure satisfait
IList<T>
: généralement ces structures permettent de prévoir plus facilement le partitionement des threads utilisés pour le traitement car, la liste est indexée et le nombre d’éléments de cette structure est connu. Le partitionnement des threads pour cette structure se fait par intervalle d’index (“range partitioning”) c’est-à-dire qu’un intervalle d’élements est attribué à chaque thread. - Pour les structures ne satisfaisant que
IEnumerable<T>
: il est plus difficile de prévoir le partitionnement des threads et de fait, le traitement est généralement moins performant. Le partitionnement des threads pour cette structure se fait par bloc (“chunk partitioning”) c’est-à-dire que plusieurs éléments sont traités pour chaque threads.
Par défaut, si la structure satisfait IList<T>
, un partitionnement par intervalle d’index sera appliqué. Même si dans la plupart des cas, ce type de partitionnement est plus performant, il peut s’avérer plus lent dans certain cas. On peut donc imposer un partitionnement par bloc en utilisant un Partitioner
.
2. Exemple simple
L’instruction de base pour utiliser PLINQ est AsParallel()
. Comme pour linq, 2 notations sont possibles.
Notation “Query”
On peut utiliser les mots clés select, from, where,
etc… (notation similaire à Linq):
using System;
using System.Linq;
//...
var source = Enumerable.Range(100, 20000);
var parallelQuery = from num in source.AsParallel()
where num % 10 == 0
select num;
int[] result = parallelQuery.ToArray();
Notation “fluent”
On peut obtenir l’équivalent avec la notation “fluent”:
int[] result = Enumerable.Range(100, 20000)
.AsParallel()
.Where(n => n % 10 == 0)
.Select(n => n)
.ToArray();
3. D’autres alternatives possibles
Comme pour la parallélisation, regrouper le résultat de traitement parallèle induit un coût en performance. Il est possible d’éviter d’utiliser PLINQ.
Rangement dans une structure classique indexée
Si on effectue une exécution parallèle en utilisant Parallel.For()
, on connaît précisemment le nombre d’itérations. On peut donc prévoir de ranger le résultat dans une structure dont la taille est fixe:
var source = Enumerable.Range(1, 10000);
var result = new int?[10000];
Parallel.For(0, 10000, i =>
{
if (source[i] % 2 == 0)
{
result[i] = i;
}
else
{
result[i] = null;
}
});
result = result.Where(n => n.HasValue()).ToArray();
Rangement dans un structure de données autorisant les appels concurrents
Pour ranger le résultat d’une boucle parallèle, on peut utiliser une structure du namespace System.Collections.Concurrent
comme ConcurrentBag
, ConcurrentStack
, ConcurrentQueue
ou ConcurrentDictionary
. Ces structures permettent de minimiser les contentions entre threads. Certaines d’entre elles dans certaines conditions n’utilisent pas de “lock”.
Par exemple, si on utilise ConcurrentBag
pour ranger le résultat:
var source = Enumerable.Range(1, 10000);
ConcurrentBag result = new ConcurrentBag<int>();
Parallel.For(0, 10000, i =>
{
if (source[i] % 2 == 0)
{
result.Add(i);
}
});
4. Maintenir l’ordre des éléments
Lorsqu’un traitement est parallélisé, pour améliorer le partitionnement des threads qui effectuent le traitement (cf. partionnement dans Parallel.For et Parallel.ForEach en 5 min), on ne garantit pas de maintenir le même ordre que la structure de données en entrée.
Pour préserver l’ordre des éléments, on peut utiliser AsOrdered()
.
Par exemple:
int[] result = Enumerable.Range(100, 20000)
.AsParallel()
.AsOrdered()
.Where(n => n % 10 == 0)
.Select(n => n)
.ToArray();
Il est possible de supprimer la contrainte de l’ordre à un endroit du pipeline après l’avoir imposé. Par exemple, on peut vouloir garantir l’ordre pour certains traitements et supprimer cette contrainte pour d’autres traitements.
Pour supprimer la contrainte d’ordre après avoir utilisé AsOrdered()
, on peut utiliser AsUnordered()
, par exemple:
var result = Enumerable.Range(100, 20000)
.AsParallel()
.AsOrdered() // Ordre garanti
.Traitement1()
.AsUnordered() // Ordre non garanti
.Traitement2();
5. Limiter le degré de parallélisme
Pour éviter de paralléliser le traitement sur tous les coeurs du processeur, on peut imposer le degré de parallélisme. Par défaut, ce nombre est égal au nombre de coeurs du processeur. Ce nombre ne peut dépasser le nombre de coeurs du processeur.
Pour imposer un degré de parallélisme, on peut utiliser l’instruction WithDegreeOfParallelism()
.
Par exemple:
int[] result = Enumerable.Range(100, 20000)
.AsParallel()
.WithDegreeOfParallelism(2)
.Where(n => n % 10 == 0)
.Select(n => n)
.ToArray();
6. Agréger le résultat d’une exécution parallèle en utilisant PLINQ
Lorsqu’on utilise AsParallel()
pour paralléliser le traitement, le type de retour est ParallelQuery<T>
. Les fonctions disponibles pour agréger le résultat sont celles de cette classe. Par défaut, les éléments en sortie de AsParallel()
peuvent être dans un ordre différent des éléments en entrée.
Plus de détails sur MSDN.
Agréger le résultat dans une structure de données
ParallelQuery<T>
permet de regrouper les résultats directement dans une structure de données en utilisant:
ToArray()
pour regrouper dans un tableau,ToList()
pour une liste,ToDictionary()
pour un dictionaire ouToLookUp()
pour regrouper dans une structureLookUp<TKey, TElement>
. Cette structure permet de stocker une collection de clés. Chaque valeur de clé permet d’atteindre une ou plusieurs valeurs.
Par exemple, pour regrouper le résultat directement dans une liste:
IEnumerable<T> input = ... :
var result = input.AsParallel()
.Select(item => Compute(item))
.ToList();
Agréger le résultat dans une valeur unique
ParallelQuery<T>
permet d’utiliser quelques méthodes pour regrouper les résultats dans une valeur unique avec:
Average()
pour calculer la moyenne d’entiers, de décimaux ou de doubles obtenus à partir des résultats du traitement parallèle.Min()
ouMax()
pour obtenir respectivement le minimum ou le maximum des nombres entiers, décimaux ou doubles obtenus à partir des résultats du traitement parallèle.Aggregate()
pour effectuer un traitement d’agrégation plus personnalisé.
Par exemple, en utilisant Average()
:
IEnumerable<int> input = Enumerable.Range(3, 100000-3);
double average = input.AsParallel()
.Where(n => n % 2 == 0)
.Average(n => Math.Sqrt(Convert.ToDouble(n));
Agréger les résultats en utilisant ForAll()
ForAll()
permet d’appliquer une Action<T>
à tous les résultats du traitement parallélisé.
Par exemple:
List<InputData> input = ...;
input.AsParallel()
.Select(i => new OutputData(i))
.ForAll(o => ProcessOutput(o));
7. Sortie du traitement parallèle
La sortie d’une instruction PLINQ peut se faire en utilisant une CancellationToken
dans le cas où elle est prévue. L’exécution peut aussi être interrompue si une exception survient.
CancellationToken
Pour interrompre une instruction PLINQ volontairement, on peut utiliser une CancellationToken
en entourant l’instruction d’un try...catch
:
var cancellationSource = new CancellationTokenSource();
try
{
int[] result = Enumerable.Range(100, 20000)
.AsParallel()
.WithCancellation (cancelSource.Token)
.Where(...)
}
catch (OperationCanceledException)
{
Console.WriteLine ("Query canceled");
}
On peut annuler l’exécution de l’instruction PLINQ à partir d’un autre thread si on exécute:
cancellationSource.Cancel()
Sortie par exception
Une exception interrompt l’exécution, toutefois dans certains cas elle ne sera pas propagée tel quel à l’extérieur de l’instruction PLINQ:
- Si l’exécution de l’instruction PLINQ est séquentielle: l’exception sera propagée avec le même type. Par attraper l’exception, on peut utiliser une clause
try...catch
avec le type de l’exception. - Si l’exécution est parallélisée: l’exception sera encapsulée dans une exception de type
AggregateException
. Pour attraper l’exception à l’extérieur de l’instruction PLINQ, il faut utiliser une clausetry...catch
avec le typeAggregateException
.
Plus de détails sur MSDN.
8. ParallelMergeOptions
Par défaut, le regroupement des itérations du traitement parallèle se fait en utilisant un buffer. L’objectif de ce buffer est d’améliorer les performances du traitement parallèle en regroupant les itérations avec un équilibre entre:
- Une faible latence entre le début du traitement et la génération du résultat et
- De bonne performance globale en terme de consommation des ressources.
On peut, toutefois, changer l’option par défaut de regroupement en utilisant WithMergeOptions
et l’énumérable ParallelMergeOptions
:
AutoBuffered
: valeur par défaut permettant un équilibre entre faible latence et bonne performance.NotBuffered
: pas de buffer. Le résultat est généré dès qu’il est calculé.FullyBuffered
: les résultats sont rangés systématiquement dans un buffer pour qu’ils soient délivrés d’un coup en fin de traitement.
Par exemple, pour utiliser WithMergeOptions:
int[] result = Enumerable.Range(100, 20000)
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Where(...)
OrderBy
et Reverse
imposent FullyBuffered
Lorsqu’on utilise les fonctions OrderBy
et Reverse
, l’option ParallelMergeOptions
est FullyBuffered
.
Plus de détails sur MSDN.
9. Partitioner
Comme indiqué plus haut, la classe System.Concurrent.Collections.Partitioner
permet de forcer le partitionnement par bloc d’une structure satisfaisant IList<T>
.
Par exemple:
IList<int> input = ... ;
var parallelQuery = Partitioner.Create (input, true)
.AsParallel()
.Where (...)
La surcharge Partitioner.Create(input, true)
permet de renseigner avec le booléen la façon dont la partitionnement des threads est effectué suivant la charge:
true
: le partitionnement est dynamique donc il convient bien pour un partitionnement par bloc.false
: le partitionnement est statique donc pour appliquer un partitionnement par intervalle d’index.
Plus de détails sur la classe Partitioner
sur MSDN.
- The Parallel class dans Threading in C# de Joe Albahari: http://www.albahari.com/threading/part5.aspx#_PLINQ
- Parallel LINQ (PLINQ): https://msdn.microsoft.com/fr-fr/library/dd460688%28v=vs.110%29.aspx
- Patterns for Parallel Programming: Understanding and Applying Parallel Patterns with the .NET Framework 4: https://www.microsoft.com/en-us/download/details.aspx?id=19222
- Problems with AsParallel: http://indexoutofrange.com/Problems_with_AsParallel/