158 lines
3.2 KiB
Ruby
Executable File
158 lines
3.2 KiB
Ruby
Executable File
#
|
|
# File:: thread.rb
|
|
# Description:: Utility classes for handling threaded code.
|
|
#
|
|
# Author:: Greg Smith <greg@rockstarnorth.com>
|
|
# Author:: David Muir <david.muir@rockstarnorth.com>
|
|
# Date:: 16 April 2007
|
|
#
|
|
|
|
#----------------------------------------------------------------------------
|
|
# Uses
|
|
#----------------------------------------------------------------------------
|
|
require 'thread'
|
|
|
|
#----------------------------------------------------------------------------
|
|
# Implementation
|
|
#----------------------------------------------------------------------------
|
|
|
|
module Pipeline
|
|
|
|
#
|
|
# == Description
|
|
# Wraps any object so it can be accessed in a thread safe way
|
|
#
|
|
class ThreadObj
|
|
|
|
def initialize( obj )
|
|
@mutex = Mutex.new
|
|
@inner_obj = obj
|
|
end
|
|
|
|
def method_missing(method, *args, &block)
|
|
@mutex.lock
|
|
begin
|
|
@inner_obj.send(method,*args,&block)
|
|
ensure
|
|
@mutex.unlock
|
|
end
|
|
end
|
|
end
|
|
|
|
|
|
#
|
|
# == Description
|
|
# A simple thread pool implementation.
|
|
#
|
|
# === Example Usage
|
|
# pool = ThreadPool.new( 10, 'Worker' )
|
|
# dataitems.each do |item|
|
|
# pool.process { item }
|
|
# end
|
|
#
|
|
# === References
|
|
# http://snippets.dzone.com/posts/show/3276
|
|
#
|
|
class ThreadPool
|
|
|
|
#
|
|
# == Description
|
|
#
|
|
class Worker
|
|
def initialize( name )
|
|
@mutex = Mutex.new
|
|
@thread = Thread.new do
|
|
Thread.current[:name] = name
|
|
while true
|
|
sleep 0.001
|
|
block = get_block
|
|
if block
|
|
block.call
|
|
reset_block
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def get_block
|
|
@mutex.synchronize {@block}
|
|
end
|
|
|
|
def set_block(block)
|
|
@mutex.synchronize do
|
|
raise RuntimeError, "Thread already busy." if @block
|
|
@block = block
|
|
end
|
|
end
|
|
|
|
def reset_block
|
|
@mutex.synchronize {@block = nil}
|
|
end
|
|
|
|
def busy?
|
|
@mutex.synchronize {!@block.nil?}
|
|
end
|
|
end
|
|
|
|
attr_accessor :max_size
|
|
attr_reader :workers
|
|
|
|
def initialize( max_size = 10, name_prefix = 'ThreadPool_Worker' )
|
|
@max_size = max_size
|
|
@name_prefix = name_prefix
|
|
@workers = []
|
|
@mutex = Mutex.new
|
|
end
|
|
|
|
def size
|
|
@mutex.synchronize {@workers.size}
|
|
end
|
|
|
|
def busy?
|
|
@mutex.synchronize {@workers.any? {|w| w.busy?}}
|
|
end
|
|
|
|
def join
|
|
sleep 0.01 while busy?
|
|
end
|
|
|
|
def process(&block)
|
|
while true
|
|
@mutex.synchronize do
|
|
worker = find_available_worker
|
|
if worker
|
|
return worker.set_block(block)
|
|
end
|
|
end
|
|
sleep( 0.01 )
|
|
end
|
|
end
|
|
|
|
def wait_for_worker
|
|
while true
|
|
worker = find_available_worker
|
|
return worker if worker
|
|
sleep 0.01
|
|
end
|
|
end
|
|
|
|
def find_available_worker
|
|
free_worker || create_worker
|
|
end
|
|
|
|
def free_worker
|
|
@workers.each {|w| return w unless w.busy?}; nil
|
|
end
|
|
|
|
def create_worker
|
|
return nil if @workers.size >= @max_size
|
|
worker = Worker.new( "#{@name_prefix}_#{@workers.size+1}" )
|
|
@workers << worker
|
|
worker
|
|
end
|
|
end
|
|
|
|
end # Pipeline module
|
|
|
|
# thread.rb
|