Last active
March 11, 2022 12:52
-
-
Save ayeo/8d7b725a26ce625a1685d8538214111a to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package pl.ayeo | |
import cats.effect.{IO, IOApp, Ref} | |
import cats.effect.std.Queue | |
import cats.effect.unsafe.implicits.global | |
import cats.syntax.all.* | |
import fs2.{Pipe, Stream} | |
import scala.concurrent.duration.* | |
import scala.util.Random | |
object ProcessManagerApp extends IOApp.Simple { | |
object Protocol { | |
abstract sealed class Message | |
abstract sealed class Event extends Message | |
case class BulkReceived(quantity: Int) extends Event | |
case class Status(quantity: Int) extends Event | |
abstract sealed class Command extends Message | |
case class Add(quantity: Int) extends Command | |
case class Reset(quantity: Int) extends Command | |
case class Check() extends Command | |
} | |
import Protocol._ | |
case class Aggregate(r: Ref[IO, Int]) { | |
def handle(command: Command): IO[Option[Event]] = command match { //todo events list | |
case Add(quantity) => | |
r.updateAndGet(_ + quantity).map { c => | |
if (c > 0 && c % 10 == 0) Some(BulkReceived(c)) | |
else None | |
} | |
case Reset(quantity) => r.getAndUpdate(_ - quantity).map(_ => None) | |
case Check() => r.get.map(c => Some(Status(c))) | |
case _ => IO(None) | |
} | |
} | |
case class ProcessManager() { | |
def handle(event: Event): Option[Command] = event match { | |
case BulkReceived(quantity) => Some(Reset(Math.round(quantity/2))) | |
case _ => None | |
} | |
} | |
def pmPipe(processManager: ProcessManager, commands: Queue[IO, Command]): Pipe[IO, Event, Event] = _.evalMap { i => | |
processManager.handle(i).map(commands.offer).sequence >> IO(i) | |
} | |
def aggregatePipe(q: Queue[IO, Event], aggregate: Aggregate): Pipe[IO, Command, Command] = _.evalMap { input => | |
for { | |
oi <- aggregate.handle(input) | |
_ <- oi.map(q.offer).sequence | |
} yield input | |
} | |
val ioAggregate: IO[Aggregate] = Ref[IO].of(0).map(Aggregate(_)) | |
val inboxQueue: IO[Queue[IO, Command]] = Queue.unbounded[IO, Command] | |
val outboxQueue: IO[Queue[IO, Event]] = Queue.unbounded[IO, Event] | |
val finalStream: Stream[IO, (Aggregate, Queue[IO, Command], Queue[IO, Event])] = Stream.eval { | |
for { | |
a <- ioAggregate | |
i <- inboxQueue | |
o <- outboxQueue | |
} yield (a, i , o) | |
} | |
val addStream: Stream[IO, Command] = Stream.repeatEval(IO.sleep(5.millis) >> IO(Add(2))) | |
val checkStream: Stream[IO, Command] = Stream.repeatEval(IO.sleep(5.millis) >> IO(Check())) | |
val program: Stream[IO, Message] = finalStream.flatMap { | |
(aggregate, commands, events) => | |
Stream.fromQueueUnterminated(commands) | |
.merge(addStream) | |
.merge(checkStream) | |
.through(aggregatePipe(events, aggregate)) | |
.merge(Stream.fromQueueUnterminated(events).through(pmPipe(ProcessManager(), commands))) | |
} | |
override def run: IO[Unit] = | |
program.through(_.evalMap(t => IO.println(s"${Thread.currentThread().getName()}:\t $t"))).compile.drain | |
/* Output | |
io-compute-6: Check() | |
io-compute-3: Status(0) | |
io-compute-7: Add(2) | |
io-compute-8: Check() | |
io-compute-5: Status(2) | |
io-compute-1: Add(2) | |
io-compute-3: Check() | |
io-compute-9: Status(4) | |
io-compute-10: Add(2) | |
io-compute-7: Check() | |
io-compute-10: Status(6) | |
io-compute-3: Check() | |
io-compute-8: Status(6) | |
io-compute-8: Add(2) | |
io-compute-5: Check() | |
io-compute-4: Status(8) | |
io-compute-5: Add(2) | |
io-compute-8: BulkReceived(10) | |
io-compute-1: Check() | |
io-compute-8: Status(10) | |
io-compute-2: Reset(5) | |
io-compute-7: Check() | |
io-compute-1: Status(5) | |
io-compute-7: Add(2) | |
io-compute-4: Check() | |
io-compute-2: Status(7) | |
... | |
*/ | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment