PLINQ en 5 min

L’intérêt de Parallel LINQ (PLINQ) est de proposer plusieurs méthodes pour regrouper (“merger”) le résultat d’un traitement parallèle.

Simplement en lançant un traitement parallèle en utilisant AsParallel(), on bénéficiera de méthodes pour, par exemple:

  • Ranger le résultat de chaque traitement dans une structure de données,
  • Agréger les résultats sous forme d’un entier ou d’un double pour obtenir une moyenne,
  • Agréger tous les résultats en utilisant une opération commutative et associative.

Certaines opérations sont similaires à celles de Linq:

  • Select pour effectuer une projection,
  • Where pour filtrer,
  • OrderBy pour ordonner.

1. Comparaison des performances

L’exécution d’un traitement en parallèle n’est pas forcément plus performant qu’un traitement séquentiel (cf. Parallel.For et Parallel.ForEach en 5 min). Un traitement parallèle occasionne:

  • Des changements de contexte entre threads,
  • Une synchronisation des threads pour accéder à des objets partagés.

Il faut comparer des exécutions avec une implémentation séquentielle et son équivalent en parallèle de façon à être sûr du gain apporté par un processeur multi-coeur.

Certaines opérateurs dégradent les performances de PLINQ

Certaines instructions Linq autorisent la parallélisation mais sont généralement couteuses en performance:

Join, GroupBy, GroupJoin, Distinct, Union, Intersect et Except

Pour ces instructions, l’exécution séquentielle s’avère plus rapide que l’exécution en parallèle. Il convient donc de comparer le 2 types d’implémentation pour privilégier celle qui est la plus efficace.

Dans certains cas, suivant les opérateurs qui sont utilisés, une instruction PLINQ ne sera pas forcément exécutée en parallèle. Elle peut être exécutée séquentiellement. On peut forcer l’exécution parallèle en ajoutant:

.WithExecutionMode(ParallelExecutionMode.ForceParallelism)

Performances différentes entre IEnumerable<T> et IList<T>

Pour utiliser PLINQ, il suffit que la structure satisfait au moins IEnumerable<T>. Toutefois, comme pour Parallel.For() et Parallel.ForEach(), les performances de PLINQ sont différentes suivant si la structure d’entrée satisfait IList<T> ou seulement IEnumerable<T>.

En effet:

  • Si la structure satisfait IList<T>: généralement ces structures permettent de prévoir plus facilement le partitionement des threads utilisés pour le traitement car, la liste est indexée et le nombre d’éléments de cette structure est connu. Le partitionnement des threads pour cette structure se fait par intervalle d’index (“range partitioning”) c’est-à-dire qu’un intervalle d’élements est attribué à chaque thread.
  • Pour les structures ne satisfaisant que IEnumerable<T>: il est plus difficile de prévoir le partitionnement des threads et de fait, le traitement est généralement moins performant. Le partitionnement des threads pour cette structure se fait par bloc (“chunk partitioning”) c’est-à-dire que plusieurs éléments sont traités pour chaque threads.

Par défaut, si la structure satisfait IList<T>, un partitionnement par intervalle d’index sera appliqué. Même si dans la plupart des cas, ce type de partitionnement est plus performant, il peut s’avérer plus lent dans certain cas. On peut donc imposer un partitionnement par bloc en utilisant un Partitioner.

2. Exemple simple

L’instruction de base pour utiliser PLINQ est AsParallel(). Comme pour linq, 2 notations sont possibles.

Notation “Query”

On peut utiliser les mots clés select, from, where, etc… (notation similaire à Linq):

using System; 
using System.Linq; 
//... 
 
var source = Enumerable.Range(100, 20000); 
 
var parallelQuery = from num in source.AsParallel() 
                    where num % 10 == 0 
                    select num; 
 
int[] result = parallelQuery.ToArray();

Notation “fluent”

On peut obtenir l’équivalent avec la notation “fluent”:

int[] result = Enumerable.Range(100, 20000) 
  .AsParallel() 
  .Where(n => n % 10 == 0) 
  .Select(n => n) 
  .ToArray();

3. D’autres alternatives possibles

Comme pour la parallélisation, regrouper le résultat de traitement parallèle induit un coût en performance. Il est possible d’éviter d’utiliser PLINQ.

Rangement dans une structure classique indexée

Si on effectue une exécution parallèle en utilisant Parallel.For(), on connaît précisemment le nombre d’itérations. On peut donc prévoir de ranger le résultat dans une structure dont la taille est fixe:

var source = Enumerable.Range(1, 10000); 
var result = new int?[10000]; 
 
Parallel.For(0, 10000, i => 
{ 
  if (source[i] % 2 == 0) 
  { 
    result[i] = i; 
  } 
  else 
  { 
    result[i] = null; 
  } 
}); 
 
result = result.Where(n => n.HasValue()).ToArray(); 

Rangement dans un structure de données autorisant les appels concurrents

Pour ranger le résultat d’une boucle parallèle, on peut utiliser une structure du namespace System.Collections.Concurrent comme ConcurrentBag, ConcurrentStack, ConcurrentQueue ou ConcurrentDictionary. Ces structures permettent de minimiser les contentions entre threads. Certaines d’entre elles dans certaines conditions n’utilisent pas de “lock”.

Par exemple, si on utilise ConcurrentBag pour ranger le résultat:

var source = Enumerable.Range(1, 10000); 
ConcurrentBag result = new ConcurrentBag<int>(); 
 
Parallel.For(0, 10000, i => 
{ 
  if (source[i] % 2 == 0) 
  { 
    result.Add(i); 
  } 
});

4. Maintenir l’ordre des éléments

Lorsqu’un traitement est parallélisé, pour améliorer le partitionnement des threads qui effectuent le traitement (cf. partionnement dans Parallel.For et Parallel.ForEach en 5 min), on ne garantit pas de maintenir le même ordre que la structure de données en entrée.

Pour préserver l’ordre des éléments, on peut utiliser AsOrdered().

Par exemple:

int[] result = Enumerable.Range(100, 20000) 
  .AsParallel() 
  .AsOrdered() 
  .Where(n => n % 10 == 0) 
  .Select(n => n) 
  .ToArray();

Il est possible de supprimer la contrainte de l’ordre à un endroit du pipeline après l’avoir imposé. Par exemple, on peut vouloir garantir l’ordre pour certains traitements et supprimer cette contrainte pour d’autres traitements.

Pour supprimer la contrainte d’ordre après avoir utilisé AsOrdered(), on peut utiliser AsUnordered(), par exemple:

var result = Enumerable.Range(100, 20000)  
  .AsParallel()  
  .AsOrdered() // Ordre garanti 
  .Traitement1()  
  .AsUnordered() // Ordre non garanti 
  .Traitement2();

5. Limiter le degré de parallélisme

Pour éviter de paralléliser le traitement sur tous les coeurs du processeur, on peut imposer le degré de parallélisme. Par défaut, ce nombre est égal au nombre de coeurs du processeur. Ce nombre ne peut dépasser le nombre de coeurs du processeur.

Pour imposer un degré de parallélisme, on peut utiliser l’instruction WithDegreeOfParallelism().

Par exemple:

int[] result = Enumerable.Range(100, 20000)  
  .AsParallel()  
  .WithDegreeOfParallelism(2) 
  .Where(n => n % 10 == 0)  
  .Select(n => n)  
  .ToArray();

6. Agréger le résultat d’une exécution parallèle en utilisant PLINQ

Lorsqu’on utilise AsParallel() pour paralléliser le traitement, le type de retour est ParallelQuery<T>. Les fonctions disponibles pour agréger le résultat sont celles de cette classe. Par défaut, les éléments en sortie de AsParallel() peuvent être dans un ordre différent des éléments en entrée.

Plus de détails sur MSDN.

Agréger le résultat dans une structure de données

ParallelQuery<T> permet de regrouper les résultats directement dans une structure de données en utilisant:

  • ToArray() pour regrouper dans un tableau,
  • ToList() pour une liste,
  • ToDictionary() pour un dictionaire ou
  • ToLookUp() pour regrouper dans une structure LookUp<TKey, TElement>. Cette structure permet de stocker une collection de clés. Chaque valeur de clé permet d’atteindre une ou plusieurs valeurs.

Par exemple, pour regrouper le résultat directement dans une liste:

IEnumerable<T> input = ... : 
 
var result = input.AsParallel() 
  .Select(item => Compute(item)) 
  .ToList(); 

Agréger le résultat dans une valeur unique

ParallelQuery<T> permet d’utiliser quelques méthodes pour regrouper les résultats dans une valeur unique avec:

  • Average() pour calculer la moyenne d’entiers, de décimaux ou de doubles obtenus à partir des résultats du traitement parallèle.
  • Min() ou Max() pour obtenir respectivement le minimum ou le maximum des nombres entiers, décimaux ou doubles obtenus à partir des résultats du traitement parallèle.
  • Aggregate() pour effectuer un traitement d’agrégation plus personnalisé.

Par exemple, en utilisant Average():

IEnumerable<int> input = Enumerable.Range(3, 100000-3); 
 
double average = input.AsParallel() 
  .Where(n => n % 2 == 0) 
  .Average(n => Math.Sqrt(Convert.ToDouble(n)); 

Agréger les résultats en utilisant ForAll()

ForAll() permet d’appliquer une Action<T> à tous les résultats du traitement parallélisé.

Par exemple:

List<InputData> input = ...; 
input.AsParallel() 
  .Select(i => new OutputData(i)) 
  .ForAll(o => ProcessOutput(o));

7. Sortie du traitement parallèle

La sortie d’une instruction PLINQ peut se faire en utilisant une CancellationToken dans le cas où elle est prévue. L’exécution peut aussi être interrompue si une exception survient.

CancellationToken

Pour interrompre une instruction PLINQ volontairement, on peut utiliser une CancellationToken en entourant l’instruction d’un try...catch:

var cancellationSource = new CancellationTokenSource();  
 
try  
{ 
  int[] result = Enumerable.Range(100, 20000) 
    .AsParallel() 
    .WithCancellation (cancelSource.Token) 
    .Where(...) 
} 
catch (OperationCanceledException) 
{ 
  Console.WriteLine ("Query canceled"); 
}

On peut annuler l’exécution de l’instruction PLINQ à partir d’un autre thread si on exécute:

cancellationSource.Cancel()

Sortie par exception

Une exception interrompt l’exécution, toutefois dans certains cas elle ne sera pas propagée tel quel à l’extérieur de l’instruction PLINQ:

  • Si l’exécution de l’instruction PLINQ est séquentielle: l’exception sera propagée avec le même type. Par attraper l’exception, on peut utiliser une clause try...catch avec le type de l’exception.
  • Si l’exécution est parallélisée: l’exception sera encapsulée dans une exception de type AggregateException. Pour attraper l’exception à l’extérieur de l’instruction PLINQ, il faut utiliser une clause try...catch avec le type AggregateException.

Plus de détails sur MSDN.

8. ParallelMergeOptions

Par défaut, le regroupement des itérations du traitement parallèle se fait en utilisant un buffer. L’objectif de ce buffer est d’améliorer les performances du traitement parallèle en regroupant les itérations avec un équilibre entre:

  • Une faible latence entre le début du traitement et la génération du résultat et
  • De bonne performance globale en terme de consommation des ressources.

On peut, toutefois, changer l’option par défaut de regroupement en utilisant WithMergeOptions et l’énumérable ParallelMergeOptions:

  • AutoBuffered: valeur par défaut permettant un équilibre entre faible latence et bonne performance.
  • NotBuffered: pas de buffer. Le résultat est généré dès qu’il est calculé.
  • FullyBuffered: les résultats sont rangés systématiquement dans un buffer pour qu’ils soient délivrés d’un coup en fin de traitement.

Par exemple, pour utiliser WithMergeOptions:

int[] result = Enumerable.Range(100, 20000) 
  .AsParallel() 
  .WithMergeOptions(ParallelMergeOptions.NotBuffered) 
  .Where(...)
Les opérateurs OrderBy et Reverse imposent FullyBuffered

Lorsqu’on utilise les fonctions OrderBy et Reverse, l’option ParallelMergeOptions est FullyBuffered.

Plus de détails sur MSDN.

9. Partitioner

Comme indiqué plus haut, la classe System.Concurrent.Collections.Partitioner permet de forcer le partitionnement par bloc d’une structure satisfaisant IList<T>.

Par exemple:

IList<int> input = ... ; 
var parallelQuery = Partitioner.Create (input, true) 
  .AsParallel() 
  .Where (...)

La surcharge Partitioner.Create(input, true) permet de renseigner avec le booléen la façon dont la partitionnement des threads est effectué suivant la charge:

  • true: le partitionnement est dynamique donc il convient bien pour un partitionnement par bloc.
  • false: le partitionnement est statique donc pour appliquer un partitionnement par intervalle d’index.

Plus de détails sur la classe Partitioner sur MSDN.

Références

Leave a Reply