module ForkAndReturn

ForkAndReturn implements a couple of methods that simplify running a block of code in a subprocess. The result (Ruby object or exception) of the block will be available in the parent process.

The intermediate return value (or exception) will be Marshal’led to disk. This means that it is possible to (concurrently) run thousands of child process, with a relative low memory footprint. Just gather the results once all child process are done. ForkAndReturn will handle the writing, reading and deleting of the temporary file.

The core of these methods is ::fork_and_return_core(). It returns some nested lambdas, which are handled by the other methods and by Enumerable#concurrent_collect. These lambdas handle the WAITing, LOADing and RESULTing (explained in ::fork_and_return_core()).

The child process exits with Process.exit!(), so at_exit() blocks are skipped in the child process. However, both $stdout and $stderr will be flushed.

Only Marshal’lable Ruby objects can be returned.

ForkAndReturn uses Process.fork(), so it only runs on platforms where Process.fork() is implemented.

Constants

VERSION

Public Class Methods

fork_and_return(*args, &block) click to toggle source

Fork a new process and run the block of code within that process.

The WAITing, LOADing and RESULTing (explained in ::fork_and_return_core()) will be performed immediately and the return value of the block will be returned.

Example:

[1, 2, 3, 4].collect do |object|
  Thread.fork do
    ForkAndReturn.fork_and_return do
      object*2
    end
  end
end.collect do |thread|
  thread.value
end   # ===> [2, 4, 6, 8]

This runs each “object*2” statement in a seperate process, concurrently. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing! (Although Enumerable#concurrent_collect is even simpler…)

*args is passed to the block.

# File lib/forkandreturn/forkandreturn.rb, line 44
def self.fork_and_return(*args, &block)
  wait        = fork_and_return_core(*args, &block)

  wait.call.call.call
end
fork_and_return_core(*args) { |*args| ... } click to toggle source

Fork a new process and run the block of code within that process.

Returns some nested lambdas: The first lambda is the WAIT-lambda. If you call the WAIT-lambda, you’re going to wait for the child process to finish. The WAIT-lambda returns the LOAD-lambda. If you call the LOAD-lambda, the result of the child process (the return value or the exception) will be loaded from the temporary file into memory and the temporary file will be deleted. The LOAD-lambda returns the RESULT-lambda. If you call RESULT-lambda, the result of the child process will be handled. This means either “return the return value of the block” or “raise the exception”

::fork_and_return_core() is coded like this:

def fork_and_return_core
  # Fork a process.

  lambda do
    # Wait for the result.

    lambda do
      # Load the result and delete the temp file.

      lambda do
        # Handle the result.
      end
    end
  end
end

::fork_and_return_core() is used like this:

wait   = ForkAndReturn.fork_and_return_core{raise "BOOM"}
load   = wait.call
result = load.call
value  = result.call   # This is were the exception "BOOM" is raised.

at_exit blocks defined in the child itself will be executed in the child, whereas at_exit blocks defined in the parent won’t be executed in the child.

*args is passed to the block.

Example:

[1, 2, 3, 4].collect do |object|
  ForkAndReturn.fork_and_return_core do
    object*2
  end
end.collect do |wait|
  wait.call
end.collect do |load|
  load.call
end.collect do |result|
  result.call
end   # ===> [2, 4, 6, 8]

This runs each “object*2” statement in a seperate process, concurrently.

# File lib/forkandreturn/forkandreturn.rb, line 133
def self.fork_and_return_core(*args, &block)
  tempfile    = Tempfile.new("fork_and_return")

  tempfile.unlink

  #begin
    pid =
    Process.fork do
      at_exit do
        $stdout.flush
        $stderr.flush

        Process.exit! # To avoid the execution of already defined at_exit handlers.
      end

      begin
        ok, res       = true, yield(*args)
      rescue
        ok, res       = false, $!
      end

      Marshal.dump([ok, res], tempfile)

      tempfile.close
    end
  #rescue Errno::EAGAIN       # Resource temporarily unavailable - fork(2)
  #  Kernel.sleep 0.1

  #  retry                    # TODO: Reconsider.
  #end

  lambda do                   # Wait for the result.
    Process.wait(pid)         # To avoid zombies.

    lambda do                 # Load the result and delete the temp file.
      begin
        tempfile.rewind

        ok, res       = *Marshal.load(tempfile)
      rescue EOFError         # end of file reached
        ok, res       = false, WorkerError.new("the worker hasn't returned a result")
      rescue TypeError        # can't be read
        ok, res       = false, WorkerError.new("the worker has returned corrupt data")
      ensure
        tempfile.close
      end

      lambda do               # Handle the result.
        raise res     unless ok

        res
      end
    end
  end
end
fork_and_return_later(*args, &block) click to toggle source

Fork a new process and run the block of code within that process.

Returns a lambda. If you call it, the WAITing, LOADing and RESULTing (explained in ::fork_and_return_core()) will be performed in one go.

*args is passed to the block.

Example:

[1, 2, 3, 4].collect do |object|
  ForkAndReturn.fork_and_return_later do
    object*2
  end
end.collect do |wait|
  wait.call
end   # ===> [2, 4, 6, 8]

This runs each “object*2” statement in a seperate process, concurrently.

# File lib/forkandreturn/forkandreturn.rb, line 69
def self.fork_and_return_later(*args, &block)
  wait        = fork_and_return_core(*args, &block)

  lambda{wait.call.call.call}
end