ForkAndReturn implements a couple of methods that simplify running a block of code in a subprocess. The result (Ruby object or exception) of the block will be available in the parent process.
The intermediate return value (or exception) will be Marshal’led to disk. This means that it is possible to (concurrently) run thousands of child process, with a relative low memory footprint. Just gather the results once all child process are done. ForkAndReturn will handle the writing, reading and deleting of the temporary file.
The core of these methods is ::fork_and_return_core(). It returns some nested lambdas, which are handled by the other methods and by Enumerable#concurrent_collect. These lambdas handle the WAITing, LOADing and RESULTing (explained in ::fork_and_return_core()).
The child process exits with Process.exit!(), so at_exit() blocks are skipped in the child process. However, both $stdout and $stderr will be flushed.
Only Marshal’lable Ruby objects can be returned.
ForkAndReturn uses Process.fork(), so it only runs on platforms where Process.fork() is implemented.
Fork a new process and run the block of code within that process.
The WAITing, LOADing and RESULTing (explained in ::fork_and_return_core()) will be performed immediately and the return value of the block will be returned.
Example:
[1, 2, 3, 4].collect do |object| Thread.fork do ForkAndReturn.fork_and_return do object*2 end end end.collect do |thread| thread.value end # ===> [2, 4, 6, 8]
This runs each “object*2” statement in a seperate process, concurrently. Hopefully, the processes are spread over all available CPU’s. That’s a simple way of parallel processing! (Although Enumerable#concurrent_collect is even simpler…)
*args is passed to the block.
# File lib/forkandreturn/forkandreturn.rb, line 44 def self.fork_and_return(*args, &block) wait = fork_and_return_core(*args, &block) wait.call.call.call end
Fork a new process and run the block of code within that process.
Returns some nested lambdas: The first lambda is the WAIT-lambda. If you call the WAIT-lambda, you’re going to wait for the child process to finish. The WAIT-lambda returns the LOAD-lambda. If you call the LOAD-lambda, the result of the child process (the return value or the exception) will be loaded from the temporary file into memory and the temporary file will be deleted. The LOAD-lambda returns the RESULT-lambda. If you call RESULT-lambda, the result of the child process will be handled. This means either “return the return value of the block” or “raise the exception”
::fork_and_return_core() is coded like this:
def fork_and_return_core # Fork a process. lambda do # Wait for the result. lambda do # Load the result and delete the temp file. lambda do # Handle the result. end end end end
::fork_and_return_core() is used like this:
wait = ForkAndReturn.fork_and_return_core{raise "BOOM"} load = wait.call result = load.call value = result.call # This is were the exception "BOOM" is raised.
at_exit blocks defined in the child itself will be executed in the child, whereas at_exit blocks defined in the parent won’t be executed in the child.
*args is passed to the block.
Example:
[1, 2, 3, 4].collect do |object| ForkAndReturn.fork_and_return_core do object*2 end end.collect do |wait| wait.call end.collect do |load| load.call end.collect do |result| result.call end # ===> [2, 4, 6, 8]
This runs each “object*2” statement in a seperate process, concurrently.
# File lib/forkandreturn/forkandreturn.rb, line 133 def self.fork_and_return_core(*args, &block) tempfile = Tempfile.new("fork_and_return") tempfile.unlink #begin pid = Process.fork do at_exit do $stdout.flush $stderr.flush Process.exit! # To avoid the execution of already defined at_exit handlers. end begin ok, res = true, yield(*args) rescue ok, res = false, $! end Marshal.dump([ok, res], tempfile) tempfile.close end #rescue Errno::EAGAIN # Resource temporarily unavailable - fork(2) # Kernel.sleep 0.1 # retry # TODO: Reconsider. #end lambda do # Wait for the result. Process.wait(pid) # To avoid zombies. lambda do # Load the result and delete the temp file. begin tempfile.rewind ok, res = *Marshal.load(tempfile) rescue EOFError # end of file reached ok, res = false, WorkerError.new("the worker hasn't returned a result") rescue TypeError # can't be read ok, res = false, WorkerError.new("the worker has returned corrupt data") ensure tempfile.close end lambda do # Handle the result. raise res unless ok res end end end end
Fork a new process and run the block of code within that process.
Returns a lambda. If you call it, the WAITing, LOADing and RESULTing (explained in ::fork_and_return_core()) will be performed in one go.
*args is passed to the block.
Example:
[1, 2, 3, 4].collect do |object| ForkAndReturn.fork_and_return_later do object*2 end end.collect do |wait| wait.call end # ===> [2, 4, 6, 8]
This runs each “object*2” statement in a seperate process, concurrently.
# File lib/forkandreturn/forkandreturn.rb, line 69 def self.fork_and_return_later(*args, &block) wait = fork_and_return_core(*args, &block) lambda{wait.call.call.call} end