about 1 year ago

前面玩了一下 Akka Cluster,現在再把 Akka Http 加進來整合。

建立基本的 Akka Http 服務

首先 SBT 組態要改一下,從 0.13.8 改成下面

build.properties
sbt.version = 0.13.11

不然會發生警告如下

下午 04:30:08 SBT project import
            [warn] Multiple dependencies with the same organization/name but different versions. To avoid conflict, pick one version:
            [warn]  * org.scala-lang:scala-reflect:(2.11.7, 2.11.8)
            [warn]  * org.scala-lang.modules:scala-xml_2.11:(1.0.2, 1.0.4)

依賴的部分

build.sbt
name := "akkafront2"

version := "1.0"

scalaVersion := "2.11.8"

libraryDependencies ++= {
  val akkaV       = "2.4.2"
  val scalaTestV  = "2.2.6"
  Seq(
    "com.typesafe.akka" %% "akka-actor"                           % akkaV,
    "com.typesafe.akka" %% "akka-cluster"                         % akkaV,
    "com.typesafe.akka" %% "akka-cluster-metrics"                 % akkaV,
    "com.typesafe.akka" %% "akka-http-experimental"               % akkaV,
    "com.typesafe.akka" %% "akka-http-spray-json-experimental"    % akkaV,
    "org.scalatest"     %% "scalatest"                            % scalaTestV % "test"
  )
}
application.conf
akka {
  loglevel = DEBUG
}

http {
  interface = "0.0.0.0"
  port = 9000
}
TransformationMessages.scala
package com.sam.comm

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol

final case class Posts(text: String)

trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val postsFormat = jsonFormat1(Posts)
}
PostsOperations.scala
package com.sam.routes

import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import akka.http.scaladsl.server.Directives
import akka.stream.Materializer
import com.sam.comm._
import com.typesafe.config.Config
import scala.concurrent.ExecutionContextExecutor

/**
  * Created by SAM on 2016/3/29.
  */

trait PostsOperations extends Directives with JsonSupport {
  implicit val system: ActorSystem

  implicit def executor: ExecutionContextExecutor

  implicit val materializer: Materializer

  def config: Config

  val logger: LoggingAdapter

  lazy val postsRoutes = {
    getRoute ~ postRoute
  }

  private val getRoute =
    logRequestResult("posts-get") {
      path("api" / "v1" / "posts" / Segment) { postid =>
        get {
          complete {
            Map("postid" -> postid)
          }
        }
      }
    }

  private val postRoute =
    logRequestResult("posts-post") {
      path("api" / "v1" / "posts") {
          post {
            entity(as[Posts]) { posts =>
              complete {
                //回覆物件做Json序列化
                Map("posts.text" -> posts.text)
                //回覆成字串
                //s"Posts.text: ${posts.text}"
              }
            }
          }
      }
    }
}
RestInterface.scala
package com.sam.routes

/**
  * Created by SAM on 2016/3/29.
  */
  
trait RestInterface extends PostsOperations {
  val routes = {
    postsRoutes
  }
}
AkkaHttpMicroservice.scala
package com.sam.server

import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import com.sam.routes.RestInterface
import com.typesafe.config.ConfigFactory

/**
  * Created by SAM on 2016/3/29.
  */

object AkkaHttpMicroservice extends App with RestInterface{
  override implicit val system = ActorSystem()
  override implicit val executor = system.dispatcher
  override implicit val materializer = ActorMaterializer()

  override val config = ConfigFactory.load()
  override val logger = Logging(system, getClass)

  val port = config.getInt("http.port")
  val interface = config.getString("http.interface")

  val binding = Http().bindAndHandle(routes, interface, port)
  logger.info(s"Bound to port $port on interface $interface")
  binding onFailure {
    case ex: Exception 
      logger.error(s"Failed to bind to $interface:$port!", ex)
  }
  sys.addShutdownHook(system.terminate())
}

以上就是一個最基本的 Akka Http 範例

啟動後

[DEBUG] [03/29/2016 15:40:24.648] [main] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [03/29/2016 15:40:24.649] [main] [EventStream(akka://default)] Default Loggers started
[DEBUG] [03/29/2016 15:40:24.880] [main] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [03/29/2016 15:40:25.016] [main] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@5dafbe45
[INFO] [03/29/2016 15:40:29.169] [main] [AkkaHttpMicroservice$(akka://default)] Bound to port 9000 on interface 0.0.0.0
[DEBUG] [03/29/2016 15:40:29.242] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
//測試1
curl -X GET -H "Content-Type: application/json" -H "Cache-Control: no-cache" "http://127.0.0.1:9000/api/v1/posts/ABC"

//結果
{
  "postid": "ABC"
}

//程式端的log
[DEBUG] [03/29/2016 17:23:11.825] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [03/29/2016 17:23:12.345] [default-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(default)] posts-get: Response for
  Request : HttpRequest(HttpMethod(GET),http://127.0.0.1:9000/api/v1/posts/ABC,List(Host: 127.0.0.1:9000, Connection: keep-alive, User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36, Cache-Control: no-cache, Postman-Token: dda4bb67-b788-faea-6537-7eb19caa5a36, Accept: */*, Accept-Encoding: gzip, deflate, sdch, Accept-Language: zh-TW, zh;q=0.8, en-US;q=0.6, en;q=0.4, Timeout-Access: <function1>),HttpEntity.Strict(application/json,),HttpProtocol(HTTP/1.1))
  Response: Complete(HttpResponse(200 OK,List(),HttpEntity.Strict(application/json,{
  "postid": "ABC"
}),HttpProtocol(HTTP/1.1)))
//測試2
curl -X POST -H "Content-Type: application/json" -H "Cache-Control: no-cache" -d '{
  "text": "哩哩扣扣"
}' "http://127.0.0.1:9000/api/v1/posts"

//結果
{
  "posts.text": "哩哩扣扣"
}

//程式端的log
[DEBUG] [03/29/2016 17:29:40.702] [default-akka.actor.default-dispatcher-6] [akka.actor.ActorSystemImpl(default)] posts-post: Response for
  Request : HttpRequest(HttpMethod(POST),http://127.0.0.1:9000/api/v1/posts,List(Host: 127.0.0.1:9000, Connection: keep-alive, Cache-Control: no-cache, Origin: chrome-extension://fhbjgbiflinjbdggehcddcbncdddomop, User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36, Postman-Token: c56ff82a-af88-1941-f374-9c44234121d8, Accept: */*, Accept-Encoding: gzip, deflate, Accept-Language: zh-TW, zh;q=0.8, en-US;q=0.6, en;q=0.4, Timeout-Access: <function1>),HttpEntity.Strict(application/json,{
  "text": "哩哩扣扣"
}),HttpProtocol(HTTP/1.1))
  Response: Complete(HttpResponse(200 OK,List(),HttpEntity.Strict(application/json,{
  "posts.text": "哩哩扣扣"
}),HttpProtocol(HTTP/1.1)))

改成 Akka Cluster 的 FrontEnd

設定檔的修改

application.conf
http {
  interface = "0.0.0.0"
  port = 9000
}

akka {
  loglevel = DEBUG
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "0.0.0.0"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
}

# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled = off

# Enable metrics extension in akka-cluster-metrics.
akka.extensions = ["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder = ${user.dir}/target/native

增加傳輸要用的物件,這部分前後端都會有

TransformationMessages.scala
package com.sam.comm

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol

final case class Posts(text: String)

/**
  * 建立 post
  * @param posts
  */
final case class PostsCreateJob(posts: Posts)

/**
  * 取得 post
  * @param postid
  */
final case class PostsGetJob(postid: String)

/**
  * 如果沒有後端服務的話會發生失敗
  * @param text
  */
final case class JobFailed(text: String)

/**
  * 定義 Json 序列化跟反序列化
  */
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val postsFormat = jsonFormat1(Posts)
}

/**
  * 後端註冊 Post 服務
  */
case object PostsActorRegister

前端用Actor

PostsSender.scala
package com.sam.actor

import akka.actor._
import com.sam.comm._

class PostsSender extends Actor with ActorLogging {
  var postsActors = IndexedSeq.empty[ActorRef]
  var jobCounter = 0

  override def receive: Receive = {

    case job: PostsCreateJob if postsActors.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later")

    case job: PostsGetJob if postsActors.isEmpty =>
      sender() ! JobFailed("Service unavailable, try again later")

    case job: PostsCreateJob =>
      jobCounter += 1
      postsActors(jobCounter % postsActors.size) forward job

    case job: PostsGetJob =>
      jobCounter += 1
      postsActors(jobCounter % postsActors.size) forward job

    //後端註冊服務
    case PostsActorRegister if !postsActors.contains(sender()) =>
      context watch sender()
      postsActors = postsActors :+ sender()

    //後端離開服務
    case Terminated(a) =>
      postsActors = postsActors.filterNot(_ == a)
  }
}

路徑跟Actor操作,只做了兩個Post跟Get當練習,後端會回傳包好的Right跟Left也就不用考慮太多,再來就是沒有服務的JobFailed狀況

PostsOperations.scala
package com.sam.routes

import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorSystem}
import akka.event.LoggingAdapter
import akka.http.scaladsl.model.StatusCodes
import akka.pattern.ask
import akka.http.scaladsl.server.Directives
import akka.stream.Materializer
import akka.util.Timeout
import com.sam.comm._
import com.typesafe.config.Config
import scala.concurrent.{ExecutionContext, Future}

trait PostsOperations extends Directives with JsonSupport {
  implicit val system: ActorSystem

  implicit val executor: ExecutionContext

  implicit val materializer: Materializer

  def config: Config

  val logger: LoggingAdapter

  implicit def postsSender: ActorRef

  implicit val timeout = Timeout(5, TimeUnit.SECONDS)

  lazy val postsRoutes = {
    getRoute ~ postRoute
  }

  private val getRoute =
    path("api" / "v1" / "posts" / Segment) { postid =>
      get {
        logRequestResult("posts-get") {
          val reply: Future[Any] = postsSender ? PostsGetJob(postid)
          onSuccess(reply) { x =>
            x match {
              case Right(posts: Posts) => complete(StatusCodes.OK, posts)
              case Left(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
              case JobFailed(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
            }
          }
        }
      }
    }

  private val postRoute =
    path("api" / "v1" / "posts") {
      post {
        logRequestResult("posts-post") {
          entity(as[Posts]) { posts =>
            val reply: Future[Any] = postsSender ? PostsCreateJob(posts)
            onSuccess(reply) { x =>
              x match {
                case Right(postid: String) => complete(StatusCodes.Created, Map("postid" -> postid))
                case Left(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
                case JobFailed(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
              }
            }
          }
        }
      }
    }
}
RestInterface.scala
package com.sam.routes

/**
  * 管理有哪些路徑可提供服務
  */
trait RestInterface extends PostsOperations {
  val routes = {
    postsRoutes
  }
}

啟動的時候

AkkaHttpMicroservice.scala
package com.sam.server

import akka.actor.{ActorSystem, Props}
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import com.sam.actor.PostsSender
import com.sam.routes.RestInterface
import com.typesafe.config.ConfigFactory

object AkkaHttpMicroservice extends App with RestInterface {

  override val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
    withFallback(ConfigFactory.load())

  override implicit val system = ActorSystem("ClusterSystem", config)
  override implicit val postsSender = system.actorOf(Props[PostsSender], name = "frontend")

  override implicit val executor = system.dispatcher
  override implicit val materializer = ActorMaterializer()

  override val logger = Logging(system, getClass)


  val port = config.getInt("http.port")
  val interface = config.getString("http.interface")

  val binding = Http().bindAndHandle(routes, interface, port)
  logger.info(s"Bound to port $port on interface $interface")
  binding onFailure {
    case ex: Exception 
      logger.error(s"Failed to bind to $interface:$port!", ex)
  }
  sys.addShutdownHook(system.terminate())
}

增加 Akka Cluster 的 BackEnd

設定檔

application.conf
akka {
  actor {
    provider = "akka.cluster.ClusterActorRefProvider"
  }
  remote {
    log-remote-lifecycle-events = off
    netty.tcp {
      hostname = "127.0.0.1"
      port = 0
    }
  }

  cluster {
    seed-nodes = [
      "akka.tcp://ClusterSystem@127.0.0.1:2551",
      "akka.tcp://ClusterSystem@127.0.0.1:2552"]

    auto-down-unreachable-after = 10s
  }
}

# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off

# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]

# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
TransformationMessages.scala
package com.sam.comm

import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol

final case class Posts(text: String)

/**
  * 建立 post
  * @param posts
  */
final case class PostsCreateJob(posts: Posts)

/**
  * 取得 post
  * @param postid
  */
final case class PostsGetJob(postid: String)

/**
  * 定義 Json 序列化跟反序列化
  */
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
  implicit val postsFormat = jsonFormat1(Posts)
}

/**
  * 後端註冊 Post 服務
  */
case object PostsActorRegister

後端用 Hazelcast 當儲存

HazelcastStore.scala
package com.sam.data

import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.AtomicInteger

import com.hazelcast.config.Config
import com.hazelcast.core.{Hazelcast, HazelcastInstance}
import com.sam.comm.Posts

/**
  * Created by SAM on 2016/3/30.
  */

object HazelcastStore {
  val config = new Config();
  val hInstance = Hazelcast.newHazelcastInstance(config);
  val counter = new AtomicInteger
}

trait HazelcastStore {
  /**
    * 取得資料儲存庫
    *
    * @return
    */
  def getInstance() = {
    HazelcastStore.hInstance
  }

  def getPosts(postid: String): Either[String, Posts] = {
    try {
      val map: ConcurrentMap[String, Posts] = HazelcastStore.hInstance.getMap[String, Posts]("posts")
      val posts: Posts = map.get(postid)
      if (posts != null) {
        Right(posts)
      } else {
        Left("Resource does not exist!")
      }
    } catch {
      case e: Exception => Left("Get resource failed!")
    }
  }

  def savePosts(posts: Posts): Either[String, String] = {
    var postid = ""
    try {
      val map: ConcurrentMap[String, Posts] = HazelcastStore.hInstance.getMap[String, Posts]("posts")
      postid = HazelcastStore.counter.incrementAndGet().toString
      map.put(postid, posts)
      Right(postid)
    } catch {
      case e: Exception => Left("Save failed!")
    }
  }
}

接收的 Actor

PostsActor.scala
package com.sam.actor

import akka.actor.{Actor, RootActorPath}
import akka.cluster.{Cluster, Member, MemberStatus}
import akka.cluster.ClusterEvent.{CurrentClusterState, MemberUp}
import com.sam.comm.{PostsActorRegister, PostsCreateJob, PostsGetJob}
import com.sam.data.HazelcastStore

class PostsActor extends Actor with HazelcastStore{
  val cluster = Cluster(context.system)

  // subscribe to cluster changes, MemberUp
  // re-subscribe when restart
  override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
  override def postStop(): Unit = cluster.unsubscribe(self)

  override def receive: Receive = {
    case createJob:PostsCreateJob =>
      println("get data " + createJob.posts)
      sender() ! savePosts(createJob.posts)

    case getJob:PostsGetJob =>
      sender() ! getPosts(getJob.postid)

    case state: CurrentClusterState =>
      state.members.filter(_.status == MemberStatus.Up) foreach register

    case MemberUp(m) => register(m)
  }

  def register(member: Member): Unit =
    if (member.hasRole("frontend"))
      context.actorSelection(RootActorPath(member.address) / "user" / "frontend") ! PostsActorRegister
}

啟動程式

AkkaPostService.scala
package com.sam.server

import akka.actor.{ActorSystem, Props}
import com.sam.actor.PostsActor
import com.sam.data.HazelcastStore
import com.typesafe.config.ConfigFactory

/**
  * Created by SAM on 2016/3/30.
  */
object AkkaPostService extends App with HazelcastStore{
  val config = ConfigFactory.parseString("akka.cluster.roles = [backend]").
    withFallback(ConfigFactory.load())

  val system = ActorSystem("ClusterSystem", config)
  system.actorOf(Props[PostsActor], name = "backend")
  //init Hazelcast
  getInstance()

}

自動發現

Seed部分沿用 akka cluster example 這篇練習

測試

//建立資料
curl -X POST -H "Content-Type: application/json" -H "Cache-Control: no-cache" -d '{
  "text": "哩哩扣扣"
}' "http://127.0.0.1:9000/api/v1/posts"

//沒有後端結果
{
  "msg": "Service unavailable, try again later"
}

//正確結果
{
  "postid": "1"
}
//取得資料
curl -X GET -H "Content-Type: application/json" -H "Cache-Control: no-cache" "http://127.0.0.1:9000/api/v1/posts/1"

結果
{
  "text": "哩哩扣扣"
}

目前錯誤,不過這部分有人說升到2.4.2就可以,不過還是一樣,要再查查看

[ERROR] [03/31/2016 07:33:16.174] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/user/StreamSupervisor-0/flow-2-0-unknown-operation] Error in stage [recover]: No elements passed in the last 1 minute. (akka.http.impl.engine.HttpConnectionTimeoutException)
[DEBUG] [03/31/2016 07:33:16.174] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/user/StreamSupervisor-0/flow-2-0-unknown-operation] Aborting tcp connection because of upstream failure: No elements passed in the last 1 minute.
akka.stream.impl.Timers$IdleTimeoutBidi$$anon$4.onTimer(Timers.scala:147)
akka.stream.stage.TimerGraphStageLogic.akka$stream$stage$TimerGraphStageLogic$$onInternalTimer(GraphStage.scala:1125)
akka.stream.stage.TimerGraphStageLogic$$anonfun$akka$stream$stage$TimerGraphStageLogic$$getTimerAsyncCallback$1.apply(GraphStage.scala:1114)
akka.stream.stage.TimerGraphStageLogic$$anonfun$akka$stream$stage$TimerGraphStageLogic$$getTimerAsyncCallback$1.apply(GraphStage.scala:1114)
akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:564)
akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:397)
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:547)
akka.actor.Actor$class.aroundReceive(Actor.scala:480)
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:493)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
akka.actor.ActorCell.invoke(ActorCell.scala:495)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
akka.dispatch.Mailbox.run(Mailbox.scala:224)
akka.dispatch.Mailbox.exec(Mailbox.scala:234)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

參考資料
Akka Configuration
Akka学习笔记:ActorSystem(配置)
[翻译] AKKA笔记- ACTORSYSTEM (配置CONFIGURATION 与调度SCHEDULING)
Akka笔记之配置及调度
Akka in JAVA(四)
Akka Cluster原理与应用
Scala Either, Left And Right
Scala 初学者指南-类型 Either
【Scala】使用Option、Either和Try处理数据交互
No elements passed in the last 1 minute. #18982
Hazelcast

← akka cluster example Mac常用操作筆記 →
 
comments powered by Disqus