Created
July 13, 2016 12:57
-
-
Save nartamonov/bce0171a6eab0de32e610c48a7c74f75 to your computer and use it in GitHub Desktop.
Простая реализация монады Future и операций из стандартной библиотеки Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.{Executor, Executors} | |
import java.util.concurrent.atomic.AtomicReference | |
import scala.annotation.tailrec | |
import scala.util.{Failure, Success, Try} | |
import scala.util.control.NonFatal | |
object Future { | |
def apply[T](expression: => T)(implicit executor: Executor) = new FutureImpl(expression) | |
} | |
trait Future[T] { | |
self => | |
def onComplete(callback: Try[T] => Unit)(implicit executor: Executor): Unit | |
def map[R](f: T => R): Future[R] = new Future[R] { | |
def onComplete(callback: Try[R] => Unit)(implicit executor: Executor): Unit = | |
self.onComplete { | |
res => callback(res.map(f)) | |
} | |
} | |
def flatMap[R](f: T => Future[R]): Future[R] = new Future[R] { | |
def onComplete(callback: (Try[R]) => Unit)(implicit executor: Executor): Unit = | |
self.onComplete { | |
case Success(v) => f(v).onComplete(callback) | |
case Failure(e) => callback(Failure(e)) | |
} | |
} | |
def fallbackTo(f: => Future[T]): Future[T] = new Future[T] { | |
def onComplete(callback: (Try[T]) => Unit)(implicit executor: Executor): Unit = { | |
self.onComplete { | |
case s@Success(_) => callback(s) | |
case e@Failure(_) => f.onComplete { | |
case s@Success(_) => callback(s) | |
case Failure(_) => callback(e) | |
} | |
} | |
} | |
} | |
} | |
final class FutureImpl[T](expression: => T)(implicit executor: Executor) extends Future[T] { | |
private case class State(value: Option[Try[T]], completionListeners: Vector[Try[T] => Unit]) | |
private var state = new AtomicReference[State](State(None, Vector.empty)) | |
runAsync() | |
def isCompleted: Boolean = state.get().value.isDefined | |
def onComplete(callback: Try[T] => Unit)(implicit executor: Executor): Unit = { | |
val guardedCallback = (x: Try[T]) => | |
execute(try { callback(x) } catch { case NonFatal(e) => e.printStackTrace() }) | |
getAndTransform { | |
case s@State(Some(v), _) => | |
guardedCallback(v) | |
s | |
case s@State(None, completionListeners) => | |
State(None, completionListeners :+ guardedCallback) | |
} | |
} | |
private def runAsync(): Unit = { | |
execute { | |
val v = Try(expression) | |
getAndTransform(s => s.copy(value = Some(v))) | |
for (callback <- state.get().completionListeners) | |
callback(v) | |
} | |
} | |
// См. https://alexn.org/blog/2013/05/07/towards-better-atomicreference-scala.html | |
@tailrec | |
private def getAndTransform(f: State => State): State = { | |
val oldValue = state.get() | |
val newValue = f(oldValue) | |
if (!state.compareAndSet(oldValue, newValue)) | |
getAndTransform(f) | |
else | |
oldValue | |
} | |
protected def execute[R](expression: => R)(implicit executor: Executor): Unit = { | |
executor.execute(new Runnable { | |
override def run(): Unit = expression | |
}) | |
} | |
} | |
def retry[T](times: Int)(asyncComp: => Future[T]): Future[T] = { | |
require(times >= 1) | |
Stream.iterate(asyncComp)(_ fallbackTo asyncComp).take(times).last | |
} | |
// TESTING | |
implicit val executor = Executors.newSingleThreadExecutor() | |
val c = for { | |
a <- Future(10) | |
b <- Future(a + 20) | |
} yield b | |
c.onComplete(println) // ==> 30 | |
Thread.sleep(500) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment