Created
September 19, 2020 15:57
-
-
Save k-tsj/afa6595ecdcaa5daed870ed23b4ca525 to your computer and use it in GitHub Desktop.
Enumerator::Parallel
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
END { | |
8.times.parallel(n: 2).each { p _1 } | |
} | |
class Enumerator | |
class Parallel | |
def initialize(enum, n:) | |
@enum = enum | |
@n = n | |
end | |
def each(&blk) | |
pipe = Ractor.new do | |
loop do | |
Ractor.yield(Ractor.recv) | |
end | |
end | |
rs = @n.times.map do |i| | |
Ractor.new(pipe, blk.object_id) do |pipe, blk_id| | |
b = ObjectSpace.each_object {|i| break i if i.object_id == blk_id } | |
while l = pipe.take | |
b.call(l) | |
end | |
rescue Ractor::ClosedError | |
end | |
end | |
@enum.each do |i| | |
pipe.send(i) | |
end | |
pipe.close | |
until rs.empty? | |
r, = Ractor.select(*rs) | |
rs.delete r | |
end | |
end | |
end | |
def parallel(n:) | |
Enumerator::Parallel.new(self, n: n) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment