Created
June 22, 2019 06:00
-
-
Save xmeng1/c99da1fd6cdeac226c80f658d2144aa3 to your computer and use it in GitHub Desktop.
Akka stream and Akka Http WebSocket Example 1
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ChatApp extends App { | |
implicit val system = ActorSystem("chat") | |
implicit val executor: ExecutionContextExecutor = system.dispatcher | |
implicit val materializer = ActorMaterializer() | |
val route = get { | |
pathEndOrSingleSlash { | |
handleWebSocketMessages(websocketFlow) | |
} | |
} | |
val maximumClients = 1 | |
class ChatRef extends Actor { | |
override def receive: Receive = withClients(Map.empty[UUID, ActorRef]) | |
def withClients(clients: Map[UUID, ActorRef]): Receive = { | |
case SignedMessage(uuid, msg) => clients.collect{ | |
case (id, ar) if id != uuid => ar ! msg | |
} | |
case OpenConnection(ar, uuid) if clients.size == maximumClients => ar ! PoisonPill | |
case OpenConnection(ar, uuid) => context.become(withClients(clients.updated(uuid, ar))) | |
case CloseConnection(uuid) => context.become(withClients(clients - uuid)) | |
} | |
} | |
object Protocol { | |
case class SignedMessage(uuid: UUID, msg: String) | |
case class OpenConnection(actor: ActorRef, uuid: UUID) | |
case class CloseConnection(uuid: UUID) | |
} | |
val chatRef = system.actorOf(Props[ChatRef]) | |
def websocketFlow: Flow[Message, Message, Any] = | |
Flow[Message] | |
.mapAsync(1) { | |
case TextMessage.Strict(s) => Future.successful(s) | |
case TextMessage.Streamed(s) => s.runFold("")(_ + _) | |
case b: BinaryMessage => throw new Exception("Binary message cannot be handled") | |
}.via(chatActorFlow(UUID.randomUUID())) | |
.map(TextMessage(_)) | |
def chatActorFlow(connectionId: UUID) : Flow[String, String, Any] = { | |
val sink = Flow[String] | |
.map(msg => Protocol.SignedMessage(connectionId, msg)) | |
.to(Sink.actorRef(chatRef, Protocol.CloseConnection(connectionId))) | |
val source = Source.actorRef(16, OverflowStrategy.fail) | |
.mapMaterializedValue { | |
actor : ActorRef => { | |
chatRef ! Protocol.OpenConnection(actor, connectionId) | |
} | |
} | |
Flow.fromSinkAndSource(sink, source) | |
} | |
Http().bindAndHandle(route, "0.0.0.0", 8080) | |
.map(_ => println(s"Started server...")) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment