In #clustered_concurrent_collect(), all objects in the enumeration are clustered. Each cluster is than handled in a seperate process. Compare this to #concurrent_collect(), where each object is handled in a separate process.
However, the caller won’t will not be aware of the clusters: The interface is exactly the same as #concurrent_collect() and Enumerable.collect().
#clustered_concurrent_collect() is suitable for handling a lot of not too CPU intensive jobs.
# File lib/forkandreturn/enumerable.rb, line 71 def clustered_concurrent_collect(number_of_clusters=ForkAndReturn::Util.cores, &block) number_of_clusters = 0 unless ForkAndReturn::Util.forkable? if number_of_clusters < 1 self.concurrent_collect(number_of_clusters, &block) else ThreadLimiter.handle_clusters(self, number_of_clusters, :concurrent_collect, &block) end end
Like #clustered_concurrent_select, but it’s “each” instead of “collect”.
# File lib/forkandreturn/enumerable.rb, line 115 def clustered_concurrent_each(*args, &block) clustered_concurrent_collect(*args, &block) self end
Like #clustered_concurrent_select, but it’s “reject” instead of “collect”.
# File lib/forkandreturn/enumerable.rb, line 109 def clustered_concurrent_reject(*args, &block) self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r} end
Like #clustered_concurrent_select, but it’s “select” instead of “collect”.
# File lib/forkandreturn/enumerable.rb, line 103 def clustered_concurrent_select(*args, &block) self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r} end
For each object in the enumeration, call the block in a seperate process and pass the object to the block and collect the results of the blocks. It must be one of the easiest ways of parallel processing for Ruby.
Example:
[1, 2, 3, 4].concurrent_collect do |object| 2*object end # ===> [2, 4, 6, 8]
This runs each “2*object” in a seperate process. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing!
Note that the code in the block is run in a seperate process, so updating objects and variables in the block won’t affect the parent process:
count = 0 [...].concurrent_collect do count += 1 end count # ==> 0
#concurrent_collect() is suitable for handling a couple of very CPU intensive jobs, like parsing large XML files.
# File lib/forkandreturn/enumerable.rb, line 25 def concurrent_collect(max_concurrent_workers=-1, &block) max_concurrent_workers = 0 unless ForkAndReturn::Util.forkable? case when max_concurrent_workers < 0 # No limit. self.collect do |object| ForkAndReturn.fork_and_return_core do if block.arity > 1 and object.kind_of?(Enumerable) yield(*object.to_a) else yield(object) end end end.collect do |wait| wait.call end.collect do |load| load.call end.collect do |result| result.call end when max_concurrent_workers == 0 # No fork. self.collect(&block) when max_concurrent_workers > 0 self.threaded_collect(max_concurrent_workers) do |object| ForkAndReturn.fork_and_return_core do if block.arity > 1 and object.kind_of?(Enumerable) yield(*object.to_a) else yield(object) end end.call end.collect do |load| load.call end.collect do |result| result.call end end end
Like #concurrent_collect, but it’s “each” instead of “collect”.
# File lib/forkandreturn/enumerable.rb, line 95 def concurrent_each(*args, &block) concurrent_collect(*args, &block) self end
Like #concurrent_collect, but it’s “reject” instead of “collect”.
# File lib/forkandreturn/enumerable.rb, line 89 def concurrent_reject(*args, &block) self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r} end
Like #concurrent_collect, but it’s “select” instead of “collect”.
# File lib/forkandreturn/enumerable.rb, line 83 def concurrent_select(*args, &block) self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r} end