Last active
August 23, 2018 15:00
-
-
Save arschles/6834422 to your computer and use it in GitHub Desktop.
examples of common concurrency patterns that you can achieve with the scala.concurrent package
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.util.Random | |
import java.util.{Timer, TimerTask} | |
object Util { | |
sealed trait BaseResponse | |
case class Response1(res: Int) extends BaseResponse | |
case class Response2(res: String) extends BaseResponse | |
private val timeoutTimer = { | |
//in real life, use an Akka scheduler or a Netty HashWheelTimer, because they are more thread-efficient | |
new Timer() | |
} | |
def timeoutFuture[T](timeout: Duration): Future[T] = { | |
val prom = Promise[T] | |
val timeoutTask = new TimerTask { | |
override def run = { | |
prom.failure(new TimeoutException(s"timed out after $timeout")) | |
} | |
} | |
timeoutTimer.schedule(timeoutTask, timeout.toMillis) | |
prom.future | |
} | |
private val random = new Random(System.currentTimeMillis()) | |
private val possibleSleepMS = List(100L, 200L, 150L, 250L) | |
//creates a future that sleeps for a pseudo-random amount of time, and then executes t. | |
//both actions happen on a dedicated thread (because we imported scala.concurrent.ExecutionContext.Implicits.global) | |
def sleepAndThen[T](t: => T): Future[T] = { | |
Future { | |
val sleepLengthIdx = math.abs(random.nextInt) % possibleSleepMS.length | |
val sleepLength = possibleSleepMS(sleepLengthIdx) | |
Thread.sleep(sleepLength) | |
t | |
} | |
} | |
} | |
//fan out/fan in pattern: | |
//execute a bunch of independent background jobs, each in their own future. | |
//merge them into one future that will be complete when the slowest one is. | |
def fanOut = { | |
import Util._ | |
val f1 = sleepAndThen(Response1(1)) | |
val f2 = sleepAndThen(Response2("one")) | |
val f3 = sleepAndThen(Response1(123)) | |
val fannedOutFutures: List[Future[BaseResponse]] = f1 :: f2 :: f3 :: Nil | |
val results: Future[List[BaseResponse]] = Future.sequence(fannedOutFutures) | |
results.map { resultList: List[BaseResponse] => | |
//after all the results come back, we can do stuff with them | |
} | |
} | |
//timeout pattern | |
//execute a first request and try again if it fails | |
def timeout = { | |
import Util._ | |
val request = sleepAndThen(Response1(1)) | |
val timeout = timeoutFuture[Response1](1.second) | |
val respFuture = Future.firstCompletedOf(request :: timeout :: Nil) | |
respFuture.map { resp => | |
//only executes if the request didn't time out | |
} | |
} | |
//similar to timeouts - cancelling | |
//if you have an expensive operation, or maybe even one that runs forever, | |
//you can cancel it so it doesn't take up resources after we don't need it | |
def cancel = { | |
import Util._ | |
//the variable we set to start the cancellation process | |
val cancelVar = Promise[Unit]() | |
//the variable that the cancelee sets to ack that it's fully done | |
val cancelledVar = Promise[Unit]() | |
//the handler for each request in our simple server | |
def acceptRequest = () | |
//start the server | |
Future { | |
while(!cancelVar.isCompleted) { | |
acceptRequest | |
} | |
cancelledVar.success(()) | |
} | |
//stop the server after a while | |
sleepAndThen { | |
cancelVar.success(()) | |
while(!cancelledVar.isCompleted) { | |
println("still waiting for the server to finish") | |
} | |
} | |
} | |
//firstCompleted | |
//execute the same request on multiple servers, and return the first that comes back. | |
//combined with timeouts and cancellation you can tightly control your call | |
def firstCompleted = { | |
import Util._ | |
def makeRequest = sleepAndThen(Response1(1)) | |
val req1 = makeRequest | |
val req2 = makeRequest | |
val req3 = makeRequest | |
val respFuture: Future[Response1] = Future.firstCompletedOf(req1 :: req2 :: req3 :: Nil) | |
respFuture.map { resp => | |
//continue with the response from the fastest server | |
} | |
} | |
//channels | |
//allow *asynchronous* bidirectional communication between two concurrently executing processes. | |
//scala.concurrency has a synchronous version of this | |
def channels = { | |
import Util._ | |
//a class that lets you send one message. you can compose these into a queue to send multiple messages | |
class OneTimeChannel[T] { | |
private val sendProm = Promise[T]() | |
private val recvProm = Promise[Unit]() | |
def send(t: T): Future[Unit] = { | |
sendProm.success(t) | |
recvProm.future | |
} | |
def recv: Future[T] = { | |
sendProm.future.map { t => | |
recvProm.success(()) | |
t | |
} | |
} | |
} | |
val chan = new OneTimeChannel[Int] | |
//sender | |
sleepAndThen { | |
val sent: Future[Unit] = chan.send(123) | |
sent.map { _ => | |
//do stuff after we know the value has been sent | |
} | |
} | |
//receiver | |
sleepAndThen { | |
val receivedFuture: Future[Int] = chan.recv | |
receivedFuture.map { received => | |
//do stuff with the received value | |
} | |
} | |
} |
your timeoutFuture fails if the timeout is reached. for a timer that just returns Unit upon the timeout being reached, see Futures.alarm https://github.com/scala/scala/blob/v2.10.2/src/actors/scala/actors/Future.scala#L131
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
sources of inspiration