JVM Concurrency auf der JavaLand am 8. März 2016

63
Javaland 2016 Concurrency-Modelle auf der JVM Lutz Hühnken @lutzhuehnken https://github.com/lutzh/concurrency_talk

Transcript of JVM Concurrency auf der JavaLand am 8. März 2016

Javaland 2016

Concurrency-Modelle auf der JVM

Lutz Hühnken @lutzhuehnken

https://github.com/lutzh/concurrency_talk

Nebenläufigkeit in der Literatur

Provokante These (bitte später beim Bier diskutieren):

Java braucht eine neue Concurrency - Standardbibliothek!

Warum reichen Threads nicht aus?

Problem 1: Effizienz (bzw. Mangel an derselben)

CPU

Threads

Arbeit

1 2 3 … 10.000

4-core CPU

Tausende Threads!

Jede Menge Arbeit!

Das ist, wo wir hinwollen:

Von

ThreadProzess

Prozess „Worker“ „Task“

Zu

Siehe auch https://en.wikipedia.org/wiki/Reactor_pattern

Wichtiger Aspekt: (blocking) I/O

Wenn jeder Thread nur jeweils einen Task hat:

(Dies ist eine Momentaufnahme, kein zeitlicher Ablauf)

Niemals blockieren!

Wenn ein Thread ein „Worker“ ist, der viele Tasks bearbeitet:

(Dies ist eine Momentaufnahme, kein zeitlicher Ablauf)

Effizienz-Problem: Gelöst!

• Nebenläufigkeit unterhalb des Thread-Levels (Tasks) • Asynchrone I/O

• Dies haben alle Modelle, die wir uns ansehen werden, gemeinsam!

• Das haben übrigens auch alle reaktiven* Frameworks gemeinsam!

* http://www.reactivemanifesto.org

Warum reichen Threads nicht aus?

Problem 1: Effizienz (bzw. Mangel an derselben)

Problem 2: Programmiermodell

They discard the most essential and appealing properties of sequential computation: understandability, predictability, and determinism. Threads, as a model of computation, are wildly nondeterministic, and the job of the programmer becomes one of pruning that nondeterminism.

Quelle: John Rose, Java VM Architect, JFokus, Stockholm, February 2015

Exkurs: Die Callback-Höllefs.readdir(source, function(err, files) { if (err) { console.log('Error finding files: ' + err) } else { files.forEach(function(filename, fileIndex) { console.log(filename) gm(source + filename).size(function(err, values) { if (err) { console.log('Error identifying file size: ' + err) } else { console.log(filename + ' : ' + values) aspect = (values.width / values.height) widths.forEach(function(width, widthIndex) { height = Math.round(width / aspect) console.log('resizing ' + filename + 'to ' + height + 'x' + height) this.resize(width, height).write(destination + 'w' + width + '_' + filename, function(err) { if (err) console.log('Error writing file: ' + err) }) }.bind(this)) } }) }) } })

Zugegeben: Das könnte man schöner schreiben

• Scala Futures / for-expressions • async / await (auch Scala) • JavaScript Promises (then.. then.. then)

• All das ist „syntactic sugar“ für Callbacks - was eine gute Sache ist!

• Aber lasst uns nach anderen Ansätzen für asynchrone Komposition schauen.

Also was interessiert uns?

KompositionZustand

Integration, I/O (async und sync)

Vorhersagbarkeit

Coroutines (Green Threads, User Mode Threads, IOC Threads, Fibers)

Quasar Fibershttp://www.paralleluniverse.co/quasar/

Channels Clojure core.asynchttps://github.com/clojure/core.async/

Event Loop vert.xhttp://vertx.io

Actor Model Akkahttp://akka.io

Concurrency - Modelle

Fibers

new Fiber<V>() { @Override protected V run() throws SuspendExecution,

InterruptedException { // code here }}.start();

sieht aus wie ein Thread, redet wie ein Thread

class RetrieveInfo extends Fiber<Void> { ... @Override public Void run() {

final CloseableHttpClient client = FiberHttpClientBuilder.create().build();

try { String response =

client.execute(new HttpGet(MessageFormat.format(url, searchTerm)), new BasicResponseHandler());

addResult(key, StringUtils.abbreviate(response, 1000));

} catch (IOException iox) { ...

Use like blocking

Need to aggregate results..

„fiber blocking“ async http client

private final Map<String, String> store = new ConcurrentHashMap<>();

public void define(final String searchTerm) { for (String currentKey : sources.keySet()) { new RetrieveInfo(currentKey, sources.get(currentKey), searchTerm).start(); } }

/** * This method is working on shared mutual state - this is what we look to avoid! */ void addResult(final String key, final String result) { store.put(key, result); ...Methode, um Ergebnis entgegenzunehmen - muss „locken" - wollen wir nicht!!

shared mutable state

Starte Web-Aufrufe parallel im Hintergrund

Fibers

• Größter Vorteil: Imperative Programmierung, wie wir es von Threads kennen.

• Größter Nachteil: Imperative Programmierung, wie wir es von Threads kennen!

class FooAsync extends FiberAsync<String, FooException> implements FooCompletion { @Override public void success(String result) { asyncCompleted(result); }

@Override public void failure(FooException exception) { asyncFailed(exception); }}

String op() { new FooAsync() { protected void requestAsync() { Foo.asyncOp(this); } }.run();}

Verwandelt callback APIs in fiber blocking APIs!

Fibers Zusammenfassung

• Thread-artiges Programmiermodell • Nette „Tricks“: Instrumentation,

Suspendable, Thread Interop • Fibers alleine sind ziemlich „low level“ • (Fast) 1:1 Ersatz für Threads

Channels

Channels

• Theoretische Basis: Communicating Sequential Processes (Tony Hoare 1978, https://en.wikipedia.org/wiki/Communicating_sequential_processes)

• Elementarer Bestandteil von „Go“ • Unser Beispiel: Clojure core.async

Channels

(def echo-chan (chan))

(go (println (<! echo-chan)))

(>!! echo-chan „ketchup")

; => true ; => ketchup

go block (wie ein Fiber)

>!! blocks thread

<! blocks go block

(def echo-buffer (chan 2)) (>!! echo-buffer "ketchup") ; => true (>!! echo-buffer "ketchup") ; => true (>!! echo-buffer "ketchup") ; blocks

buffered channel - async

(def responses-channel (chan 10))

(go (println (loop [values []] (if (= 3 (count values)) values (recur (conj values (<! responses-channel)))))))

(defn async-get [url result] (http/get url #(go (>! result (:body %)))))

(async-get "http:m-w.com..." responses-channel)(async-get "http:wiktionary.." responses-channel)(async-get "http:urbandictionary.." responses-channel)

callback puts result in channel

channel for responses

Aggregate results recursively

Channels

• Erlaubt Datenfluss-orientierte Programmierung, flexible Komposition (z.B. alts!)

• Reduktion: Immutability, sequentielles Konsumieren

• go block / thread Dualität erlaubt Integration mit async und sync Libraries

• Channels als Hüter des Zustandes?

Channels Zusammenfassung

Event Loop

Channels

• Event loop model - Vorbild node.js • „multi event loop“, auf der JVM • Polyglott - Java, Scala, Clojure, JRuby, JS.. • Keine Einheiten (Verticles) deployen, die

per JSON kommunizieren

vert.x

public class Receiver extends AbstractVerticle {

@Override public void start() throws Exception {

EventBus eb = vertx.eventBus();

eb.consumer("ping-address", message -> {

System.out.println("Received message: " + message.body()); // Now send back reply message.reply("pong!"); });

System.out.println("Receiver ready!"); }}

register on event bus

extend Verticle

connect

MessageConsumer<JsonObject> consumer = eventBus.consumer("definer.task");consumer.handler(message -> { httpClient.getNow(message.body().getString("host"), message.body().getString("path"), httpClientResponse -> { httpClientResponse.bodyHandler(buffer -> { eventBus.send("collector", new JsonObject() .put(...)

callback based async http

register

send result as message

Vertx vertx = Vertx.vertx();EventBus eventBus = vertx.eventBus();DeploymentOptions options = new DeploymentOptions().setInstances(sources.size());

vertx.deployVerticle("de.huehnken.concurrency.Task", options, msg -> { for (String currentKey : sources.keySet()) { eventBus.send("definer.task", new JsonObject() .put(..)

which one gets the message?

deploy multiple

private final Map<String, String> store = new HashMap<>();

@Overridepublic void start(Future<Void> startFuture) { EventBus eventBus = vertx.eventBus();

MessageConsumer<JsonObject> consumer = eventBus.consumer("collector"); consumer.handler(message -> { store.put(message.body().getString(„key"),

message.body().getString("definition"));

wait for results..

encapsultate state in Verticle

messages are usually JSON

Event Bus (vert.x)

Image from Jonas Bandi @jbandi

Channels

• (Sehr) lose Kopplung • Reduktion: Events und „Single Thread Illusion“ • Hybrides Thread-Modell erlaubt integration

synchroner APIs, und dem Event-Loop Arbeit woanders abzuladen.

• Zustands erfordert etwas Aufmerksamkeit • APIs sind sehr Callback-lastig, node.js inspiriert • Verteilt! Einheitliches Modell für lokale und

verteilte Nebenläufigkeit!

vert.x Summary

Actors

Channels

• The actor model [..] is a [model] that treats "actors" as the universal primitives of concurrent computation: in response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received. https://en.wikipedia.org/wiki/Actor_model

• Entwickelt von Carl Hewitt 1973 • Verbreitung durch Erlang

Akka

Actors (Akka)

Actors (Akka)

class Coordinator(searchTerm: String) extends Actor {

var results = Map[String, String]()

def receive = { case StartCommand => sources foreach { case (key, url) =>

context.actorOf(Task.props(key, self)) ! GetDefinition(url) } context.system.scheduler.scheduleOnce(5 seconds, self, Done)

case Result(key, definition) => results += key -> definition if (results.size == sources.size) { self ! Done } case Done =>

erzeuge Child Actors, sende Nachricht

Zustand, gekapselt

aggregiere Ergebnisse

class Task(key: String, coordinator: ActorRef) extends Actor {

def receive = { case GetDefinition(url) => http.singleRequest(HttpRequest(uri = url)) pipeTo self

case HttpResponse(StatusCodes.OK, headers, entity, _) => coordinator ! Result(key, entity.dataBytes.utf8String)

case HttpResponse(code, _, _, _) => // handle error

send result to aggregator

async call, translated to message

Actors (Akka)

Channels

• Reduktion: Nachrichten und „Single Thread Illusion“

• Messages (as opposed to events) • „Dispatcher“ erlauben Integration synchroner APIs

und das Abladen von Arbeit • pipeTo & ask zur Integration von asynchronen APIs • Verteilt! Einheitliches Modell für lokale und

verteilte Nebenläufigkeit! • Supervision! Fehlerbehandlung eingebaut!

Actors Summary

Concurrency-Modelle ZusammenfassungTasks (sub-thread level)

Asynchronous Messaging

Distribution (unified model)

Supervision

Fibers✔

Channels (core.async) ✔ ✔

Event Bus (vert.x) ✔ ✔ ✔

Aktoren (Akka) ✔ ✔ ✔ ✔

• Im Bereich Concurrency ist einiges los! • Threads are passé! • Sie sind die Basis auf OS- und JVM-Ebene, aber Entwickler

benötigen ein anderes, besseres Modell. • Alternativen sind vorhanden, auf der JVM, hier und heute. • Wenn eure Zeit knapp ist, und ihr euch nur ein anderes

Modell ansehen könnt, empfehle ich Akka.

Zusammenfassung

Der Kreis schließt sich - noch ein Buch

Vielen Dank

Lutz Hühnken @lutzhuehnken

https://github.com/lutzh/concurrency_talk

Aber ich will doch nur einen kleinen REST-Service bauen..

• Wenn du nicht auf dieser Ebene programmierst - kenne deine Frameworks!

• Halte Ausschau nach „reactive“ oder „async“ • Play! • Akka HTTP • Red Hat vert.x • (Pivotal Reactor? Rat pack?)

Achtung bei Benchmarks!

• Ungeeignet: CPU-lastige Benchmarks • Ungeeignet: Alles, was synchrone I/O

verwendet • Es geht nicht um die Geschwindigkeit bei

einzelnen Requests, sondern um Skalierbarkeit (Little’s Law, L=𝛌W)

Agents

(def x (agent 0))

(defn increment [c n] (+ c n))

(send x increment 5) ; @x -> 5

(send x increment 10) ; @x -> 15

„wrap“ a value in an agent

send the code to the agent to modify value!

a function

Agenten

• Agenten kapseln Zustand • Sende ausführbaren Code an Agenten, um

Zustand zu ändern • Interessantes Feature, besonders in der

Funktionalen Programmierung • Der Code wird asynchron ausgeführt, aber allein

ist es nicht wirklich ein Concurrency-Modell

Der Vollständigkeit halber

• Quasar bietet auch Channels, und sogar Actors

• Akka bietet auch Busse (lokal und verteilt), und Agenten.

• Vieles stammt von außerhalb der JVM-Welt (Go Channels, node.js, Erlang Actors)

Jetzt ist wirklich Schluss!

Lutz Hühnken @lutzhuehnken

https://github.com/lutzh/concurrency_talk

Image Credits

Cat on hot tin roof - from "McPhillamyActorBlog" - http://mcphillamy.com

Agents - from "Matrix Wiki" - http://matrix.wikia.com/wiki/Agent

Event Loop - http://www.freeimages.com/photo/big-looping-1522456

Channel - http://www.freeimages.com/photo/white-cliffs-of-dover-1256973

Fibers - http://www.freeimages.com/photo/fiber-optics-1503081