Class | Ruote::Dashboard |
In: |
lib/ruote/dashboard.rb
|
Parent: | Object |
This class was once named ‘Engine’, but since ruote 2.x and its introduction of workers, the methods here are those of a "dashboard". The real engine being the set of workers.
The methods here allow to launch processes and to query about their status. There are also methods for fixing issues with stalled processes or processes stuck in errors.
NOTE : the methods launch and reply are implemented in Ruote::ReceiverMixin (this Engine class has all the methods of a Receiver).
WORKER_STATES | = | %w[ running stopped paused ] |
context | [R] | |
variables | [R] |
Creates an engine using either worker or storage.
If a storage instance is given as the first argument, the engine will be able to manage processes (for example, launch and cancel workflows) but will not actually run any workflows.
If a worker instance is given as the first argument and the second argument is true, engine will start the worker and will be able to both manage and run workflows.
If the second options is set to { :join => true }, the worker will be started and run in the current thread (and the initialize method will not return).
Adds a service locally (will not get propagated to other workers).
tracer = Tracer.new @dashboard.add_service('tracer', tracer)
or
@dashboard.add_service( 'tracer', 'ruote/exp/tracer', 'Ruote::Exp::Tracer')
This method returns the service instance it just bound.
/!\ warning: advanced method.
Adds a tracker to the ruote engine.
Returns the tracker_id.
Computes mutation and immediately applies it…
See compute_mutation
Return the mutation instance (forensic?)
Given a flow expression id, locates the corresponding ruote expression and attaches a subprocess to it.
Accepts the fei as a Hash or as a FlowExpressionId instance.
By default, the workitem of the expression you attach to provides the initial workitem for the attached branch. By using the :fields/:workitem or :merge_fields options, one can change that.
Returns the fei of the attached [root] expression (as a FlowExpressionId instance).
Given a workitem or a fei, will do a cancel_expression, else it‘s a wfid and it does a cancel_process.
They will get passed as is in the underlying ‘msg’, it can be useful to flag the message for historical purposes as in
dashboard.cancel(wfid, 'reason' => 'cleanup', 'user' => current_user)
Returns a Mutation instance listing all the operations necessary to transform the current tree of the process (wfid) into the given definition tree (pdef).
See also apply_mutation
Returns a configuration value.
dashboard.configure('ruby_eval_allowed', true) p dashboard.configuration('ruby_eval_allowed') # => true
Sets a configuration option. Examples:
# allow remote workflow definitions (for subprocesses or when launching # processes) @dashboard.configure('remote_definition_allowed', true) # allow ruby_eval @dashboard.configure('ruby_eval_allowed', true)
Given a workitem or a fei (or a String version of a fei), returns the corresponding error (or nil if there is no other).
Returns an array of current errors (hashes)
Can be called in two ways :
dashboard.errors(wfid)
and
dashboard.errors(:skip => 100, :limit => 100)
Given a workitem or a fei, will do a kill_expression, else it‘s a wfid and it does a kill_process.
(also see notes about opts for cancel)
Quick note : the implementation of launch is found in the module Ruote::ReceiverMixin that the engine includes.
Some processes have to have one and only one instance of themselves running, these are called ‘singles’ (‘singleton’ is too object-oriented).
When called, this method will check if an instance of the pdef is already running (it uses the process definition name attribute), if yes, it will return without having launched anything. If there is no such process running, it will launch it (and register it).
Returns the wfid (workflow instance id) of the running single.
Warning : expensive operation.
Leftovers are workitems, errors and schedules belonging to process instances for which there are no more expressions left.
Better delete them or investigate why they are left here.
The result is a list of documents (hashes) as found in the storage. Each of them might represent a workitem, an error or a schedule.
If you want to delete one of them you can do
dashboard.storage.delete(doc)
Sets a participant or subprocess to be triggered when an error occurs in a process instance.
dashboard.on_error = participant_name dashboard.on_error = subprocess_name dashboard.on_error = Ruote.process_definition do alpha end
Note that this ‘on_error’ doesn‘t trigger if an on_error is defined in the process itself.
Returns the process tree that is triggered in case of process termination.
Note that a termination process doesn‘t raise a termination process when it terminates itself.
Returns nil if there is no ‘on_terminate’ set.
Sets a participant or a subprocess that is to be launched/called whenever a regular process terminates.
dashboard.on_terminate = participant_name dashboard.on_terminate = subprocess_name dashboard.on_terminate = Ruote.define do alpha bravo end
Note that a termination process doesn‘t raise a termination process when it terminates itself.
on_terminate processes are not triggered for on_error processes. on_error processes are triggered for on_terminate processes as well.
Returns an instance of the participant registered under the given name. Returns nil if there is no participant registered for that name.
Returns a list of Ruote::ParticipantEntry instances.
dashboard.register_participant :alpha, MyParticipant, 'message' => 'hello' # interrogate participant list # list = dashboard.participant_list participant = list.first p participant.regex # => "^alpha$" p participant.classname # => "MyParticipant" p participant.options # => {"message"=>"hello"} # update participant list # participant.regex = '^alfred$' dashboard.participant_list = list
Accepts a list of Ruote::ParticipantEntry instances or a list of
See Engine#participant_list
Some examples :
dashboard.participant_list = [ [ '^charly$', [ 'Ruote::StorageParticipant', {} ] ], [ '.+', [ 'MyDefaultParticipant', { 'default' => true } ] ]
This method writes the participant list in one go, it might be easier to use than to register participant one by ones.
Given a wfid, will [attempt to] pause the corresponding process instance. Given an expression id (fei) will [attempt to] pause the expression and its children.
The only known option for now is :breakpoint => true, which lets the engine only pause the targetted expression.
By default, pausing an expression will pause that expression and all its children.
engine.pause(fei, :breakpoint => true)
will only flag as paused the given fei. When the children of that expression will reply to it, the execution for this branch of the process will stop, much like a break point.
Returns an array of ProcessStatus instances.
WARNING : this is an expensive operation, but it understands :skip and :limit, so pagination is our friend.
Please note, if you‘re interested only in processes that have errors, Engine#errors is a more efficient means.
To simply list the wfids of the currently running, Engine#process_wfids is way cheaper to call.
Re-applies an expression (given via its FlowExpressionId).
That will cancel the expression and, once the cancel operation is over (all the children have been cancelled), the expression will get re-applied.
The fei parameter may be a hash, a Ruote::FlowExpressionId instance, a Ruote::Workitem instance or a sid string.
:tree is used to completely change the tree of the expression at re_apply
dashboard.re_apply( fei, :tree => [ 'participant', { 'ref' => 'bob' }, [] ])
:fields is used to replace the fields of the workitem at re_apply
dashboard.re_apply( fei, :fields => { 'customer' => 'bob' })
:workitem is ok too
dashboard.re_apply( fei, :workitem => { 'fields' => { 'customer' => 'bob' } })
:merge_in_fields is used to add / override fields
dashboard.re_apply( fei, :merge_in_fields => { 'customer' => 'bob' })
A shorter version of register_participant
dashboard.register 'alice', MailParticipant, :target => 'alice@example.com'
or a block registering mechanism.
dashboard.register do alpha 'Participants::Alpha', 'flavour' => 'vanilla' participant 'bravo', 'Participants::Bravo', :flavour => 'peach' catchall ParticipantCharlie, 'flavour' => 'coconut' end
Originally implemented in ruote-kit by Torsten Schoenebaum.
By default, when registering multiple participants in block, ruote considers you‘re wiping the participant list and re-adding them all.
You can prevent the clearing by stating :clear => false like in :
dashboard.register :clear => false do alpha 'Participants::Alpha', 'flavour' => 'vanilla' participant 'bravo', 'Participants::Bravo', :flavour => 'peach' catchall ParticipantCharlie, 'flavour' => 'coconut' end
Registers a participant in the engine.
Takes the form
dashboard.register_participant name_or_regex, klass, opts={}
With the form
dashboard.register_participant name_or_regex do |workitem| # ... end
A BlockParticipant is automatically created.
When registering participants, strings or regexes are accepted. Behind the scenes, a regex is kept.
Passing a string like "alain" will get ruote to automatically turn it into the following regex : /^alain$/.
For finer control over this, pass a regex directly
dashboard.register_participant /^user-/, MyParticipant # will match all workitems whose participant name starts with "user-"
dashboard.register_participant 'compute_sum' do |wi| wi.fields['sum'] = wi.fields['articles'].inject(0) do |s, (c, v)| s + c * v # sum + count * value end # a block participant implicitely replies to the engine immediately end class MyParticipant def initialize(opts) @name = opts['name'] end def on_workitem workitem.fields['rocket_name'] = @name send_to_the_moon(workitem) end def on_cancel # do nothing end end dashboard.register_participant( /^moon-.+/, MyParticipant, 'name' => 'Saturn-V') # computing the total for a invoice being passed in the workitem. # class TotalParticipant include Ruote::LocalParticipant def on_workitem workitem['total'] = workitem.fields['items'].inject(0.0) { |t, item| t + item['count'] * PricingService.lookup(item['id']) } reply end def on_cancel end end dashboard.register_participant 'total', TotalParticipant
Remember that the options (the hash that follows the class name), must be serializable via JSON.
It‘s OK to register a participant by passing its full classname as a String.
dashboard.register_participant( 'auditor', 'AuditParticipant', 'require_path' => 'part/audit.rb') dashboard.register_participant( 'auto_decision', 'DecParticipant', 'load_path' => 'part/dec.rb')
Note the option load_path / require_path that point to the ruby file containing the participant implementation. ‘require’ will load and eval the ruby code only once, ‘load’ each time.
By default, when registering a participant, if this results in a regex that is already used, the previously registered participant gets unregistered.
dashboard.register_participant 'alpha', AaParticipant dashboard.register_participant 'alpha', BbParticipant, :override => false
This can be useful when the accept? method of participants are in use.
Note that using the register(&block) method, :override => false is automatically enforced.
dashboard.register do alpha AaParticipant alpha BbParticipant end
One can specify the position where the participant should be inserted in the participant list.
dashboard.register_participant 'auditor', AuditParticipant, :pos => 'last'
/!\ warning: advanced method.
Removes a tracker from the ruote system.
The first arg is a FlowExpressionId, in its instance form, hash form or shortened (sid) string form. It can also be any string (any tracker id).
The second arg is optional, it‘s a wfid. It‘s useful for some storage implementations (like ruote-swf) and helps determine how to grab the tracker list. Most of the ruote deployments don‘t need that arg set.
This method re_apply all the leaves of a process instance. It‘s meant to be used against stalled workflows to give them back the spark of life.
Stalled workflows can happen when msgs get lost. It also happens with some storage implementations where msgs are stored differently from expressions and co.
By default, it doesn‘t re_apply leaves that are in error. If the ‘errors_too’ option is set to true, it will re_apply leaves in error as well. For example:
$dashboard.respark(wfid, 'errors_too' => true)
Given a wfid will [attempt to] resume the process instance. Given an expression id (fei) will [attempt to] to resume the expression and its children.
Note : this is supposed to be called on paused expressions / instances, this is NOT meant to be called to unstuck / unhang a process.
Resuming a process instance is equivalent to calling resume on its root expression. If the root is not paused itself, this will have no effect.
dashboard.resume(wfid, :anyway => true)
will make sure to call resume on each of the paused branch within the process instance (tree), effectively resuming the whole process.
Shuts down the engine, mostly passes the shutdown message to the other services and hope they‘ll shut down properly.
A convenience method for
sp = Ruote::StorageParticipant.new(dashboard)
simply do
sp = dashboard.storage_participant
This method expects there to be a logger with a wait_for method in the context, else it will raise an exception.
WARNING: wait_for() is meant for environments where there is a unique worker and that worker is nested in this engine. In a multiple worker environment wait_for doesn‘t see events handled by ‘other’ workers.
This method is only useful for test/quickstart/examples environments.
dashboard.wait_for(:alpha) # will make the current thread block until a workitem is delivered # to the participant named 'alpha' engine.wait_for('123432123-9043') # will make the current thread block until the processed whose # wfid is given (String) terminates or produces an error. engine.wait_for(5) # will make the current thread block until 5 messages have been # processed on the workqueue... engine.wait_for(:empty) # will return as soon as the engine/storage is empty, ie as soon # as there are no more processes running in the engine (no more # expressions placed in the storage) engine.wait_for('terminated') # will return as soon as any process has a 'terminated' event.
It‘s OK to wait for multiple wfids:
engine.wait_for('20100612-bezerijozo', '20100612-yakisoba')
If one needs to wait for something else than a wfid but needs to break in case of error:
engine.wait_for(:alpha, :or_error)
Ruote 2.3.0 introduced the ability to wait for an event given its name. Here is a quick list of event names and a their description:
Names that are past participles are for notification events, while plain verbs are for action events. Most of the time, a notitication is emitted has the result of an action event, workers don‘t take any action on them, but services that are listening to the ruote activity might want to do something about them.
For more precise testing, wait_for accepts hashes, for example:
r = dashboard.wait_for('action' => 'apply', 'exp_name' => 'wait')
will block until a wait expression is applied.
If you know ruote msgs, you can pinpoint at will:
r = dashboard.wait_for( 'action' => 'apply', 'exp_name' => 'wait', 'fei.wfid' => wfid)
wait_for returns the intercepted event. It‘s useful when testing/ spec‘ing, as in:
it 'completes successfully' do definition = Ruote.define :on_error => 'charly' do alpha bravo end wfid = @board.launch(definition) r = @board.wait_for(wfid) # wait until process terminates or hits an error r['workitem'].should_not == nil r['workitem']['fields']['alpha'].should == 'was here' r['workitem']['fields']['bravo'].should == 'was here' r['workitem']['fields']['charly'].should == nil end
One can pass a timeout value in seconds for the wait_for call, as in:
dashboard.wait_for(wfid, :timeout => 5 * 60)
The default timeout is 60 (seconds). A nil or negative timeout disables the timeout.
Returns the state the workers are supposed to be in right now. It‘s usually ‘running’, but it could be ‘stopped’ or ‘paused’.
Sets the [desired] worker state. The workers will check that target state at their next beat and switch to it.
Setting the state to ‘stopped’ will force the workers to stop as soon as they notice the new state.
Setting the state to ‘paused’ will force the workers to pause. They will not process msgs until the state is set back to ‘running’.
By default the [engine] option ‘worker_state_enabled’ is not set, so calling this method will result in a error, unless ‘worker_state_enabled’ was set to true when the storage was initialized.