Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not...

45
Verteilte Algorithmen TI5005 Th. Letschert TH Mittelhessen Gießen University of Applied Sciences Netzwerk-Programmierung mit Scala / Akka – Klassische Netzwerk-Programmierung in Scala – akka.io – Entfernte Aktoren – Entfernte Kommunikation als Pysical- und Link-Layer

Transcript of Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not...

Page 1: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Verteilte Algorithmen TI5005Th. Letschert

TH Mittelhessen Gießen

University of Applied Sciences

Netzwerk-Programmierung mit Scala / Akka– Klassische Netzwerk-Programmierung in Scala– akka.io

– Entfernte Aktoren – Entfernte Kommunikation als Pysical- und Link-Layer

Page 2: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 2

Java-Netzwerk-Programmierung

Klassische Java-Netzwerk-I/OIn Scala steht der komplette Satz Java-Klassen zur Verfügung

Dieser kann für Netzwerk-I/O eingesetzt werden.

TCP-KommunikationBeispiel einfache Echo-Anwendung (1: Server)

import java.io.{ BufferedReader, IOException, InputStreamReader, PrintWriter}import java.net.{Socket, ServerSocket}import scala.util.{Try, Success, Failure}

object EchoServer_App extends App { val ECHO_PORT = 4713; val serverSocket : ServerSocket = new ServerSocket(ECHO_PORT); for (i <- 1 to 10) { val sock = serverSocket.accept(); val pw = new PrintWriter(sock.getOutputStream()); val stream = new InputStreamReader(sock.getInputStream()); val reader = new BufferedReader(stream); val msg = reader.readLine(); val response = "Echo: "+ msg; pw.println(response); pw.flush(); reader.close(); pw.close(); sock.close(); } serverSocket.close()}

Page 3: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 3

Java-Netzwerk-Programmierung

TCP-KommunikationBeispiel einfache Echo-Anwendung (2: Client)

import java.net.Socketimport java.io.{BufferedReader, InputStreamReader, PrintWriter}

object EchoClientApp extends App { val ECHO_PORT = 4713;

val socket = new Socket("127.0.0.1", ECHO_PORT) val pw = new PrintWriter(socket.getOutputStream()); val stream = new InputStreamReader(socket.getInputStream()); val reader = new BufferedReader(stream);

pw.println("Hallo"); pw.flush();

println("Client awaits echo"); val response = reader.readLine(); println("Received from Server: "+ response);

reader.close(); pw.close(); socket.close(); }

Page 4: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 4

Java-Netzwerk-Programmierung

UDP-Kommunikation Beispiel einfache Echo-Anwendung

import java.net.{DatagramPacket, DatagramSocket, InetAddress}

object EchoServer_App extends App { val ECHO_PORT = 4713; val dtgrmSocket = new DatagramSocket(ECHO_PORT) val buf = new Array[Byte](256) val rcvpkt = new DatagramPacket(buf, buf.length);

while (true) { dtgrmSocket.receive(rcvpkt); println("Server hat Daten empfangen: " + new String(rcvpkt.getData(), 0, rcvpkt.getLength())); val clientAdr = rcvpkt.getAddress(); val clientPort = rcvpkt.getPort(); println("Von Rechner: " + clientAdr.getHostAddress() + " an Port: "+clientPort); val sndpkt = new DatagramPacket(buf, buf.length, clientAdr, clientPort); dtgrmSocket.send(sndpkt); } }

import java.net.{DatagramPacket, DatagramSocket, InetAddress, SocketException}

object EchoClient_App extends App {

val ECHO_PORT = 4713; val host = "127.0.0.1" val dtgrmSocket = new DatagramSocket() val serverAddress = InetAddress.getByName(host); val msg = "Hallo wer da?"; val buf = msg.getBytes(); val pkt = new DatagramPacket(buf, buf.length, serverAddress, ECHO_PORT); dtgrmSocket.send(pkt); dtgrmSocket.receive(pkt); println("Client hat empfangen: " + new String(pkt.getData())); }

Server

Client

Page 5: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 5

Java-Netzwerk-Programmierung

NIO In Scala können natürlich auch alle modernen Kommunikations-Mittel von Java verwendet werden

– NIO (ab Java 1.4) : Channel, Selector – NIO.2 (ab Java 7) : Asynchrone I/O

NIO / NIO.2 ist schwer direkt nutzbar

meist nutzt man (low level) Frameworks, die NIO-Features zugänglich machen (z.B. Netty [http://netty.io/] )

Page 6: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 6

Aktoren und entfernte Kommunikation

Aktoren und entfernte Kommunikation Aktoren und Netzwerk-Kommunikation können auf verschiedene Arten kombiniert werden:

– Lokal: Aktor; Entfernt: Thread-basierte Kommunikationfür die entfernte Kommunikation werden klassische Thread-basierte Kommunikations-Mechanismen verwendet

Lokal kommen bei Bedarf Aktoren zum Einsatz

Interaktion Aktor <~> Thread muss realisiert werden

– Lokale und Entfernte AktorenEinheitliches Modell mit Kommunikation von lokalen Aktoren und Aktoren auf anderen Knoten

Geeignet für Peer-to-Peer-Anwendungen mit Ortstransparenz der Aktoren

Basis des Akka-Clusterings (in Entwicklung)

wenig geeignet für Client-Server-Anwendungen

– Akka.ioKommunikation im Client-Server-Stil

mit Unterstützung der Interaktion von Aktoren und dem Kommunikationssystem

Page 7: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 7

akka.io

akk.io ÜbersichtPackage akka.io mit Akka Version 2.2 (2013) eingeführt

– Scala-Framework

– Basiert auf / entspricht dem Modul spray.io des Spray-Frameworks [http://spray.io/] (gemeinsame Entwicklung von Spray und Typesafe)

– nutzt speziell speziell den Netzwerkcode von Spray

der auf NIO- / NIO.2-Features basiert

– ist vergleichbar mit Netty

Prinzip– ein Manager-Actor kontrolliert die IO

– die Anwendung kommuniziert mit dem Manager über Kommando-NachrichtenVerbindungs-Management (connect, … close) undNachrichten-Transfer (read, write)

Page 8: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 8

akka.io

TCP-Beispiel ein einfacher Echo-Server (1)

import akka.actor.{ Actor, ActorRef, Props, ActorSystem}import akka.io.{ IO, Tcp }import akka.util.ByteStringimport java.net.InetSocketAddress

class ConnectionHandler extends Actor { … }

class Server(managerActor: ActorRef) extends Actor { … }

object Server_App extends App {

implicit val system = ActorSystem("TCPEchoSystem")

val managerActor = IO(Tcp) val server = system.actorOf(Props(classOf[Server], managerActor), "server") println("starting server as actor " + server) }

Ein Manager-Actor für TCP wird erzeugt. Er dient als Schnittstelle zu Kommunikations-Diensten.

Ein Server wird erzeugt, er erhält eine Referenz auf den Manager-Actor um über diesen seine Kommunikation abwickeln zu können.

Der Server nimmt Verbindungen an

Der ConnectionHandler bedient Verbindungen

Page 9: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 9

akka.io

TCP-Beispiel ein einfacher Echo-Server (2)

class Server(managerActor: ActorRef) extends Actor {

val handler = self //The Bind message is send to the TCP manager actor in order to bind to a listening socket. //handler: The actor which will receive all incoming connection requests in the form of Connected messages managerActor ! Tcp.Bind(handler, new InetSocketAddress("127.0.0.1", 4711))

// The TCP manager will reply either with a CommandFailed, // or it will spawn an internal actor representing the new connection. // This new actor will then send a Connected message to the original sender of the Connect message. def receive = { case Tcp.CommandFailed(_: Tcp.Bind) => context stop self //The actor sending the Bind message will receive a Bound message signalling //that the server is ready to accept incoming connections; //this message also contains the InetSocketAddress to which the socket //was actually bound (i.e. resolved IP address and correct port number). case b @ Tcp.Bound(localAddress) => println("Server ready") // The connection actor sends this message either to the sender of a Connect command (for outbound) // or (here) to the handler for incoming connections designated in the Bind message. // The connection is characterized by the remoteAddress and localAddress TCP endpoints. case c @ Tcp.Connected(remote, local) => println("Server accepted connection from " + remote + ", at " + local) val connectionHandler = context.actorOf(Props[ConnectionHandler]) // sender: internal actor representing the new connection val connection = sender() // In order to activate the new connection a Register message // must be sent to the connection actor, informing that one about // who shall receive data from the socket connection ! Tcp.Register(connectionHandler) } }

Alle Kommentare sind weitgehend aus der API-Doku oder dem Tutorial übernommen.

Page 10: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 10

akka.io

TCP-Beispiel ein einfacher Echo-Server (3)

Ein ConnectionHandler nimmt Daten an und sendet sie zurück. … Solange bis die Verbindung geschlossen wird.

class ConnectionHandler extends Actor {

def receive = { case Tcp.Received(data) => println("Server received " + data + " from " + sender()) // send back answer: sender() ! Tcp.Write(data) case Tcp.PeerClosed => println("Connection closed") context stop self case x: Any => println("Server received " + x) }

}

Page 11: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 11

akka.io

TCP-Beispiel ein einfacher Echo-Client (1)

import akka.actor.{ Actor, ActorRef, Props, ActorSystem}import akka.io.{ IO, Tcp }import akka.util.ByteStringimport java.net.InetSocketAddressimport akka.actor.ActorDSL._

class Client(managerActor: ActorRef) extends Actor { …}

object Client_App extends App { implicit val system = ActorSystem("TCPEchoSystem")

val managerActor = IO(Tcp)

val client = system.actorOf(Props(classOf[Client], managerActor), "client") for (i <- 1 to 10) { val v = scala.io.StdIn.readLine() client ! Some(v) } Thread.sleep(500) client ! "close" Thread.sleep(500) system.shutdown()}

Ein Manager-Actor für TCP wird erzeugt. Er dient als Schnittstelle zu Kommunikations-Diensten.

Ein Client wird erzeugt, er erhält eine Referenz auf den Manager-Actor um über diesen seine Kommunikation abwickeln zu können.

Page 12: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 12

akka.io

TCP-Beispiel ein einfacher Echo-Client (2)

class Client(managerActor: ActorRef) extends Actor { //The Connect message is sent to the TCP manager actor which will spawn an internal actor managerActor ! Tcp.Connect(new InetSocketAddress("127.0.0.1", 4711)) def receive = { case Tcp.CommandFailed(_: Tcp.Connect) => println("connect failed") context stop self // The connection actor sends this message either to the sender of a Connect command // (here, for outbound connections) or to the handler for incoming connections designated in the Bind message. // The connection is characterized by the remoteAddress and localAddress TCP endpoints. case c @ Tcp.Connected(remote, local) =>

println("connected to " + remote + " at " + local) // sender: internal actor representing the new connection val connection = sender() // This message must be sent to a TCP connection actor after receiving the Connected message. // The connection will not read any data from the socket until this message is received, because // this message defines the actor which will receive all inbound data. connection ! Tcp.Register(self) // register at connection-actor context become { // change behavior

nächste Folie: Verhalten des Actors nach Verbindungsaufbau

} }}

Alle Kommentare sind weitgehend aus der API-Doku oder dem Tutorial übernommen.

Page 13: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 13

akka.io

TCP-Beispiel ein einfacher Echo-Client (3)

class Client(managerActor: ActorRef) extends Actor { … def receive = { … case c @ Tcp.Connected(remote, local) =>

context become { // change behavior

case Tcp.CommandFailed(w: Tcp.Write) => println("write failed")

case Tcp.Received(data) => println("client received from connection : " + data)

case "close" => connection ! Tcp.Close case _: Tcp.ConnectionClosed => println("connection closed") context stop self

// application msg case msg @ Some(x) => println("client received user msg " + msg) println("... and sends to manager " + ByteString(x.toString + "\n") + " to " + connection) connection ! Tcp.Write(ByteString(x.toString + "\n")) // data from network case data: ByteString => println("Client actor received: " + data) // other msg case x: Any => println("handler received: " + x) } … }}

Page 14: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 14

akka.io

UDP-Beispiel ein einfacher Echo-Server

import akka.actor.{Actor, ActorRef, Props, ActorSystem}import akka.io.{IO, Udp}import akka.util.ByteStringimport java.net.InetSocketAddress

class Server(localAdr: InetSocketAddress) extends Actor { import context.system val managerActor = IO(Udp) ! Udp.Bind(self, localAdr) def receive = { case Udp.Bound(local) => context become { case Udp.Received(data, remote) => val client = sender() println("Server received " + data + " from " + remote) client ! Udp.Send(data, remote) case Udp.Unbind => val client = sender() client ! Udp.Unbind case Udp.Unbound => context.stop(self) } } }

object Server_App extends App { val PORT = 4712 val system = ActorSystem("UDPEchoSystem") val server = system.actorOf(Props(classOf[Server], new InetSocketAddress("127.0.0.1", PORT)), "server") println("starting server as actor " + server) }

Page 15: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 15

akka.io

UDP-Beispiel ein einfacher Echo-Client (1)

import akka.actor.{Actor, ActorRef, Props, ActorSystem, PoisonPill}import akka.io.{IO, Udp}import akka.util.ByteStringimport java.net.InetSocketAddress

class Client(localAdr: InetSocketAddress, remoteAdr: InetSocketAddress) extends Actor { … siehe nächste Folie … }

object Client_App extends App { val MyPORT = 4711 val RemotePORT = 4712

val system = ActorSystem("UDPEchoSystem") val client = system.actorOf( Props(classOf[Client], new InetSocketAddress("127.0.0.1", MyPORT), new InetSocketAddress("127.0.0.1", RemotePORT)), "client") Thread.sleep(500) for (i <- 1 to 10) { val v = scala.io.StdIn.readLine() client ! "Msg Nr. " + v } Thread.sleep(500) client ! PoisonPill Thread.sleep(500) system.shutdown()

}

Page 16: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 16

akka.io

UDP-Beispiel ein einfacher Echo-Client (2)

class Client(localAdr: InetSocketAddress, remoteAdr: InetSocketAddress) extends Actor { import context.system IO(Udp) ! Udp.Bind(self, localAdr) def receive = { case Udp.Bound(local) =>

val server = sender() context become { case msg: String => println("client sends " + msg) server ! Udp.Send(ByteString(msg), remoteAdr) case Udp.Received(data, remote) => println("Client received " + data + " from " + remote) } } }

Page 17: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 17

akka.io

Bytestring akka.io basiert auf NIO

NIO nutzt java.nio.ByteBuffer

– sehr low-level und schwierig zu nutzen

akka.util.ByteString

– Schnittstelle zwischen Anwendung und ByteBuffer

Page 18: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 18

akka.io

Serialisierung – Serialisierung ist ein zentrales Thema aller verteilten Anwendung

– Java Serialisierung (Serializable, DataInput- DataOutputStream, ObjectInput-, ObjectOutputStream...)

– Scala Serialisierung auf Basis von Java-Features

– Diverse Serialisierungs-Frameworks (z.B. Protocol Buffers https://code.google.com/p/protobuf/)

– Akka: erweiterbare konfigurierbare Serialisierung

Page 19: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 19

akka.io

Beispiel: einfache Serialisierung von / nach ByteString abstract class Msgcase class StringMsg(str: String) extends Msgcase class NumberMsg(d1: Double, d2: Double) extends Msg zu übertragende Nachrichten

object Msg { implicit val byteOrder = java.nio.ByteOrder.BIG_ENDIAN

val STRING_MSG = 0 val NUMBER_MSG = 1 def decode(msg: ByteString): Msg = { val byteIter: ByteIterator = msg.iterator val msgType: Int = byteIter.getInt

msgType match { case STRING_MSG => StringMsg(getString(byteIter)) case NUMBER_MSG => NumberMsg(byteIter.getDouble, byteIter.getDouble); } }

def encode(msg: Msg) : ByteString = msg match { case StringMsg(str) => val strBytes = str.getBytes val strBytesLen = strBytes.length new ByteStringBuilder().putInt(STRING_MSG).putInt(strBytesLen).putBytes(str.getBytes).result

case NumberMsg(d1, d2) => new ByteStringBuilder().putInt(NUMBER_MSG).putDoubles(Array[Double](d1, d2)).result } def getString(iter: ByteIterator): String = { val length = iter.getInt val bytes = new Array[Byte](length) iter getBytes bytes ByteString(bytes).utf8String } }

1 | Double | Double

0 | Len | Char | … | Char

Übertragungsformat für 2 Sorten von

Nachrichten

Deserialisierung mit einem ByteIterator

Serialisierung mit einem ByteStringBuilder

object SerialTest_App extends App { println(Msg.decode(Msg.encode(StringMsg("Hallo")))) println(Msg.decode(Msg.encode(NumberMsg(1.5, 2.5))))}

StringMsg(Hallo)NumberMsg(1.5,2.5)

Page 20: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 20

Entfernte Aktoren

Beispiel: Sender und Empfänger

Empfänger-Prozess:127.0.0.1:4711

Sender-Prozess:127.0.0.1:2525

Actor-System: DistSystem

ReceiverActor ForwarderActor

create create

Msg

Msg

JVM JVM

Page 21: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 21

Entfernte Aktoren

Beispiel: Empfänger

import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}

class ReceiverActor extends Actor { def receive = { case msg: String => println(s"Actor ${context.self} received $msg from $sender") case _ => println("Received non-string msg ") }}

object Receiver_App extends App { val config = ConfigFactory. parseString( """akka { loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 4711 } log-sent-messages = on log-received-messages = on } }""")

val system = ActorSystem("DistSystem",config)

val receiverActor = system.actorOf(Props[ReceiverActor], name = "receiverActor")

println(s" ${receiverActor.path} is ready")}

ReceiverActor

create

Dieser Aktor wird von einem entfernten Aktor angesprochen werden. Diese Tatsache ist für den Aktor selbst völlig transparent.

Das Aktorsystem muss konfiguriert werden, um entfernte Aktoren nutzen zu können. Die Konfiguration wird in der Regel in einer Konfigurationsdatei erfolgen. Hier wird sie der Übersicht halber programmatisch realisiert.

Der Empfänger ist ein eigenständiger Prozess

Page 22: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 22

Entfernte Aktoren

Beispiel: Sender (1)

import akka.actor.{Actor, ActorSystem, ActorSelection, ActorRef, Props}import akka.util.Timeoutimport scala.concurrent.duration._import scala.concurrent.{Await, Future}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}

case object SendCmd

class ForwarderActor(remoteReceiverActor: ActorRef) extends Actor { def receive = { case SendCmd => remoteReceiverActor ! "Hello from remote sender actor!" }}

object Sender_App extends App { val config = ConfigFactory. parseString( """akka { loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] log-sent-messages = on log-received-messages = on netty.tcp { hostname = "127.0.0.1" port = 2526 } } }""") … nächste Folie … }

ForwarderActor

createMsg

Dieser Aktor leitet die Nachrichten des Threads an den entfernten Aktor weiter. Die Tastsache, dass die Nachricht an einen entfernten Aktor geht ist auch hier völlig irrelevant.

Auch auf der Sendeseite muss die Verwendung entfernter Aktoren konfiguriert werden.

Der Sender ist ein eigenständiger Prozess

Page 23: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 23

Entfernte Aktoren

Beispiel: Sender (2)

object Sender_App extends App { val config = ... implicit val timeout = Timeout(3 seconds) val system = ActorSystem("DistSystem", config) // create actor selection: a path to a remote actor val receiverActorS : ActorSelection = system.actorSelection("akka.tcp://[email protected]:4711/user/receiverActor") println("receiverActor selection = " + receiverActorS) // resolve actor selection to actor path val futureActorR : Future[ActorRef] = receiverActorS.resolveOne val serverActorR : ActorRef = Await.result(futureActorR , 2.seconds) println("Remote receiverActorR resolved to: " + serverActorR)

// send to actor selection without resolving it to a actor ref // resolving will be done automatically receiverActorS ! "Greeting from sender" // create local actor via constructor with remote ActorRef parameter val forwarderActor = system.actorOf(Props( classOf[ForwarderActor], serverActorR), name = "remoteReceiverActor")

// send to local actor forwarderActor ! SendCmd

}

ActorSelection: Referenz auf einen Unterbaum der Aktorhierachie.

resolveOne: Suche die ActorReference die zur ActorSelection passt.

Sende direkt an eine ActorSelection

Erzeuge Forwarder übergib dabei Referenz auf entfernten Aktor

Page 24: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 24

Entfernte Aktoren

Beispiel: Entfernten Aktor erzeugen

Empfänger-Prozess:127.0.0.1:4711

Sender-Prozess:127.0.0.1:2525

Actor-System: DistSystem

ReceiverActor ForwarderActor

create

Msg

Msgcreate

JVM JVM

Page 25: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 25

Entfernte Aktoren

Beispiel: Entfernten Aktor erzeugen / Empfänger

import akka.actor.{Actor, ActorSystem, Props}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}

class ReceiverActor extends Actor { def receive = { case msg: String => println(s"Actor ${context.self} received $msg from $sender") case _ => println("Received non-string msg ") }}

object Receiver_App extends App { val config = ConfigFactory. parseString( """akka { //loglevel = "DEBUG" actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] netty.tcp { hostname = "127.0.0.1" port = 4711 } log-sent-messages = on log-received-messages = on } }""")

val system = ActorSystem("DistSystem",config) //val receiverActor = system.actorOf(Props[ReceiverActor], name = "receiverActor") println("receiver process is ready")}

Empfänger-Aktor: Ein Aktor dieser Klasse wird von entfernt (von einem anderen Prozess) erzeugt werden.

Hier wird nur das Aktorsystem erzeugt und die Klasse der Aktoren bereit gestellt.

Page 26: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 26

Entfernte Aktoren

Beispiel: Entfernten Aktor erzeugen / Sender

import akka.actor.{Actor, ActorSystem, ActorSelection, ActorRef, Props, AddressFromURIString, Deploy}import akka.util.Timeoutimport scala.concurrent.duration._import scala.concurrent.{Await, Future}import com.typesafe.config.{ConfigFactory, ConfigValueFactory}import akka.remote.RemoteScope

case object SendCmd

class ForwarderActor(remoteReceiverActor: ActorRef) extends Actor { def receive = { case SendCmd => remoteReceiverActor ! "Hello from remote sender actor!" }}

object Sender_App extends App { val config = ConfigFactory.parseString( """akka { actor { provider = "akka.remote.RemoteActorRefProvider" } remote { enabled-transports = ["akka.remote.netty.tcp"] log-sent-messages = on log-received-messages = on netty.tcp { hostname = "127.0.0.1" port = 2526 } }}""") val system = ActorSystem("DistSystem", config) val address = AddressFromURIString("akka.tcp://[email protected]:4711") val receiverActorR : ActorRef = system.actorOf(Props[ReceiverActor].withDeploy(Deploy(scope = RemoteScope(address))))

val forwarderActor = system.actorOf(Props(classOf[ForwarderActor], receiverActorR), name = "remoteReceiverActor")

forwarderActor ! SendCmd}

Die Klasse ReceiverActor muss hier in diesem Prozess auf dem Klassenpfad zur Verfügung stehen und ebenfalls auf dem Klassenpfad der entfernten Anwendung auf der der Aktor erzeugt werden wird. Akka überträgt (im Gegensatz zu RMI) keinen Klassencode!

Erzeugung eines Aktors auf einem entfernten Prozess.

Page 27: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 27

Protokoll-Implementierungen

ProtokolleVerteilte Algorithmen werden als Protokolle implementiert

In der Regel sind in einem verteilten System mehrere Protokolle gleichzeitig aktiv

Oft interagieren die Protokolle

Schichtenstruktur: Typisches Interaktionsmuster von ProtokollenEin Protokoll nutzt die Dienste, die von einem anderen bereit gestellt werden

OSI Konzept der Schichten

Page 28: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 28

Protokoll-Implementierung

Physical-Layer / Link-Layer, VerbindungsschichtUnterste Schichten eines Protokollstacks nach OSI und TCP/IP

Stellen den elementarsten Kommunikationsdienst bereit:

Kommunikation von direkt (auf Hardware-Ebne) verbundenen Rechnern

Protokolle der Verbindungsschicht

– verbindungslos (z.B. Ethernet)

oder verbindungs-orientiert (z.B. PPP)

– mit Broadcast (für Broadcast-Medien: Ethernet-HW)

oder ohne Broadcast (für Punkt-zu-Punkt-Medien: Kabel, Modem, etc.)

Page 29: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 29

Protokoll-Implementierung

Beispiel: Emulation Physical-LayerAufgabe des Physical-Layers

– Bereitstellung von Netzwerkgeräten (Sender/Empfänger)

– für ihre Nutzer: Netzwerktreiber / Protokoll-Implementierungen

Emulation Netzwerk-GerätAufgabe des Geräts

– Senderoutine für Bytes

– Registrierung einer Callback-Routine für den Empfang von Bytes

trait NetworkDevice { // low level send def sendWire(msg: Array[Byte]) // register callback routine for this device def register(cB: Array[Byte] => Unit) : Unit = { callBack = cB } // callback routine for this device var callBack : Array[Byte] => Unit = { (bytes: Array[Byte]) => { throw new NoSuchMethodException } }

}

Page 30: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 30

Protokoll-Implementierung

Emulation Netzwerk-GerätBeispiel akk.io via UDP als „Netzwerk-Gerät

import akka.actor.{Actor, ActorRef, Props, ActorSystem}import akka.io.{IO, Udp}import akka.util.{ByteString, ByteIterator}import java.net.InetSocketAddress

class UdpProxy(localAdr: InetSocketAddress, remoteAdr: InetSocketAddress, udpDevice: UDPNetworkDevice) extends Actor { import context.system IO(Udp) ! Udp.Bind(self, localAdr) def receive = { case Udp.Bound(local) => val partner = sender() context become { case Udp.Received(data, remote) => val byteIter: ByteIterator = data.iterator val bytes: Array[Byte] = data.toArray[Byte] udpDevice.callBack(bytes) case msg: Array[Byte] => partner ! Udp.Send(ByteString(msg), remoteAdr) } } }

class UDPNetworkDevice(myPort: Int, remotePort: Int)(implicit system: ActorSystem) extends NetworkDevice { val proxy = system.actorOf( Props(classOf[UdpProxy], new InetSocketAddress("127.0.0.1", myPort), new InetSocketAddress("127.0.0.1", remotePort), this), "udpInterface") override def sendWire(msg: Array[Byte]) : Unit = { proxy ! msg }}

Page 31: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 31

Protokoll-Implementierung

Protokoll-Handler– Implementierung einer Protokoll-Instanz

– wird an ein Netzwerk-Gerät gebunden

trait Codec[T] { // encode msg as byte-array def encode(msg: T): Array[Byte] // decode msg from byte-array def decode(bytes: Array[Byte]): T} Codec: Protokoll-spezifische Serialisierung

abstract class NetworkDriver[MSG_T](val codec: Codec[MSG_T]) {

// low level routine for frame transfer; will be set when connected to a device var sendWire: Array[Byte] => Unit = null

// used to encode and transfer msgs; to be used by derived classes final def sendMsg(msg: MSG_T) : Unit = { val bytes : Array[Byte] = codec.encode(msg) sendWire(bytes) } // called when a msg arrives; to be defined by derived classes def acceptMsg(msg: MSG_T) : Unit}

Treiber sind Ableitungen dieses Traits. Ein Treiber muss die Empfangsroutine definieren und kann Daten via sendWire senden. Ein Netzwerktreiber stellt die unterste Ebne einer Protokollimplementierung dar.

NetworkDriver

sendWire

konkreterNetworkDriver

acceptMsg

wird abhängig

vom Netzwerk-

Gerät gesetzt

wird abhängig

vom Protokoll definiert

Page 32: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 32

Protokoll-Implementierung

Emulation Physical-Layer– Protokoll-Handler

– werden an Netzwerk-Geräte gebunden

DriverDriver DriverCodec Codec Codec

Physical-Layer

class PhysicalLayer { // all devices; devices are identified by symbols var devices : Map[Symbol, NetworkDevice] = Map[Symbol, NetworkDevice]() // install a new device; a device is always wired def installNetworkDevice(deviceId: Symbol, networkDevice: NetworkDevice): Unit = { devices = devices + (deviceId -> networkDevice) } def getLinkIds : Set[Symbol] = devices.keySet // install a handler at a device; the handler will accept all incoming msgs def installDriver[MSG_T](deviceId: Symbol, driver: NetworkDriver[MSG_T]): Unit = { val networkDevice = devices(deviceId) // install receive method of handler as callback of network device // the handler will get decoded bytes networkDevice.register( { (bytes: Array[Byte]) => driver.acceptMsg(driver.codec.decode(bytes)) }) // set low-level send routine driver.sendWire = networkDevice.sendWire } }

Page 33: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 33

Protokoll-Implementierung

Protokoll-HandlerBeispiel: Echo-Protokoll – 1 : Nachrichten und deren (De-) Serialisierung

abstract class EchoMsgcase class EchoRequest(msg: String) extends EchoMsgcase class EchoReply(msg: String) extends EchoMsg

object EchoCodec extends Codec[EchoMsg] { val ECHO_REQUEST: Byte = 0 val ECHO_REPLY: Byte = 1 def encode(msg: EchoMsg): Array[Byte] = { msg match { case EchoRequest(str) => val strBytes : Array[Byte]= str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) result(0) = ECHO_REQUEST strBytes.copyToArray(result, 1) result case EchoReply(str) => val strBytes = str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) strBytes.copyToArray(result, 1) result(0) = ECHO_REPLY result } }

def decode(bytes: Array[Byte]): EchoMsg = { bytes(0) match { case ECHO_REQUEST => EchoRequest(new String(bytes, 1, bytes.length-1)) case ECHO_REPLY => EchoReply(new String(bytes, 1, bytes.length-1)) } }}

Page 34: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 34

Protokoll-Implementierung

Protokoll-HandlerBeispiel: Echo-Protokoll – 2 : Protokoll-Handler Client-Seite

EchoClient

send

Codec

acceptMsg

Device

sendMsg

class EchoClient extends NetworkDriver[EchoMsg](EchoCodec) {

def send(str: String) { sendMsg(EchoRequest(str)) } def acceptMsg(msg: EchoMsg) : Unit = { msg match { case EchoReply(msg) => println("Client received reply " + msg) case _ => throw new Exception("unexpected Msg!") } }

}

Page 35: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 35

Protokoll-Implementierung

Protokoll-HandlerBeispiel: Echo-Protokoll – 3 : Protokoll-Handler Server-Seite

EchoServer

Codec

acceptMsg

Device

sendMsg

class EchoServer extends NetworkDriver[EchoMsg](EchoCodec) {

def acceptMsg(msg: EchoMsg) : Unit = { msg match { case EchoRequest(msg) => println("server received " + msg) sendMsg(EchoReply(msg)) case _ => throw new Exception("unexpected Msg!") } }

}

Page 36: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 36

Protokoll-Implementierung

Beispiel: Echo-Protokoll auf Physical-Layer / Test

import akka.actor.{ Actor, ActorRef, Props, ActorSystem}

class StringCodec extends Codec[String] { def encode(msg: String): Array[Byte] = msg.getBytes def decode(ba: Array[Byte]): String = new String(ba)}

object UDPNode_1 extends App { val myPort = 4711 val remotePort = 4712 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayer extends PhysicalLayer val stringCodec = new StringCodec val echoServer = new EchoServer physicalLayer.installNetworkDevice('A, udpDevice) physicalLayer.installDriver('A, echoServer)}

import akka.actor.{ Actor, ActorRef, Props, ActorSystem}

object UDPNode_2 extends App { val myPort = 4712 val remotePort = 4711 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayer extends PhysicalLayer val stringCodec = new StringCodec val echoClient = new EchoClient physicalLayer.installNetworkDevice('B, udpDevice) physicalLayer.installDriver('B, echoClient) Thread.sleep(500) echoClient.send("HALLO 1") echoClient.send("HALLO 2")}

Page 37: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 37

Protokoll-Instanzen

Protokoll und Protokoll-Instanz – Auf einem Knoten ist in der Regel mehr als ein Protokoll aktiv

– Ein Protokoll kann zudem in mehr als einer Instanz gleichzeitig abgewickelt werden

– Verbindungsorientierte Protokolle: mehrere gleichzeitige Verbindungen

– Verbindungslose Protokolle die in mehreren Instanzen gleichzeitig abgewickelt werdenBeispiel: Broadcast

Mehrere Knoten können gleichzeitig an mehreren Broadcast beteiligt seinbei denen sie eventuell unterschiedliche Rollen spielen.

– Mehrere Protokolle Zuordnung der Nachrichten zu Automaten unterschiedlicher Protokolle

an Hand einer Protokoll-ID (Schicht-2 <=> Schicht-3)

– Mehrere Instanzen eines Protokolls Zuordnung der Nachrichten zu unterschiedlichen Instanzen des gleichen Automaten

an Hand einer SAP-Nummer („Port“)

Page 38: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 38

Protokoll-Instanzen

Protokoll-Instanz

Protokoll Automat(Monitor)

cmd

Protokoll Automat(Monitor) Protokoll

Automat

cmdcmd

acceptPkt

sendWire

Empfangs-Prozess

resend

connect

erzeuge

receive

zustellen

registrieren

Beispiel: Protokoll-Instanzen bei einem

Verbindungsorientieten Protokoll

ServiceAccessPointsSAPs

Page 39: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 39

Protokoll-Instanzen

Beispiel: Emulation einer Link-Layer-Implementierung Aufgabe des Link-Layers

– Allgemein: Bereitstellung einer Punkt-zu-Punkt-Kommunikation zwischen Netz-Knoten (Rechner) die direkt über ein Medium verbunden sind

– Protokoll-Dispatching: Installation von Protokoll-Handlern für das Behandeln von Nachrichten die an zu einem bestimmten Protokoll gehören

– Adressierung auf dem Link-Layer: Knoten-Identitäten festlegen / verwalten Namen / Adresse der lokalen Station feststellen / definieren Namen / Adressen der direkt erreichbaren Stationen feststellen / definieren

DriverDriver DriverCodec Codec Codec

Physical-Layer

DriverDriver DriverCodec Codec Codec

Link-LayerNode BNode A

sendTo(B, msg) (A, msg) = reciveFrom()

B A

Page 40: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 40

Protokoll-Instanzen

Beispiel: Emulation einer Link-Layer-Implementierung Übersicht

object LinkLayer {

type NodeId = Int type DeviceId = Symbol type ProtocolId = Int case class Frame(protocolID: ProtocolId, payload: Array[Byte])}

abstract class LinkLayer(val localAddr: NodeId) { protected val physicalLayer: PhysicalLayer …

def sendTo(protId: ProtocolId, dest: NodeId, payload: Array[Byte]) { … } def connectPhysical(deviceId: DeviceId, remoteAdr: NodeId) : Unit = { …. } def registerProtocolHandler[MSG_T <: Any](handler: ProtocolHandler[MSG_T]) : Unit = { …. }}

Begleiter-Objekt: definiert Adresstypen und und Rahmenformat

Die Klasse ist abstrakt in Bezug auf den Physical-Layer mit dem der LL verbunden ist.

Sende eine Nachricht an einen bestimmten Knoten.

Verknüpfe ein Netzwerk-Gerät mit dem dem Namen der Station, die über dieses Gerät erreichbar ist.

Registriere einen Handler, der für die Nachrichten eines bestimmten Protokolls zuständig ist.

Page 41: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 41

Protokoll-Instanzen

Beispiel: Emulation einer Link-Layer-Implementierung Protocol-Handler und deren Registrierung

abstract class ProtocolHandler[MSG_T <: Any](val protId: ProtocolId, val codec: Codec[MSG_T]) {

protected val ll: LinkLayer protected final def sendMsg(dest: NodeId, msg: MSG_T) : Unit = { val bytes : Array[Byte] = codec.encode(msg) ll.sendTo(protId, dest, bytes) } def acceptMsgFrom(msg: MSG_T, from: NodeId) : Unit}

Die Klasse ist abstrakt in Bezug auf den Link-Layer mit dem sie verbunden ist.

sendMsg wird von abgeleiteten Klassen zum Senden genutzt.

acceptMsgFrom wird von abgeleiteten Klassen definiert.

abstract class LinkLayer(val localAddr: NodeId) { …

// all installed protocol handlers, protocol handlers are identified by protocol ids private var handlers : Map[ProtocolId, ProtocolHandler[Any]] = Map[ProtocolId, ProtocolHandler[Any]]() def registerProtocolHandler[MSG_T <: Any](handler: ProtocolHandler[MSG_T]) : Unit = { handlers = handlers + (handler.protId -> handler.asInstanceOf[ProtocolHandler[Any]]) }

… }

Ein Protocol-Handler wird in einer map unter ihrer Prokollnummer gespeichert. Nachrichten werden vom Dispatcher an Hand der Nummer zugestellt.

Page 42: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 42

Protokoll-Instanzen

Beispiel: Emulation einer Link-Layer-Implementierung Verbindung mit Netzwerkgeräten

abstract class LinkLayer(val localAddr: NodeId) { …

private var deviceToNode: Map[DeviceId, NodeId] = Map[DeviceId, NodeId]() private var nodeToDevice: Map[NodeId, DeviceId] = Map[NodeId, DeviceId]() def connectPhysical(deviceId: DeviceId, remoteAdr: NodeId) : Unit = { physicalLayer.installDriver( deviceId, new NetworkDriver[Frame](FrameCodec) { val device = physicalLayer.devices(deviceId)

override def acceptMsg(msg: Frame) : Unit = msg match { case Frame(protocolId, payload) => // dispatch to handler for protocolId handlers(protocolId).acceptMsgFrom(handlers(protocolId).codec.decode(payload), deviceToNode(deviceId)) case _ => throw new Exception("unexpected Msg!")

} }) deviceToNode = deviceToNode + (deviceId -> remoteAdr) nodeToDevice = nodeToDevice + (remoteAdr -> deviceId) }

…}

object FrameCodec extends Codec[Frame]{ def encode(msg: Frame): Array[Byte] = msg match { case Frame(protocolID: ProtocolId, payload: Array[Byte]) => val payloadLen = payload.length val result = new Array[Byte](payloadLen+1) result(0) = protocolID.toByte payload.copyToArray(result, 1) result } def decode(ba: Array[Byte]): Frame = Frame(ba(0).toInt, ba.slice(1, ba.length))}

Codec für Rahmen

Installation eines Netzwerktreibers auf dem Gerät das mit dem verbundenen Knoten kommuniziert und Nachrichten an Hand ihrer Protokoll-Id dispatcht.Der Absender ist der mit dem Gerät assoziierte Knoten.

Page 43: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 43

Protokoll-Instanzen

Beispiel: Emulation einer Link-Layer-Implementierung Echo-Client und Server als Protocol-Handler

abstract class EchoClient extends ProtocolHandler[EchoMsg](1, EchoCodec) { val remoteId : NodeId override def acceptMsgFrom(msg: EchoMsg, from: NodeId) : Unit = { msg match { case EchoReply(str) => println(s"Client received $str"); case x: Any => println("Client received msg with wrong format " + msg) } } def sendUserMsg(str: String) { sendMsg(remoteId, EchoRequest(str)) } }

abstract class EchoServer extends ProtocolHandler[EchoMsg](1, EchoCodec) { val remoteId : NodeId override def acceptMsgFrom(msg: EchoMsg, from: NodeId) : Unit = { println(s"Server received $msg from $from") msg match { case EchoRequest(str) => sendMsg(from, EchoReply("ECHO "+ str)); case x: Any => println("Server received msg with wrong format " + msg) } } }

Page 44: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 44

Protokoll-Instanzen

Beispiel: Emulation einer Link-Layer-Implementierung Echo-Nachrichten und ihr Codec

abstract class EchoMsgcase class EchoRequest(msg: String) extends EchoMsgcase class EchoReply(msg: String) extends EchoMsg

object EchoCodec extends Codec[EchoMsg] { val ECHO_REQUEST: Byte = 0 val ECHO_REPLY: Byte = 1 def encode(msg: EchoMsg): Array[Byte] = { msg match { case EchoRequest(str) => val strBytes : Array[Byte]= str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) result(0) = ECHO_REQUEST strBytes.copyToArray(result, 1) result case EchoReply(str) => val strBytes = str.getBytes val strBytesLen = strBytes.length val result = new Array[Byte](strBytesLen+1) strBytes.copyToArray(result, 1) result(0) = ECHO_REPLY result } } def decode(bytes: Array[Byte]): EchoMsg = { bytes(0) match { case ECHO_REQUEST => EchoRequest(new String(bytes, 1, bytes.length-1)) case ECHO_REPLY => EchoReply(new String(bytes, 1, bytes.length-1)) } }}

Page 45: Verteilte Algorithmen TI5005hg51/Veranstaltungen/VA-1415/Folien/v… · // The connection will not read any data from the socket until this message is received, because // this message

Seite 45

Protokoll-Instanzen

Beispiel: Emulation einer Link-Layer-Implementierung Test

object LLEchoServer extends App { val myPort = 4711 val remotePort = 4712 val localNodeName = 1 val partnerNodeName = 2 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayerHere extends PhysicalLayer physicalLayerHere.installNetworkDevice('A, udpDevice) object linkLayer extends LinkLayer(localNodeName) { val physicalLayer = physicalLayerHere } linkLayer.connectPhysical('A, partnerNodeName)

object EchoServerInst extends EchoServer { val remoteId : NodeId = partnerNodeName val ll : LinkLayer = linkLayer } linkLayer.registerProtocolHandler(EchoServerInst) }

object LLEchoClient extends App { val myPort = 4712 val remotePort = 4711 val localNodeName = 2 val partnerNodeName = 1 implicit val system = ActorSystem("UDPSystem") val udpDevice = new UDPNetworkDevice(myPort, remotePort) object physicalLayerHere extends PhysicalLayer physicalLayerHere.installNetworkDevice('B, udpDevice) object linkLayer extends LinkLayer(localNodeName) { val physicalLayer = physicalLayerHere } linkLayer.connectPhysical('B, partnerNodeName)

object EchoClientInst extends EchoClient { val remoteId : NodeId = partnerNodeName val ll : LinkLayer = linkLayer } linkLayer.registerProtocolHandler(EchoClientInst) Thread.sleep(1000) for (i <- 1 to 10) { EchoClientInst.sendUserMsg("Hallo Nr " + i) }

}