over 1 year ago

Akka Cluster 觀念有點特別,你必須要有類似自動發現機制的 Seed,但是Seed 本身就是 Actor,整個叢集自成一脈,說實在有好有壞,不過還好他也可以支援 Zookeeper,不過要評估一下未來會不會有異質服務加入,不然到時要改就累了

不過這邊先拿原生Seed的機制來練習。

大致上如下圖,幾點要注意一下
1.兩個Seed節點可以互相加入,就可以知道彼此所有路徑
2.工作結點可以加入其中一個Seed Node或是兩個都嘗試加入
3.Akka好像尚未實現自動路由機制,還是要在程式理面實做的樣子

Seed-Node

官方練習的範例我把Seed部分獨立出來

build.sbt
name := "akkaseed"

version := "1.0"

scalaVersion := "2.11.7"

scalacOptions in Compile ++= Seq("-encoding", "UTF-8", "-target:jvm-1.8", "-deprecation", "-feature", "-unchecked", "-Xlog-reflective-calls", "-Xlint")

javacOptions in Compile ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint:unchecked", "-Xlint:deprecation")

libraryDependencies ++= {
  val akkaVersion = "2.4.1"
  Seq(
    "com.typesafe.akka" %% "akka-actor" % akkaVersion,
    "com.typesafe.akka" %% "akka-remote" % akkaVersion,
    "com.typesafe.akka" %% "akka-cluster" % akkaVersion,
    "com.typesafe.akka" %% "akka-cluster-metrics" % akkaVersion,
    "com.typesafe.akka" %% "akka-cluster-tools" % akkaVersion,
    "com.typesafe.akka" %% "akka-multi-node-testkit" % akkaVersion,
    "org.scalatest" %% "scalatest" % "2.2.1" % "test",
    "io.kamon" % "sigar-loader" % "1.6.6-rev002"
  )
}
application.conf
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
}

# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
ClusterSeedListener.scala
package com.sam.seed

import akka.actor.{ActorLogging, Actor}
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._

class ClusterSeedListener extends Actor with ActorLogging {
  val cluster = Cluster(context.system);

  // subscribe to cluster changes, re-subscribe when restart
  override def preStart(): Unit = {
    //#subscribe
    cluster.subscribe(self, initialStateMode = InitialStateAsEvents, classOf[MemberEvent], classOf[UnreachableMember])
  };

  override def postStop(): Unit = cluster.unsubscribe(self);

  def receive = {
    case MemberUp(member) =>
      log.info("Member is Up: {}", member.address)

    case UnreachableMember(member) =>
      log.info("Member detected as unreachable: {}", member)

    case MemberRemoved(member, previousStatus) =>
      log.info("Member is Removed: {} after {}", member.address, previousStatus);

    case _: MemberEvent => // ignore
  }
}
ClusterSeedApp.scala
package com.sam.seed

import akka.actor.{Props, ActorSystem}
import com.typesafe.config.ConfigFactory

object ClusterSeedApp {
  def main(args: Array[String]): Unit = {
    if (args.isEmpty)
      startup(Seq("2551", "2552"))
    else
      startup(args)
  }

  def startup(ports: Seq[String]): Unit = {
    ports foreach { port =>
      // Override the configuration of the port
      val config = ConfigFactory.parseString("akka.remote.netty.tcp.port=" + port).
        withFallback(ConfigFactory.load())

      // Create an Akka system
      val system = ActorSystem("ClusterSystem", config)
      // Create an actor that handles cluster domain events
      system.actorOf(Props[ClusterSeedListener], name = "clusterListener")
    }
  }
}

起動訊息

[INFO] [03/11/2016 13:28:17.125] [main] [akka.remote.Remoting] Starting remoting
[INFO] [03/11/2016 13:28:17.674] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [03/11/2016 13:28:17.701] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Starting up...
[INFO] [03/11/2016 13:28:17.893] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [03/11/2016 13:28:17.893] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Started up successfully
[INFO] [03/11/2016 13:28:18.040] [main] [akka.remote.Remoting] Starting remoting
[INFO] [03/11/2016 13:28:18.069] [main] [akka.remote.Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[INFO] [03/11/2016 13:28:18.070] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Starting up...
[INFO] [03/11/2016 13:28:18.074] [main] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Started up successfully
no sigar-amd64-winnt.dll in java.library.path
org.hyperic.sigar.SigarException: no sigar-amd64-winnt.dll in java.library.path
三月 11, 2016 1:28:19 下午 kamon.sigar.SigarProvisioner provision
資訊: Sigar library provisioned: D:\workspace_idea\akkaseed\target\native\sigar-amd64-winnt.dll
三月 11, 2016 1:28:19 下午 kamon.sigar.SigarProvisioner provision
警告: Sigar library is already provisioned.
....
....
三月 11, 2016 1:28:19 下午 kamon.sigar.SigarProvisioner provision
資訊: Sigar library provisioned: D:\workspace_idea\akkaseed\target\native\sigar-amd64-winnt.dll
三月 11, 2016 1:28:19 下午 kamon.sigar.SigarProvisioner provision
警告: Sigar library is already provisioned.
[INFO] [03/11/2016 13:28:19.096] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Metrics collection has started successfully
[INFO] [03/11/2016 13:28:19.100] [ClusterSystem-akka.actor.default-dispatcher-23] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics collection has started successfully
[INFO] [03/11/2016 13:28:19.255] [ClusterSystem-akka.actor.default-dispatcher-14] [akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1] Message [akka.cluster.InternalClusterAction$InitJoinNack] from Actor[akka.tcp://ClusterSystem@127.0.0.1:2552/system/cluster/core/daemon#1476353305] to Actor[akka://ClusterSystem/system/cluster/core/daemon/firstSeedNodeProcess-1#368132738] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.
[INFO] [03/11/2016 13:28:19.263] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
[INFO] [03/11/2016 13:28:19.281] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[INFO] [03/11/2016 13:28:19.291] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] [03/11/2016 13:28:24.321] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
[INFO] [03/11/2016 13:28:24.500] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [03/11/2016 13:28:24.501] [ClusterSystem-akka.actor.default-dispatcher-2] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2551
[INFO] [03/11/2016 13:28:24.941] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
[INFO] [03/11/2016 13:28:24.943] [ClusterSystem-akka.actor.default-dispatcher-17] [akka.tcp://ClusterSystem@127.0.0.1:2551/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2552
[INFO] [03/11/2016 13:28:24.956] [ClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://ClusterSystem@127.0.0.1:2552/user/clusterListener] Member is Up: akka.tcp://ClusterSystem@127.0.0.1:2552

仔細看一下你會看到兩個seed node互加的行為,也會看到個 Sigar library Excetion,不過這個沒關係先不用管,這類似要取得主機資訊的原生套件。

FrontEnd

build.sbt 跟 application.conf 跟前面一樣是,但 package 我調整了一下,等下要改造

這是定義Frontend跟Backend傳遞的中間訊息

TransformationMessages.scala
package com.sam.comm

final case class TransformationJob(text: String)
final case class TransformationResult(text: String)
final case class JobFailed(reason: String, job: TransformationJob)
case object BackendRegistration

這邊是Front真正的動作

TransformationFrontend.scala
package com.sam.front

import com.sam.comm.{BackendRegistration, JobFailed, TransformationJob}
import language.postfixOps
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.Terminated
import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import java.util.concurrent.atomic.AtomicInteger

//#frontend
class TransformationFrontend extends Actor {

  var backends = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  def receive = {
    case job: TransformationJob if backends.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later", job)

    case job: TransformationJob =>
      jobCounter += 1
      backends(jobCounter % backends.size) forward job

    case BackendRegistration if !backends.contains(sender()) =>
      context watch sender()
      backends = backends :+ sender()

    case Terminated(a) =>
      backends = backends.filterNot(_ == a)
  }
}
//#frontend
object TransformationFrontend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [frontend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    val frontend = system.actorOf(Props[TransformationFrontend], name = "frontend")
    
    // 模擬了測試功能定時固定對 frontend Actor 傳遞消息並接收結果
    val counter = new AtomicInteger
    import system.dispatcher
    system.scheduler.schedule(2.seconds, 2.seconds) {
      implicit val timeout = Timeout(5 seconds)
      (frontend ? TransformationJob("hello-" + counter.incrementAndGet())) onSuccess {
        case result => println(result)
      }
    }
  }
}

啟動部分就是分別啟動兩個隨機port服務

TransformationApp.scala
package com.sam.front

object TransformationApp {
  // starting 2 frontend nodes
  TransformationFrontend.main(Array.empty)
  TransformationFrontend.main(Array.empty)
}

BackEnd

後端部分的 build.sbt 、 application.conf 、 TransformationMessages.scala 都是一樣的

主要Actor

TransformationBackend.scala
package com.sam.back

import com.sam.comm.{TransformationJob, TransformationResult, BackendRegistration}

import language.postfixOps
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor.RootActorPath
import akka.cluster.Cluster
import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.Member
import akka.cluster.MemberStatus
import com.typesafe.config.ConfigFactory

//#backend
class TransformationBackend extends Actor {

  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  def receive = {
    case TransformationJob(text) => sender() ! TransformationResult(text.toUpperCase)
    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register
    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") ! BackendRegistration
}
//#backend

object TransformationBackend {
  def main(args: Array[String]): Unit = {
    // Override the configuration of the port when specified as program argument
    val port = if (args.isEmpty) "0" else args(0)
    val config = ConfigFactory.parseString(s"akka.remote.netty.tcp.port=$port").
      withFallback(ConfigFactory.parseString("akka.cluster.roles = [backend]")).
      withFallback(ConfigFactory.load())

    val system = ActorSystem("ClusterSystem", config)
    system.actorOf(Props[TransformationBackend], name = "backend")
  }
}

啟動

TransformationApp.scala
package com.sam.back

object TransformationApp {
  def main(args: Array[String]): Unit = {
    // starting 3 backend nodes
    TransformationBackend.main(Array.empty)
    TransformationBackend.main(Array.empty)
    TransformationBackend.main(Array.empty)
  }
}

測試

依序起動 SeedNode、FrontEnd、BackEnd後觀察log

FrontEndLog
[INFO] [03/11/2016 16:15:55.381] [ClusterSystem-akka.actor.default-dispatcher-14] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:62959] - Metrics collection has started successfully
[INFO] [03/11/2016 16:15:55.382] [ClusterSystem-akka.actor.default-dispatcher-18] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:62972] - Metrics collection has started successfully
[INFO] [03/11/2016 16:15:55.667] [ClusterSystem-akka.actor.default-dispatcher-19] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:62959] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
[INFO] [03/11/2016 16:15:55.667] [ClusterSystem-akka.actor.default-dispatcher-25] [akka.cluster.Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:62972] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
JobFailed(Service unavailable, try again later,TransformationJob(hello-1))
JobFailed(Service unavailable, try again later,TransformationJob(hello-1))
[WARN] [03/11/2016 16:15:59.192] [ClusterSystem-akka.remote.default-remote-dispatcher-5] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [com.sam.comm.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
TransformationResult(HELLO-2)
[WARN] [03/11/2016 16:15:59.323] [ClusterSystem-akka.remote.default-remote-dispatcher-8] [akka.serialization.Serialization(akka://ClusterSystem)] Using the default Java serializer for class [com.sam.comm.TransformationJob] which is not recommended because of performance implications. Use another serializer or disable this warning using the setting 'akka.actor.warn-about-java-serializer-usage'
TransformationResult(HELLO-2)
TransformationResult(HELLO-3)
TransformationResult(HELLO-3)
TransformationResult(HELLO-4)

下一篇再加上Akka Http

← Setup Kubernetes 1.0 at CentOS use yum akka cluster with akka http →
 
comments powered by Disqus