Last active
January 8, 2018 13:54
-
-
Save sortega/4cbe6f648d2c69449a54e89e79613e1a to your computer and use it in GitHub Desktop.
Future.traverse is not fail fast
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package sandbox.futures | |
import scala.collection.generic.CanBuildFrom | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import scala.language.higherKinds | |
import scala.util.{Random, Success} | |
import scala.util.control.NoStackTrace | |
object FailFastTraverse { | |
def traverse[A, B, M[X] <: TraversableOnce[X]](in: M[A])(fn: A => Future[B])( | |
implicit cbf: CanBuildFrom[M[A], B, M[B]], | |
ec: ExecutionContext): Future[M[B]] = | |
in.foldLeft(Future.successful(cbf(in))) { (fr, a) => | |
fr.transformWith { | |
case _: Success[_] => fr.zipWith(fn(a))(_ += _) | |
case _ => fr | |
} | |
} | |
.map(_.result()) | |
} | |
object FailFastTest extends App { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
def sleep(): Unit = { | |
Thread.sleep(100 + Random.nextInt(5000)) | |
} | |
val result = FailFastTraverse.traverse(1 to 100) { n => | |
sleep() | |
println(n) | |
if (Random.nextBoolean()) Future.successful(n) | |
else { | |
println("failing!") | |
Future.failed(new Exception("ka-poom") with NoStackTrace) | |
} | |
} | |
Await.result(result, 3.minutes) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package futures | |
import scala.collection.generic.CanBuildFrom | |
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import scala.language.higherKinds | |
import scala.util.{Random, Success, Try, Failure} | |
import scala.util.control.NoStackTrace | |
import scalaz._ | |
import Scalaz._ | |
object FailFastTraverse { | |
def traverse[A, B, M[X] <: TraversableOnce[X], F[_]](in: M[A])(fn: A => F[B])( | |
implicit cbf: CanBuildFrom[M[A], B, M[B]], | |
F: MonadError[F, Throwable]): F[M[B]] = | |
in.foldLeft(F.point(cbf(in))) { (fr, a) => | |
attempt(fr).flatMap { | |
case _: Success[_] => ^(fr, fn(a)) { _ += _ } | |
case _ => fr | |
} | |
} | |
.map(_.result()) | |
private def attempt[F[_], A](ma: F[A])( | |
implicit F: MonadError[F, Throwable]): F[Try[A]] = | |
ma.map[Try[A]](Success.apply).handleError(ex => F.point(Failure(ex))) | |
} | |
object FailFastTest extends App { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
def sleep(): Unit = { | |
Thread.sleep(100 + Random.nextInt(5000)) | |
} | |
val result = FailFastTraverse.traverse(1 to 100) { n => | |
sleep() | |
println(n) | |
if (Random.nextBoolean()) Future.successful(n) | |
else { | |
println("failing!") | |
Future.failed(new Exception("ka-poom") with NoStackTrace) | |
} | |
} | |
Await.result(result, 3.minutes) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment