Was sind „Reaktive Streams“? - java-forum-stuttgart.de · 2 Agenda Verarbeitungsparadigmen...
Transcript of Was sind „Reaktive Streams“? - java-forum-stuttgart.de · 2 Agenda Verarbeitungsparadigmen...
Was sind „Reaktive Streams“?
Java Forum Stuttgart 2018
Jörg Hettel
www.hs-kl.de 2
Agenda
Verarbeitungsparadigmen Pull- versus Push-Verarbeitung
„Java 8“-Streams Concurrency-Model
Grundprinzipien der reaktive Programmierung
Reactive Streams Beispiel mit RxJava Concurrency-Model
Fazit
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 3
Pull versus Push
© Jörg Hettel, Hochschule Kaiserslautern
Fabrik Lager
Händler
push pull
www.hs-kl.de 4
Pull-Verarbeitung
// Datenquelle List<Bestellposition> bestellung = ...; // Iterator Iterator<Bestellposition> itr = bestellung.iterator(); while( itr.hasNext() ) { Bestellposition bestellPos = itr.next(); verarbeite(bestellPos); } // Foreach-Schreibweise for(Bestellposition bestellPos : bestellung ) { verarbeite(bestellPos); }
Beispiel: Iteration über eine Liste
Elemente werden der Reihe nach verarbeitet
Aktiver Zugriff auf jedes Element
Code entspricht dem Programmfluss
www.hs-kl.de 5
Push-Verarbeitung
// Datenquelle Button btn = new Button(); // Datenverarbeitung btn.setOnAction( event -> verarbeite(event) ); // Datenauslieferung (Irgendwo im Code ) btn.fireEvent( new ActionEvent() );
Beispiel: Event-Auslieferung
Event wird verarbeitet, wenn es ausgelöst wird
Code entspricht nicht mehr dem Programmfluss
Callback wird zu einem späteren Zeitpunkt aufgerufen
Consumer
„Java 8“-Streams
www.hs-kl.de 7
Java 8 Streams
„Java 8“-Streams entsprechen einer Iterationsabstraktion für Datensammlungen
Deklarative Beschreibung einer Pull-Verarbeitung
Interne Iteration anstatt äußere
Nur noch Beschreibung von dem was gemacht werden soll:
streamOfStrings.forEach( ... );
str -> System.out.println(str)
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 8
Arbeiten mit Streams
„Java 8“-Streams
Verarbeitung von Datensammlungen, wie z.B. Collections
Entspricht einer „Pipeline“-Verarbeitung
Daten-
quelle Ergebnis
Stream-
erzeugung Stream-
verarbeitung Stream-
Auswertung
• map( … )
• filter( … )
• flatMap( … )
• peek( … )
• distinct()
• sorted()
• ...
• forEach( … )
• findAny( … )
• collect( … )
• reduce( … )
• ...
• aus Collections
• aus Iteratoren
• aus Generatoren
• ...
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 9
Parallele Streams
Parallelisierung durch Fork/Join-Mechanismus
Daten-
quelle Ergebnis
Stream-
verarbeitung
Stream-
verarbeitung
Stream-
verarbeitung
Stream-
verarbeitung
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 10
Beispiel: „Mutable Reduction“
List<Kunde> kunden = ...;
List<String> seqToList = kunden.stream()
.filter( k -> k.getAlter() > 18 )
.map( k -> k.getName() )
.collect( Collectors.toList() );
List<String> parToList = kunden.parallelStream()
.filter( k -> k.getAlter() > 18 )
.map( k -> k.getName() )
.collect( Collectors.toList() );
www.hs-kl.de 11
Concurrency - Modell
Bei Parallel-Streams wird die Verarbeitung einer „Datensammlung“ parallelisiert.
Sammlung von Operationen (Lambda-Ausdrücken) wird durch die Anwendung einer terminalen Operation auf die einzelnen Datenelemente ausgelöst.
Kontrollfluss wird aufgespaltet und wieder zusammengeführt
Terminale Operationen sind blockierend
© Jörg Hettel, Hochschule Kaiserslautern
Datenverarbeitung Datenverarbeitung
Reaktive Programmierung
www.hs-kl.de 13
Motivationsbeispiel
Berechnung von (x + y)2
Darstellung als „Berechnungsgraph“
© Jörg Hettel, Hochschule Kaiserslautern
x y
x + y
( x + y )2
www.hs-kl.de 14
Beispiel für eine reaktive Verarbeitung
Kalkulationsprogramm:
© Jörg Hettel, Hochschule Kaiserslautern
Definition der
Verarbeitung
Zellen als Datenquellen
www.hs-kl.de 15
Datenflussorientierte Push-Verarbeitung
Daten werden in eine Verarbeitungspipeline „eingespeist“.
© Jörg Hettel, Hochschule Kaiserslautern
x y
x + y
( z )2
3 5
3 5
8
Consumer
64
www.hs-kl.de 16
Codebeispiel mit RxJava
// Datenquellen PublishSubject<Integer> sourceX = PublishSubject.create(); PublishSubject<Integer> sourceY = PublishSubject.create(); // Definition der Datenverarbeitung (Datenflussgraph) Observable.zip(sourceX, sourceY, (x,y) -> x + y ) .map( z -> z*z ) .subscribe( System.out::println ); // Eingabe der Daten sourceX.onNext(3); sourceY.onNext(5); sourceX.onNext(6); sourceY.onNext(4); sourceX.onComplete(); sourceY.onComplete();
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 17
Operatoren
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 18
Codebeispiel mit RxJava (asynchron)
// Datenquellen PublishSubject<Integer> sourceX = PublishSubject.create(); PublishSubject<Integer> sourceY = PublishSubject.create(); Observable<Integer> val1 = sourceX.observeOn(Schedulers.computation()); Observable<Integer> val2 = sourceY.observeOn(Schedulers.computation()); // Datenfluss Observable.zip(val1, val2, (x,y) -> x + y ) .map( z -> z*z ) .subscribe( System.out::println ); // Eingabe der Daten sourceX.onNext(3); sourceY.onNext(5); sourceX.onNext(6); sourceY.onNext(4); sourceX.onComplete(); sourceY.onComplete();
www.hs-kl.de 19 © Jörg Hettel, Hochschule Kaiserslautern
Bemerkung: ReactiveX
ReactiveX ist eine API für die asynchrone Programmierung
http://reactivex.io
ReactiveX kennt „beobachtbare“ Streams
Sogenannte Push-Streams
Das API ist eine Kombination von
Observer-Pattern
Iterator-Pattern
Funktionaler Programmierung
Es existieren zahlreiche „Portierungen“
RxJava, RxJS, RxScala, Rx.NET, …
www.hs-kl.de 20
Programmierparadigma bei dem:
der Datenfluss im Vordergrund steht
eine asynchrones Verarbeitung sehr einfach möglich ist
Oft das Standardverhalten
der Programmcode oft viel verständlicher ist
Keine Callback-Hölle
1 Salvaneschi, Guido and Amann, Sven and Proksch, Sebastian and Mezini, Mira: An Empirical Study on Program Comprehension with Reactive Programming, Proceedings of the 22nd ACM SIGSOFT International Symposium on Foundations of Software Engineering, 2014
© Jörg Hettel, Hochschule Kaiserslautern
Warum reaktive Programmierung?
1
Reactive Streams
www.hs-kl.de 22
Reactive Stream
Initiative zur Definition eines Standards für die asynchrone Verarbeitung von Streams (Datenflüssen) mit non-blocking back pressure
Mitglieder: Netflix, Oracle, Red Hat, Twitter, …
Back pressure: Consumer kann dem Producer mitteilen, wie viele Daten er haben möchte bzw. verarbeiten kann.
Initiative hat einen entsprechenden Standard verabschiedet
http://www.reactive-streams.org
Spezifikation besteht aus vier Interfaces und einem Spezifikationsdokument
Keine high order functions
Java 9 hat die vier Interfaces in der Flow-Klasse aufgenommen
© Jörg Hettel, Hochschule Kaiserslautern
Reactive Streams
www.hs-kl.de 23 © Jörg Hettel, Hochschule Kaiserslautern
Reactive-Stream-Support bei Java
www.hs-kl.de 25
Back Pressure
Subscriber „steuert“ die Auslieferung der Daten
Datenquelle (Publisher)
Subscriber onNext( .. ) { }
Fordert N Daten an
Datenauslieferung an onNext() wird
z.B. asynchron ausgeführt
www.hs-kl.de 26
Reactive Streams unterstützen „push“-Verarbeitung
Entsprechen „aktiven“ Datenquellen (Datenflussorientiert)
Synchrone und asynchrone Datenauslieferung möglich
Anwendungsfälle
Verarbeitung von Echtzeitdatenquellen (Sensoren, Roboter, etc.)
Monitor- und Analysewerkzeuge
Verarbeitung von Daten, die verzögert eintreffen
Übertragung großer/unendlicher Datenmengen
…
Reactive Streams
www.hs-kl.de 27
Java-Frameworks mit Reactive-Streams-Unterstützung
RxJava ab Version 2.0
Reactor (wird von Spring benutzt)
Vert.x
Akka-Streams
…
© Jörg Hettel, Hochschule Kaiserslautern
Reactive Streams Implementierungen
Anwendungsbeispiel mit
Reactive Streams
www.hs-kl.de 29
Beispiel: Sensor-Monitoring
Sensor A (Publisher)
Subscriber
Auslieferungen der Daten erfolgt in eigenen Threads
push
Subscriber
Subscriber Subscriber
Subscriber
Sensor B (Publisher)
push
www.hs-kl.de 30
Datenflussdefinition
// Datenquellen Flowable<Integer> sensorDataA = SensorDataPublisher.getDataSensorA(); Flowable<Integer> sensorDataB = SensorDataPublisher.getDataSensorB(); // Definition der Datenflüsse sensorDataA.subscribe(seriesAChart); sensorDataA.buffer(10, 1) .map(l -> (int) l.stream() .mapToInt(Integer::intValue) .average().getAsDouble()) .subscribe(seriesASlidingAverageChart); sensorDataB.subscribe(seriesBChart); sensorDataB.buffer(10, 1) .map(l -> (int) l.stream() .mapToInt(Integer::intValue) .average().getAsDouble()) .subscribe(seriesBSlidingAverageChart); Flowable.zip(sensorDataA, sensorDataB, (v1, v2) -> v1 + v2) .subscribe(value -> Platform.runLater(() -> repaintGradient(value)));
www.hs-kl.de 31
Beispiel: „DataChart als Subscriber“
public class SensorDataChart implements Subscriber<Integer> { ... @Override public void onNext(Integer sensorValue) { if (this.xValue >= 50) { this.xValue = 0; Platform.runLater( () -> this.series.getData().clear() ); } Platform.runLater( /* Plot (x,y) */ ); this.xValue++; } @Override public void onComplete() { ... } @Override public void onError(Throwable exce) { ... } @Override public void onSubscribe(Subscription subscription) { this.subscription = subscription; this.subscription.request(Long.MAX_VALUE); } }
www.hs-kl.de 32
Daten-
verarbeitung
Concurrency-Modell
Bei Reactive-Streams werden Datenflüsse parallelisiert
Design des Programms entlang der Datenflüsse
Datenfluss wird mit push-Operationen bearbeitet und einem Subscriber verarbeitet
Verarbeitung erfolgt asynchron (nicht-blockierend)
© Jörg Hettel, Hochschule Kaiserslautern
Daten-
verarbeitung
Daten-
quelle
Daten-
quelle
Subscriber
Subscriber
Reaktive Systeme
www.hs-kl.de 34
Das Reaktive Manifest
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 35
www.reactivemanifesto.org
Reaktive Systeme sind:
Antwortbereit (responsive): Das System antwortet unter allen Umständen zeitgerecht, solange dies überhaupt möglich ist.
Widerstandsfähig (resilient): Das System bleibt selbst bei Ausfällen von Hard- oder Software antwortbereit.
Elastisch (elastic): Das System bleibt auch unter sich ändernden Lastbedingungen antwortbereit.
Nachrichtenorientiert (message driven): Das System verwendet asynchrone Nachrichtenübermittlung zwischen seinen Komponenten zur Sicherstellung von deren Entkopplung und Isolation sowie zwecks Übermittlung von Fehlern an übergeordnete Komponenten.
© Jörg Hettel, Hochschule Kaiserslautern
www.hs-kl.de 36
Reaktive Systeme:
Systemarchitektur für verteilte Systeme
Entwicklungsframeworks
Z.B.: Akka, Vert.x
Reaktive Programmierung
Programmierparadigma für responsive Anwendungen
Asynchrone Datenverarbeitung
Geeignet für die Implementierung von „Komponenten“ eines reaktiven Systems
Jonas Bonér und Viktor Klang: Reactive Programming versus Reactive Programming, Lightbend, 2016
© Jörg Hettel, Hochschule Kaiserslautern
Reaktive Systeme vs. reaktive Programmierung
www.hs-kl.de 37
Take Home Message
„Java 8“-Streams ≠ Reactive Streams
„Java 8“-Streams realisieren eine blockierende parallele
Verarbeitung einer Datensammlung
Einsatzbereich: Beschleunigung der Bearbeitung einer Datensammlung innerhalb einer Anweisungsfolge (Kontrollfluss)
Reactive-Streams realisieren die parallele asynchrone
Verarbeitung von verschiedenen Datenströme
Einsatzbereich: Parallele (asynchrone) Verarbeitung von mehreren Datenquellen (Datenflüssen)
© Jörg Hettel, Hochschule Kaiserslautern
Jörg Hettel
Hochschule Kaiserslautern Campus Zweibrücken Fachbereich Informatik
eMail: [email protected]
Gibt es Fragen?