Class: Opee::Actor

Inherits:
Object
  • Object
show all
Defined in:
lib/opee/actor.rb

Overview

The Actor class is the base class for all the asynchronous Objects in OPEE. It accepts requests through the ask() method and excutes those methods on a thread dedicated to the Actor.

Direct Known Subclasses

Collector, Log, Queue

Defined Under Namespace

Classes: Act

Constant Summary

STOPPED =

value of @state that indicates the Actor is not currently processing requests

0
RUNNING =

value of @state that indicates the Actor is currently ready to process requests

1
CLOSING =

value of @state that indicates the Actor is shutting down

2
STEP =

value of @state that indicates the Actor is processing one request and will stop after that processing is complete

3

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Constructor Details

- (Actor) initialize(options = {})

Initializes the Actor with the options specified. A new thread is created during intialization after calling the set_options() method.

Parameters:

  • options (Hash) (defaults to: {})

    options to be used for initialization

Options Hash (options):

  • :max_queue_count (Fixnum)

    maximum number of requests that can be queued before backpressure is applied to the caller.

  • :ask_timeout (Float)

    timeout in seconds to wait before raising a BusyError if the request queue if too long.



29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/opee/actor.rb', line 29

def initialize(options={})
  @queue = []
  @idle = []
  @priority = []
  @ask_mutex = Mutex.new()
  @priority_mutex = Mutex.new()
  @idle_mutex = Mutex.new()
  @step_thread = nil
  @ask_timeout = 0.0
  @max_queue_count = nil
  @ask_thread = nil
  @state = RUNNING
  @busy = false
  @name = nil
  @proc_cnt = 0
  Env.add_actor(self)
  set_options(options)
  @loop = Thread.start(self) do |me|
    Thread.current[:name] = me.name
    while CLOSING != @state
      begin
        if RUNNING == @state || STEP == @state
          a = nil
          if !@priority.empty?
            @priority_mutex.synchronize {
              a = @priority.pop()
            }
          elsif !@queue.empty?
            @ask_mutex.synchronize {
              a = @queue.pop()
            }
            @ask_thread.wakeup() unless @ask_thread.nil?
          elsif !@idle.empty?
            @idle_mutex.synchronize {
              a = @idle.pop()
            }
          else
            Env.wake_finish()
            sleep(1.0)
          end
          @busy = true
          send(a.op, *a.args) unless a.nil?
          @proc_cnt += 1
          @busy = false
          if STEP == @state
            @step_thread.wakeup() unless @step_thread.nil?
            @state = STOPPED
          end
        elsif STOPPED == @state
          sleep(1.0)
        end
      rescue Exception => e
        @busy = false
        Env.rescue(e)
      end
    end
  end
end

Dynamic Method Handling

This class handles dynamic methods through the method_missing method

- (Object) method_missing(m, *args, &blk)

When an attempt is made to call a private method of the Actor it is places on the processing queue. Other methods cause a NoMethodError to be raised as it normally would.

Parameters:

  • m (Symbol)

    method to queue for the Actor

  • args (Array)

    arguments to the op method

  • blk (Proc)

    ignored

Raises:

  • (NoMethodError)


177
178
179
180
# File 'lib/opee/actor.rb', line 177

def method_missing(m, *args, &blk)
  raise NoMethodError.new("undefined method '#{m}' for #{self.class}", m, args) unless respond_to?(m, true)
  ask(m, *args)
end

Instance Attribute Details

- (Object) name

name of the actor



20
21
22
# File 'lib/opee/actor.rb', line 20

def name
  @name
end

- (Object) state (readonly)

The current processing state of the Actor



18
19
20
# File 'lib/opee/actor.rb', line 18

def state
  @state
end

Instance Method Details

- (Object) ask(op, *args)

Queues an operation and arguments to be called when the Actor is ready.

Parameters:

  • op (Symbol)

    method to queue for the Actor

  • args (Array)

    arguments to the op method

Raises:



145
146
147
# File 'lib/opee/actor.rb', line 145

def ask(op, *args)
  timeout_ask(@ask_timeout, op, *args)
end

- (Float) ask_timeout

Returns the default timeout for the time to wait for the Actor to be ready to accept a request using the #ask() method.

Returns:

  • (Float)

    current timeout for the #ask() method



211
212
213
# File 'lib/opee/actor.rb', line 211

def ask_timeout()
  @ask_timeout
end

- (Object) backed_up

Returns a score indicating how backed up the queue is. This is used for selecting an Actor when stepping from the Inspector.



191
192
193
194
195
196
197
198
199
200
# File 'lib/opee/actor.rb', line 191

def backed_up()
  cnt = @queue.size()
  return 0 if 0 == cnt
  if @max_queue_count.nil? || 0 == @max_queue_count
    cnt = 80 if 80 < cnt
    cnt
  else
    cnt * 100 / @max_queue_count
  end
end

- (true|false) busy?

Returns the true if any requests are queued or a request is being processed.

Returns:

  • (true|false)

    true if busy, false otherwise



204
205
206
# File 'lib/opee/actor.rb', line 204

def busy?()
  @busy || !@queue.empty? || !@priority.empty? || !@idle.empty?
end

- (Object) close

Closes the Actor by exiting the processing thread and removing the Actor from the Env.



258
259
260
261
262
263
264
265
266
267
268
# File 'lib/opee/actor.rb', line 258

def close()
  @state = CLOSING
  begin
    # if the loop has already exited this will raise an Exception that can be ignored
    @loop.wakeup()
  rescue
    # ignore
  end
  Env.remove_actor(self)
  @loop.join()
end

- (NilClass|Fixnum) max_queue_count

Returns the maximum number of requests allowed on the normal processing queue. A value of nil indicates there is no limit.

Returns:

  • (NilClass|Fixnum)

    maximum number of request that can be queued



218
219
220
# File 'lib/opee/actor.rb', line 218

def max_queue_count()
  @max_queue_count
end

- (Object) on_idle(op, *args)

Queues an operation and arguments to be called when the Actor is has no other requests to process.

Parameters:

  • op (Symbol)

    method to queue for the Actor

  • args (Array)

    arguments to the op method



153
154
155
156
157
158
# File 'lib/opee/actor.rb', line 153

def on_idle(op, *args)
  @idle_mutex.synchronize {
    @idle.insert(0, Act.new(op, args))
  }
  @loop.wakeup() if RUNNING == @state
end

- (Object) priority_ask(op, *args)

Queues an operation and arguments to be called as soon as possible by the Actor. These requests take precedence over other ordinary requests.

Parameters:

  • op (Symbol)

    method to queue for the Actor

  • args (Array)

    arguments to the op method



164
165
166
167
168
169
# File 'lib/opee/actor.rb', line 164

def priority_ask(op, *args)
  @priority_mutex.synchronize {
    @priority.insert(0, Act.new(op, args))
  }
  @loop.wakeup() if RUNNING == @state
end

- (Fixnum) proc_count

Returns the total number of requested processed.

Returns:

  • (Fixnum)

    number of request processed



224
225
226
# File 'lib/opee/actor.rb', line 224

def proc_count()
  @proc_cnt
end

- (Fixnum) queue_count

Returns the number of requests on all three request queues, the normal, priority, and idle queues.

Returns:

  • (Fixnum)

    number of queued requests



185
186
187
# File 'lib/opee/actor.rb', line 185

def queue_count()
  @queue.length + @priority.length + @idle.length
end

- (Object) set_options(options)

Processes the initialize() options. Subclasses should call super.

Parameters:

  • options (Hash)

    options to be used for initialization

Options Hash (options):

  • :max_queue_count (Fixnum)

    maximum number of requests that can be queued before backpressure is applied to the caller.

  • :ask_timeout (Float)

    timeout in seconds to wait before raising a BusyError if the request queue is too long.



94
95
96
97
98
# File 'lib/opee/actor.rb', line 94

def set_options(options)
  @max_queue_count = options.fetch(:max_queue_count, @max_queue_count)
  @ask_timeout = options.fetch(:ask_timeout, @ask_timeout).to_f
  @name = options[:name]
end

- (Object) start

Restarts the Actor's processing thread.



251
252
253
254
# File 'lib/opee/actor.rb', line 251

def start()
  @state = RUNNING
  @loop.wakeup()
end

- (Object) state_string



107
108
109
110
111
112
113
114
115
116
117
118
119
120
# File 'lib/opee/actor.rb', line 107

def state_string()
  ss = 'UNKNOWN'
  case @state
  when STOPPED
    ss = 'STOPPED'
  when RUNNING
    ss = 'RUNNING'
  when CLOSING
    ss = 'CLOSING'
  when STEP
    ss = 'STEP'
  end
  ss
end

- (Object) step(max_wait = 5)

Causes the Actor to process one request and then stop. The max_wait is used to avoid getting stuck if the processing takes too long.

Parameters:

  • max_wait (Float|Fixnum) (defaults to: 5)

    maximum time to wait for the step to complete



237
238
239
240
241
242
243
# File 'lib/opee/actor.rb', line 237

def step(max_wait=5)
  @state = STEP
  @step_thread = Thread.current
  @loop.wakeup()
  sleep(max_wait)
  @step_thread = nil
end

- (Object) stop

Causes the Actor to stop processing any more requests after the current request has finished.



230
231
232
# File 'lib/opee/actor.rb', line 230

def stop()
  @state = STOPPED
end

- (Object) timeout_ask(timeout, op, *args)

Calls #ask() but uses the specified timeout instead of the default #ask_timeout to determine how long to wait if the Actor's queue is full.

Parameters:

  • timeout (Fixnum|Float)

    maximum time to wait trying to add a request to the Actor's queue

  • op (Symbol)

    method to queue for the Actor

  • args (Array)

    arguments to the op method

Raises:

  • (BusyError)

    if the request queue does not become available in the timeout specified



128
129
130
131
132
133
134
135
136
137
138
139
# File 'lib/opee/actor.rb', line 128

def timeout_ask(timeout, op, *args)
  unless @max_queue_count.nil? || 0 == @max_queue_count || @queue.size() < @max_queue_count
    @ask_thread = Thread.current
    sleep(timeout) unless timeout.nil?
    @ask_thread = nil
    raise BusyError.new() unless @queue.size() < @max_queue_count
  end
  @ask_mutex.synchronize {
    @queue.insert(0, Act.new(op, args))
  }
  @loop.wakeup() if RUNNING == @state
end

- (Object) wakeup

Wakes up the Actor if it has been stopped or if Env.shutdown() has been called.



246
247
248
# File 'lib/opee/actor.rb', line 246

def wakeup()
  @loop.wakeup()
end