module Enumerable

Public Instance Methods

clustered_concurrent_collect(number_of_clusters=ForkAndReturn::Util.cores, &block) click to toggle source

In #clustered_concurrent_collect(), all objects in the enumeration are clustered. Each cluster is than handled in a seperate process. Compare this to #concurrent_collect(), where each object is handled in a separate process.

However, the caller won’t will not be aware of the clusters: The interface is exactly the same as #concurrent_collect() and Enumerable.collect().

#clustered_concurrent_collect() is suitable for handling a lot of not too CPU intensive jobs.

# File lib/forkandreturn/enumerable.rb, line 71
def clustered_concurrent_collect(number_of_clusters=ForkAndReturn::Util.cores, &block)
  number_of_clusters  = 0      unless ForkAndReturn::Util.forkable?

  if number_of_clusters < 1
    self.concurrent_collect(number_of_clusters, &block)
  else
    ThreadLimiter.handle_clusters(self, number_of_clusters, :concurrent_collect, &block)
  end
end
Also aliased as: clustered_concurrent_map
clustered_concurrent_each(*args, &block) click to toggle source

Like #clustered_concurrent_select, but it’s “each” instead of “collect”.

# File lib/forkandreturn/enumerable.rb, line 115
def clustered_concurrent_each(*args, &block)
  clustered_concurrent_collect(*args, &block)

  self
end
clustered_concurrent_map(number_of_clusters=ForkAndReturn::Util.cores, &block) click to toggle source
clustered_concurrent_reject(*args, &block) click to toggle source

Like #clustered_concurrent_select, but it’s “reject” instead of “collect”.

# File lib/forkandreturn/enumerable.rb, line 109
def clustered_concurrent_reject(*args, &block)
  self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r}
end
clustered_concurrent_select(*args, &block) click to toggle source

Like #clustered_concurrent_select, but it’s “select” instead of “collect”.

# File lib/forkandreturn/enumerable.rb, line 103
def clustered_concurrent_select(*args, &block)
  self.zip(self.clustered_concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r}
end
concurrent_collect(max_concurrent_workers=-1) { |*to_a| ... } click to toggle source

For each object in the enumeration, call the block in a seperate process and pass the object to the block and collect the results of the blocks. It must be one of the easiest ways of parallel processing for Ruby.

Example:

[1, 2, 3, 4].concurrent_collect do |object|
  2*object
end   # ===> [2, 4, 6, 8]

This runs each “2*object” in a seperate process. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing!

Note that the code in the block is run in a seperate process, so updating objects and variables in the block won’t affect the parent process:

count = 0
[...].concurrent_collect do
  count += 1
end
count   # ==> 0

#concurrent_collect() is suitable for handling a couple of very CPU intensive jobs, like parsing large XML files.

# File lib/forkandreturn/enumerable.rb, line 25
def concurrent_collect(max_concurrent_workers=-1, &block)
  max_concurrent_workers      = 0  unless ForkAndReturn::Util.forkable?

  case
  when max_concurrent_workers < 0     # No limit.
    self.collect do |object|
      ForkAndReturn.fork_and_return_core do
        if block.arity > 1 and object.kind_of?(Enumerable)
          yield(*object.to_a)
        else
          yield(object)
        end
      end
    end.collect do |wait|
      wait.call
    end.collect do |load|
      load.call
    end.collect do |result|
      result.call
    end
  when max_concurrent_workers == 0    # No fork.
    self.collect(&block)
  when max_concurrent_workers > 0
    self.threaded_collect(max_concurrent_workers) do |object|
      ForkAndReturn.fork_and_return_core do
        if block.arity > 1 and object.kind_of?(Enumerable)
          yield(*object.to_a)
        else
          yield(object)
        end
      end.call
    end.collect do |load|
      load.call
    end.collect do |result|
      result.call
    end
  end
end
Also aliased as: concurrent_map
concurrent_each(*args, &block) click to toggle source

Like #concurrent_collect, but it’s “each” instead of “collect”.

# File lib/forkandreturn/enumerable.rb, line 95
def concurrent_each(*args, &block)
  concurrent_collect(*args, &block)

  self
end
concurrent_map(max_concurrent_workers=-1, &block) click to toggle source
Alias for: concurrent_collect
concurrent_reject(*args, &block) click to toggle source

Like #concurrent_collect, but it’s “reject” instead of “collect”.

# File lib/forkandreturn/enumerable.rb, line 89
def concurrent_reject(*args, &block)
  self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o unless b ; r}
end
concurrent_select(*args, &block) click to toggle source

Like #concurrent_collect, but it’s “select” instead of “collect”.

# File lib/forkandreturn/enumerable.rb, line 83
def concurrent_select(*args, &block)
  self.zip(self.concurrent_collect(*args, &block)).inject([]){|r, (o, b)| r << o if b ; r}
end