Utilisation d’un “combiner” dans un job Hadoop MapReduce

Pour augmenter l’efficacité d’un job MapReduce, en plus du cache distribué, on peut s’aider de combiners.

Brièvement, dans un job MapReduce:

  • Lors de la phase Map, les mappers génèrent des paires de clé/valeur.
  • Lors de la phase shuffle/sort, ces paires sont réparties et ordonnées sur un ou plusieurs nœuds en fonction de la valeur de la clé .
  • Lors de la phase Reduce, un ou plusieurs reducers agrègent les paires de clé/valeur en fonction de la valeur de la clé.

Ces étapes sont expliquées plus en détail dans Hadoop MapReduce en 5 min.

Des paires de clé/valeur sont échangées lors de la phase shuffle/sort. Plus il y a des paires qui sont échangées entre des nœuds et moins le job sera exécuté de façon efficace. Utiliser des combiners peut aider à réduire les données échangées.

Explication du “combiner”

Le combiner est un composant logiciel qui va agréger les paires de clé/valeur à la sortie d’un mapper avant la phase shuffle/sort. Cette première agrégation permet de réduire le nombre de clé/valeur qui seront transmises à la phase shuffle/sort et par suite au reducer. Réduire le nombre de paires de clé/valeur transitant d’un nœud à l’autre permet d’optimiser l’exécution du job.

Concrètement, le combiner est placé juste après chaque mapper et est exécuté avant la phase shuffle/sort:


Exemple d’utilisation du “combiner”

En utilisant l’exemple du job Word count et en considérant le comptage des mots de la phase:

"Hadoop est un outil puissant toutefois cet outil  
a des limites qui le rendent moins puissant."

Dans le cas où le cluster est composé de 2 nœuds, le comptage de la phrase sera réparti sur 2 mappers. A la sortie des mappers, les paires de clé/valeur seront:

  • Nœud 1 après exécution du mapper: ("Hadoop"/1), ("est"/1), ("un"/1), ("outil"/1), ("puissant"/1), ("toutefois"/1), ("cet"/1), ("outil"/1)
  • Nœud 2 après exécution du mapper: ("a"/1), ("des"/1), ("limites"/1), ("qui"/1), ("le"/1), ("rendent"/1), ("moins"/1), ("puissant"/1)

Après exécution des combiners qui se trouvent à la sortie des mappers:

  • Nœud 1 après exécution du combiner: ("Hadoop"/1), ("est"/1), ("un"/1), ("outil"/2), ("puissant"/1), ("toutefois"/1), ("cet"/1)
  • Nœud 2 après exécution du combiner: ("a"/1), ("des"/1), ("limites"/1), ("qui"/1), ("le"/1), ("rendent"/1), ("moins"/1), ("puissant"/1)

On remarque que dans le cas du nœud 1, les paires avec la clé "outil" ont été agrégées.

Après la phase shuffle/sort:

  • Nœud 1: ("a"/1), ("cet"/1), ("des"/1), ("est"/1), ("Hadoop"/1), ("le"/1), ("limites"/1), ("moins"/1).
  • Nœud 2: ("outil"/2), ("puissant"/1), ("puissant"/1), ("qui"/1), ("rendent"/1), ("toutefois"/1), ("un"/1).

Les paires clé/valeur sont triées et réparties sur les 2 nœuds.
Dans le cas d’un seul reducer, toutes les clés sont adressées au seul reducer pour être agrégées. On obtient après exécution du reducer:

("a"/1), ("cet"/1), ("des"/1), ("est"/1), ("Hadoop"/1), ("le"/1), 
    ("limites"/1), ("moins"/1), ("outil"/2), ("puissant"/2), ("qui"/1), ("rendent"/1), 
    ("toutefois"/1), ("un"/1).

Le reducer n’a effectué qu’une seule agrégation: les paires avec la clé "puissant". S’il n’y avait pas de combiners, le reducer aurait dû agréger les paires correspondant aux clés "outil" et "puissant".

Le combiner permet donc de:

  • Réduire les données échangées entre nœud
  • Réduire le nombre de traitements effectués par le reducer.

Implémentation du “combiner”

Si on prend l’exemple du texte:
http://www.gutenberg.org/files/2600/2600-0.txt

Dans le cadre du comptage de mots, l’implémentation du combiner est la même que celle du reducer (l’implémentation du reducer se trouve dans l’article Exécuter un job Hadoop MapReduce avec .NET Core:

using System; 
  
namespace reducer 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            string currentWord = string.Empty; 
            int currentWordCount = 0; 
            string word = string.Empty; 
  
            string valueFromMapper = Console.ReadLine(); 
            while (valueFromMapper != null) 
            { 
                var splitLine = valueFromMapper.Split('\t'); 
                if (splitLine.Length == 2) 
                { 
                    word = splitLine[0]; 
                    int count; 
                    if (Int32.TryParse(splitLine[1], out count)) 
                    { 
                        if (currentWord == word) 
                        { 
                            currentWordCount += count; 
                        } 
                        else 
                        { 
                            if (!string.IsNullOrWhiteSpace(currentWord)) 
                            { 
                                Console.WriteLine($"{currentWord}\t{currentWordCount}"); 
                            } 
                            currentWord = word; 
                            currentWordCount = count; 
                        } 
                    } 
                } 
  
                valueFromMapper = Console.ReadLine(); 
            } 
  
            if (currentWord == word && !string.IsNullOrWhiteSpace(currentWord)) 
            { 
                Console.WriteLine($"{currentWord}\t{currentWordCount}"); 
            } 
        } 
    } 
}

Exécution du job MapReduce avec un “combiner”

Pour exécuter un job MapReduce avec Hadoop Streaming en fournissant un combiner, il faut, dans une premier temps, créer un projet console avec .Net Core:

% dotnet new console –n combiner

Après avoir modifié l’implémentation dans combiner/Program.cs:

% cd combiner  
% dotnet build  
% dotnet publish –c release

On copie ensuite les résultats de compilation dans le répertoire publish:

% cp bin/release/netcoreapp2.0/publish/* ../publish

On exécute ensuite le job MapReduce en fournissant l’argument correspondant au combiner:

% hadoop jar /usr/hadoop/hadoop-2.8.1/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar \ 
-files "publish" \ 
-mapper "dotnet publish/mapper.dll" \ 
-reducer "dotnet publish/reducer.dll" \ 
-combiner "dotnet publish/combiner.dll" \ 
-input /user/input/comparison/2600-0.txt \ 
-output /output_combiner

Le résumé de l’exécution est le suivant:

File System Counters 
        FILE: Number of bytes read=375438 
        FILE: Number of bytes written=1171439 
        FILE: Number of read operations=0 
        FILE: Number of large read operations=0 
        FILE: Number of write operations=0 
        HDFS: Number of bytes read=3363856 
        HDFS: Number of bytes written=227620 
        HDFS: Number of read operations=9 
        HDFS: Number of large read operations=0 
        HDFS: Number of write operations=2 
    Job Counters  
        Launched map tasks=2 
        Launched reduce tasks=1 
        Data-local map tasks=2 
        Total time spent by all maps in occupied slots (ms)=76630 
        Total time spent by all reduces in occupied slots (ms)=12702 
        Total time spent by all map tasks (ms)=38315 
        Total time spent by all reduce tasks (ms)=6351 
        Total vcore-milliseconds taken by all map tasks=38315 
        Total vcore-milliseconds taken by all reduce tasks=6351 
        Total megabyte-milliseconds taken by all map tasks=78469120 
        Total megabyte-milliseconds taken by all reduce tasks=13006848 
    Map-Reduce Framework 
        Map input records=66055 
        Map output records=566308 
        Map output bytes=4242981 
        Map output materialized bytes=375444
        Input split bytes=210 
        Combine input records=566308 
        Combine output records=29157 
        Reduce input groups=20470 
        Reduce shuffle bytes=375444 
        Reduce input records=29157 
        Reduce output records=20470 
        Spilled Records=58314 
        Shuffled Maps =2 
        Failed Shuffles=0 
        Merged Map outputs=2 
        GC time elapsed (ms)=323 
        CPU time spent (ms)=7770 
        Physical memory (bytes) snapshot=571236352 
        Virtual memory (bytes) snapshot=8277688320 
        Total committed heap usage (bytes)=398663680 
    Shuffle Errors 
        BAD_ID=0 
        CONNECTION=0 
        IO_ERROR=0 
        WRONG_LENGTH=0 
        WRONG_MAP=0 
        WRONG_REDUCE=0 
    File Input Format Counters  
        Bytes Read=3363646 
    File Output Format Counters  
        Bytes Written=227620

Comparaison entre l’exécution avec et sans “combiner”

Si on considère le résumé de l’exécution du job MapReduce word count sans combiner:

...
Map-Reduce Framework 
        Map input records=66055 
        Map output records=566308 
        Map output bytes=4242981 
        Map output materialized bytes=5375609
...

Sur la quantité de données échangées, on constate une diminution assez conséquente (l’unité utilisée est l’octet):

  • Sans combiner: Map output materialized bytes=5375609
  • Avec combiner: Map output materialized bytes=375444

Soit un gain de 93% ce qui est considérable.

Le temps d’exécution est le même avec ou sans combiner (50 sec).

Dans le cadre du job Word Count, on constate que la quantité de données échangées est considérablement réduite. Le temps d’exécution est, quant à lui, inchangé.

Le combiner est donc une option à prendre en compte lors de l’exécution d’une job MapReduce, en particulier si on cherche à limiter la quantité de données transitant d’un nœud à l’autre.

Leave a Reply