Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

29
Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1

Transcript of Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Page 1: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Einführung in Big Data II

Kurt Stockinger

1

Page 2: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Lernziele

• Kennen der Details des MapReduce Algorithmus• Anwenden von MapReduce für Big Data Problemstellungen

• Kennen von Pig und Pig Latin• Verstehen des Unterschieds zu SQL

2

Page 3: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Wiederholung: MapReduce

3

Page 4: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Wörterzählen: Einfacher Pseudo Code

define wordCount as Map<String,long>; for each document in documentSet {

T = tokenize(document); for each token in T {

wordCount[token]++; }

} display(wordCount);

Funktioniert für “kleine” Datenmengen

4

Page 5: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Paralleles Programmieren

• Wie erweitert man den Code, sodass er auf mehreren Machinen parallel läuft?

5

Page 6: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Wörterzählen: MapReduce Pseudo Code

map(String input_key, String input_value):// input_key: document name // input_value: document contents for each word w in input_value:

EmitIntermediate (w, "1"); reduce(String output_key, Iterator intermediate_values):

// output_key: a word // output_values: a list of counts int result = 0; for each v in intermediate_values:

result += ParseInt(v);Emit (AsString(result));

6

Page 7: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Word Count with MR in Hadoop #1

package org.myorg;

import java.io.IOException; import java.util.*;

import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.*; import org.apache.hadoop.io.*; import org.apache.hadoop.mapred.*; import org.apache.hadoop.util.*;

public class WordCount {

public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {

private final static IntWritable one = new IntWritable(1); private Text word = new Text(); 7

Page 8: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Word Count with MR in Hadoop #2

public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output,

Reporter reporter) throws IOException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken());

output.collect(word, one); } } }

8

Page 9: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Word Count with MR in Hadoop #3

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {

public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text,

IntWritable> output, Reporter reporter) throws

IOException { int sum = 0; while (values.hasNext()) { sum += values.next().get(); } output.collect(key, new IntWritable(sum)); } }

9

Page 10: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Word Count with MR in Hadoop #4

public static void main(String[] args) throws Exception { JobConf conf = new JobConf(WordCount.class); conf.setJobName("wordcount");

conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class);

conf.setMapperClass(Map.class); conf.setCombinerClass(Reduce.class); conf.setReducerClass(Reduce.class);

conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class);

conf.setInputPath(new Path(args[0])); conf.setOutputPath(new Path(args[1]));

JobClient.runJob(conf); }

10

Page 11: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Details zum User Interface #1

• Mapper:• Maps tranformieren Input Records in temporäre Records (Output Pairs)• Zugriff auf Output Pairs mittles

OutputCollector.collect(WritableComparable,Writable).• Reporter wird verwendet, um den Programmfortschritt zu loggen• Alle temporären Werte für einen Output Key werden gruppiert und an

Reduce weitergereicht:• Output wird partitioniert: #Partitionen entspricht #Reduce Tasks des Jobs

• Lokale Aggregation mittels JobConf.setCombinerClass(Class):• Reduziert Netzwerkauslastung in Reduce-Phase

• #Maps?• Entspricht Grösse der Inputdaten, #Blocks und #Inputdaten• ~10-100 Maps/Knoten

11

Page 12: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Details zum User Interface #2

• Reducer:• Reduziert temporäre Werte zu kleinerer Anzahl.• #Reducer ist definiert mittels JobConf.setNumReduceTasks(int).• #Reduces?

• Zwischen 0.95 und 1.75 * #Knoten

12

Page 13: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Pig

13

Teilweise basierend auf Slides von Dr. Bill Howe, University of Washington

Page 14: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Pig / Pig Latin

• Pig ist eine high-level Plattform, um MapReduce Jobs auf Hadoop zu auszuführen:• MapReduce Jobs werden automatisch generiert• Gute Skalierbarkeit für Big Data

• Die Sprache der Plattform heisst Pig Latin:• Ähnlich wie SQL• Kein Java-basiertes “low-level” MapReduce notwendig• Aber:

• User Defined Functions (UDF) können in Java, JavaScript, Python, etc. geschrieben werden

• Pig wurde von Yahoo Research 2006 entwickelt• Seit 2007 ist es ein Apache Projekt

14

Page 15: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Datenmodell

• Atom:• Integer, string, etc

• Tuple:• Sequenz von Werten• Jeder Wert hat einen bestimmten Typ z.B. integer, string

• Bag:• Kollektion von Tuple• Können von unterschiedlichem Typ sein• Duplikate möglich (Unterschied zur Menge)

• Map:• „Dictionary“• String-Werte weisen auf einen beliebigen Wert

15

Page 16: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Beispiel

t = < 7, { <1,2>, <3,4>, <5,6>, <3,4>}, [‘Huber’:’Student’] >

16

f1: Atom f2: Bag f3: Map

Ausdruck Ergebnis$0 1

f2 Bag{ <1,2>, <3,4>, <5,6>, <3,4>}

f2.$0 Bag{ <1>, <3>, <5>, <3>}

f3#’Huber’ Atom “Student”

sum(f2.$0) 1+3+5+3 = 12

Page 17: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Wichtigste Operatoren

• LOAD/STORE:• Laden/Speichern von Daten

• FILTER: • Auslesen von Tupel (entspricht SQL-Select)

• FOREACH: • Bearbeiten von Spalten (entspricht SQL-Projekt)

• GROUP: • Gruppierung von Daten in eine einzige Relation

• COGROUP, inner Join, outer Join:• Gruppierung bzw. Join in zwei oder mehrere Relationen

• UNION:• Vereinigung von Relationen

• SPLIT:• Aufteilung von Relationen 17

Page 18: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

LOAD

• Annahme:• Datei ist ein Bag, d.h. jeder Datensatz wird als Tupel interpretiert

• Parsing mittles Funktion USING• Schemadefintion mittles AS

T = LOAD ‘file.txt’ USING PigStorage(‘\t’) AS (f1, f2, f3)

<9, 2, 3><4, 2, 2><1, 2, 3><9, 3, 2>

18

Page 19: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

FILTER

• Werte ausfiltern• Boolsche Operatoren möglich (AND, OR, etc)• Reguläre Ausdrücke

Y = FILTER T BY f1 == ‘9’;

T= <9, 2, 3> Y = <9, 2, 3><4, 2, 2> <9, 3, 2><1, 2, 3><9, 3, 2>

19

Page 20: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

GROUP

Daten gruppieren

X = GROUP T BY f1;

T= <9, 2, 3> Y = <1, {<1, 2, 3>}><4, 2, 2> <4, {<4, 2, 2>}><1, 2, 3> <9, {<9, 2, 3>, <9, 3, 2>}><9, 3, 2>

20

Name: “group” Name: “T”

Page 21: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

DISTINCT

• Duplikate eliminieren

X1 = DISTINCT T1

T1= <9, 2, 3> X1 = <9, 2, 3><4, 2, 2> <4, 2, 2><1, 2, 3> <1, 2, 3><9, 2, 3>

21

Page 22: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

FOREACH

• Jedes Tupel manipulieren

X = FOREACH T GENERATE f1, f2+f3

Y = GROUP T BY f1;Z = FOREACH Y GENERATE group, Y.($1,$2)

22

T = <9, 2, 3><4, 2, 2><1, 2, 3><9, 3, 2>

X = <9, 5><4, 4><1, 5><9, 5>

Z = <9, {<2, 3>, <3, 2>}><4, {<2 ,2>}><1, {<2, 3>}>

Projektion

Page 23: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

COGROUP

C = COGROUP T f1 BY $0, S BY $0;

23

T = <9, 2, 3><4, 2, 2><1, 2, 3><9, 3, 2>

S = <4, 1><5, 2><6, 3>

C = <1, {<1, 2, 3>}, {}> <4, {<4, 2, 2>}, {4,1}>

<5, {}, {<5, 2>} ><6, {}, {<6, 3>} ><9, {<9, 2, 3>, <9, 3, 2>}, {} >

Page 24: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

JOIN

C = JOIN T BY $0, S BY $1

24

T = <9, 2, 3><4, 2, 2><1, 2, 3><9, 3, 2>

S = <4, 1><5, 2><3, 9>

C = <9, 2, 3, 3><1, 2, 3, 4><9, 3, 2, 3>

Page 25: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Join-Optimierung

• Join-Operator ist nicht automatisch optimiert wie in einer Datenbank

• Replikation (Broadcase Join):• 1 Tabelle ist klein, andere ist gross• Repliziere kleine Tabelle, um Netzwerkbandbreite für Reduze-Phase zu

reduzieren

25

Page 26: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Pig Beispiel: Wörterzählen

input_lines = LOAD '/tmp/my-copy-of-all-pages-on-internet' AS (line:chararray);

-- Extract words from each line and put them into a pig bag -- datatype, then flatten the bag to get one word on each row words = FOREACH input_lines GENERATE FLATTEN(TOKENIZE(line)) AS word;

-- filter out any words that are just white spaces filtered_words = FILTER words BY word MATCHES '\\w+';

-- create a group for each word word_groups = GROUP filtered_words BY word;

-- count the entries in each group word_count = FOREACH word_groups GENERATE COUNT(filtered_words) AS count, group AS word;

-- order the records by count ordered_word_count = ORDER word_count BY count DESC; STORE ordered_word_count INTO '/tmp/number-of-words-on-internet'; 26

Page 27: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Pig vs. MapReduce

• MapReduce kombiniert drei primitive Operationen:• Datensätze prozessieren• Gruppierung• Prozessiere gruppierte Datensätze

• In Pig sind diese Operation• explizit• unabhängig• vollständig kombinierbar

• Pig hat zusätzliche primitive Operation für:• Filter• Projektion• Vereinigung zwei oder mehrerer Datensätze

27

Page 28: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Pig Latin vs. SQL

28

Pig Latin SQL

Prozedural (Sequenz von Schritten) Deklarativ (Sequenz von Constraints)

Schema-on-Read: Jeglicher Datensatz ohne bestimmtes Schema kann gelesen werden

Schema-on-Write: Vordefiniertes Datenschema

Bulk Lese- und Schreiboperationen, keine Indizes und Transaktionen

Bulk und Random Lese- und Schreibeoperationen, Indizes und Transaktionen

Lazy evaluation: Evaluierung eines Ausdrucks erst bei Verwendung

Eager evaluation

Verwendung für ETL Verwendung vor allem für Queries

Implementierung kann spezifiert werden (z.B. für Join)

User definiert Join, jedoch nicht welche Implementierung verwendet wird (Query Optimizer entscheidet)

Einfache Verwendung und Integration von User Defined Functions (UDF)

UDFs nicht so gut integriert wie logische Ausdrücke

Page 29: Zürcher Fachhochschule Einführung in Big Data II Kurt Stockinger 1.

Zürcher Fachhochschule

Pig Latin ist prozedural und erlaubt Pipelining und Checkpointing

Aufgabe: • Join der Sourcen users und clicks• Join mit geoinfo• Aggregation und Speicherung in

ValuableClicksPerDMA

Pig Latin:

Users = load 'users' as (name, age, ipaddr);

Clicks = load 'clicks' as (user, url, value);

ValuableClicks = filter Clicks by value > 0;

UserClicks = join Users by name, ValuableClicks by user;

Geoinfo = load 'geoinfo' as (ipaddr, dma);

UserGeo = join UserClicks by ipaddr, Geoinfo by ipaddr;

ByDMA = group UserGeo by dma;

ValuableClicksPerDMA = foreach ByDMA generate group, COUNT(UserGeo);

store ValuableClicksPerDMA into 'ValuableClicksPerDMA';

29

SQL:

insert into ValuableClicksPerDMA

select dma, count(*)

from geoinfo join (

select name, ipaddr from users join clicks on (users.name = clicks.user) where value > 0;

) using ipaddr group by dma;