Last active
March 21, 2016 18:00
-
-
Save djellemah/a7faf1b001160db83a01 to your computer and use it in GitHub Desktop.
blocking Queue.multiplex in Ruby
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'thwait' | |
# Synchronous hand-off between two threads. | |
# Both threads will block until a transfer is successful. | |
class Rendevous | |
def initialize | |
@mutex = Mutex.new | |
@send_flag = ConditionVariable.new | |
@recv_flag = ConditionVariable.new | |
@container = [] | |
end | |
def value | |
@mutex.synchronize do | |
while @container.empty? | |
::Kernel.raise @exception if instance_variable_defined? :@exception | |
@recv_flag.wait @mutex | |
end | |
# can do this because the signal will only be acted after the mutex is | |
# released, which is only after @container.pop | |
@send_flag.signal | |
@container.pop | |
end | |
end | |
def value=( rhs ) | |
@mutex.synchronize do | |
while [email protected]? | |
@send_flag.wait @mutex | |
end | |
::Kernel.raise @exception if instance_variable_defined? :@exception | |
@container << rhs | |
@recv_flag.signal | |
end | |
end | |
def raise( *args ) | |
@mutex.synchronize do | |
@exception = (::Kernel.raise *args rescue $!) | |
@recv_flag.broadcast | |
end | |
end | |
end | |
# maybe select is the wrong name, seeing as in posix, select is non-blocking | |
def Queue.multiplex(*qs, &receiver) | |
return enum_for(__method__, *qs) unless block_given? | |
xchg = Rendevous.new | |
consumer_threads = qs.map do |q| | |
Thread.new do | |
while item = q.pop | |
xchg.value = item, q | |
end | |
end | |
end | |
# all queues finished, so wake threads waiting on xchg | |
Thread.new do | |
ThreadsWait.all_waits *consumer_threads | |
xchg.raise StopIteration | |
end | |
# only need to make this decision once | |
to_receiver = | |
case receiver.arity | |
when 1; ->(item, _){receiver.call item} | |
when -1, 2; ->(item, q){receiver.call item, q} | |
else | |
raise "unknown arity for receiver #{receiver.arity}" | |
end | |
loop do | |
item, q = xchg.value | |
to_receiver.call item, q | |
end | |
end | |
def exercise | |
q1 = SizedQueue.new 9 | |
q2 = SizedQueue.new 3 | |
q3 = SizedQueue.new 11 | |
prod1 = Thread.new{12.times{|i| item = "q1 << #{i}"; q1 << item ; i += 1}; q1 << nil} | |
prod2 = Thread.new{15.times{|i| item = "q2 << #{i}"; q2 << item ; i += 1}; q2 << nil} | |
prod3 = Thread.new{7.times{|i| item = "q3 << #{i}"; q3 << item ; i += 1}; q3 << nil} | |
Queue.multiplex(q1,q2,q3).each_with_index{|(item,q),index| puts [index,item,q].inspect} | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment