Files
2025-09-29 00:52:08 +02:00

158 lines
3.2 KiB
Ruby
Executable File

#
# File:: thread.rb
# Description:: Utility classes for handling threaded code.
#
# Author:: Greg Smith <greg@rockstarnorth.com>
# Author:: David Muir <david.muir@rockstarnorth.com>
# Date:: 16 April 2007
#
#----------------------------------------------------------------------------
# Uses
#----------------------------------------------------------------------------
require 'thread'
#----------------------------------------------------------------------------
# Implementation
#----------------------------------------------------------------------------
module Pipeline
#
# == Description
# Wraps any object so it can be accessed in a thread safe way
#
class ThreadObj
def initialize( obj )
@mutex = Mutex.new
@inner_obj = obj
end
def method_missing(method, *args, &block)
@mutex.lock
begin
@inner_obj.send(method,*args,&block)
ensure
@mutex.unlock
end
end
end
#
# == Description
# A simple thread pool implementation.
#
# === Example Usage
# pool = ThreadPool.new( 10, 'Worker' )
# dataitems.each do |item|
# pool.process { item }
# end
#
# === References
# http://snippets.dzone.com/posts/show/3276
#
class ThreadPool
#
# == Description
#
class Worker
def initialize( name )
@mutex = Mutex.new
@thread = Thread.new do
Thread.current[:name] = name
while true
sleep 0.001
block = get_block
if block
block.call
reset_block
end
end
end
end
def get_block
@mutex.synchronize {@block}
end
def set_block(block)
@mutex.synchronize do
raise RuntimeError, "Thread already busy." if @block
@block = block
end
end
def reset_block
@mutex.synchronize {@block = nil}
end
def busy?
@mutex.synchronize {!@block.nil?}
end
end
attr_accessor :max_size
attr_reader :workers
def initialize( max_size = 10, name_prefix = 'ThreadPool_Worker' )
@max_size = max_size
@name_prefix = name_prefix
@workers = []
@mutex = Mutex.new
end
def size
@mutex.synchronize {@workers.size}
end
def busy?
@mutex.synchronize {@workers.any? {|w| w.busy?}}
end
def join
sleep 0.01 while busy?
end
def process(&block)
while true
@mutex.synchronize do
worker = find_available_worker
if worker
return worker.set_block(block)
end
end
sleep( 0.01 )
end
end
def wait_for_worker
while true
worker = find_available_worker
return worker if worker
sleep 0.01
end
end
def find_available_worker
free_worker || create_worker
end
def free_worker
@workers.each {|w| return w unless w.busy?}; nil
end
def create_worker
return nil if @workers.size >= @max_size
worker = Worker.new( "#{@name_prefix}_#{@workers.size+1}" )
@workers << worker
worker
end
end
end # Pipeline module
# thread.rb