Streaming Plattformen und die Qual der Wahl

Post on 23-Jan-2018

536 views 1 download

Transcript of Streaming Plattformen und die Qual der Wahl

Stream Processing Plattformen & die Qual der Wahl_

@matthiasniehoff

WebTechNight Karlsruhe

•Grundlegende Konzepte vermitteln

•Kriterien zur Auswahl vorstellen

•Ausgewählte Frameworks erläutern

•Unverbindliche Empfehlungen

Ziel des Vortrags_

2

Die Basics

3

Request/Response_

4

Client Server

Batch_

5

Client Server

Streaming_

6

Client Server

Warum Stream Processing?_

7

•Infinite and continuous data

Unendliche, kontinuierliche Daten_

8

•And some mind breaking Bulletpoints 1

•And some mind breaking Bulletpoints 2

•And some mind breaking Bulletpoints 3 • Or some great Sub-Bulletpoints 1 • Or some great Sub-Bulletpoints 2 • Or some great Sub-Bulletpoints 3

• And some mind breaking Bulletpoints 4 • And some mind breaking Bulletpoints 5

Geschwindigkeit & (Near) Real Time_

9

Erst Verarbeiten, dann speichern_

10

Persistenz

Query

Persistenzstream processing

stream processing

stream processing

Stream Processing Plattform?

11

–Tyler Akidau

“ ... an execution engine designed for unbounded data sets, and nothing more”

12

•Unbegrenzter Datenstrom

•Verteilt über verschiedene Knoten

•Kontinuierliche Verarbeitung, Aggregation und Analyse

•MapReduce ähnliches Verarbeitungsmodell

•In-Memory Verarbeitung

•Latenz im Bereich von Millisekunden oder Sekunden

•Skalieren durch Verteilen

•Häufig modelliert als DAG

Distributed Stream Processing_

13

• Eventzeit: • Zeitpunkt, an dem das Event aufgetreten ist

• Verarbeitungszeit: • Zeitpunkt, an dem das Event vom System beobachtet und

verarbeitet wurde

Eventzeit vs. Verarbeitungszeit_

14

Event

Verarbeitung

1 2 3 4 5 6 7 8 9t in Minuten

•Differenz ist nicht konstant • Ressourcen bedingt (CPU, Netzwerk,..) • Software bedingt (verteilte Systeme..) • Daten bedingt (Schlüsselverteilung, Varianzen in Daten selbst)

• Analyse nach Verarbeitungszeit • einfacher aber ggfs. zu ungenau

• Analyse nach Eventzeit • komplexer, dafür genauer

Eventzeit vs. Verarbeitungszeit_

15

•Nicht triviale Anwendungen benötigen meist einen State • z.b. Aggregationen über einen längeren / unendlichen Zeitraum • in-memory • interessant im Fehlerfall

State & Window Verarbeitung_

16

• Window als (zeitlich) begrenzter State • Tumbling Window • Sliding Window • Session Window

• Unterschiedliche Trigger • Zeit • Anzahl

Windowing & Sliding_

17

Tumbling Window_

18

Sliding Window_

19

Session Window_

20

Zeit

User 1

User 2 Inaktivität

Inaktivität

•Mit Verarbeitungszeit einfach

•Mit Eventzeit schwerer • Vollständigkeit (out of order Events) • Buffering

• Strategien bei Eventzeit Windows • Watermarks • Trigger • Akkumulation

• Mehr Informationen • https://www.oreilly.com/ideas/the-world-beyond-batch-

streaming-101 • http://www.vldb.org/pvldb/vol8/p1792-Akidau.pdf

Window Verarbeitung und Zeiten_

21

•Wann werden States materialisiert?

•Watermark: Wann habe ich alle Daten zusammen?

•Trigger: Zusätzliche Auslöser, sowohl früher als auch später

•Akkumulation: Wie werden die einzelne Teile zusammengefügt?

Exkurs: Watermarks, Trigger, Akkumulation_

22

input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2))).triggering(AtWatermark().withEarlyFirings(AtPeriod(Duration.standardMinutes(1))).withLateFirings(AtCount(1)))).withAllowedLateness(Duration.standardMinutes(1)))

.discardingFiredPanes()).apply(Sum.integersPerKey());

vgl. https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

Die Kandidaten

23

•Gestartet 2010 durch BackType/Twitter, Apache seit 2013

•Pionier im Big Data / Stream Bereich

•Technologie der Lambda Architektur

•Low Level API

•Spouts und Bolts beschreiben eine Topologie

•Trident: High Level Erweiterung • Aggregationen • State & Window Operationen • Join, Merge, Group, ...

Apache Storm_

24

•Entwickelt bei LinkedIn, Open Source 2013

•Verfolgt den Log Ansatz von Kafka

•Ausgeführt auf YARN

•Geeignet für große States

•Erweiterbar über APIs

Apache Samza_

25

•Open Source (2010) & Apache Projekt (2013)

•Einheitliche Batch & Stream Verarbeitung

•Hohe Verbreitung

•RDD als Basis, DataFrames/DataSets als Optimierung

Apache Spark_

26

•2010 als Forschungsprojekt gestartet, seit 2014 Apache

•Low Latency Streaming und High Throughput Batch Processing

•Streaming First Ansatz

•Flexible States und Windows

Apache Flink_

27

•Einfache Bibliothek, ohne Laufzeitumgebung

•Benötigt Kafka (0.10)

•Nutzt Kafka Consumer Techniken • Reihenfolge, Partitionierung, Skalierung

• Quelle, Senke: Nur Kafka Topics

• Kafka Connect für andere Datenquellen/senken

• Grundidee: Streams ≈ Tabellen

Kafka Streams_

28

(key1, value1) key1 value1

key1 value3key2 value2

key1 value1key2 value2(key2, value2)

(key1, value3)

Die Analyse

29

•Engine

•Programming Model

•Skalierbarkeit

•Latenz

•Durchsatz

•Resilienz / Delivery Guarantees

•Community

Aspekte von Streaming Anwendungen_

30

Engine - Native Streaming_

31

Empfänger

Verarbeitung

Senke

geringe Latenz geringer Durchsatzflexibel Fehlertoleranz komplexer

Lastenverteilung komplexer

Engine - Microbatching_

32

Empfänger

Verarbeitung

Senke

Microbatches

hoher Durchsatz höhere Latenzeinfacher Fehlertolerant weniger Flexibel (z.B. Windows)

State Verarbeitung komplexer

•Operatoren und Quellen als Komponenten

•Eigene Komponenten

•manuelle Topologie Definition

Programmiermodell_

33

Komponentenbasiert

•High Level API

•Higher Order Functions

•Abstrakte Datentypen

•Fortgeschrittene Operationen inkludiert

•Eingebaute Optimierungen

Deklarativ

Word Count - Flink_

34

valenv=StreamExecutionEnvironment.getExecutionEnvironmentvaltext=env.socketTextStream("localhost",9999)

valcounts=text.flatMap(_.toLowerCase.split("\\W+")).filter(_.nonEmpty).map(_,1).groupBy(0).sum(1)

counts.print

env.execute("ScalaSocketStreamWordCount")

Stateful Operation

Word Count - Spark_

35

valsparkConf=newSparkConf().setAppName("StreamingWordCount")valssc=newStreamingContext(sparkConf,Seconds(1))ssc.checkpoint(".")

valmappingFunc=(key:String,value:Option[Int],state:State[Int])=>{valsum=value.getOrElse(0)+state.getOption.getOrElse(0)valoutput=(key,sum)state.update(sum)output}

valwordCountState=StateSpec.function(mappingFunc)

Word Count - Spark_

36

vallines=ssc.socketTextStream(args(0),args(1).toInt)valwords=lines.flatMap(_.split(""))valwordsWithCount=words.map(x=>(x,1))valstateDstream=wordsWithCount.mapWithState(wordCountState)stateDstream.print()ssc.start()ssc.awaitTermination()

•Spark 2.0 wurde releast • Dataframes für Streaming • Schneller, flexibler, ... • Preview Status

Spark 2.0_

37

Word Count - Storm_

38

TopologyBuilderbuilder=newTopologyBuilder();

builder.setSpout("spout",newRandomSentenceSpout(),5);

builder.setBolt("split",newSplitSentence(),8).shuffleGrouping(„spout");

builder.setBolt("count",newWordCount(),12).fieldsGrouping("split",newFields("word"));

Word Count - Storm_

39

Configconf=newConfig();conf.setMaxTaskParallelism(3);

LocalClustercluster=newLocalCluster();cluster.submitTopology("word-count",conf,builder.createTopology());Thread.sleep(10000);cluster.shutdown();

Word Count - Storm_

40

publicstaticclassWordCountextendsBaseBasicBolt{Map<String,Integer>counts=newHashMap<String,Integer>();publicvoidexecute(Tupletuple,BasicOutputCollectorcollector){Stringword=tuple.getString(0);Integercount=counts.get(word);if(count==null)count=0;count++;counts.put(word,count);collector.emit(newValues(word,count));}publicvoiddeclareOutputFields(OutputFieldsDeclarerdeclarer){declarer.declare(newFields("word","count"));}}

Word Count - Storm Trident_

41

TridentTopologytopology=newTridentTopology();TridentStatewordCounts=topology.newStream("spout1",spout).each(newFields("sentence"),newSplit(),newFields("word")).groupBy(newFields("word")).persistentAggregate(newMemoryMapState.Factory(),newCount(),newFields("count")).parallelismHint(6);

Trident

Word Count - Storm Trident_

42

publicclassSplitextendsBaseFunction{publicvoidexecute(TridentTupletuple,TridentCollectorcollector){Stringsentence=tuple.getString(0);for(Stringword:sentence.split("")){collector.emit(newValues(word));}}}

Trident

Word Count - Samza_

43

classWordCountTaskextendsStreamTaskwithInitableTask{

privatevarstore:CountStore=_

definit(config:Config,context:TaskContext){this.store=context.getStore("wordcount-store").asInstanceOf[KeyValueStore[String,Integer]]}

Word Count - Samza_

44

overridedefprocess(envelope:IncomingMessageEnvelope,collector:MessageCollector,coordinator:TaskCoordinator){

valwords=envelope.getMessage.asInstanceOf[String].split("")

words.foreach{key=>valcount:Integer=Option(store.get(key)).getOrElse(0)store.put(key,count+1)collector.send(newOutgoingMessageEnvelope(newSystemStream("kafka","wordcount"),(key,count)))}}

Word Count - Kafka Streams_

45

KStreamBuilderbuilder=newKStreamBuilder();KStream<String,String>source=builder.stream("streams-file-input");

KTable<String,Long>counts=source.flatMapValues(value->

Arrays.asList(value.toLowerCase().split("\\W+"))).map((key,value)->newKeyValue<>(value,value)).countByKey("Counts")

counts.to(Serdes.String(),Serdes.Long(),"streams-wordcount-output");

KafkaStreamsstreams=newKafkaStreams(builder,props);streams.start();

Stateful Operation

•Skalieren durch Partitionierung • Partitionieren der Daten • Partitionieren des Flows

Skalierbarkeit_

46

Spark - Cluster_

47

Flink - Cluster_

48

•Erneute Verarbeitung nicht einfach möglich

•Anfang und Ende schwer zu bestimmen

•State muss auch gesichert werden

•Verschiedene Ansätze • Record Ack • Micro Batching • Transactional Updates • Snapshots • Change Logs

Fault Tolerance_

49

Fault Tolerance - Storm_

50

Ack Ack

Ack

Ack

AckAck

•Fehlgeschlagene Microbatches werden wiederholt

•Batch Acknowledge statt Record Acknowledge

• Checkpoints für States

Fault Tolerance - Spark & Storm Trident_

51

•Transaktionale Updates auf Transaction Log

•Kafka als Transaction Log

Fault Tolerance - Samza_

52

partition 0

partition 1

partition 2

Checkpoint partition 0: offset .. partition 1: offset .. partition 2: offset ..

SamzaKafka

Kafka

•Distributed Checkpoints

Fault Tolerance - Flink_

53

•Basiert auf der neuen Consumer API

• State Changelog als Kafka Topic (mit Log Compaction)

Fault Tolerance - Kafka Streams_

54

•Maximal mögliche Garantien

•Beeinflussen Performance

•Nicht in jeder Kombination möglich (abhängig von Quelle)

• At-Least-Once + Idempotente Operationen häufig ausreichend

Zustellungsgarantien_

55

Trident

At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once* At-least-once

•Abhängig von der Runtime

•Höhere Latenz --> höherer Durchsatz gilt nur noch teilweise

Latenz & Durchsatz_

56

~50ms 500ms 30.000ms

Samza

Storm

Flink

Spark Streaming

Storm Trident

Custom

Kafka Streams

Latenz & Durchsatz_

57

https://yahooeng.tumblr.com/post/135321837876/benchmarking-streaming-computation-engines-at

Throughput

Late

nz

•Viele Variablen, Unparteiische Tests schwierig

•Latenz vs. Durchsatz

•Delivery Guarantees

•Fehlertoleranz

•Tuning

•Netzwerk, Daten Lokalität, Serialisierung

Performance_

58

•Native Eventzeitverarbeitung in Flink und Storm • Out-of-order Events • Watermarks • Trigger

• Eventzeit als Key in anderen Framework möglich • Keine out-of-order Events • Begrenzte Window Lebenszeit (Kafka)

Event- & Verarbeitungszeit_

59

finalStreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

•Idee der „continuous application“

Queryable State_

60

state Query

stream processing

•In Notebooks (Zeppelin, Spark Notebook,...)

Queryable State_

61

dstream.foreachRDD(rdd=>rdd.toDF().registerAsTable("tweets"))selectcreatedAt,count(1)fromtweetsgroupbycreatedAtorderbycreatedAt

•Direkt in der Streaming Anwendung: Geplant/Umgesetzt • Spark (SPARK-16738) • Kafka Streams (KAFKA-3909, v0.10.1.0) • Flink (FLINK-3779, v1.2.0)

Laufzeitumgebung - Cluster vs. Bibliothek_

62

YARN

Das Ergebnis

63

Überblick_

64

Trident

Engine Nativ Microbatching Microbatching Nativ Nativ Nativ

Programmier-modell Komponentenbasiert Deklarativ Komp. basiert Deklarativ Beides

Durchsatz Mittel Gering Sehr Hoch Hoch Hoch Hoch

Latenz Sehr Gering Mittel Mittel Gering Gering Gering

Garantien At-least-once Exactly-once* Exactly-once* At-least-once Exactly-once* At-least-once

Eventzeit Handling Nein Nein Nein/Ja* Nein Ja Ja*

Reife Hoch Hoch Hoch Hoch Hoch Mittel

Community & Ökosystem Hoch Hoch Hoch Mittel Mittel Mittel

Laufzeit-umgebung Cluster Cluster Cluster Cluster Cluster Bibliothek

•bereits Spark Batch Anwendungen vorhanden sind

•komplexe Analysen ausgeführt werden sollen

•eine Hadoop Distribution vorhanden ist

•viele Umsysteme integriert werden

•eine große Community wichtig ist

•Scala kein Problem ist

•Latenz kein Kriterium ist

Spark Streaming wenn..._

65

•Sehr niedrige Latenz, niedriges Volumen

•At-Least Once Verarbeitung

•Zustandslose Verarbeitung

•Ggfs. Heron als Alternative

Storm für ..._

66

•Für Eventzeit Verarbeitung

•Für pures Streaming

•Sehr gute Konzepte

•Nutzen und Mitarbeit an einem jungen Projekt

Und Flink ..._

67

•Kafka omnipräsent ist

•Die Verarbeitung simpel ist

•Keine Streams korreliert werden

•Wenn das Deployment einfach sein soll

•Ein Scheduler vorhanden ist

Kafka Streams, wenn _

68

•Es bereits verwendet wird ;-)

Samza wenn ..._

69

•And some mind breaking Bulletpoints 1

•And some mind breaking Bulletpoints 2

•And some mind breaking Bulletpoints 3 • Or some great Sub-Bulletpoints 1 • Or some great Sub-Bulletpoints 2 • Or some great Sub-Bulletpoints 3

• And some mind breaking Bulletpoints 4 • And some mind breaking Bulletpoints 5

Weitere Aspekte & Tools_

70

•Apache Beam • High Level API für Streaming Runner

•Google Cloud Data Flow • Googles Cloud Streaming Framework; Beam Implementierung

•Apex • YARN basiert, statische Topologie, Änderungen zur Laufzeit

•Flume • Logfile Streaming insb. in HDFS

• Heron • Storm Nachfolger, API kompatibel, verbesserter Throughput &

Latency

Ein Satz zu_

71

72

•https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101

•https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102

•http://data-artisans.com/apache-flink-apache-kafka-streams/

•https://cloud.google.com/blog/big-data/2016/05/why-apache-beam-a-google-perspective

Leseempfehlungen_

73

Flink Workshop_

74

München: 13.12.2016 Hamburg: tba (Januar/Februar 2017)

https://www.codecentric.de/2016/10/19/apache-flink-workshops-on-tour/

75

Fragen?_

Vielen Dank!

Matthias Niehoff IT-Consultant

90

codecentric AG Gartenstraße 69b 76135 Karlsruhe, Germany

mobil: +49 (0) 172.1702676 matthias.niehoff@codecentric.de

www.codecentric.de blog.codecentric.de

matthiasniehoff

• Logfile: Linux Screenshots, Flickr

• Sensors, IT Network: Wikipedia

• Devices: Brad Forst, Flickr

• Speed: Rool Paap, Flickr

• Graph: Wikipedia

• Stateful Processing: data-artisans.com

• Window & Sliding Windows, Flink Übersicht, Flink Fault Tolerance: Apache Flink

• Storm Topologien: Apache Storm

• Spark Übersicht: Apache Spark

• Samza Übersicht: Apache Samza

• Unendliche Daten: https://i.ytimg.com/vi/9rE3kbGmP4w/maxresdefault.jpg

• Kafka Streams FT: http://image.slidesharecdn.com/kafkastreams-kafkasummitneha-160511194334/95/introducing-kafka-streams-largescale-stream-processing-with-kafka-neha-narkhede-37-638.jpg?cb=1462995961

Reference_

77

• http://de.slideshare.net/PetrZapletal1/distributed-realtime-stream-processing-why-and-how-20

• http://data-artisans.com/high-throughput-low-latency-and-exactly-once-stream-processing-with-apache-flink/

• https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101

• https://databaseline.files.wordpress.com/2016/03/apache-streaming-technologies3.png

• O Rly from @ThePracticalDev

• Pictures partly from Apache Flink and Apache Spark

Reference_

78