Depuis l’apparition du Big Data, les méthodes, architectures et outils de traitement de gros volumes de données n’ont cessé d’émerger : MapReduce, Hadoop, Spark, etc.
MapReduce, créé par le géant Google, va être très vite adopté comme framework pour faire les opérations de calculs distribués et de parallélisation.
Dans cet article, nous allons à la découverte de MapReduce : du concept à son utilisation. Puis, je présenterai une de ses extensions et un exemple succinct en Python.
Qu’est-ce que MapReduce ?
MapReduce ou Hadoop MapReduce est un modèle de programmation qui sert à calculer de gros volumes de données en parallélisant les calculs sur différents nœuds d’un cluster. On parle de calculs distribués.
Plus concrètement, MapReduce consiste, de base, en deux fonctions :
Map()
pour distribuer le travail sur les nœuds du cluster ;Reduce()
pour agréger le résultat de chaque nœud en un unique résultat.
Ce framework fonctionne exclusivement sur le concept de (clé, valeur)
.
La donnée en entrée du modèle sera découpée en des portions de données. Ces dernières seront intégrées parallèlement dans des mappers. Cette étape consiste à partitionner les données.
En sortie de chaque mapper, chaque portion de données sous la forme (clé, valeur)
sera trié. Nous sommes à une étape implicite du paradigme : ShuffleSort()
.
Dans notre module de mélange-tri, nous allons trier rapidement (QuickSort) chaque portion de données en fonction de chaque clé
. Puis, on les regroupe ensemble dans des combiners en fonction de la valeur de chaque clé
.
Les données en sortie de chaque combiner seront mélangées, calculées avec la fonction de mapping renseignée (par exemple len()
pour connaître la longueur de listes) et triées de nouveau à destination de mergers pour être regroupées. À cette étape, les données sont calculées concrètement puis fusionnées en fonction de la valeur de leurs clés.
Les différentes portions de données triées, calculées et fusionnées sont envoyées parallèlement dans des reducers où un tri est effectué en entrée en fonction de chaque valeur
du couple (clé, valeur)
. Nous sommes à la fin de notre modèle : les données de chaque reducer sont regroupées et nos données sont dans le format désiré.
L’image ci-dessous donne plus de détails sur l’architecture de fonctionnement de Hadoop MapReduce.
Apache Spark : une extension de MapReduce
Apache Spark est un framework open source de calcul distribué. Il a été créé plusieurs années après Hadoop MapReduce. Son architecture de base se fonde sur des objets appelés Resilient Distributed Dataset (RDD) qui forment une collection d’éléments partitionnés entre les nœuds d’un cluster pour être calculés parallèlement.
Grâce aux RDDs, Spark va surpasser une des limites de MapReduce. En effet, avec MapReduce, dans notre pipeline, on partitionne, trie, mélange, fusionne et réduit les données en un seul résultat. Pour effectuer des opérations plus complexes comme celles de Machine Learning, le caractère acyclique du modèle de MapReduce est une grande limite.
Spark va permettre d’avoir une itération de l’opération MapReduce avec les RDDs afin d’améliorer le temps de calcul, faire des calculs complexes (de NLP par exemple) et de distribuer le stockage des données.
Les fonctionnalités de RDD
La documentation sur les RDDs Apache Spark est disponible ici.
Le langage de programmation de base dans lequel a été écrit Spark est Scala. Mais il est disponible également en R et Python.
Les différents codes qui seront présentés ci-dessous sont en Python.
En Python, il existe un module pour pouvoir utiliser Spark et Hadoop : le module pyspark
.
Pour installer correctement Spark et pyspark, je vous conseille de suivre ce lien.
Pour pouvoir utiliser RDD :
# S'assurer d'avoir correctement installé pyspark et Spark
# Importer quelques classes Spark pour initialiser notre environnement
from pyspark import SparkContext, SparkConf
# Initialiser Spark
conf = SparkConf()
sc = SparkContext(conf=conf)
- L’objet
SparkContext sc
spécifie à Spark comment accéder au cluster. - Quant à
SparkConf conf
, il va contenir l’information de votre application (ici nous n’en avons pas). Dans certains, on initialiseconf
comme ceci :conf = SparkConf().setAppName(appName).setMaster(master)
Cela permet de renseigner l’application et le cluster maître.
Nous pouvons créer maintenant un RDD.
# Créer un jeu de données
data = list(range(10)) ## [0, 1, 2, 3, 4, 5, 6, 7, 9]
# Créer un RDD
rdd = sc.parallelize(data)
print(type(rdd), '\t', rdd.collect())
[Output]
<class 'pyspark.rdd.RDD'> [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
🎉🎊 Félicitations
À première vue, au niveau du résultat, il n’y a pas de grande différence entre notre RDD et notre liste data
. D’abord, remarquons les fonctions parallelize()
et collect()
. La première de l’objet sc
permet de créer un RDD. Et la dernière, permet d’afficher le contenu de l’objet rdd
de type pyspark.rdd.RDD
. C’est le type de « base ».
Nous pouvons également créer un RDD de chaînes de caractères : parallelize()
est notre ami 😎.
# Créer un jeu de données
data = ['Ledatascientist', 'le', 'top', 'des', 'blogs', 'français', 'en', 'Data', 'Science']
# Créer un RDD
rdd = sc.parallelize(data, numSlices=10).glom()
print(type(rdd), '\t', rdd.collect())
[Output]
<class 'pyspark.rdd.RDD'> [[], ['Ledatascientist'], ['le'], ['top'], ['des'], ['blogs'], ['français'], ['en'], ['Data'], ['Science']]
La fonction parallelize()
propose le paramètre numSlices
qui est utilisé par la fonction glom()
pour créer un RDD avec le nombre de partitions. Chaque partition est dans une liste.
Dans notre cas, on a la première partition vide, car la longueur de la liste data
est de 9.
Dans la suite, nous voulons calculer le nombre d’occurrences des mots dans un texte quelconque. Nous pourrons ainsi voir quelques fonctions de RDD.
Application
Téléchargez n’importe quel texte et enregistrez-le au format TXT.
Dans notre cas, nous créerons un RDD à partir d’un fichier example.txt
qui contient notre article sur les bases du Machine Learning.
# Créer un RDD à partir d'un fichier TXT
rdd = sc.textFile("./example.txt")
# Affichage du premier élément de notre RDD
print(type(rdd), '\n', rdd.first())
[Output]
<class 'pyspark.rdd.RDD'>
Machine Learning ou apprentissage automatique est l’un des champs de l’intelligence artificielle (IA). Ce domaine apparu tout au début des années 1960 va devenir de nos jours très incontournable au XXIe siècle pour la résolution de problèmes concrets et complexes.
La fonction first()
à la différence de collect()
permet d’afficher le premier élément d’un RDD.
Maintenant, nous allons faire du MapReduce avec RDD.
Afin de calculer le nombre d’occurrences des mots de notre texte, nous allons créer une fonction qui servira pour le mapping.
def process(sentence):
"""
Supprime les ponctuations d'une phrase, la splitte en liste
et retourne la liste des mots (en miniscule) dont la longueur est supérieure à 3.
Parameter
----------
sentence: str
Une phrase.
Returns
--------
words: list
La liste des mots de la phrase "sentence" dont la longueur est supérieure à 3.
"""
## Suppression des ponctuations
for p in "`!()-_[]{};:’,./?\"":
sentence = sentence.replace(p, " ")
## Split de la phrase
list_ = sentence.split()
## Liste des mots de longueur supérieure à 3
words = [word.lower() for word in list_ if len(word) > 3]
## Sortie de notre fonction
return words
On va créer un nouveau RDD qui sera la liste de tous les mots du texte dont la longueur est supérieure à 3.
On évitera d’avoir par exemple les déterminants (le, les, de, à, …) dans notre RDD.
# Créer le nouveau RDD
rdd_processed = rdd.flatMap(process)
# Affichage des quinze premiers éléments de notre RDD
rdd_processed.take(15) ## take(1) == first()
[Output]
['machine',
'learning',
'apprentissage',
'automatique',
'champs',
'intelligence',
'artificielle',
'domaine',
'apparu',
'tout',
'début',
'années',
'1960',
'devenir',
'jours']
flatMap 🤔
On aurait pu utiliser la fonction map()
de RDD mais le résultat aurait été différent en sortie. En effet, flatMap()
permet non seulement de faire un mapping en utilisant la fonction process()
, elle retourne une liste de mots. Alors que map()
aurait retourné une liste de listes de mots.
N’hésitez pas à essayer vous-mêmes 🤠.
Assez patienté, on fait le MapReduce maintenant 🤗
# Map - Reduce - Sort
rdd_final = rdd_processed.map(lambda word: (word, 1))\
.reduceByKey(lambda a, b: a+b)\
.sortBy(lambda x: -x[1])
# Afficher les cinq premiers éléments
rdd_final.take(5)
[Output]
[('machine', 17),
('apprentissage', 13),
('learning', 12),
('supervisé', 10),
('régression', 9)]
Succès 🎊🤓 Mais expliquons tout ça 😎
- Premièrement, grâce à la fonction
map()
, qui prend en paramètre une fonction, on crée un RDD où chaque élément est de la forme (clé, valeur). La clé sera le mot et la valeur 1. - Puis avec
reduceByKey()
, on va faire une opération reduce sur les clés. Cela permet de faire la somme des valeurs de même clé. Donc, d’obtenir l’occurrence de chaque clé (mot). - Finalement, on utilise la fonction
sortBy()
pour trier le résultat en fonction des valeurs de manière descendante-x[1]
.
Dans le cas où on voudrait trier de manière ascendante par clé, on peut fairex[0]
ou utiliser la fonctionsortByKey()
.
Et comme on l’a vu plus haut, la fonction take(n)
pour afficher les n
mots les plus récurrents de notre texte.
C’était facile et sympa 😊
Le code se trouve sur GitHub.
Conclusion
Dans cet article, nous avons vu l’un des paradigmes les plus populaires du Big Data. Grâce à Apache Spark, MapReduce permet la résolution de plusieurs problèmes complexes de NLP, de Machine Learning.
Essayez de vous familiariser avec ses objets tels que RDD, les accumulateurs, etc.
J’espère que vous avez aimé cet article. N’hésitez pas à laisser des commentaires 😊
Vous voulez publier sur ledatascientist.com ? C’est par ici.