almost 7 years ago
前面玩了一下 Akka Cluster,現在再把 Akka Http 加進來整合。
建立基本的 Akka Http 服務
首先 SBT 組態要改一下,從 0.13.8 改成下面
sbt.version = 0.13.11
不然會發生警告如下
下午 04:30:08 SBT project import
[warn] Multiple dependencies with the same organization/name but different versions. To avoid conflict, pick one version:
[warn] * org.scala-lang:scala-reflect:(2.11.7, 2.11.8)
[warn] * org.scala-lang.modules:scala-xml_2.11:(1.0.2, 1.0.4)
依賴的部分
name := "akkafront2"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies ++= {
val akkaV = "2.4.2"
val scalaTestV = "2.2.6"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaV,
"com.typesafe.akka" %% "akka-cluster" % akkaV,
"com.typesafe.akka" %% "akka-cluster-metrics" % akkaV,
"com.typesafe.akka" %% "akka-http-experimental" % akkaV,
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % akkaV,
"org.scalatest" %% "scalatest" % scalaTestV % "test"
)
}
akka {
loglevel = DEBUG
}
http {
interface = "0.0.0.0"
port = 9000
}
package com.sam.comm
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol
final case class Posts(text: String)
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val postsFormat = jsonFormat1(Posts)
}
package com.sam.routes
import akka.actor.ActorSystem
import akka.event.LoggingAdapter
import akka.http.scaladsl.server.Directives
import akka.stream.Materializer
import com.sam.comm._
import com.typesafe.config.Config
import scala.concurrent.ExecutionContextExecutor
/**
* Created by SAM on 2016/3/29.
*/
trait PostsOperations extends Directives with JsonSupport {
implicit val system: ActorSystem
implicit def executor: ExecutionContextExecutor
implicit val materializer: Materializer
def config: Config
val logger: LoggingAdapter
lazy val postsRoutes = {
getRoute ~ postRoute
}
private val getRoute =
logRequestResult("posts-get") {
path("api" / "v1" / "posts" / Segment) { postid =>
get {
complete {
Map("postid" -> postid)
}
}
}
}
private val postRoute =
logRequestResult("posts-post") {
path("api" / "v1" / "posts") {
post {
entity(as[Posts]) { posts =>
complete {
//回覆物件做Json序列化
Map("posts.text" -> posts.text)
//回覆成字串
//s"Posts.text: ${posts.text}"
}
}
}
}
}
}
package com.sam.routes
/**
* Created by SAM on 2016/3/29.
*/
trait RestInterface extends PostsOperations {
val routes = {
postsRoutes
}
}
package com.sam.server
import akka.actor.ActorSystem
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import com.sam.routes.RestInterface
import com.typesafe.config.ConfigFactory
/**
* Created by SAM on 2016/3/29.
*/
object AkkaHttpMicroservice extends App with RestInterface{
override implicit val system = ActorSystem()
override implicit val executor = system.dispatcher
override implicit val materializer = ActorMaterializer()
override val config = ConfigFactory.load()
override val logger = Logging(system, getClass)
val port = config.getInt("http.port")
val interface = config.getString("http.interface")
val binding = Http().bindAndHandle(routes, interface, port)
logger.info(s"Bound to port $port on interface $interface")
binding onFailure {
case ex: Exception ⇒
logger.error(s"Failed to bind to $interface:$port!", ex)
}
sys.addShutdownHook(system.terminate())
}
以上就是一個最基本的 Akka Http 範例
啟動後
[DEBUG] [03/29/2016 15:40:24.648] [main] [EventStream(akka://default)] logger log1-Logging$DefaultLogger started
[DEBUG] [03/29/2016 15:40:24.649] [main] [EventStream(akka://default)] Default Loggers started
[DEBUG] [03/29/2016 15:40:24.880] [main] [AkkaSSLConfig(akka://default)] Initializing AkkaSSLConfig extension...
[DEBUG] [03/29/2016 15:40:25.016] [main] [AkkaSSLConfig(akka://default)] buildHostnameVerifier: created hostname verifier: com.typesafe.sslconfig.ssl.DefaultHostnameVerifier@5dafbe45
[INFO] [03/29/2016 15:40:29.169] [main] [AkkaHttpMicroservice$(akka://default)] Bound to port 9000 on interface 0.0.0.0
[DEBUG] [03/29/2016 15:40:29.242] [default-akka.actor.default-dispatcher-2] [akka://default/system/IO-TCP/selectors/$a/0] Successfully bound to /0:0:0:0:0:0:0:0:9000
//測試1
curl -X GET -H "Content-Type: application/json" -H "Cache-Control: no-cache" "http://127.0.0.1:9000/api/v1/posts/ABC"
//結果
{
"postid": "ABC"
}
//程式端的log
[DEBUG] [03/29/2016 17:23:11.825] [default-akka.actor.default-dispatcher-3] [akka://default/system/IO-TCP/selectors/$a/0] New connection accepted
[DEBUG] [03/29/2016 17:23:12.345] [default-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(default)] posts-get: Response for
Request : HttpRequest(HttpMethod(GET),http://127.0.0.1:9000/api/v1/posts/ABC,List(Host: 127.0.0.1:9000, Connection: keep-alive, User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36, Cache-Control: no-cache, Postman-Token: dda4bb67-b788-faea-6537-7eb19caa5a36, Accept: */*, Accept-Encoding: gzip, deflate, sdch, Accept-Language: zh-TW, zh;q=0.8, en-US;q=0.6, en;q=0.4, Timeout-Access: <function1>),HttpEntity.Strict(application/json,),HttpProtocol(HTTP/1.1))
Response: Complete(HttpResponse(200 OK,List(),HttpEntity.Strict(application/json,{
"postid": "ABC"
}),HttpProtocol(HTTP/1.1)))
//測試2
curl -X POST -H "Content-Type: application/json" -H "Cache-Control: no-cache" -d '{
"text": "哩哩扣扣"
}' "http://127.0.0.1:9000/api/v1/posts"
//結果
{
"posts.text": "哩哩扣扣"
}
//程式端的log
[DEBUG] [03/29/2016 17:29:40.702] [default-akka.actor.default-dispatcher-6] [akka.actor.ActorSystemImpl(default)] posts-post: Response for
Request : HttpRequest(HttpMethod(POST),http://127.0.0.1:9000/api/v1/posts,List(Host: 127.0.0.1:9000, Connection: keep-alive, Cache-Control: no-cache, Origin: chrome-extension://fhbjgbiflinjbdggehcddcbncdddomop, User-Agent: Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36, Postman-Token: c56ff82a-af88-1941-f374-9c44234121d8, Accept: */*, Accept-Encoding: gzip, deflate, Accept-Language: zh-TW, zh;q=0.8, en-US;q=0.6, en;q=0.4, Timeout-Access: <function1>),HttpEntity.Strict(application/json,{
"text": "哩哩扣扣"
}),HttpProtocol(HTTP/1.1))
Response: Complete(HttpResponse(200 OK,List(),HttpEntity.Strict(application/json,{
"posts.text": "哩哩扣扣"
}),HttpProtocol(HTTP/1.1)))
改成 Akka Cluster 的 FrontEnd
設定檔的修改
http {
interface = "0.0.0.0"
port = 9000
}
akka {
loglevel = DEBUG
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "0.0.0.0"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled = off
# Enable metrics extension in akka-cluster-metrics.
akka.extensions = ["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder = ${user.dir}/target/native
增加傳輸要用的物件,這部分前後端都會有
package com.sam.comm
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol
final case class Posts(text: String)
/**
* 建立 post
* @param posts
*/
final case class PostsCreateJob(posts: Posts)
/**
* 取得 post
* @param postid
*/
final case class PostsGetJob(postid: String)
/**
* 如果沒有後端服務的話會發生失敗
* @param text
*/
final case class JobFailed(text: String)
/**
* 定義 Json 序列化跟反序列化
*/
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val postsFormat = jsonFormat1(Posts)
}
/**
* 後端註冊 Post 服務
*/
case object PostsActorRegister
前端用Actor
package com.sam.actor
import akka.actor._
import com.sam.comm._
class PostsSender extends Actor with ActorLogging {
var postsActors = IndexedSeq.empty[ActorRef]
var jobCounter = 0
override def receive: Receive = {
case job: PostsCreateJob if postsActors.isEmpty =>
sender() ! JobFailed("Service unavailable, try again later")
case job: PostsGetJob if postsActors.isEmpty =>
sender() ! JobFailed("Service unavailable, try again later")
case job: PostsCreateJob =>
jobCounter += 1
postsActors(jobCounter % postsActors.size) forward job
case job: PostsGetJob =>
jobCounter += 1
postsActors(jobCounter % postsActors.size) forward job
//後端註冊服務
case PostsActorRegister if !postsActors.contains(sender()) =>
context watch sender()
postsActors = postsActors :+ sender()
//後端離開服務
case Terminated(a) =>
postsActors = postsActors.filterNot(_ == a)
}
}
路徑跟Actor操作,只做了兩個Post跟Get當練習,後端會回傳包好的Right跟Left也就不用考慮太多,再來就是沒有服務的JobFailed狀況
package com.sam.routes
import java.util.concurrent.TimeUnit
import akka.actor.{ActorRef, ActorSystem}
import akka.event.LoggingAdapter
import akka.http.scaladsl.model.StatusCodes
import akka.pattern.ask
import akka.http.scaladsl.server.Directives
import akka.stream.Materializer
import akka.util.Timeout
import com.sam.comm._
import com.typesafe.config.Config
import scala.concurrent.{ExecutionContext, Future}
trait PostsOperations extends Directives with JsonSupport {
implicit val system: ActorSystem
implicit val executor: ExecutionContext
implicit val materializer: Materializer
def config: Config
val logger: LoggingAdapter
implicit def postsSender: ActorRef
implicit val timeout = Timeout(5, TimeUnit.SECONDS)
lazy val postsRoutes = {
getRoute ~ postRoute
}
private val getRoute =
path("api" / "v1" / "posts" / Segment) { postid =>
get {
logRequestResult("posts-get") {
val reply: Future[Any] = postsSender ? PostsGetJob(postid)
onSuccess(reply) { x =>
x match {
case Right(posts: Posts) => complete(StatusCodes.OK, posts)
case Left(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
case JobFailed(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
}
}
}
}
}
private val postRoute =
path("api" / "v1" / "posts") {
post {
logRequestResult("posts-post") {
entity(as[Posts]) { posts =>
val reply: Future[Any] = postsSender ? PostsCreateJob(posts)
onSuccess(reply) { x =>
x match {
case Right(postid: String) => complete(StatusCodes.Created, Map("postid" -> postid))
case Left(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
case JobFailed(msg: String) => complete(StatusCodes.InternalServerError, Map("msg" -> msg))
}
}
}
}
}
}
}
package com.sam.routes
/**
* 管理有哪些路徑可提供服務
*/
trait RestInterface extends PostsOperations {
val routes = {
postsRoutes
}
}
啟動的時候
package com.sam.server
import akka.actor.{ActorSystem, Props}
import akka.event.Logging
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import com.sam.actor.PostsSender
import com.sam.routes.RestInterface
import com.typesafe.config.ConfigFactory
object AkkaHttpMicroservice extends App with RestInterface {
override val config = ConfigFactory.parseString("akka.cluster.roles = [frontend]").
withFallback(ConfigFactory.load())
override implicit val system = ActorSystem("ClusterSystem", config)
override implicit val postsSender = system.actorOf(Props[PostsSender], name = "frontend")
override implicit val executor = system.dispatcher
override implicit val materializer = ActorMaterializer()
override val logger = Logging(system, getClass)
val port = config.getInt("http.port")
val interface = config.getString("http.interface")
val binding = Http().bindAndHandle(routes, interface, port)
logger.info(s"Bound to port $port on interface $interface")
binding onFailure {
case ex: Exception ⇒
logger.error(s"Failed to bind to $interface:$port!", ex)
}
sys.addShutdownHook(system.terminate())
}
增加 Akka Cluster 的 BackEnd
設定檔
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}
# Disable legacy metrics in akka-cluster.
akka.cluster.metrics.enabled=off
# Enable metrics extension in akka-cluster-metrics.
akka.extensions=["akka.cluster.metrics.ClusterMetricsExtension"]
# Sigar native library extract location during tests.
# Note: use per-jvm-instance folder when running multiple jvm on one host.
akka.cluster.metrics.native-library-extract-folder=${user.dir}/target/native
package com.sam.comm
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol
final case class Posts(text: String)
/**
* 建立 post
* @param posts
*/
final case class PostsCreateJob(posts: Posts)
/**
* 取得 post
* @param postid
*/
final case class PostsGetJob(postid: String)
/**
* 定義 Json 序列化跟反序列化
*/
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val postsFormat = jsonFormat1(Posts)
}
/**
* 後端註冊 Post 服務
*/
case object PostsActorRegister
後端用 Hazelcast 當儲存
package com.sam.data
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.atomic.AtomicInteger
import com.hazelcast.config.Config
import com.hazelcast.core.{Hazelcast, HazelcastInstance}
import com.sam.comm.Posts
/**
* Created by SAM on 2016/3/30.
*/
object HazelcastStore {
val config = new Config();
val hInstance = Hazelcast.newHazelcastInstance(config);
val counter = new AtomicInteger
}
trait HazelcastStore {
/**
* 取得資料儲存庫
*
* @return
*/
def getInstance() = {
HazelcastStore.hInstance
}
def getPosts(postid: String): Either[String, Posts] = {
try {
val map: ConcurrentMap[String, Posts] = HazelcastStore.hInstance.getMap[String, Posts]("posts")
val posts: Posts = map.get(postid)
if (posts != null) {
Right(posts)
} else {
Left("Resource does not exist!")
}
} catch {
case e: Exception => Left("Get resource failed!")
}
}
def savePosts(posts: Posts): Either[String, String] = {
var postid = ""
try {
val map: ConcurrentMap[String, Posts] = HazelcastStore.hInstance.getMap[String, Posts]("posts")
postid = HazelcastStore.counter.incrementAndGet().toString
map.put(postid, posts)
Right(postid)
} catch {
case e: Exception => Left("Save failed!")
}
}
}
接收的 Actor
package com.sam.actor
import akka.actor.{Actor, RootActorPath}
import akka.cluster.{Cluster, Member, MemberStatus}
import akka.cluster.ClusterEvent.{CurrentClusterState, MemberUp}
import com.sam.comm.{PostsActorRegister, PostsCreateJob, PostsGetJob}
import com.sam.data.HazelcastStore
class PostsActor extends Actor with HazelcastStore{
val cluster = Cluster(context.system)
// subscribe to cluster changes, MemberUp
// re-subscribe when restart
override def preStart(): Unit = cluster.subscribe(self, classOf[MemberUp])
override def postStop(): Unit = cluster.unsubscribe(self)
override def receive: Receive = {
case createJob:PostsCreateJob =>
println("get data " + createJob.posts)
sender() ! savePosts(createJob.posts)
case getJob:PostsGetJob =>
sender() ! getPosts(getJob.postid)
case state: CurrentClusterState =>
state.members.filter(_.status == MemberStatus.Up) foreach register
case MemberUp(m) => register(m)
}
def register(member: Member): Unit =
if (member.hasRole("frontend"))
context.actorSelection(RootActorPath(member.address) / "user" / "frontend") ! PostsActorRegister
}
啟動程式
package com.sam.server
import akka.actor.{ActorSystem, Props}
import com.sam.actor.PostsActor
import com.sam.data.HazelcastStore
import com.typesafe.config.ConfigFactory
/**
* Created by SAM on 2016/3/30.
*/
object AkkaPostService extends App with HazelcastStore{
val config = ConfigFactory.parseString("akka.cluster.roles = [backend]").
withFallback(ConfigFactory.load())
val system = ActorSystem("ClusterSystem", config)
system.actorOf(Props[PostsActor], name = "backend")
//init Hazelcast
getInstance()
}
自動發現
Seed部分沿用 akka cluster example 這篇練習
測試
//建立資料
curl -X POST -H "Content-Type: application/json" -H "Cache-Control: no-cache" -d '{
"text": "哩哩扣扣"
}' "http://127.0.0.1:9000/api/v1/posts"
//沒有後端結果
{
"msg": "Service unavailable, try again later"
}
//正確結果
{
"postid": "1"
}
//取得資料
curl -X GET -H "Content-Type: application/json" -H "Cache-Control: no-cache" "http://127.0.0.1:9000/api/v1/posts/1"
結果
{
"text": "哩哩扣扣"
}
目前錯誤,不過這部分有人說升到2.4.2就可以,不過還是一樣,要再查查看
[ERROR] [03/31/2016 07:33:16.174] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/user/StreamSupervisor-0/flow-2-0-unknown-operation] Error in stage [recover]: No elements passed in the last 1 minute. (akka.http.impl.engine.HttpConnectionTimeoutException)
[DEBUG] [03/31/2016 07:33:16.174] [ClusterSystem-akka.actor.default-dispatcher-24] [akka://ClusterSystem/user/StreamSupervisor-0/flow-2-0-unknown-operation] Aborting tcp connection because of upstream failure: No elements passed in the last 1 minute.
akka.stream.impl.Timers$IdleTimeoutBidi$$anon$4.onTimer(Timers.scala:147)
akka.stream.stage.TimerGraphStageLogic.akka$stream$stage$TimerGraphStageLogic$$onInternalTimer(GraphStage.scala:1125)
akka.stream.stage.TimerGraphStageLogic$$anonfun$akka$stream$stage$TimerGraphStageLogic$$getTimerAsyncCallback$1.apply(GraphStage.scala:1114)
akka.stream.stage.TimerGraphStageLogic$$anonfun$akka$stream$stage$TimerGraphStageLogic$$getTimerAsyncCallback$1.apply(GraphStage.scala:1114)
akka.stream.impl.fusing.GraphInterpreter.runAsyncInput(GraphInterpreter.scala:564)
akka.stream.impl.fusing.GraphInterpreterShell.receive(ActorGraphInterpreter.scala:397)
akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:547)
akka.actor.Actor$class.aroundReceive(Actor.scala:480)
akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:493)
akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
akka.actor.ActorCell.invoke(ActorCell.scala:495)
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
akka.dispatch.Mailbox.run(Mailbox.scala:224)
akka.dispatch.Mailbox.exec(Mailbox.scala:234)
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
參考資料
Akka Configuration
Akka学习笔记:ActorSystem(配置)
[翻译] AKKA笔记- ACTORSYSTEM (配置CONFIGURATION 与调度SCHEDULING)
Akka笔记之配置及调度
Akka in JAVA(四)
Akka Cluster原理与应用
Scala Either, Left And Right
Scala 初学者指南-类型 Either
【Scala】使用Option、Either和Try处理数据交互
No elements passed in the last 1 minute. #18982
Hazelcast