Support de formation sur Apache Spark et Map Reduce méhtodes et pratique
Introduction à Map Reduce et à Apache Spark
Mohamed-Amine Baazizi – email:
pré
bdle/start
Organisation de BDLE
• Partie 1 : BD multidimensionnelles
• Partie 2 : MR et traitements sur Spark
– Cours 4 : introduction MR et Spark
– Cours 5 : algèbre Spark
– Cours 6 à 8 : modèle d’exécution de Spark
• Partie 3 : Systèmes large échelle
• Partie 4 : Données graphes
Phénomène Big data
•? Nouvelles applications
– Essor du web, indexer large volume de données
– Réseaux sociaux
– Données scientifiques, capteurs, LOD
•? Analyses de plus en plus complexes
– Changement climatique
– Comportment des utilisateurs
– Sécurité
Caractérsitiques du big data : 5 V
Faire face au phénomène Big data
•? Evolution des architectures
– Processeurs multi-cores, calcul sur GPU ou procésseur dédiés FPGA
– Grappe de machine, cloud
•? Evolution des logiciels
– Optimisation pour nouvelles architectures (HPC)
– Parallélisation des algorithmes
Open Source Big Data Landscape
Architecture type pour le big data
Architecture type pour le big data
•? Cluster = ensemble de lames connectés via un switch •? Lame = 8-64 unités connectées via gigabit •? Unité = CPUs +RAM+Ddur
Cluster ppX : 6 nœuds, 2 CPU 10 cœurs - 64Go Ram – 2 DDurs 1To raid
Tirer profit des nouvelles architectures matérielles
• Niveau calcul :
– Diviser en plusieurs petits calculs à synchronisation, gestion des pannes
• Niveau données:
– Répliquer les données sur plusieurs unités à gestion de la cohérence et des pannes
Comment tirer profit des nouvelles architectures matérielles?
•? Map Reduce = paradigme + éco-système • Paradigme
– Spécification de tâches de calculs
– Deux primitives : Map et Reduce
•? Eco-système
– Gestion de la tolérance aux pannes
– Gestion de la réplication
• Inspiré du fonctionnel
• Rappels fonctions d’ordre supérieur
–? Map
• Entrée = une fonction f, une liste L=[e1,e2,…,en]
• Résultat = Map (f, L)=[f(e1), f(e2),…, f(en)]
Exemple f(x)=x/2 L=[12,4,12,3] Map (f, L)=[6,2,6,1.5]
–? Reduce (appelé parfois Fold ou Aggregate)
• Entrée = un opérateur binaire ?, une liste L=[e1,e2,…,en]
• Résultat = Reduce (?, L)=?(e1, ?(e2 ,…, ?(en-1,en)…)
Exemple ? = ‘+’ Reduce (‘+’, L)=+(12, +(4, +(12, 3)))=31
Map-Reduce = Généralisation prog. fonctionnelle
Map (f, L)=[f(e1), f(e2),…, f(en)]
Reduce (?, L)=?(e1, ?(e2 ,…, ?(en-1,en)…) Désormais, f génère une liste de paires (clé, val) f : Dval è (Dcle, Dval) où
• Dval valeurs de type quelconque (tuples, strings,…)
• Dcleclés (souvent numérique ou strings)
Exemple
f : Nè(N,R+) avec f(x)=(x,x2-1)
pour L=[9,4,1] on obtient Map (f,L)=[(9,80),(4,15),(1,0)]
Map-Reduce = Généralisation prog. fonctionnelle
Map(f, L)=[f(e1), f(e2),…, f(en)]
Reduce (?, L)=?(e1, ?(e2 ,…, ?(en-1,en)…)
Désormais, entrée = liste (clé,[liste-val]) où clé est unique!
Sortie = liste (clé, ?([liste-val]))
Exemple
L=[(3,[0.5, 0.3, 0.2]) , (4,[2.0]) ]
Reduce (‘+’, L)=[(3,1), (4,2.0)]
Workflow Map-Reduce
• Map :
– Entrée : [e1,..,en]
– Résultat : [(k1 ,v1),.., (kn ,wn)]
• Regroupement des valeurs par clé
– produit des paires (clé,<liste-valeurs>)
• Reduce (une paire à la fois) :
– Entrée : (k1, [v1,.., vn])
– Résultat = (k, ?([v1 ,.., vn]))
• Eventuellement, enchainer exécutions
Workflow Map-Reduce
• Map :
– Entrée : [e1,..,en]
– Résultat : [(k1 ,v1),.., (kn ,wn)]
• Regroupement des valeurs par clé
– Résultat : (k1, [v1,.., vn]), (k2, [w1,.., wn]),…
• Reduce (une paire à la fois) :
– Entrée : (k1, [v1,.., vn])
– Résultat = (k, ?([v1 ,.., vn]))
• Eventuellement, enchainer exécutions
Exemple Map-Reduce
• Entrée : n-uplets (station, annee, mois, temp, dept)
• Résultat : annee, Max(temp)
7,2010,04,27,75 (2010,27) 2008, 28
12,2009,01,31,78 (2009,31) (2008, [28]) (2008, 28) 2009, 31
41,2009,03,25,95 (2009,25) (2009, [25, 31]) (2009, 31) 2010, 32 2,2008,04,28,76 (2008,28) Shuffle (2010,[27,32]) (2010, 32)
7,2010,02,32,91 (2010,32)
Tri et regroupement
Entrée par clé Résultat
Workflow Map-Reduce
légende Tasks Adapté de [Ullman]
Exécution Map-Reduce
• Map
– Génère les paires (cle,val) à partir de sa parition
– Applique une fonction de hachage h sur cle
– Stocke (cle,val) sur le bucket local h(cle)
• Nœud superviseur (master)
– Fusionne les bucket h(cle) de chaque map
• Reduce
– Applique ?
Exécution Map-Reduce
h(cle)=cle mod 2
Exécution Map-Reduce :
performances
1.? Nombre de reduce task
•? 1 reduce task = 1 bucket •? 1bucket = 1 fichier è Limiter le nombre de reduce task
Exécution Map-Reduce :
performances
2.? Combine
•? Les map et reduce s’exécutent sur des unités différentes
è transfert des résultats intermédiaires
Exécution Map-Combine-Reduce
Exécution Map-Reduce :
performances
2.? Combine
•? Les map et reduce s’exécutent sur des unités différentes
è transfert des résultats intermédiaires
è ? doit être commutatif et associatif
Exécution Map-Reduce
• Prog MR = 1 master + n workers
• Master :
– Créer les map et les reduce tasks
– Les affecter à des workers
– Superviser l’exécution
• Progression, stockage des résultats intermédiaires, relance des workers ayant échoué
• Worker
– En charge d’un map ou d’un reduce Rappel Archi. Cluster jamais les deux en même temps
Exécution Map-Reduce
Tiré de [Ullman]
Présentaion de Spark Motivation de Spark
•? Supporter des traitements itératifs efficacement
– Applications émergentes tels que PageRank, clustering par nature itératives
– Systèmes du style Hadoop matérialisent les résultats intermédiaires à performances dégradées
•? Solution
–? Les données doivent résider en mémoire-centrale et être partagées à mémoire distribuée
Architecture Spark
Architecture Spark
•? Application = driver + exécuteurs •? Driver = programme qui lance et coordonne plusieurs tâches sur le cluster •? Exécuteurs = processus indépendants qui réalisent les tâches de calcul • SparkContext
– objet Java qui permet de se connecter au cluster
– fournit des méthodes pour créer des RDD
Fonctionnement de Spark
• Resilient Distributed Datasets (RDDs)
– Structures accessibles en lecture seule
– Stockage distribué en mémoire centrale
– Restriction aux opérations sur gros granules
• Transformations de la structure en entier vs MAJ valeurs atomiques qui nécessite propagation replicats
–? Journalisation pour assurer la tolérance aux fautes
•? Possibilité de rejouer les transformations vs checkpointing
Fonctionnement des RDD
1. Création
– Chargement données depuis SGF distribué/local
– Transformation d’une RDD existante
Note : RDD est une séquence d’enregistrements
2. Transformations
– map : applique une fonction à chaque élément
– filter : restreint aux éléments selon condition –? join : combine deux RDD sur la base des clés *
(*) Les RDD en entrée doivent être des séquences de paires (clé,valeur)
Fonctionnement des RDD
3. Actions
– collect : retourne les éléments
– count : comptes les éléments
– save : écrit les données sur le SF
4. Paramétrage du stockage en mémoire
– persist : force le maintien en mémoire
– unpersist : force l’écriture sur disque
•? Notes :
– par défaut, les RDD sont persistantes en mémoire
– Si manque d’espace alors écriture sur disque
– Possibilité d’attribuer des priorités
Illustration d’une RDD
On considère une chaîne de traitements classique
1. Chargement depuis stockage (local ou hdfs)
2. Application d’un filtre simple 3.? Cardinalité du résultat de 2
4.? Paramétrage de la persistance
1 lines=spark.textFile("")
2 data=lines.filter(_.contains( "word"))
3 data.count
4 data.persist()
Illustration d’une RDD
On considère une chaîne de traitements classique
1. Chargement depuis stockage (local ou hdfs)
2. Application d’un filtre simple 3.? Cardinalité du résultat de 2
4.? Paramétrage de la persistance
1 lines=spark.textFile("")
2 data=lines.filter(_.contains( "word"))
3 data.count
4 data.persist()
Lazy evaluation
Construire les RDDs seulement si action (mode pipelined) Exemple : lines n’est construit qu’à la ligne 3 è Chargement sélectif de
API Spark
• Documentation
• Plusieurs langages hôtes
– Java
– Scala (langage fonctionnel sur JVM)
– Python
• Choix pour ce cours = Scala (Scalable Language)
– Documentation
– Tutoriel
• Langage orienté-objet et fonctionnel à la fois
– Orienté objet : valeur à objet, opération à méthode
Ex: l’expression 1+2 signifie l’invocation de ‘+ ’ sur des objets de la classe Int –? Fonctionnel :
1. Les fonctions se comportent comme des valeurs : peuvent être retournées ou passées comme arguments
2. Les structures de données sont immuables (immutable) : les méthodes n’ont pas d’effet de bord, elles associent des valeurs résultats à des valeurs en entrée
• Immuabilité des données
• Variables vs valeurs
//1- déclarons une valeur n scala> val n=1+10 n: Int = 11 //2-essayons de la modifier scala> n=n+1
<console>:12: error: reassignment to val n=n+1
^
//3- déclarons une variable scala> var m=10 m: Int = 10 //4- idem que 2 scala> m=m+1 m: Int = 11
val n’autorise pas de lier une variable plus d’une fois
•? Inférence de types
scala> var a=1 a: Int = 1 scala> var a="abc" a: String = abc scala> var a=Set(1,2,3)
a: [Int] = Set(1, 2, 3)
scala> a+=4
res1: [Int] = Set(1, 2, 3, 4) scala> a+="a"
<console>:9: error: type mismatch; found : String
required: [Int]
a+="a"
^
• Fonctions
• Itérations avec for-each
– style de programmation impérative
– méthode associé à un tableau (ou liste, ou ensemble)
– prend en entrée une fonction, souvent print
//déclarer une liste et l’iniXaliser
scala> var l=List(1,2,3)
l: List[Int] = List(1, 2, 3)
//imprimer chaque élément de la liste scala> l.foreach(x=>print(x))
123
//syntaxe équivalente scala> l.foreach(print)
123
• Tableaux
– Collections d’objets typés
– Initialisation directe ou avec apply() –? Mise à jour directe ou avec update()
scala> val b=Array.apply("1","2","3") //IniXalisaXon avec apply b: Array[String] = Array(1, 2, 3) scala> b(0)="33’’ //mise à jour directe
scala> b.update(1,"22 ’’) //mise à jour avec en uXlisant update
• Listes et ensembles
– Collections d’objets typés immuables
– Initialisation directe
– Mise à jour impossible
scala> val da=List(1,2,3) //iniXalisaXon directe da: List[Int] = List(1, 2, 3) scala> da(2) //accès indexéres53: Int = 3
scala> da(0)=1 //tentaXve de mise à jour
<console>:9: error: value update is not a member of List[Int] da(0)=1
^
• Opérations sur les listes
– Concaténation avec :::, ajout en tête avec ::
– Inverser l’ordre d’une liste reverse()
– Et plein d’autres méthodes (cf Annexe A)
scala> val l1=List(1,2,3) l1: List[Int] = List(1, 2, 3) scala> val l2=List(4,5) l2: List[Int] = List(4, 5) scala> l1:::l2 res44: List[Int] = List(1, 2, 3, 4, 5) scala> (6::l2) res47: List[Int] = List(6, 4, 5) scala> val l1bis=1::2::3::Nil l1bis: List[Int] = List(1, 2, 3)
//deviner la sorX de ce9e instrucXon scala> l1:::(6::l2.reverse).reverse
• Tuples
– Différents types pour chaque élément (12, ‘22’, <1,2,3>)
– Accès indexé avec ._index où index commence par 1
//creaXon d’un tuple complexe scala> val co=(12, "text", List(1,2,3)) co: (Int, String, List[Int]) = (12,text,List(1, 2, 3)) scala> co._0
<console>:9: error: value _0 is not a member of (Int,
String, List[Int]) scala> co._1 res56: Int = 12 scala> co._3
res58: List[Int] = List(1, 2, 3)
• Tableaux imbriqués dans des tuples
– Rappel : les élément des tableaux peuvent changer
– Un tableau imbriqué dans un tuple est une référence
• Les tableaux associatifs (Map)
– Associer à chaque entrée un élément –? Extension avec +
scala> var capital = Map("US" -> "Washington", "France" -> "Paris") capital: [String,String] = Map(US ->
Washington, France -> Paris) scala> capital("US") res2: String = Washington scala> capital += ("Japan" -> "Tokyo")
• Fonctions d’ordre supérieur Map et Reduce
– Fonctionnement : déjà vu
– Notation abrégée
scala> List(1, 2, 3) map (z=>z+1) //est équivalent à la ligne suivante scala> List(1, 2, 3).map (_ + 1)
res71: List[Int] = List(2, 3, 4)
//rappel: capital désigne Map(US -> Washington, France -> Paris) scala> (z=>(z._1.length))
res77: scala.collecXon.immutable.Iterable[Int] = List(2, 6)
scala> capital.reduce((a,b) => if(a._1.length>b._1.length) a else b)
res7: (String, String) = (France,Paris) scala> capital+=("Algeria"->"Algiers")
scala> capital.reduce((a,b) => if(a._1.length>b._1.length) a else b) res10: (String, String) = (Algeria,Algiers)
•? Plein d’autres fonctionnalités (consulter références) •? But de ce cours : utiliser Scala sous Spark
Scala sous Spark : fonctions
Figure Xrée de [Spark]
Scala sous Spark : illustration MR
Préparation des données
scala> val lines=sc.textFile("")
lines: [String] … 7,2010,04,27,75
scala> lines.count 12,2009,01,31,78
res3: Long = 5 412,2008,2009,04,,03,2825,76 ,95 scala> lines.collect 7,2010,02,32,91
res4: Array[String] = Array(7,2010,04,27,75,
12,2009,01,31,7, ….
Préparation des données
scala> val lines=sc.textFile("")
lines: [String] … 7,2010,04,27,75
scala> lines.count 12,2009,01,31,78 res3: Long = 5 412,2008,2009,04,,03,2825,76 ,95 scala> lines.collect 7,2010,02,32,91
res4: Array[String] = Array(7,2010,04,27,75,
12,2009,01,31,7, ….
•? Map (f:T?U)
scala> (x=>x.split(",")).collect
res8: Array[Array[String]] = Array(Array(7, 2010, 04, 27, 75), Array(12, 2009, 01, 31,
7),
scala> (x=>x.split(",")).map(x=>(x(1),x(3))).collect res12: Array[(String, String)] = Array((2010,27), (2009,31), …
Préparation des données
scala> val lines=sc.textFile("")
lines: [String] … 7,2010,04,27,75
scala> lines.count 12,2009,01,31,78 res3: Long = 5 412,2008,2009,04,,03,2825,76 ,95 scala> lines.collect 7,2010,02,32,91
res4: Array[String] = Array(7,2010,04,27,75,
12,2009,01,31,7, ….
• Map (f:T?U)
scala> (x=>x.split(",")).collect
res8: Array[Array[String]] = Array(Array(7, 2010, 04, 27, 75), Array(12, 2009, 01, 31,
7),
scala> (x=>x.split(",")).map(x=>(x(1),x(3))).collect res12: Array[(String, String)] = Array((2010,27), (2009,31), …
• Map (f:T?U)
scala> (x=>x.split(",")).collect
res8: Array[Array[String]] = Array(Array(7, 2010, 04, 27, 75), Array(12, 2009, 01, 31,
7),
scala> (x=>x.split(",")).map(x=>(x(1),x(3))).collect res12: Array[(String, String)] = Array((2010,27), (2009,31), …
• ReduceByKey (f:(V,V)? V )
//converXr l’entrée en enXer pour pouvoir uXliser ReduceByKey! scala> val (x=>x.split(",")).map(x=>(x(1).toInt,x(3)))
v: [(Int, String)] = MappedRDD[7] at map at <console>:14 scala> val max=v.reduceByKey((a,b)=>if (a>b)a else b).take(10) max: Array[(Int, String)] = Array((2010,32), (2008,28), (2009,31))
• ReduceByKey (f:(V,V)? V )
//converXr l’entrée en enXer pour pouvoir uXliser ReduceByKey! scala> val (x=>x.split(",")).map(x=>(x(1).toInt,x(3)))
v: [(Int, String)] = MappedRDD[7] at map at <console>:14 scala> val max=v.reduceByKey((a,b)=>if (a>b)a else b).take(10) max: Array[(Int, String)] = Array((2010,32), (2008,28), (2009,31))
• Comportement du ReduceByKey
scala> val max=v.reduceByKey((a,b)=>a).take(10) max: Array[(Int, String)] = Array((2010,27), (2008,28), (2009,31))
scala> val max=v.reduceByKey((a,b)=>b).take(10) max: Array[(Int, String)] = Array((2010,32), (2008,28), (2009,25))
