Last active
August 26, 2021 13:32
-
-
Save Daenyth/47a19a8f632e00a136f8647f3d9b5994 to your computer and use it in GitHub Desktop.
Convert Java futures to cats-effect IO / cats-effect 1.0
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics | |
import cats.effect.implicits._ | |
import cats.effect.{ContextShift, ExitCase, Sync, Timer} | |
import cats.syntax.all._ | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration.FiniteDuration | |
object JavaFuture { | |
// Alias for more friendly imports | |
type JFuture[A] = java.util.concurrent.Future[A] | |
/** Convert a java future to an effectful value, using async sleep polling to get the result. | |
* | |
* Because this uses async sleep, it won't block a thread. | |
* | |
* Java future only offers us blocking and polling as the interface | |
* | |
* @param mayInterruptThreadOnCancel This is passed as an argument to [[JFuture#cancel]] | |
* in the case the returned `F[A]` is cancelled. | |
*/ | |
def toIO[F[_], A]( | |
fa: F[JFuture[A]], | |
pollInterval: FiniteDuration, | |
mayInterruptThreadOnCancel: Boolean = true | |
)( | |
implicit F: Sync[F], | |
timer: Timer[F] | |
): F[A] = { | |
def loop(jf: JFuture[A]): F[A] = | |
F.delay(jf.isDone).flatMap { isDone => | |
// Not ContextShift.evalOn(blockingPool) here because isDone==true, so this should be very fast to return. | |
if (isDone) F.delay(jf.get) | |
else timer.sleep(pollInterval) *> loop(jf) | |
} | |
fa.flatMap { jf => | |
loop(jf) | |
.guaranteeCase { | |
case ExitCase.Canceled => | |
F.delay(jf.cancel(mayInterruptThreadOnCancel)).void | |
case _ => F.unit | |
} | |
} | |
} | |
/** Convert a java future to an effectful value, blocking a thread on the specified ExecutionContext | |
* to get the result. | |
* Java future only offers us blocking and polling as the interface */ | |
def toIO[F[_], A]( | |
fa: F[JFuture[A]], | |
blockingExecutionContext: ExecutionContext | |
)( | |
implicit F: Sync[F], | |
CS: ContextShift[F] | |
): F[A] = | |
fa.flatMap { jf => | |
CS.evalOn(blockingExecutionContext)(F.delay(jf.get)) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package teikametrics | |
import java.util.concurrent.FutureTask | |
import cats.effect.IO | |
import org.scalatest.{FlatSpec, Matchers} | |
import teikametrics.JavaFuture._ | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.ExecutionContext.global | |
import scala.concurrent.duration._ | |
class JavaFutureTest extends FlatSpec with Matchers with ContextShiftTest { | |
implicit def ec: ExecutionContext = global | |
def jfio[A](jf: => JFuture[A]): IO[A] = toIO[IO, A](IO(jf), 10.milliseconds) | |
behavior of "JavaFuture.toIO" | |
it should "return the value" in { | |
val io = jfio { | |
val t = new FutureTask[Int](() => 1) | |
t.run() | |
t | |
} | |
io.unsafeRunSync() shouldEqual 1 | |
} | |
it should "not stack overflow" in { | |
val io = jfio { new FutureTask[Int](() => 1) /* not running */ } | |
io.unsafeRunTimed(1.second) shouldBe None | |
} | |
it should "propagate errors" in { | |
val io = jfio { | |
val t = new FutureTask[Int](() => throw new Exception("foo")) | |
t.run() | |
t | |
} | |
io.attempt.unsafeRunSync() shouldBe 'left | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I hereby license this post under MIT license. Go nuts :)