Last active
March 19, 2016 11:07
-
-
Save djellemah/cbc1d350547f043c1b70 to your computer and use it in GitHub Desktop.
Enumerable#pmap and Enumerable#pfind using SizedQueue and a simple future
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require_relative 'waitron' # Waitron is a simple future | |
module Enumerable | |
# Return the first item for which bloc returns truthy. | |
# Obviously there's indeterminacy if there are several such items in the collection. | |
# This works best when bloc is waiting for something. Because GVL. | |
# Obviously bloc should be careful about mutable shared state | |
def pfind( threads: 4, &bloc ) | |
q = SizedQueue.new threads | |
found_item = Waitron.new # Waitron is a simple future | |
# worker threads | |
ts = threads.times.map do | |
Thread.new do | |
while item = q.pop | |
begin | |
found_item._ = item if bloc.call item | |
rescue Exception | |
# yes, we really want to catch all catchable exceptions otherwise | |
# they get swallowed by the thread and become invisible to caller | |
# of pfind. | |
found_item._ = $! | |
end | |
end | |
end | |
end | |
Thread.new do | |
# this will raise a QueueClosed exception if something has already found | |
# the item | |
each &q.method(:enq) | |
q.close # signal worker threads that we're finished | |
ts.each &:join # wait for threads cos one of them might still find the item | |
# All worker threads finished. exception raised (and ignored by this | |
# thread) if item found in the meantime. | |
found_item._ = nil | |
end | |
# wait for the item/nil/exception | |
found_item._ | |
ensure | |
# the workers than an item has been found. Could also do this inside | |
# worker threads which would stop processing slightly earlier. | |
q.close | |
end | |
# Order-preserving parallel map using futures as a back channel. | |
# Returns collection of futures each containing the mapped value. | |
# Obviously bloc should be careful about mutable shared state | |
def pmap( threads: 4, &bloc ) | |
q = SizedQueue.new threads | |
ts = threads.times.map do | |
Thread.new do | |
loop do | |
(future, item = q.pop) or raise StopIteration | |
future._ = bloc.call(item) rescue $! | |
end | |
end | |
end | |
enum = map | |
# evaluates to $!.result when StopIteration is raised, which will contain | |
# the set of values from enum.feed, ie the futures. | |
loop do | |
q << [future = Waitron.new, enum.next] # Waitron is a simple future | |
enum.feed future | |
end | |
ensure | |
q.close | |
# wait for mapping to finish. Could also do this by forcing the futures after loop | |
ts.each &:join | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment