Created
February 11, 2021 12:22
-
-
Save Odomontois/361d8e5fc5b8eda6ab11229f24abb7b3 to your computer and use it in GitHub Desktop.
FromPublisher
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package healer.reactive | |
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference} | |
import cats.effect.Concurrent | |
import cats.syntax.foldable._ | |
import cats.syntax.monoid._ | |
import cats.{Monad, Monoid} | |
import healer.reactive.impl._ | |
import io.iteratee.Enumerator | |
import io.iteratee.internal.Step | |
import org.reactivestreams.{Publisher, Subscriber, Subscription} | |
import tofu.syntax.monadic._ | |
import scala.collection.immutable.ArraySeq | |
trait FromPublisher[F[_]] { | |
def toEnumerator[A](pub: Publisher[A], batchSize: Int): Enumerator[F, A] | |
def first[A](pub: Publisher[A]): F[A] | |
def firstOption[A](pub: Publisher[A]): F[Option[A]] | |
def foldLeftM[A, B](pub: Publisher[A], batchSize: Int = 100)(b: B)(f: (B, A) => F[B]): F[B] | |
def foldMapM[A, B: Monoid](pub: Publisher[A], batchSize: Int = 100)(f: A => F[B]): F[B] | |
def foldLeft[A, B](pub: Publisher[A])(b: B)(f: (B, A) => B): F[B] | |
def foldMap[A, B: Monoid](pub: Publisher[A])(f: A => B): F[B] = foldLeft(pub)(Monoid.empty[B])((b, a) => b |+| f(a)) | |
def foreach[A](pub: Publisher[A])(f: A => F[Unit]): F[Unit] | |
} | |
object FromPublisher { | |
implicit def instance[F[_]: Concurrent]: FromPublisher[F] = | |
new FromPublisher[F] { | |
def first[A](pub: Publisher[A]): F[A] = impl.firstImpl[F, A, A](pub)( | |
a => Right(a), | |
Left(new NoSuchElementException("first: empty stream")), | |
) | |
def firstOption[A](pub: Publisher[A]): F[Option[A]] = impl.firstImpl[F, A, Option[A]](pub)( | |
a => Right(Some(a)), | |
Right(None) | |
) | |
def foldMapM[A, B: Monoid](pub: Publisher[A], batchSize: Int = 100)(f: A => F[B]): F[B] = | |
foldLeftM(pub, batchSize)(Monoid.empty[B])((b, a) => f(a).map(b |+| _)) | |
def foldLeftM[A, B](pub: Publisher[A], batchSize: Int)(b: B)(f: (B, A) => F[B]): F[B] = | |
impl.startFold(pub, batchSize)(impl.foldLeftMLoop(b)(f)) | |
def foreach[A](pub: Publisher[A])(f: A => F[Unit]): F[Unit] = foldMapM(pub)(f) | |
def toEnumerator[A](pub: Publisher[A], batchSize: Int): Enumerator[F, A] = new ToEnumerator(pub, batchSize) | |
def foldLeft[A, B](pub: Publisher[A])(b: B)(f: (B, A) => B): F[B] = foldLeftImpl(pub)(b, f) | |
} | |
} | |
private class First[A, B](success: A => Result[B], empty: => Result[B], k: Cont[B]) extends Subscriber[A] { | |
val canceled = new AtomicBoolean(false) | |
var sub: Subscription = null | |
def onSubscribe(s: Subscription): Unit = | |
if (canceled.get()) s.cancel() | |
else { | |
sub = s | |
s.request(1) | |
} | |
def onNext(a: A): Unit = { | |
k(success(a)) | |
sub.cancel() | |
} | |
def onError(error: Throwable): Unit = | |
k(Left(error)) | |
def onComplete(): Unit = | |
k(empty) | |
} | |
private class Fold[A, B](cont: Cont[B], init: B, f: (B, A) => B) extends Subscriber[A] { | |
val canceled = new AtomicBoolean(false) | |
var sub: Subscription = null | |
var b: B = init | |
def onSubscribe(s: Subscription): Unit = | |
if (canceled.get()) s.cancel() | |
else { | |
sub = s | |
s.request(1) | |
} | |
def onNext(a: A): Unit = | |
if (canceled.get()) sub.cancel() | |
else { | |
b = f(b, a) | |
sub.request(1) | |
} | |
def onError(err: Throwable): Unit = cont(Left(err)) | |
def onComplete(): Unit = cont(Right(b)) | |
} | |
private class FoldM[F[_], A](pub: Publisher[A], batchSize: Int)(implicit F: Concurrent[F]) { self => | |
private val canceled = new AtomicBoolean(false) | |
val cancel = F.delay(canceled.set(true)) | |
private class Sub(init: Cont[State[F, A]]) extends Subscriber[A] { | |
var count: Int = 0 | |
val builder = Array.ofDim[Any](batchSize).asInstanceOf[Array[A]] | |
var sub: Subscription = null | |
val cont = new AtomicReference(init) | |
def onSubscribe(s: Subscription): Unit = | |
if (canceled.get()) s.cancel() | |
else { | |
sub = s | |
s.request(batchSize.toLong) | |
} | |
def onNext(a: A): Unit = | |
if (canceled.get()) sub.cancel() | |
else { | |
builder(count) = a | |
count += 1 | |
if (count >= batchSize) { | |
val chunk = builder.toSeq | |
val next: F[State[F, A]] = F.async { k => | |
this.cont.set(k) | |
sub.request(batchSize.toLong) | |
} | |
count = 0 | |
cont.get()(Right(State(chunk, Some(next)))) | |
} | |
} | |
def onError(err: Throwable): Unit = cont.get()(Left(err)) | |
def onComplete(): Unit = { | |
val finalChunk = builder.take(count).toSeq | |
cont.get()(Right(State(finalChunk, None))) | |
} | |
} | |
def initDemand: F[State[F, A]] = | |
F.cancelable { k => | |
pub.subscribe(new Sub(k)) | |
cancel | |
} | |
} | |
private class ToEnumerator[F[_]: Concurrent, A](publisher: Publisher[A], batchSize: Int) extends Enumerator[F, A] { | |
private def loop[R](step: Step[F, A, R])(state: State[F, A]): F[Step[F, A, R]] = | |
if (step.isDone) step.pure[F] | |
else step.feed(state.chunk).flatMap(nextStep => state.next.fold(nextStep.pure[F])(_.flatMap(loop(nextStep)))) | |
def apply[R](step: Step[F, A, R]): F[Step[F, A, R]] = impl.startFold(publisher, batchSize)(loop(step)) | |
} | |
private object impl { | |
final case class State[F[_], A](chunk: Seq[A], next: Option[F[State[F, A]]]) | |
type Result[+A] = Either[Throwable, A] | |
type Cont[-A] = Either[Throwable, A] => Unit | |
def firstImpl[F[_], A, B]( | |
pub: Publisher[A] | |
)(f: A => Result[B], empty: => Result[B])(implicit F: Concurrent[F]): F[B] = | |
F.cancelable[B] { k => | |
val sub = new First[A, B](f, empty, k) | |
pub.subscribe(sub) | |
F.delay { sub.canceled.set(true) } | |
} | |
def foldLeftImpl[F[_], A, B](pub: Publisher[A])(init: B, f: (B, A) => B)(implicit F: Concurrent[F]): F[B] = | |
F.cancelable[B] { k => | |
val sub = new Fold[A, B](k, init, f) | |
pub.subscribe(sub) | |
F.delay(sub.canceled.set(true)) | |
} | |
def startFold[F[_], A, B](pub: Publisher[A], batchSize: Int)( | |
loop: State[F, A] => F[B] | |
)(implicit F: Concurrent[F]): F[B] = | |
F.delay(new FoldM[F, A](pub, batchSize)).flatMap { folder => | |
F.bracket(folder.initDemand)(loop)(_ => folder.cancel) | |
} | |
def foldLeftMLoop[F[_]: Monad, A, B](b: B)(f: (B, A) => F[B]): State[F, A] => F[B] = { | |
def loop(state: State[F, A], b: B): F[B] = { | |
val fb = ArraySeq.from[Any](state.chunk).asInstanceOf[ArraySeq[A]].foldLeftM(b)(f) | |
state.next.fold(fb)(getState => | |
for { | |
b <- fb | |
state <- getState | |
res <- loop(state, b) | |
} yield res | |
) | |
} | |
loop(_, b) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment