Introduction au Framework Akka cours Java
Introduction au Framework Akka cours Java
...
Exemple 1 : l’acteur de la semaine
Quelques remarques :
On n’est pas obligé de communiquer uniquement avec des strings
Notre Actors ne fait rien. C’est vrai qu’ils font peu, mais ici il ne fait rien !
Thread.sleep.
Propriétés des acteurs
ils sont réactifs, ils ne font qu’une chose à la fois, on ne peut voir rien à l’intérieur, ils sont disponibles et on peut les trouver facilement.
Components d’un acteur
ActorRef : on ne communique jamais avec l’actor directement. Il contacte le dispatcher pour mettre le message dans la mailbox.
Dispatcher : Met le message dans un thread
Mailbox : A l’execution, il enlève un ou plusieurs mails et les envoie à l’actor
Une fois que le message est dans la mailbox le sender est liberé
Communication : messages
Le message hérite de ANY
case class Gamma(g: String) case class Beta(b: String, g: Gamma) case class Alpha(b1: Beta, b2: Beta)
class MyActor extends Actor{
def receive = { case "Hello" =>
println("Hi")
case 42 => println("I don’t know the question.")
case s: String => println(s"You sent me a string: $s")
case Alpha(Beta(b1, Gamma(g1)), Beta(b2, Gamma(g2))) => println(s"beta1: $b1, beta2: $b2, gamma1: $g1, gamma2: $g2")
case _ =>
println("Huh?")
}
}
Communication : send
object MySequencedObject {
def doSomething(withThis: String){
// calcule une heure
}
}
MySequencedObject.doSomething("Hello !") println("Hi !")
Traditionnellement on ne voit "Hi !" que quand doSomething se termine (1h après).
Ce n’est pas le cas pour les Actors
case class DoSomething(withThis: String)
class MyConcurrentObject extends Actor { def receive ={
case DoSomething(withThis) =>
// calcule une heure
}
}
system.actorOf(Props[MyConcurrentObject]) ! DoSomething("With this") println("Hi !")
On voit "Hi!" tout suite. L’envoi d’un message est asynchrone
Création d’un Actor
import akka.actor.{Props, Actor, ActorSystem} class MyActor extends Actor { }
//la racine d’un group d’actors val system = ActorSystem("MyActors")
//pour parametrer la structure de l’actor
//e.g. le contexte d’evaluation val actorProps = Props[MyActor]
//retourne un actorRef val actor = system.actorOf(actorProps)
Exemple 2 : des lumières
Comment fonctionnent les messages?
def ! (message: Any)(implicit sender: ActorRef = null): Unit
Quelques remarques :
! est une fonction avec effet secondaire le message peut être n’importe quoi implicit ActorRef, ainsi on n’a pas à écrire le sender il y a un default value null
implicit val self: ActorRef Pour accéder au sender on a : def sender: ActorRef
Mais, attention sender est une méthode :
case SomeMessage =>
context.system.scheduleOnce(5 seconds) { sender ! DelayedResponse
}
case SomeMessage =>
val requestor = sender context.system.scheduleOnce(5 seconds) { requestor ! DelayedResponse
}
null sender et forwarding
null sender
Si on répond à un null sender on envoie dans un actor particulier appellé dead letter office dead letter office, est un actor simple de l’ActorSystem qui peut être accedé en utilisant la méthode deadLetters. Forwarding
Si A envoie à B et B forward à C, alors C voit A comme son sender et non B. rien à voir avec :
case msg @ SomeMessage => someOtherActor ! msg
c’est plus un fwd de téléphone que fwd d’email
def forward(message: Any)(implicit context: ActorContext) = tell(message, context.sender)
Exemple 3 : l’orchestre
ActorSystem
Tout Actor a un ActorSystem à la racine de sa hierarchie. L’ActorSystem :
n’est pas un Actor, garde une configuration, propose un scheduler, garde la dead letter office permet de localiser et manipuler d’autres actors
ActorSystem
User Guardian Actor est le parent de tout acteur.
Dead Letter Office est un actor comme un autre
System Guardian Actor le parent de tous les acteurs internes au système
Event Stream e.g. Log,
Settings e.g. Human-Optimized Config Object Notation (HOCON)
L’idée des Actors est d’avoir une petite foret avec de gross arbres Paths val system = ActorSystem("TheSystem") val a = system.actorOf(Props(
new Actor def receive = { case _ => } }),
"Actor")
println(a.path)
> "akka://TheSystem/user/Actor"
println(a.path.elements.mkString("/", "/", ""))
> "/user/Actor"
println()
> "Actor"
Exemple 4 : toute l’orchestre
Contexte
Le ActorContext permet :
Actor Creation : tout en structurant la hiérarchie des Actors.
System Access : à partir de n’importe quel point dans l’arbre.
Relationship Access : entre parent, fils, frères, etc.
State : access au sender par exemple.
autres fonctionnalités
Chercher les Actors actorSelection(path: Iterable[String]): ActorRef
A partir d’une collection e.g. List("/user", "/Concert", "/Conductor").
actorSelection(path: String): ActorRef
A partir d’un String e.g. "/../violon".
context.actorSelection("../*") ! msg
Exemple 4 : pi en distribué
… … …
Exceptions
Un acteur peut toujours signaler une exception. throw new Exception (" E R R O R !")
Tout acteur a un parent donc tout acteur a un superviseur, son parent.
Le parent attrape l’exception et fait un filtrage pour décider ce que l’on doit faire pour traiter le problème.
Stratégies de supervision
import akka . actor .{ ActorInitializationException ,
ActorKilledException
OneForOneStrategy }
import akka . actor . SupervisorStrategy . _
final val supervisorStrategy = OneForOneStrategy () {
case _ : ActorInitializationException = > Stop
case _ : ActorKilledException = > Stop
case _ : Exception = > Restart
case_ = > Escalate
}
OneForOneStrategy ou AllForOneStrategy
Decider : le case code
Directives du superviseur
On a 4 possibilités dans la fonction Decider :
Stop, après cela l’acteur (et ses enfants) n’existe plus.
Resume, on fait comme si rien ne s’était passé, on reprend.
Escalate, on ne peut prendre une décision donc on transmet le problème au parent.
Restart, on essaie de reconstituer une nouvelle instance de l’acteur et de retraiter le message. C’est l’option idéale mais la plus difficile à implémenter.
Le problème est de savoir si on fait de restart ou non sur les enfants.
Pas de réponse, mais il y a des heuristiques, par exemple on aime restart les enfants feuilles.
Supervision par le system
Que faire si on n’a pas un père ?
Par défaut on fait restart, mais on peut changer la stratégie
import akka . actor . SupervisorStrategyConfigurator
import akka . actor . SupervisorStrategy ._
import akka . actor . OneForOneStrategy
class UserGuardianStrategyConfigurator
extends SupervisorStrategyConfigurator {
def create (): SupervisorStrategy = {
OneForOneStrategy () {
case _ => Resume
}
}
}
On peut changer la variable de meta-classe guardiansupervisorstrategy
dynamiquement ou à la configuration.
akka {
actor {
guardiansupervisorstrategy = UserGuardianStrategyConfigurator
}
}
Que devient le message lors d’une exception ?
Tout dépend de la directive choisie :
Stop, il est perdu à jamais.
Resume, aussi.
Escalate, le père décidera.
Restart, on essaie de prendre une decision à l’aide de hooks preRestart et postRestart.
Et si on ne peut pas faire Restart
override val supervisorStrategy = OneForOneStrategy () {
case _ : NoDatabaseConnectionException = > Restart
case _ = > Escalate
}
On peut contrôler le nombre de restarts avec :
override val supervisorStrategy =
OneForOneStrategy (5 , 1. minute ) qui dit qu’on peut faire maximum 5 restarts par minute.
Mais comme savoir si un acteur est finalement mort ?
Comment gérer les acteurs distribués ?
Acteurs distribués
L’ActorSystem n’est pas conscient du fait qu’un sous ensemble de ses enfants puissent être dans des nodes différents.
libraryDependencies ++=
Seq ( " com . typesafe . akka " %% " akka - remote " % "2.1.0")
A part inclure le module akka-remote, on doit aussi modifier le dossier application.conf.
akka {
actor {
provider = " akka . remote . RemoteActorRefProvider "
\\ par defaut c ’ est akka . Local . RemoteActorRefProvider
}
remote {
enabled - transports = [" akka . remote . netty . tcp "]
netty . tcp {
hostname = "129.102.65.14"
port = 2552
}
}
}
