Utilisation du “distributed cache” dans un job Hadoop MapReduce

Lors de l’exécution d’un job MapReduce, il peut être nécessaire de consulter des données régulièrement de façon à les utiliser pour les traitements effectués par le mapper ou le reducer. Par exemple, on peut avoir la nécessité de mettre en place un dictionnaire et y accéder pendant l’exécution du mapper et du reducer.

Hadoop possède une fonctionnalité qui permet de répondre à ce besoin avec le cache distribué (i.e. distributed cache). Ce cache est accessible à partir du système de fichier distribué HDFS ce qui permet à tous les jobs qui s’exécutent de pouvoir y accéder. Il suffit de lire les fichiers sur HDFS.

Le gros intérêt du cache distribué est d’avoir des fichiers de travail consultables par les jobs:

  • Pour réduire la quantité de traitements effectués par les jobs,
  • Pour limiter la quantité de données transmises entre les mappers et le reducer,
  • Pour éventuellement utiliser les résultats d’autres jobs.

Réduire la quantité de traitement ou limiter la quantité de données transitant dans le réseau sont des facteurs qui permettent de rendre l’exécution d’un job MapReduce plus efficace.

Cet article va montrer un exemple d’utilisation du cache distribué avec Hadoop Streaming et une implémentation du mapper et du reducer en C# avec .NET Core.

Explication du cache distribué

Comme indiqué plus haut, le cache distribué (i.e. distributed cache) sont des fichiers accessibles par les jobs sur HDFS. On y accède à partir du mapper ou du reducer en les lisant directement sur le système de fichiers HDFS. Par exemple si un fichier du cache se trouve sur HDFS avec le path suivant:

refdata/distributed_cache_file.txt

On peut y accéder dans l’implémentation du mapper et du reducer en exécutant simplement:

var cacheFileLines = System.IO.File.ReadAllLines("refdata/distributed_cache_file.txt")

Dans l’article Hadoop MapReduce en 5 min, on avait expliqué le fonctionnement d’un job MapReduce en indiquant que les données traitées par les mappers étaient envoyées au reducer. Plus il y a des données à transmettre et plus l’exécution du job sera lente puisque ces données transitent à travers le réseau. On peut donc utiliser le cache distribué pour réduire la quantité de données transitant dans le réseau entre les mappers et le reducer.


Dans le schéma précédent, les flèches noires indiquent la transmission des données à travers le réseau.

Exemple d’utilisation du cache distribué

On se propose d’illustrer l’utilisation du cache distribué avec le job Word count. Le but est de réduire la quantité de données entre les mappers et le reducer. Les données transmises entre les différents jobs correspondent aux mots. Ainsi si on réduit la taille des mots, on peut réduire la quantité de données transmises.

Une méthode pour réduire la taille des mots est de considérer un dictionnaire comportant pour chaque mot un code. Si la taille de ce code est inférieure à celle du mot, on peut réduire la taille du mot.

Par exemple, si on considère le comptage des mots de la phrase:

"Hadoop est un outil puissant toutefois cet outil a des limites 
    qui le rendent moins puissant"

Normalement, à la sortie du mapper, on obtient les paires clé/valeur suivantes:

("Hadoop"/1), ("est"/1), ("un"/1), ("outil"/1), ("puissant"/1), ("toutefois"/1), 
    ("cet"/1), ("outil"/1), ("a"/1), ("des"/1), ("limites"/1), ("qui"/1), ("le"/1), 
    ("rendent"/1), ("moins"/1), ("puissant"/1)

Si on affecte un code unique à chacun de ces mots:

hadoop      1 
est         2 
un          3 
outil       4 
puissant    5 
toutefois   6 
cet         7 
a           8 
des         9 
limites     A 
qui         B 
le          C 
rendent     D 
moins       E

On remarque que pour la plupart des mots, le code est plus court en nombre de lettres que le mot. Ainsi, en utilisant le code pour désigner le mot en tant que clé dans le job MapReduce, on économise des lettres lors de la transmission des paires clé/valeur entre les mappers et le reducer.
Dans cet exemple les paires de clé/valeur transmises au reducer seraient:

("1"/1), ("2"/1), ("3"/1), ("4"/1), ("5"/1), ("6"/1), ("7"/1), ("4"/1), ("8"/1), 
    ("9"/1), ("A"/1), ("B"/1), ("C"/1), ("D"/1), ("E"/1), ("5"/1)

Si on prends l’exemple du texte:
http://www.gutenberg.org/files/2600/2600-0.txt

On peut créer un dictionnaire pour affecter à chaque mot un code dont la taille sera réduite par rapport à la taille du mot.
Le code doit être unique pour chaque mot.

Ainsi:

  • Chaque mapper lit le dictionnaire et génère des paires clé/valeur avec code/1 au lieu de mot/1.
  • Le reducer lit aussi le dictionnaire pour compter le nombre d’occurrences du code (qui est le même que celui du mot correspondant) et ensuite affecter pour chaque code le mot qui correspond.

La sortie du reducer est donc la même que si on n’utilise pas de dictionnaire. L’échange de données entre les mappers et le reducer ne comporte que des paires code/1 dont la taille est plus faible que les paires mot/1.

Génération du dictionnaire de mots

La 1ère étape consiste à générer des codes uniques pour chaque mot. On affecte à chaque mot un nombre décimal et on convertit ce nombre décimal vers un nombre hexadécimal pour en réduire la taille.

Dans l’exemple évoqué précédemment, on effectue ce traitement:

hadoop      1 
est         2 
un          3 
outil       4 
puissant    5 
toutefois   6 
cet         7 
a           8 
des         9 
limites     A 
qui         B 
le          C 
rendent     D 
moins       E

Si on va plus loin, et au lieu de convertir en hexadécimal, on utilise davantage la table de caractères ASCII en considérant les caractères du code 48 à 122:

  • Les chiffres de 0 à 9 du code ASCII 48 à 57.
  • Les lettres majuscules de A à Z du code ASCII 65 à 90.
  • Les caractères “[“, “\”, “]”, “^”, “_” et “`” du code ASCII 91 à 96.
  • Les lettres minuscules de a à z du code ASCII 97 à 122.

Sources: http://pedagogie.ac-limoges.fr/sti_si/accueil/FichesConnaissances/Sequence2SSi/co/Code_ASCII.html

Avec ces caractères, le code comporte moins de lettres que les nombres en hexadécimal.

Pour convertir le nombre décimal du mot vers la suite de caractères qui sera le code, on propose donc l’algorithme suivant:

public static string ConvertIntToCode(int number) 
{ 
    if (number < 1) return "0"; 
    int codeAsInt = number; 
    string code = string.Empty; 
    while (number > 0) 
    { 
        codeAsInt = number % 68; 
        if (codeAsInt < 10) 
            code = code.Insert(0, Convert.ToChar(codeAsInt + 48).ToString()); 
        else 
            code = code.Insert(0, Convert.ToChar(codeAsInt + 55).ToString()); 
        number /= 68; 
    } 
    return code; 
}

Le nombre 68 dans l’algorithme est utilisé parce qu’on considère les caractères de la table ASCII du code 65 à 122.

Pour générer le dictionnaire, on utilise les mots provenant d’un précédent comptage de façon à avoir un exemple de listes de mots.
Par exemple si on considère une liste de mots provenant d’un précédent comptage:

...
perdere            1 
peremptorily       3 
peremptory         2 
perfect           18 
perfecting         2 
perfection        10 
perfectly         24 
perform           16 
performance       10 
performances       2 
performed         24 
performer          1 
performing         8 
perfume            4
...

En exécutant l’algorithme suivant:

static void Main(string[] args) 
{ 
    string filePath = "part-00000"; 
    string outputFilePath = "word_codes.txt"; 
    int counter = 0; 
    using (StreamReader reader = new StreamReader(filePath)) 
    { 
        using (StreamWriter writer = new StreamWriter(outputFilePath, false)) 
        { 
            string line; 
            while ((line = reader.ReadLine()) != null) 
            { 
                string word = line.Split('\t', StringSplitOptions.RemoveEmptyEntries)
                  .FirstOrDefault(); 
                if (string.IsNullOrEmpty(word) || word.Length < 3) continue; 
                 
                writer.WriteLine($"{word}\t{ConvertIntToCode(counter)}"); 
                counter++; 
            }     
        } 
    } 
} 

On obtient un dictionnaire avec un code unique pour chaque mot:

...
perdere         2mI 
peremptorily    2mJ 
peremptory      2mK 
perfect         2mL 
perfecting      2mM 
perfection      2mN 
perfectly       2mO 
perform         2mP 
performance     2mQ 
performances    2mR 
performed       2mS 
performer       2mT 
performing      2mU 
perfume         2mV
...

Adaptation du “mapper” pour utiliser le dictionnaire

Comme indiqué plus haut, on modifie le mapper pour lire le dictionnaire et affecter un code à chaque mot trouvé. Le mapper génère des paires code/1 au lieu de génèrer des paires mot/1.

On modifie l’implémentation du mapper dans l’article: Exécuter un job Hadoop MapReduce avec .NET Core.

Après modification, le mapper devient:

using System; 
using System.IO; 
using System.Text; 
using System.Collections.Generic; 
  
namespace mapper 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            Dictionary<string, string> wordCounts = GetCachedWordCodes(); 
  
            var fileLine = Console.ReadLine(); 
            while (fileLine != null) 
            { 
                var words = fileLine.Split(' '); 
                foreach (var word in words) 
                { 
                    if (string.IsNullOrWhiteSpace(word)) continue; 
  
                    var wordToWrite = StripPunctuation(word.Trim().ToLower()); 
                    string wordCode; 
                    if (wordCounts.TryGetValue(wordToWrite, out wordCode)) 
                    { 
                        wordToWrite = wordCode; 
                    } 
  
                    Console.WriteLine($"{wordToWrite}\t1"); 
                } 
  
                fileLine = Console.ReadLine(); 
            } 
        } 
  
        // Permet de lire le dictionnaire se trouvant dans le cache distribué
        public static Dictionary<string, string> GetCachedWordCodes() 
        { 
            Dictionary<string, string> wordCounts = new Dictionary<string, string>(); 
  
            var lines = File.ReadAllLines("refdata/word_codes.txt"); 
            foreach (var line in lines) 
            { 
                var splitLine = line.Split('\t', StringSplitOptions.RemoveEmptyEntries); 
                if (splitLine.Length < 2) continue; 
  
                string word = splitLine[0]; 
                string wordCode = splitLine[1]; 
  
                wordCounts[word] = wordCode; 
            } 
  
            return wordCounts; 
        } 
  
        // Supprime la ponctuation 
        public static string StripPunctuation(string inputString) 
        { 
            var outputString = new StringBuilder(); 
            foreach (char character in inputString) 
            { 
                if (!char.IsPunctuation(character)) 
                    outputString.Append(character); 
            } 
            return outputString.ToString(); 
        } 
    } 
}

Dans cet algorithme, File.ReadAllLines("refdata/word_codes.txt") permet de lire le dictionnaire qui se trouve dans le cache distribué sur HDFS.

Exécution du job MapReduce sans adaptation du “reducer”

Si on exécute à cette étape le job MapReduce sur adapter le reducer, on obtiendra la liste de codes avec le nombre d’occurrences.

Il faut placer le dictionnaire word_codes.txt obtenu à l’étape précédente dans le répertoire refdata sur HDFS.

En compilant le mapper et en le plaçant dans le répertoire mapper_distributed_cache:

% dotnet new console –n mapper_distributed_cache

Après avoir modifié l’implémentation dans mapper_distributed_cache/Program.cs:

% cd mapper_distributed_cache 
% dotnet build 
% dotnet publish –c release

On copie ensuite les résultats de compilation dans le répertoire publish:

% cp bin/release/netcoreapp2.0/publish/* ../publish

On exécute ensuite le job MapReduce:

hadoop jar /usr/hadoop/hadoop-2.8.1/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar \ 
-files "publish,refdata" \ 
-mapper "dotnet publish/mapper_distributed_cache.dll" \ 
-reducer "dotnet publish/reducer.dll" \ 
-input /user/input/comparison/2600-0.txt  \ 
-output /output_distributedcache_mapper

On obtient une liste de codes avec les occurrences des mots:

...
2mi    1 
2mj    2 
2mk    3 
2ml    4 
2mm    1 
2mn    1 
2mo    8 
2mp    1 
2mq   24 
2mr    1 
2ms    2 
2mt    1 
2mu    1 
2mv    8
...

Le résumé de l’exécution est le suivant:

    File System Counters 
        FILE: Number of bytes read=4311878 
        FILE: Number of bytes written=9044253 
        FILE: Number of read operations=0 
        FILE: Number of large read operations=0 
        FILE: Number of write operations=0 
        HDFS: Number of bytes read=3363856 
        HDFS: Number of bytes written=122053 
        HDFS: Number of read operations=9 
        HDFS: Number of large read operations=0 
        HDFS: Number of write operations=2 
    Job Counters  
        Launched map tasks=2 
        Launched reduce tasks=1 
        Data-local map tasks=2 
        Total time spent by all maps in occupied slots (ms)=78664 
        Total time spent by all reduces in occupied slots (ms)=15192 
        Total time spent by all map tasks (ms)=39332 
        Total time spent by all reduce tasks (ms)=7596 
        Total vcore-milliseconds taken by all map tasks=39332 
        Total vcore-milliseconds taken by all reduce tasks=7596 
        Total megabyte-milliseconds taken by all map tasks=80551936 
        Total megabyte-milliseconds taken by all reduce tasks=15556608 
    Map-Reduce Framework 
        Map input records=66055 
        Map output records=566308 
        Map output bytes=3179256 
        Map output materialized bytes=4311884 
        Input split bytes=210 
        Combine input records=0 
        Combine output records=0 
        Reduce input groups=20311 
        Reduce shuffle bytes=4311884 
        Reduce input records=566308 
        Reduce output records=20310 
        Spilled Records=1132616 
        Shuffled Maps =2 
        Failed Shuffles=0 
        Merged Map outputs=2 
        GC time elapsed (ms)=328 
        CPU time spent (ms)=9270 
        Physical memory (bytes) snapshot=572051456 
        Virtual memory (bytes) snapshot=8277377024 
        Total committed heap usage (bytes)=398663680 
    Shuffle Errors 
        BAD_ID=0 
        CONNECTION=0 
        IO_ERROR=0 
        WRONG_LENGTH=0 
        WRONG_MAP=0 
        WRONG_REDUCE=0 
    File Input Format Counters  
        Bytes Read=3363646 
    File Output Format Counters  
        Bytes Written=122053

Adaptation du “reducer”

On adapte le reducer pour lire le dictionnaire, et affecter le mot correspondant à chaque code trouvé. L’implémentation de départ du reducer est celle se trouvant dans l’article Exécuter un job Hadoop MapReduce avec .NET Core.

Après adaptation, le reducer devient:

using System; 
using System.Collections.Generic; 
using System.IO; 
  
namespace reducer 
{ 
    class Program 
    { 
        static void Main(string[] args) 
        { 
            Dictionary<string, string> word_codes = GetCachedWordCodes(); 
  
            string currentWord = string.Empty; 
            int currentWordCount = 0; 
            string word = string.Empty; 
  
            string valueFromMapper = Console.ReadLine(); 
            while (valueFromMapper != null) 
            { 
                var splitLine = valueFromMapper.Split('\t'); 
                if (splitLine.Length == 2) 
                { 
                    word = splitLine[0]; 
                    int count; 
                    if (Int32.TryParse(splitLine[1], out count)) 
                    { 
                        if (currentWord == word) 
                        { 
                            currentWordCount += count; 
                        } 
                        else 
                        { 
                            if (!string.IsNullOrWhiteSpace(currentWord)) 
                            { 
                                string wordToWrite = currentWord; 
                                word_codes.TryGetValue(currentWord, out wordToWrite); 
  
                                Console.WriteLine($"{wordToWrite}\t{currentWordCount}"); 
                            } 
                            currentWord = word; 
                            currentWordCount = count; 
                        } 
                    } 
                } 
  
                valueFromMapper = Console.ReadLine(); 
            } 
  
            if (currentWord == word && !string.IsNullOrWhiteSpace(currentWord)) 
            { 
                string wordToWrite = currentWord; 
                word_codes.TryGetValue(currentWord, out wordToWrite); 
  
                Console.WriteLine($"{wordToWrite}\t{currentWordCount}"); 
            } 
        } 
  
        // Permet de lire le dictionnaire se trouvant dans le cache distribué
        public static Dictionary<string, string> GetCachedWordCodes() 
        { 
            Dictionary<string, string> wordCounts = new Dictionary<string, string>(); 
  
            var lines = File.ReadAllLines("refdata/word_codes.txt"); 
            foreach (var line in lines) 
            { 
                var splitLine = line.Split('\t', StringSplitOptions.RemoveEmptyEntries); 
                if (splitLine.Length < 2) continue; 
  
                string word = splitLine[0]; 
                string wordCode = splitLine[1]; 
  
                wordCounts[wordCode] = word; 
            } 
  
            return wordCounts; 
        } 
    } 
}

De la même façon que le mapper, File.ReadAllLines("refdata/word_codes.txt") permet de lire le dictionnaire qui se trouve dans le cache distribué sur HDFS.

Exécution du job MapReduce avec le “reducer” adapté

On compile le reducer et on le place dans le répertoire reducer_distributed_cache:

% dotnet new console –n reducer_distributed_cache

Après avoir modifié l’implémentation dans reducer_distributed_cache/Program.cs:

% cd reducer_distributed_cache 
% dotnet build 
% dotnet publish –c release

On copie les résultats de compilation dans le répertoire publish:

% cp bin/release/netcoreapp2.0/publish/* ../publish

On exécute ensuite le job MapReduce:

hadoop jar /usr/hadoop/hadoop-2.8.1/share/hadoop/tools/lib/hadoop-streaming-2.8.1.jar \ 
-files "publish,refdata" \ 
-mapper "dotnet publish/mapper_distributed_cache.dll" \ 
-reducer "dotnet publish/reducer_distributed_cache.dll" \ 
-input /user/input/comparison/2600-0.txt  \ 
-output /output_distributedcache 

On obtient une liste de mots avec leur occurrence comme si on n’avait pas utilisé de cache distribué:

...
perdere          1 
peremptorily     3 
peremptory       2 
perfect         18 
perfecting       2 
perfection      10 
perfectly       24 
perform         16 
performance     10 
performances     2 
performed       24 
performer        1 
performing       8 
perfume          4 
...

Le résumé de l’exécution est le suivant:

    File System Counters 
        FILE: Number of bytes read=4311878 
        FILE: Number of bytes written=9044253 
        FILE: Number of read operations=0 
        FILE: Number of large read operations=0 
        FILE: Number of write operations=0 
        HDFS: Number of bytes read=3363856 
        HDFS: Number of bytes written=122053 
        HDFS: Number of read operations=9 
        HDFS: Number of large read operations=0 
        HDFS: Number of write operations=2 
    Job Counters  
        Launched map tasks=2 
        Launched reduce tasks=1 
        Data-local map tasks=2 
        Total time spent by all maps in occupied slots (ms)=78664 
        Total time spent by all reduces in occupied slots (ms)=15192 
        Total time spent by all map tasks (ms)=39332 
        Total time spent by all reduce tasks (ms)=7596 
        Total vcore-milliseconds taken by all map tasks=39332 
        Total vcore-milliseconds taken by all reduce tasks=7596 
        Total megabyte-milliseconds taken by all map tasks=80551936 
        Total megabyte-milliseconds taken by all reduce tasks=15556608 
    Map-Reduce Framework 
        Map input records=66055 
        Map output records=566308 
        Map output bytes=3179256 
        Map output materialized bytes=4311884 
        Input split bytes=210 
        Combine input records=0 
        Combine output records=0 
        Reduce input groups=20311 
        Reduce shuffle bytes=4311884 
        Reduce input records=566308 
        Reduce output records=20310 
        Spilled Records=1132616 
        Shuffled Maps =2 
        Failed Shuffles=0 
        Merged Map outputs=2 
        GC time elapsed (ms)=328 
        CPU time spent (ms)=9270 
        Physical memory (bytes) snapshot=572051456 
        Virtual memory (bytes) snapshot=8277377024 
        Total committed heap usage (bytes)=398663680 
    Shuffle Errors 
        BAD_ID=0 
        CONNECTION=0 
        IO_ERROR=0 
        WRONG_LENGTH=0 
        WRONG_MAP=0 
        WRONG_REDUCE=0 
    File Input Format Counters  
        Bytes Read=3363646 
    File Output Format Counters  
        Bytes Written=122053 

Comparaison entre l’exécution avec et sans cache distribué

L’intérêt de l’utilisation du cache distribué n’est pas dans le résultat obtenu puisqu’il est le même, mais dans le temps et la quantité de données échangées entre les mappers et le reducer.

Ainsi si on considère le résumé de l’exécution du job MapReduce word count sans cache distribué:

    File System Counters 
        FILE: Number of bytes read=5375603 
        FILE: Number of bytes written=11170764 
        FILE: Number of read operations=0 
        FILE: Number of large read operations=0 
        FILE: Number of write operations=0 
        HDFS: Number of bytes read=3363856 
        HDFS: Number of bytes written=227620 
        HDFS: Number of read operations=9 
        HDFS: Number of large read operations=0 
        HDFS: Number of write operations=2 
    Job Counters  
        Killed map tasks=1 
        Launched map tasks=2 
        Launched reduce tasks=1 
        Data-local map tasks=2 
        Total time spent by all maps in occupied slots (ms)=68848 
        Total time spent by all reduces in occupied slots (ms)=15542 
        Total time spent by all map tasks (ms)=34424 
        Total time spent by all reduce tasks (ms)=7771 
        Total vcore-milliseconds taken by all map tasks=34424 
        Total vcore-milliseconds taken by all reduce tasks=7771 
        Total megabyte-milliseconds taken by all map tasks=70500352 
        Total megabyte-milliseconds taken by all reduce tasks=15915008 
    Map-Reduce Framework 
        Map input records=66055 
        Map output records=566308 
        Map output bytes=4242981 
        Map output materialized bytes=5375609 
        Input split bytes=210 
        Combine input records=0 
        Combine output records=0 
        Reduce input groups=20471 
        Reduce shuffle bytes=5375609 
        Reduce input records=566308 
        Reduce output records=20470 
        Spilled Records=1132616 
        Shuffled Maps =2 
        Failed Shuffles=0 
        Merged Map outputs=2 
        GC time elapsed (ms)=327 
        CPU time spent (ms)=8530 
        Physical memory (bytes) snapshot=574074880 
        Virtual memory (bytes) snapshot=8277377024 
        Total committed heap usage (bytes)=398663680 
    Shuffle Errors 
        BAD_ID=0 
        CONNECTION=0 
        IO_ERROR=0 
        WRONG_LENGTH=0 
        WRONG_MAP=0 
        WRONG_REDUCE=0 
    File Input Format Counters  
        Bytes Read=3363646 
    File Output Format Counters  
        Bytes Written=227620 

Sur la quantité de données échangées, on constate une légère diminution:

  • Sans cache distribué: Map output materialized bytes=5375609
  • Avec cache distribué: Map output materialized bytes=4311884

Soit un gain de 20% par rapport à l’utilisation sans cache distribué.

Au niveau du temps d’exécution:

  • Sans cache distribué: 49 sec
  • Avec cache distribué: 53 sec

On constate une légère pénalité dans le temps d’exécution qui peut s’expliquer par les traitements supplémentaires effectués pour faire la conversion mot → code dans les mappers et code → mot dans le reducer.

Dans cet exemple, le temps d’exécution n’a pas été réduit toutefois, suivant la quantité de données transitant dans le réseau, on peut imaginer que dans certains cas, le cache distribué permet de limiter les échanges de données.

Références

Leave a Reply