Quelques patterns pour attendre la fin de l’exécution d’une tâche*

*: Le terme “Tâche” est utilisé içi au sens large et non au sens de la Task Parallel Library (TPL).

Lorsque plusieurs threads sont utilisés et qu’il est nécessaire de synchroniser certains de ces threads, on peut être amener à implémenter des mécanismes d’attente. Ces mécanismes permettent ainsi:

  • D’éviter un accès concurrent à une section critique,
  • D’attendre le résultat d’un autre thread (rendre un algorithme synchrone),
  • De ralentir un tempo d’horloge trop rapide pour, par exemple, implémenter des algorithmes de “conflation”.

Plusieurs solutions sont possibles en .NET. Toutefois il peut être utile que ces solutions permettent d’implémenter certaines fonctionnalités comme:

  • Un “timeout”: au délà d’un certain temps d’attente, le thread reprendra son exécution.
  • L’annulation de l’attente: dans le cas où on souhaite terminer prématurément l’exécution d’un thread, il faut pouvoir annuler l’attente.

Mécanisme de signalement

ManualResetEvent

La méthode la plus directe est l’utilisation de l’objet ManualResetEvent ou des autres objets du même type comme AutoResetEvent ou ManualResetEventSlim.

Ces objets permettent de placer directement des threads en attente et de signaler la fin de l’attente.

Les principales différences entre ces objets sont:

  • AutoResetEvent: c’est l’objet le plus simple. Il est possible de mettre un thread en attente en exécutant WaitOne(). Ce thread sera débloqué si un autre thread exécute Set(). Cet objet n’autorise le déblocage que d’un seul thread à la fois. Si plusieurs threads attendent en exécutant Wait
  • ManualResetEvent: par rapport à AutoResetEvent, cet objet nécessite d’exécuter Reset() après avoir exécuté Set() pour qu’un thread puisse attendre à nouveau en exécutant WaitOne(). Si Reset() n’est pas exécuté, les appels suivants à WaitOne() ne seront pas bloquants. Contrairement à AutoResetEvent, si plusieurs threads sont en attente en exécutant WaitOne(), un seul appel à Set() débloque tous les threads.
  • ManualResetEventSlim: le comportement de cet objet est identique à ManualResetEvent à la différence qu’il ne permet pas de synchroniser des threads dans des processus différents. Toutefois il est plus performant que ManualResetEvent et AutoResetEvent.

On peut résumer brievement les performances et l’utilisation entre processus de ces objets dans le tableau suivant:

Objet Utilisation dans des processus différents Ordre de grandeur du temps d’un appel à cet objet *
AutoResetEvent Oui 1000 ns
ManualResetEvent Oui 1000 ns
ManualResetEventSlim Non 40 ns (25 fois plus rapide)

*: Les temps sont indiqués principalement pour montrer les différences en terme d’ordre de grandeur entre les différents objets.

En utilisant ManualResetEvent, on peut implémenter un mécanisme d’attente de la façon suivante:

public class ManualResetEventUsage  
{  
  private readonly ManualResetEventSlim waitingEvent = new ManualResetEventSlim(false); 
 
  public ManualResetEventUsage() 
  { } 
 
  public bool Wait(TimeSpan timeout) 
  { 
    Console.WriteLine("Waiting..."); 
    bool signaled = this.waitingEvent.Wait(timeout); 
    Console.WriteLine("Unlocked"); 
 
    return signaled; 
  } 
 
  public void Signal() 
  {  
    Console.WriteLine("Signaling..."); 
    this.waitingEvent.Set(); 
    Console.WriteLine("Signaled."); 
  } 
} 
 
class Test 
{  
  static void Main(string[] args) 
  {  
    var resetEventUsage = new ManualResetEventUsage(); 
 
    Task t = Task.Factory.StartNew(() => { resetEventUsage.Wait(TimeSpan.FromMilliseconds(1000)); }); 
    Thread.Sleep(200); 
 
    resetEventUsage.Signal(); 
    t.Wait(); 
  } 
}

AutoResetEvent, ManualResetEvent et ManualResetEvent permettent de préciser un timeout quand un appel est fait à WaitOne().

Plus de détails à propos de ces objets sur MSDN: ManualResetEvent, AutoResetEvent et ManualResetEventSlim.

Monitor

Contrairement au "lock" classique, Monitor permet d’indiquer un timeout ce qui permet d’implémenter facilement un mécanisme d’attente.

En exécutant Monitor.Wait(locker), on peut placer un thread en attente. L’objet “locker” est libéré pendant l’attente. Pour débloquer un thread en attente, il faut exécuter Monitor.Pulse(locker). Dans le cas où plusieurs threads sont en attente, on peut les libérer en exécutant Monitor.PulseAll(locker).

L’implémentation d’un mécanisme d’attente avec Monitor n’est pas trivial. On peut distinguer 2 types de files d’attente:

  • La file d’attente des threads prêts à s’exécuter: ce sont des threads qui sont en attente de l’acquisition d’un "lock".
  • La file d’attente des threads attendants: ce sont les threads ayant appelés Monitor.Wait() et qui sont dans l’attente d’une exécution de Monitor.Pulse().

Ces deux files d’attentes peuvent mener à des comportements inattendus. Lorsque Monitor.Pulse() est exécuté, le premier thread dans la file d’attente des threads attendants est placé dans la file d’attente des threads prêts. Toutefois d’autres threads peuvent être déjà présents dans cette file d’attente et faire l’acquisition du "lock" avant que le thread libéré ne puisse en faire l’acquisition. Le thread libéré n’aura donc pas accès au "lock" et ne s’exécutera pas.

La solution consiste à utiliser une boucle "while" pour permettre de replacer le thread libéré dans la file d’attente des threads attendants s’il n’a pas réussi à faire l’acquisition du "lock".

On peut implémenter le mécanisme d’attente de la façon suivante:

public class MonitorUsage  
{  
  private readonly object locker = new object(); 
  private bool block = true; 
 
  public MonitorUsage() 
  { } 
 
  public bool Wait(TimeSpan timeout) 
  { 
    Console.WriteLine("Waiting..."); 
    bool timeoutExpired = false; 
    lock (this.locker) 
    { 
      while (this.block && !timeoutExpired) 
      {  
        timeoutExpired = !Monitor.Wait(this.locker, timeout); 
      } 
         
      this.block = true; 
    } 
    Console.WriteLine("Unlocked"); 
 
    return timeoutExpired; 
  } 
 
  public void Signal() 
  {  
    Console.WriteLine("Signaling..."); 
    lock (this.locker) 
    { 
      this.block = false; 
      Monitor.Pulse(this.locker); 
    } 
    Console.WriteLine("Signaled."); 
  } 
} 
 
class Test 
{  
  static void Main(string[] args) 
  {  
    var monitorUsage = new MonitorUsage(); 
 
    Task t = Task.Factory
      .StartNew(() => { monitorUsage.Wait(TimeSpan.FromMilliseconds(1000)); }); 
    Thread.Sleep(200); 
 
    monitorUsage.Signal(); 
    t.Wait(); 
  } 
}
Attention à l’utilisation de Monitor

Quand on utilise Monitor.Wait() et Monitor.Pulse(), le pattern suivant ne suffit pas:

// thread A
lock (key) Monitor.Wait(key);

// thread B
lock (key) Monitor.Pulse(key);

Il faut plutôt utiliser un pattern semblable à celui utilisé dans l’exemple précédent car il permet de résoudre 2 problèmes:

  • La “race condition” qui peut se produire si on exécute Signal() avant Wait(). Dans ce cas, block sera à “false” et le boucle while ne s’exécutera pas ainsi que l’appel à Monitor.Wait().
  • Le problème du thread libéré qui ne peut pas faire l’acquisition du "lock". Si le thread libéré est ajouté à la file d’attente des threads prêts et qu’un autre thread fait l’acquisition du lock alors block sera à “true”. Ainsi le thread libéré ne sortira pas de la boucle "while" et exécutera à nouveau Monitor.Wait() et donc sera replacé dans la file d’attente des threads attendants.

Plus de détails sur Monitor sur MSDN.

Avec la Task Parallel Library (TPL)

Il existe de multiples implémentations de mécanismes d’attente avec des "Task".

Mécanisme de signalement avec une Task

La solution la plus directe consiste à utiliser Task.Wait() et TaskCompletionSource. Task.Wait() est bloquant et permet de renseigner un timeout. Le gros intérêt de cette méthode est de bénéficier de la gestion de l’annulation et des exceptions avec TPL.

Eviter d’utiliser ce pattern

Cet exemple permet de faire lien avec le mécanisme de signalement mais il n’est pas très adapté quand on utilise des Task.

On peut implémenter un mécanisme d’attente de cette façon:

public class TaskUsage  
{  
  private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); 
  private readonly Task canceledTask; 
  private readonly Task exceptionOccuredTask; 
 
  public TaskUsage() 
  {  
      this.canceledTask = this.taskCompletionSource.Task.ContinueWith( 
          tsk => Console.WriteLine("Canceled"), 
          TaskContinuationOptions.OnlyOnCanceled); 
      this.exceptionOccuredTask = this.taskCompletionSource.Task.ContinueWith( 
          tsk => Console.WriteLine("Exception occured: " + tsk.Exception.ToString()), 
          TaskContinuationOptions.OnlyOnFaulted); 
  } 
 
  public bool Wait(TimeSpan timeout) 
  { 
    Console.WriteLine("Waiting..."); 
    bool executedWithinTimeout = this.taskCompletionSource.Task.Wait(timeout) 
    Console.WriteLine("Unlocked"); 
 
    return executedWithinTimeout; 
  } 
 
  public void Signal() 
  {  
    Console.WriteLine("Signaling..."); 
    this.taskCompletionSource.SetResult(true); 
    Console.WriteLine("Signaled."); 
  } 
} 
 
class Test 
{  
  static void Main(string[] args) 
  {  
    var taskUsage = new TaskUsage(); 
 
    Task t = Task.Factory
      .StartNew(() => { taskUsage.Wait(TimeSpan.FromMilliseconds(1000)); }); 
    Thread.Sleep(200); 
 
    monitorUsage.Signal(); 
    t.Wait(); 
  } 
}

Continuation

Le mécanisme de signalement comme précédemment n’est pas vraiment adapté quand on utilise des “tasks” au sens TPL. Le pattern le plus direct pour attendre la fin de l’exécution d’une tâche est la “continuation”.

Une “continuation” permet de déclencher une tâche lorsqu’une autre tâche est terminée. Si cette autre tâche provoque une exception ou ne se termine pas, la “continuation” peut ne pas s’exécuter suivant les options indiquées à la définition de la “continuation”.

L’intérêt de cette méthode est la simplicité de son implémentation notamment en ce qui concerne la gestion des exceptions et des annulations.

Une “continuation” simple qui s’exécute en récupérant le résultat de la première tâche et qui ne s’exécute que si la première tâche se termine s’implémente de la façon suivante:

Task<bool> t = Task<bool>.Factory.StartNew(() =>  
  { 
    Console.WriteLine("Beginning first task execution..."); 
    // Making other processes 
    // ...
    Console.WriteLine("First ended"); 
  }); 
t.ContinueWith(tsk =>  
  { 
    Console.WriteLine("Result from first task: ", tsk.Result); 
  }, TaskContinuationOptions.OnlyOnRanToCompletion);  
t.Wait();

Il est possible d’implémenter des mécanismes de “continuation” quand la première tâche provoque une exception ou si elle est annulée: Utilisation des “Task” en 5 min.

Continuation + TaskCompletionSource

Dans le cas où la première tâche à exécuter ne contient pas de code particulier, on peut utiliser l’objet TaskCompletionSource et définir une continuation sur la task de la TaskCompletionSource. Pour déclencher la “continuation”, il suffit d’affecter un résultat à la task de la TaskCompletionSource.

L’intérêt de cette méthode est qu’il est possible de définir la “continuation” bien avant son exécution même dans le cas où on a pas d’implémentation particulière pour la première tâche. Le déclenchement de la “continuation” est rapide.

Ce type de mécanisme peut s’implémenter de la façon suivante:

public class TaskCompletionSourceUsage  
{  
  private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); 
 
  public TaskCompletionSourceUsage(Action<Task> actionToExecute) 
  {  
    this.taskCompletionSource.Task.ContinueWith(actionToExecute); 
  } 
 
  public void Signal() 
  {  
    Console.WriteLine("Signaling..."); 
    this.taskCompletionSource.SetResult(true); 
    Console.WriteLine("Signaled."); 
  } 
} 
 
class Test 
{  
  static void Main(string[] args) 
  {  
    var taskCompletionSourceUsage = new TaskCompletionSourceUsage(task =>  
      {  
        Console.WriteLine("Executing task..."); 
        // Making some processes 
        // ... 
        Console.WriteLine("Task ended"); 
      }); 
 
    // Making other processes 
    // ... 

    // Trigger the execution of the task 
    taskCompletionSourceUsage.Signal(); 
  } 
}

Task retardée (ajout d’un “timeout”)

Il est possible d’ajouter un espèce de “timeout” à l’implémentation précédente. Si la “continuation” n’a pas été exécuté dans l’intervalle du “timeout”, on peut déclencher un autre type de traitement.

Task retardée avec un “Timer” (Framework 4.0)

L’intérêt d’utiliser un Timer est de permettre de rendre cette technique compatible avec le Framework .NET 4.0.

Ce type de mécanisme peut s’implémenter de la façon suivante:

using System;
using System.Threading;

public class TimerUsage  
{  
  private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); 
  private bool resultAlreadySet = false; 
  private readonly object resultSync = new object(); 
  private readonly Timer timer; 
 
  public TimerUsage(int timeout, Action<bool> actionToExecute) 
  {  
    this.taskCompletionSource.Task.ContinueWith(completionSourceTask =>  
      {  
        this.ExecuteTask(completionSourceTask, actionToExecute); 
      }, TaskContinuationOptions.OnlyOnRanToCompletion); 
    this.timer = new Timer(state => this.SetTaskCompletionSourceResult(false)); 
    this.timer.Change(timeout, -1); 
  } 
 
  public void Signal() 
  {  
    Console.WriteLine("Signaling..."); 
    this.SetTaskCompletionSourceResult(true); 
    Console.WriteLine("Signaled."); 
  } 
 
  private void ExecuteTask(Task completionSourceTask, Action<bool> actionToExecute) 
  {  
    actionToExecute(completionSourceTask.Result); 
  } 
 
  private void SetTaskCompletionSourceResult(bool result) 
  {  
    if (!this.resultAlreadySet) 
    { 
      lock (this.resultSync) 
      {  
        if (!this.resultAlreadySet) 
        { 
          this.taskCompletionSource.SetResult(result); 
          this.resultAlreadySet = true; 
        } 
      } 
    } 
  } 
} 
 
class Test 
{  
  static void Main(string[] args) 
  {  
    var timerUsage = new TimerUsage(1000, result =>  
      {  
        if (result) 
        {  
          Console.WriteLine("Task executed within timeout"); 
        } 
        else 
        {  
          Console.WriteLine("Timeout");
        } 
      }); 
 
    // Making other processes 
    // ... 
 
    // Trigger the execution of the task 
    timerUsage.Signal(); 
  } 
}

SetTaskCompletionSourceResult permet d’éviter d’affecter le résultat 2 fois à la TaskCompletionSource.

Task retardée en utilisant Task.Delay() (Framework > 4.0)

Il suffit de remplacer le Timer avec une instruction Task.Delay(). A partir de l’exemple précédent la classe TimerUsage devient:

public class TaskDelayUsage  
{  
  private readonly TaskCompletionSource<bool> taskCompletionSource = new TaskCompletionSource<bool>(); 
  private bool resultAlreadySet = false; 
  private readonly object resultSync = new object(); 
  private readonly Task delayedTask; 
 
  public TaskDelayUsage(int timeout, Action<bool> actionToExecute) 
  {  
    this.taskCompletionSource.Task.ContinueWith(completionSourceTask =>  
      {  
        this.ExecuteTask(completionSourceTask, actionToExecute); 
      }, TaskContinuationOptions.OnlyOnRanToCompletion); 
 
    this.delayedTask = Task.Delay(timeout) 
      .ContinueWith(task => this.SetTaskCompletionSourceResult(false)) 
      .Start(); 
  } 
 
  public void Signal() 
  {  
    Console.WriteLine("Signaling..."); 
    this.SetTaskCompletionSourceResult(true); 
    Console.WriteLine("Signaled."); 
  } 
 
  private void ExecuteTask(Task completionSourceTask, Action<bool> actionToExecute) 
  {  
    actionToExecute(completionSourceTask.Result); 
  } 
 
  private void SetTaskCompletionSourceResult(bool result) 
  {  
    if (!this.resultAlreadySet) 
    { 
      lock (this.resultSync) 
      {  
        if (!this.resultAlreadySet) 
        { 
          this.taskCompletionSource.SetResult(result); 
          this.resultAlreadySet = true; 
        } 
      } 
    } 
  } 
}

Liste de patterns non exhaustive

La liste des patterns présentés précédemment est loin d’être exhaustive. Le but est juste de référencer quelques méthodes de façon à donner des idées d’implémentation.
Toutefois, on peut dire que dans la majorité des cas, il est préférable d’utiliser TPL et de privilégier des mécanismes de “continuation”. Les mécanismes de signalement sont à utiliser avec plus de réserve ou pour les tests unitaires.

Leave a Reply