Stream Processing Plattformen & die Qual der Wahl · And some mind breaking Bulletpoints 1 • And...
Transcript of Stream Processing Plattformen & die Qual der Wahl · And some mind breaking Bulletpoints 1 • And...
Stream Processing Plattformen & die Qual der Wahl_
Matthias Niehoff
Die Basics
2
Warum Stream Processing?_
3
•Infinite and continuous data
Unendliche, kontinuierliche Daten_
4
•And some mind breaking Bulletpoints 1
•And some mind breaking Bulletpoints 2
•And some mind breaking Bulletpoints 3 • Or some great Sub-Bulletpoints 1 • Or some great Sub-Bulletpoints 2 • Or some great Sub-Bulletpoints 3
• And some mind breaking Bulletpoints 4 • And some mind breaking Bulletpoints 5
Geschwindigkeit & Real Time_
5
Erst Verarbeiten, dann speichern_
6
Persistenz
Query
Persistenzstream processing
stream processing
stream processing
•Unbegrenzter Datenstrom
•Kontinuierliche Verarbeitung, Aggregation und Analyse
•MapReduce ähnliches Verarbeitungsmodell
•In-Memory Verarbeitung
•Latenz im Bereich von Millisekunden oder Sekunden
•Skalieren durch Verteilen
•Häufig modelliert als DAG
Distributed Stream Processing_
7
• Eventzeit: • Zeitpunkt, an dem das Event aufgetreten ist
• Verarbeitungszeit: • Zeitpunkt, an dem das Event vom System beobachtet wurde
Eventzeit vs. Verarbeitungszeit_
8
Event
Verarbeitung
1 2 3 4 5 6 7 8 9t in Minuten
•Differenz ist nicht nur != 0
•Differenz schwankt stark • Ressourcen bedingt (CPU, Netzwerk,..) • Software bedingt (verteilte Systeme..) • Daten bedingt (Schlüsselverteilung, Varianzen in Daten selbst)
• Analyse nach Verarbeitungszeit • einfacher aber ggfs. zu ungenau
• Analyse nach Eventzeit • komplexer, dafür genauer
Eventzeit vs. Verarbeitungszeit_
9
•Nicht triviale Anwendungen benötigen meist einen State • z.b. Aggregationen über einen längeren / unendlichen Zeitraum • (input, state) -> (output, state’) • gespeichert in Memory • interessant im Fehlerfall
State & Window Verarbeitung_
10
• Window als (zeitlich) begrenzter State • Tumbling Window • Sliding Window • Session Window
• Unterschiedliche Trigger • Zeit • Anzahl
Windowing & Sliding_
11
Tumbling Window_
12
Sliding Window_
13
Session Window_
14
Zeit
User 1
User 2 Inaktivität
Inaktivität
•Mit Verarbeitungszeit einfach
•Mit Eventzeit schwerer • Vollständigkeit (out of order Events) • Buffering
• Strategien bei Eventzeit Windows • Watermarks • Trigger • Akkumulation
• Mehr Informationen • https://www.oreilly.com/ideas/the-world-beyond-batch-
streaming-101 • http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf
Window Verarbeitung und Zeiten_
15
Die Kandidaten
16
•Gestartet 2010 durch BackType/Twitter, Apache seit 2013
•Pionier im Big Data / Stream Bereich
•Technologie der Lambda Architektur
•Low Level API
•Spouts und Bolts beschreiben eine Topologie
•Trident: High Level Erweiterung auf Storm Basis • Aggregationen • State & Window Operationen • Join, Merge, Group, --
Apache Storm_
17
•Open Source (2010) & Apache Projekt (2013)
•Einheitliche Batch & Stream Verarbeitung
•Breite Akzeptanz
•RDD als Basis
Apache Spark_
18
•Entwickelt bei LinkedIn, Open Source 2013
•Verfolgt den Log Ansatz von Kafka
•Ausgeführt auf YARN
•Geeignet für große States
•Erweiterbar über APIs
Apache Samza_
19
•Gestartet 2008 als europäisches Forschungsprojekt
•Low Latency Streaming und High Throughput Batch Processing
•Flexible States und Windows
•Streaming First Ansatz
Apache Flink_
20
Die Analyse
21
•Runtime
•Programming Model
•Skalierbarkeit
•Latenz
•Durchsatz
•Resilienz / Delivery Guarantees
•Reife
•Community
Aspekte von Streaming Anwendungen_
22
Runtime - Native Streaming_
23
Empfänger
Verarbeitung
Senke
geringe Latenz geringer Durchsatzflexibel Fehlertoleranz komplexer
Lastenverteilung komplexer
Runtime - Microbatching_
24
Empfänger
Verarbeitung
Senke
Microbatches
hoher Durchsatz höhere Latenzeinfacher Fehlertolerant weniger Flexibel (z.B. Windows)
State Verarbeitung komplexer
•Operatoren und Quellen als Komponenten
•Eigene Komponenten
•manuelle Topologie Definition
Programmiermodell_
25
Komponentenbasiert
•High Level API
•Higher Order Functions
•Abstrakte Datentypen
•Fortgeschrittene Operationen inkludiert
•Eingebaute Optimierungen
Deklarativ
Word Count - Flink_
26
valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaltext=env.socketTextStream("localhost",9999)
valcounts=text.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map(_,1).groupBy(0).sum(1)
counts.print
env.execute("ScalaSocketStreamWordCount")
Word Count - Spark_
27
valsparkConf=newSparkConf().setAppName("StreamingWordCount")valssc=newStreamingContext(sparkConf,Seconds(1))ssc.checkpoint(".")
valmappingFunc=(key:String,value:Option[Int],state:State[Int])=>{valsum=value.getOrElse(0)+state.getOption.getOrElse(0)valoutput=(key,sum)state.update(sum)output}
valwordCountState=StateSpec.function(mappingFunc)
Word Count - Spark_
28
vallines=ssc.socketTextStream(args(0),args(1).toInt)valwords=lines.flatMap(_.split(""))valwordsWithCount=words.map(x=>(x,1))valstateDstream=wordsWithCount.mapWithState(wordCountState)stateDstream.print()ssc.start()ssc.awaitTermination()
Word Count - Storm_
29
TopologyBuilderbuilder=newTopologyBuilder();
builder.setSpout("spout",newRandomSentenceSpout(),5);
builder.setBolt("split",newSplitSentence(),8).shuffleGrouping(„spout");
builder.setBolt("count",newWordCount(),12).fieldsGrouping("split",newFields("word"));
Word Count - Storm_
30
Configconf=newConfig();conf.setMaxTaskParallelism(3);
LocalClustercluster=newLocalCluster();cluster.submitTopology("word-count",conf,builder.createTopology());Thread.sleep(10000);cluster.shutdown();
Word Count - Storm_
31
publicstaticclassWordCountextendsBaseBasicBolt{Map<String,Integer>counts=newHashMap<String,Integer>();publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){Stringword=tuple.getString(0);Integercount=counts.get(word);if(count==null)count=0;count++;counts.put(word,count);collector.emit(newValues(word,count));}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word","count"));}}
Word Count - Storm Trident_
32
TridentTopologytopology=newTridentTopology();TridentStatewordCounts=topology.newStream("spout1",spout).each(newFields("sentence"),newSplit(),newFields("word")).groupBy(newFields("word")).persistentAggregate(newMemoryMapState.Factory(),newCount(),newFields("count")).parallelismHint(6);
Trident
Word Count - Storm Trident_
33
publicclassSplitextendsBaseFunction{publicvoidexecute(TridentTupletuple,TridentCollectorcollector){Stringsentence=tuple.getString(0);for(Stringword:sentence.split("")){collector.emit(newValues(word));}}}
Trident
Word Count - Samza_
34
classWordCountTaskextendsStreamTaskwithInitableTask{
privatevarstore:CountStore=_
definit(config:Config,context:TaskContext){this.store=context.getStore("wordcount-store").asInstanceOf[KeyValueStore[String,Integer]]}
Word Count - Samza_
35
overridedefprocess(envelope:IncomingMessageEnvelope,collector:MessageCollector,coordinator:TaskCoordinator){
valwords=envelope.getMessage.asInstanceOf[String].split("")
words.foreach{key=>valcount:Integer=Option(store.get(key)).getOrElse(0)store.put(key,count+1)collector.send(newOutgoingMessageEnvelope(newSystemStream("kafka","wordcount"),(key,count)))}}
•Maximal mögliche Garantien
•Beeinflussen Performance
•Nicht in jeder Kombination möglich (abhängig von Quelle)
Zustellungsgarantien_
36
Trident
At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once*
•Abhängig von der Runtime
•Höhere Latenz --> höherer Durchsatz
Latenz & Durchsatz_
37
~50ms 500ms 30.000ms
Samza
Storm
Flink
Spark Streaming
Storm Trident
Custom
Latenz & Durchsatz_
38
https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at
Throughput
Late
nz
•Viele Variablen, Unparteiische Tests schwierig
•Latenz vs. Durchsatz
•Delivery Guarantees
•Fehlertoleranz
•Tuning
•Netzwerk, Daten Lokalität, Serialisierung
Performance_
39
•Skalieren durch Partitionierung • Partitionieren der Daten • Partitionieren des Flows
Skalierbarkeit_
40
•Erneute Verarbeitung nicht einfach möglich
•Anfang und Ende schwer zu bestimmen
•State muss auch gesichert werden
•Verschiedene Ansätze • Record Ack • Micro Batching • Transactional Updates • Snapshots
Fault Tolerance_
41
Fault Tolerance - Storm_
42
Ack Ack
Ack
Ack
AckAck
•Fehlgeschlagene Microbatches werden wiederholt
•Batch Acknowledge statt Record Acknowledge
• Checkpoints für States
Fault Tolerance - Spark & Storm Trident_
43
•Transaktionale Updates auf Transaction Log
•Kafka als Transaction Log
Fault Tolerance - Samza_
44
partition 0
partition 1
partition 2
Checkpoint partition 0: offset .. partition 1: offset .. partition 2: offset ..
SamzaKafka
Kafka
•Distributed Checkpoints
Fault Tolerance - Flink_
45
•Native Eventzeitverarbeitung nur in Flink • Out-of-order Events • Watermarks • Trigger
• Eventzeit als Key in anderen Framework möglich • Keine out-of-order Events
Event- & Verarbeitungszeit_
46
finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Das Ergebnis
47
Überblick_
48
Trident
Runtime Nativ Microbatching Microbatching Nativ Nativ
Programmier-modell Komponentenbasiert Deklarativ Komponenten
basiert Deklarativ
Durchsatz Gering Mittel Hoch Hoch Hoch
Latenz Gering Mittel Mittel Gering Gering
Garantien At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once*
Eventzeit Handling Nein Nein Nein Nein Ja
Reife & Community Hoch Hoch Hoch Mittel Mittel
•bereits Spark Batch Anwendungen vorhanden sind
•viele Umsysteme integriert werden
•eine große Community wichtig ist
•Scala kein Problem ist
•Latenz kein Kriterium ist
Spark wenn..._
49
•Sehr niedrige Latenz, niedriges Volumen
•At-Least Once Verarbeitung
•Zustandslose Verarbeitung
•Ggfs. Heron als Alternative
Storm für ..._
50
•Kafka ist omnipräsent
•Große States
•Kein Exactly Once
•Kafka Streams als Alternative
Samza wenn ..._
51
•Für Eventzeit Verarbeitung
•Für pures Streaming
•Sehr gute Konzepte
•Etwas weniger Umsysteme
•Nutzen und Mitarbeit an einem jungen Projekt
Und Flink ..._
52
•Apache Beam • High Level API für Streaming Runner
•Google Cloud Data Flow • Googles Cloud Streaming Framework; Beam Implementierung
•Apex • YARN based direct-streaming with checkpointing
•Flume • Logfile Streaming insb. in HDFS
•Kafka Streams • Streaming integriert in Kafka ab 0.10, einfache Anwendungen
• Heron • Storm Nachfolger, API kompatibel, verbesserter Throughput &
Latency
Ein Satz zu_
53
Questions?
Matthias Niehoff, IT-Consultant
90
codecentric AG Zeppelinstraße 2 76185 Karlsruhe, Germany
mobil: +49 (0) 172.1702676 [email protected]
www.codecentric.de blog.codecentric.de
matthiasniehoff
• Logfile: Linux Screenshots, Flickr
• Sensors, IT Network: Wikipedia
• Devices: Brad Forst, Flickr
• Speed: Rool Paap, Flickr
• Graph: Wikipedia
• Stateful Processing: data-artisans.com
• Window & Sliding Windows, Flink Übersicht, Flink Fault Tolerance: Apache Flink
• Storm Topologien: Apche Storm
• Spark Übersicht: Apache Spark
• Samza Übersicht: Apache Samza
• Unendliche Daten: https://i.ytimg.com/vi/9rE3kbGmP4w/maxresdefault.jpg
Picture Reference_
55