Created
June 1, 2011 11:12
-
-
Save joa/1002125 to your computer and use it in GitHub Desktop.
Go Channels in Scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package go | |
import java.util.concurrent.{ | |
BlockingQueue => JBlockingQueue, | |
ArrayBlockingQueue => JArrayBlockingQueue | |
} | |
object Channel { | |
def empty[A]: Channel[A] = new BlockingChannel() | |
def make[A]: Channel[A] = make(1) | |
def make[A](capacity: Int): Channel[A] = new BlockingChannel(capacity) | |
} | |
trait ChannelOps { | |
@volatile private var _closed = false | |
def close() { | |
_closed = true | |
} | |
def closed: Boolean = _closed | |
def open: Boolean = !closed | |
def nonEmpty: Boolean = !isEmpty | |
def isEmpty: Boolean | |
} | |
trait ReceiveChannel[A] extends ChannelOps { | |
def unary_! : Option[A] | |
def foreach[U](f: Option[A] => U) { | |
while(open || nonEmpty) { | |
f(this.unary_!) | |
} | |
} | |
def forall(f: Option[A] => Boolean): Boolean = { | |
while(open || nonEmpty) { | |
if(!f(this.unary_!)) { | |
return false | |
} | |
} | |
true | |
} | |
} | |
trait SendChannel[A] extends ChannelOps { | |
def !(value: A) | |
} | |
trait Channel[A] extends ReceiveChannel[A] with SendChannel[A] | |
class BlockingChannel[A](capacity: Int = 1) extends Channel[A] { | |
private val blockingQueue: JBlockingQueue[A] = | |
new JArrayBlockingQueue(capacity) | |
override def isEmpty = blockingQueue.isEmpty | |
override def !(value: A) { | |
if(open) { | |
blockingQueue.put(value) | |
} | |
} | |
override def unary_! : Option[A] = { | |
if(closed && isEmpty) { | |
None | |
} else { | |
Some(blockingQueue.take()) | |
} | |
} | |
} | |
object go { | |
import scala.actors.Futures._ | |
def apply[U](f: => U) = future { f } | |
} | |
object ProducerConsumerExample { | |
val done = Channel.make[Boolean] | |
val msgs = Channel.make[Int] | |
def produce() { | |
for(i <- 0 until 10) { | |
msgs ! i | |
} | |
msgs.close() | |
done ! true | |
} | |
def consume() { | |
for { | |
optMsg <- msgs | |
msg <- optMsg | |
} { | |
println(msg) | |
} | |
} | |
def main(args: Array[String]) { | |
go(produce()) | |
go(consume()) | |
!done | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Very nice, what's the license of this? :-)