[컴] akka 사용

scala / react /

 

Akka 에서 Actor

Guardian Actor

  • ActorSystem 으로 message 를 보내면, root Actor 로 message 가 가게 된다.
  • ActorSystem이 생성될 때 user guardian actor 가 생성된다.
  • 대체로 app 의 초기과정, 여러 subsystems 을 spawn 하고, 그들의 lifecycle 을 감시하는 역할을 한다.
val system: ActorSystem[HelloWorldMain.SayHello] =
  ActorSystem(HelloWorldMain(), "hello")

system ! HelloWorldMain.SayHello("World")
system ! HelloWorldMain.SayHello("Akka")

Behaviors.setup, Behaviors.receiveMessage

  • Behaviors.receiveMessage 는 Behaviors.receive 의 간략한 버전이다. context 가 이미 얻어진 상황에서 사용하면 된다.
  • behavior instance 의 생성이 Behaviors.receive 는 actor 가 실행되기 전이라도 behavior instance 를 만들지만, Behaviors.setup 은 actor 가 실행되기 전까지 behavior instance 의 생성이 지연된다.
  • ActorContext 를 사용하고 싶으면, Behaviors.setup { context => 을 사용
  • ActorContextspawn 을 이용해서 child actor 를 생성
    • replyTo : 아래 코드에서는 message 를 받으면, HelloWorldBot behavior 에 의해 정의된 child actor 를 생성하게 된다.
  • Behaviors.receive 는 context 를 같이 넘겨준다.
    • Behaviors.setup, Behaviors.receiveMessage 를 따로 쓰거나, Behavior.setup을 사용안한다면, Behaviors.receiveMessage 에서는 context 를 얻을 수 없다.
    • 그 때 receive 를 이용해서 처리할 수 있다.
object HelloWorldMain {

  final case class SayHello(name: String)

  def apply(): Behavior[SayHello] =
    Behaviors.setup { context =>
      // dispatcher 생성
      val dispatcherPath = "akka.actor.default-blocking-io-dispatcher"
      val props = DispatcherSelector.fromConfig(dispatcherPath)

      val greeter = context.spawn(HelloWorld(), "greeter", props)

      Behaviors.receiveMessage { message =>
        val replyTo = context.spawn(HelloWorldBot(max = 3), message.name)
        greeter ! HelloWorld.Greet(message.name, replyTo)
        Behaviors.same
      }
    }

}

call stack 

주기적으로 Mailbox.run() 에서 SystemMessage 들을 처리하고, 그 다음 Mailbox 를 처리하게 된다.

ForkJoinWorkerThread.run()
ForkJoinPool.runWorker(ForkJoinPool$WorkQueue)
ForkJoinPool.scan(ForkJoinPool$WorkQueue,int)
ForkJoinPool$WorkQueue.topLevelExec(ForkJoinTask,ForkJoinPool$WorkQueue,int)
ForkJoinTask.doExec()
Mailbox.exec()
Mailbox.run()
Mailbox.processMailbox(int,long)
ActorCell.invoke(Envelope)
ActorCell.receiveMessage(Object)
ActorAdapter.aroundReceive(PartialFunction,Object)
ActorAdapter.handleMessage(Object)
Behavior$.interpretMessage(Behavior,TypedActorContext,Object)
Behavior$.interpret(Behavior,TypedActorContext,Object,boolean)
BehaviorImpl$ReceiveMessageBehavior.receive(TypedActorContext,Object)
1028522957.apply(Object) (Unknown Source:-1)
Helloer$.$anonfun$apply$2(ActorContext,Helloer$SayHello)
object Helloer {
  final case class SayHello(name: String)
  def apply(): Behavior[SayHello] =
    Behaviors.setup { context =>
      Behaviors.receiveMessage { message =>
        context.log.info("Helloer {} for {}", "gogo", message)
        Behaviors.same
      }
    }
}

object Behaviors {
  def receive[T](onMessage: (ActorContext[T], T) => Behavior[T]): Receive[T] = new ReceiveImpl(onMessage)
  def receiveMessage[T](onMessage: T => Behavior[T]): Receive[T] = new ReceiveMessageImpl(onMessage)


@InternalApi
private[akka] final class ReceiveMessageImpl[T](onMessage: T => Behavior[T])
  extends BehaviorImpl.ReceiveMessageBehavior[T](onMessage)
  with Receive[T] {

override def receiveSignal(onSignal: PartialFunction[(ActorContext[T], Signal), Behavior[T]]): Behavior[T] =
  new BehaviorImpl.ReceiveMessageBehavior[T](onMessage, onSignal)
}


class ReceiveMessageBehavior[T](
      val onMessage: T => Behavior[T],
      onSignal: PartialFunction[(SAC[T], Signal), Behavior[T]] =
        BehaviorImpl.unhandledSignal.asInstanceOf[PartialFunction[(SAC[T], Signal), Behavior[T]]])
      extends ExtensibleBehavior[T] {

    override def receive(ctx: AC[T], msg: T) = onMessage(msg)
    ...
}

Behavior.same

  • Behaviors.same 은 다음 message 를 처리할 때도 현재 함수를 사용하겠다고 하는 것인데, 아래 code 를 보면, 어느정도 의미를 파악할 수 있다. oop style 과 functional style 를 비교해보고, case Incrementcast GetValue 를 비교 확인해 보자.

source from: Style guide • Akka Documentation

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors

object Counter {
  sealed trait Command
  case object Increment extends Command
  final case class GetValue(replyTo: ActorRef[Value]) extends Command
  final case class Value(n: Int)

  def apply(): Behavior[Command] =
    counter(0)

  private def counter(n: Int): Behavior[Command] =
    Behaviors.receive { (context, message) =>
      message match {
        case Increment =>
          val newValue = n + 1
          context.log.debug("Incremented counter to [{}]", newValue)
          counter(newValue)
        case GetValue(replyTo) =>
          replyTo ! Value(n)
          Behaviors.same
      }
    }
}

////////////////////////////////////////////////////////
// OOP style
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.AbstractBehavior

object Counter {
  sealed trait Command
  case object Increment extends Command
  final case class GetValue(replyTo: ActorRef[Value]) extends Command
  final case class Value(n: Int)

  def apply(): Behavior[Command] = {
    Behaviors.setup(context => new Counter(context))
  }
}

class Counter(context: ActorContext[Counter.Command]) extends AbstractBehavior[Counter.Command](context) {
  import Counter._

  private var n = 0

  override def onMessage(msg: Command): Behavior[Counter.Command] = {
    msg match {
      case Increment =>
        n += 1
        context.log.debug("Incremented counter to [{}]", n)
        this
      case GetValue(replyTo) =>
        replyTo ! Value(n)
        this
    }
  }
}

watch

  • 만약 actor 의 terminate 하는 것을 알고 싶다면, watch 를 사용할 수 있다.
  • actor 는 다른 actor 의 terminate 을 관찰할 수 있다.
  • watch 를 사용하면, Terminated 신호를 받게 된다.
  • 아래 같은 경우 Gabller Actor 가 생성되고, Gabbler 를 관찰하게 된다.(watch)
  • Gabbler 가 종료되면, receiveSignal 이 호출된다.
object Main {
  def apply(): Behavior[NotUsed] =
    Behaviors.setup { context =>
      val gabblerRef = context.spawn(Gabbler(), "gabbler")
      context.watch(gabblerRef)

      Behaviors.receiveSignal {
        case (context, Terminated(ref)) =>
          context.log.info("Job stopped: {}", ref.path.name)
          Behaviors.same
          // Behaviors.stopped
      }
    }

akka example : Hello world

//#full-example
package com.schat

import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

//#main-class
object App {
  final case class SayHello(name: String)

  object RootBehavior {
    def apply(): Behavior[SayHello] =
      Behaviors.setup[SayHello] { context =>

        Behaviors.receiveMessage { message =>
          context.log.info("Greeting {} for {}", "namh", message)
          Behaviors.same
        }
      }
  }

  def main(args: Array[String]): Unit = {
    //#actor-system
    val rb = RootBehavior()
    val rooActor: ActorSystem[SayHello] = ActorSystem(rb, "Schat")

    //#main-send-messages
    rooActor ! SayHello("Charles")

  }

}
//#main-class
//#full-example

See also

  1. 쿠...sal: [컴] akka 를 이용한 chat server
  2. 쿠...sal: [컴] 정리 - akka in action 2014 

 

Reference

  1. Actor lifecycle • Akka Documentation

댓글 없음:

댓글 쓰기