scala / react /
Akka 에서 Actor
Guardian Actor
- ActorSystem 으로 message 를 보내면, root Actor 로 message 가 가게 된다.
- ActorSystem이 생성될 때 user guardian actor 가 생성된다.
- 대체로 app 의 초기과정, 여러 subsystems 을 spawn 하고, 그들의 lifecycle 을 감시하는 역할을 한다.
val system: ActorSystem[HelloWorldMain.SayHello] =
ActorSystem(HelloWorldMain(), "hello")
system ! HelloWorldMain.SayHello("World")
system ! HelloWorldMain.SayHello("Akka")
Behaviors.setup
, Behaviors.receiveMessage
- Behaviors.receiveMessage 는 Behaviors.receive 의 간략한 버전이다. context 가 이미 얻어진 상황에서 사용하면 된다.
- behavior instance 의 생성이 Behaviors.receive 는 actor 가 실행되기 전이라도 behavior instance 를 만들지만, Behaviors.setup 은 actor 가 실행되기 전까지 behavior instance 의 생성이 지연된다.
ActorContext
를 사용하고 싶으면,Behaviors.setup { context =>
을 사용-
ActorContext
의spawn
을 이용해서 child actor 를 생성-
replyTo
: 아래 코드에서는 message 를 받으면,HelloWorldBot
behavior 에 의해 정의된 child actor 를 생성하게 된다.
-
-
Behaviors.receive
는 context 를 같이 넘겨준다.-
Behaviors.setup
,Behaviors.receiveMessage
를 따로 쓰거나,Behavior.setup
을 사용안한다면,Behaviors.receiveMessage
에서는 context 를 얻을 수 없다. - 그 때 receive 를 이용해서 처리할 수 있다.
-
object HelloWorldMain {
final case class SayHello(name: String)
def apply(): Behavior[SayHello] =
Behaviors.setup { context =>
// dispatcher 생성
val dispatcherPath = "akka.actor.default-blocking-io-dispatcher"
val props = DispatcherSelector.fromConfig(dispatcherPath)
val greeter = context.spawn(HelloWorld(), "greeter", props)
Behaviors.receiveMessage { message =>
val replyTo = context.spawn(HelloWorldBot(max = 3), message.name)
greeter ! HelloWorld.Greet(message.name, replyTo)
Behaviors.same
}
}
}
call stack
주기적으로 Mailbox.run() 에서 SystemMessage 들을 처리하고, 그 다음 Mailbox 를 처리하게 된다.
ForkJoinWorkerThread.run()
ForkJoinPool.runWorker(ForkJoinPool$WorkQueue)
ForkJoinPool.scan(ForkJoinPool$WorkQueue,int)
ForkJoinPool$WorkQueue.topLevelExec(ForkJoinTask,ForkJoinPool$WorkQueue,int)
ForkJoinTask.doExec()
Mailbox.exec()
Mailbox.run()
Mailbox.processMailbox(int,long)
ActorCell.invoke(Envelope)
ActorCell.receiveMessage(Object)
ActorAdapter.aroundReceive(PartialFunction,Object)
ActorAdapter.handleMessage(Object)
Behavior$.interpretMessage(Behavior,TypedActorContext,Object)
Behavior$.interpret(Behavior,TypedActorContext,Object,boolean)
BehaviorImpl$ReceiveMessageBehavior.receive(TypedActorContext,Object)
1028522957.apply(Object) (Unknown Source:-1)
Helloer$.$anonfun$apply$2(ActorContext,Helloer$SayHello)
object Helloer {
final case class SayHello(name: String)
def apply(): Behavior[SayHello] =
Behaviors.setup { context =>
Behaviors.receiveMessage { message =>
context.log.info("Helloer {} for {}", "gogo", message)
Behaviors.same
}
}
}
object Behaviors {
def receive[T](onMessage: (ActorContext[T], T) => Behavior[T]): Receive[T] = new ReceiveImpl(onMessage)
def receiveMessage[T](onMessage: T => Behavior[T]): Receive[T] = new ReceiveMessageImpl(onMessage)
@InternalApi
private[akka] final class ReceiveMessageImpl[T](onMessage: T => Behavior[T])
extends BehaviorImpl.ReceiveMessageBehavior[T](onMessage)
with Receive[T] {
override def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] =
new BehaviorImpl.ReceiveMessageBehavior[T](onMessage, onSignal)
}
class ReceiveMessageBehavior[T](
val onMessage: T => Behavior[T],
onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] =
BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
extends ExtensibleBehavior[T] {
override def receive(ctx: AC[T], msg: T) = onMessage(msg)
...
}
Behavior.same
-
Behaviors.same
은 다음 message 를 처리할 때도 현재 함수를 사용하겠다고 하는 것인데, 아래 code 를 보면, 어느정도 의미를 파악할 수 있다. oop style 과 functional style 를 비교해보고,case Increment
와cast GetValue
를 비교 확인해 보자.
source from: Style guide • Akka Documentation
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
object Counter {
sealed trait Command
case object Increment extends Command
final case class GetValue(replyTo: ActorRef[Value]) extends Command
final case class Value(n: Int)
def apply(): Behavior[Command] =
counter(0)
private def counter(n: Int): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Increment =>
val newValue = n + 1
context.log.debug("Incremented counter to [{}]", newValue)
counter(newValue)
case GetValue(replyTo) =>
replyTo ! Value(n)
Behaviors.same
}
}
}
////////////////////////////////////////////////////////
// OOP style
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.AbstractBehavior
object Counter {
sealed trait Command
case object Increment extends Command
final case class GetValue(replyTo: ActorRef[Value]) extends Command
final case class Value(n: Int)
def apply(): Behavior[Command] = {
Behaviors.setup(context => new Counter(context))
}
}
class Counter(context: ActorContext[Counter.Command]) extends AbstractBehavior[Counter.Command](context) {
import Counter._
private var n = 0
override def onMessage(msg: Command): Behavior[Counter.Command] = {
msg match {
case Increment =>
n += 1
context.log.debug("Incremented counter to [{}]", n)
this
case GetValue(replyTo) =>
replyTo ! Value(n)
this
}
}
}
watch
- 만약 actor 의 terminate 하는 것을 알고 싶다면,
watch
를 사용할 수 있다. - actor 는 다른 actor 의 terminate 을 관찰할 수 있다.
-
watch
를 사용하면,Terminated
신호를 받게 된다. - 아래 같은 경우 Gabller Actor 가 생성되고, Gabbler 를 관찰하게 된다.(
watch
) - Gabbler 가 종료되면,
receiveSignal
이 호출된다.
object Main {
def apply(): Behavior[NotUsed] =
Behaviors.setup { context =>
val gabblerRef = context.spawn(Gabbler(), "gabbler")
context.watch(gabblerRef)
Behaviors.receiveSignal {
case (context, Terminated(ref)) =>
context.log.info("Job stopped: {}", ref.path.name)
Behaviors.same
// Behaviors.stopped
}
}
akka example : Hello world
//#full-example package com.schat import akka.actor.typed.ActorSystem import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.Behaviors //#main-class object App { final case class SayHello(name: String) object RootBehavior { def apply(): Behavior[SayHello] = Behaviors.setup[SayHello] { context => Behaviors.receiveMessage { message => context.log.info("Greeting {} for {}", "namh", message) Behaviors.same } } } def main(args: Array[String]): Unit = { //#actor-system val rb = RootBehavior() val rooActor: ActorSystem[SayHello] = ActorSystem(rb, "Schat") //#main-send-messages rooActor ! SayHello("Charles") } } //#main-class //#full-example
See also
댓글 없음:
댓글 쓰기