Created
March 21, 2018 13:06
-
-
Save fmasion/24707eaeb728ddaec8522989ed7d9b4d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.lightbend.akka.http.sample | |
import java.util.UUID | |
import akka.NotUsed | |
import akka.actor.{ ActorSystem, Cancellable } | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.Source | |
import scala.collection.mutable | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext | |
object MergeSource extends App { | |
implicit val system: ActorSystem = ActorSystem() | |
implicit val materializer: ActorMaterializer = ActorMaterializer() | |
implicit val executionContext: ExecutionContext = system.dispatcher | |
case object TICK | |
val ticker1: Source[TICK.type, Cancellable] = Source.tick(0.second, 1200.millisecond, TICK) | |
val ticker3: Source[TICK.type, Cancellable] = Source.tick(0.second, 3.second, TICK) | |
val ticker10: Source[TICK.type, Cancellable] = Source.tick(0.second, 10500.millisecond, TICK) | |
val cycler123: Source[String, NotUsed] = Source.cycle(() => List("1", "2", "3").iterator) | |
val cyclerABC: Source[String, NotUsed] = Source.cycle(() => List("A1", "B1", "C1").iterator) | |
val cyclerABC2: Source[String, NotUsed] = Source.cycle(() => List("A2", "B2", "C2").iterator) | |
val t123: Source[String, Cancellable] = ticker1.zip(cycler123).map { _._2 } //zip to "sync" with tickers | |
val tABC: Source[String, Cancellable] = ticker3.zip(cyclerABC).map { _._2 } | |
val tABC10: Source[String, Cancellable] = ticker10.zip(cyclerABC2).map { _._2 } | |
val listSource: List[Source[String, Cancellable]] = List(tABC, t123, tABC10) | |
val source: Source[List[String], Cancellable] = listSource | |
.map(s => (UUID.randomUUID(), s)) //assign a UUID to each source | |
.map { case (uuid, source) => source.map(value => (uuid, value)) } //Each source now emits each value with the source's uuid | |
.reduce(_.merge(_)) // merge in one source of tuples // warn not safe if empty source | |
.scan(mutable.Map.empty[UUID, String])((acc, next) => acc += (next._1 -> next._2)) | |
.map(_.values.toList) | |
source.runForeach(println) | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
emit this kind of result :
List()
List(A)
List(A, A)
List(1, A, A)
List(2, A, A)
List(3, A, A)
List(3, B, A)
List(1, B, A)
List(2, B, A)
List(3, B, A)
List(3, C, A)
List(1, C, A)
List(2, C, A)
List(2, A, A)
List(3, A, A)
List(3, A, B)
List(1, A, B)
List(1, B, B)
List(2, B, B)
List(3, B, B)
List(1, B, B)
List(1, C, B)
List(2, C, B)
List(3, C, B)
List(1, C, B)
List(1, A, B)
List(2, A, B)
List(3, A, B)
List(3, A, C)
List(3, B, C)
List(1, B, C)
List(2, B, C)
List(2, C, C)
List(3, C, C)
List(1, C, C)
List(2, C, C)
List(2, A, C)
List(3, A, C)
List(1, A, C)
List(2, A, C)
List(2, B, C)
List(3, B, C)
List(3, B, A)
List(1, B, A)
List(1, C, A)
List(2, C, A)
List(3, C, A)
List(1, C, A)