ptolemy.actor.process
Class ProcessDirector

java.lang.Object
  extended by ptolemy.kernel.util.NamedObj
      extended by ptolemy.kernel.util.Attribute
          extended by ptolemy.actor.Director
              extended by ptolemy.actor.process.ProcessDirector
All Implemented Interfaces:
java.io.Serializable, java.lang.Cloneable, Executable, Initializable, Changeable, Debuggable, DebugListener, Derivable, ModelErrorHandler, MoMLExportable, Moveable, Nameable
Direct Known Subclasses:
CompositeProcessDirector

public class ProcessDirector
extends Director

The base class for directors for the process oriented domains. It provides default implementations for methods that are common across such domains.

In the process oriented domains, the director controlling a model needs to keep track of the state of the model. In particular it needs to maintain an accurate count of the number of active processes under its control and any processes that are blocked for whatever reason (trying to read from an empty channel as in PN). These counts, and perhaps other counts, are needed by the director to control and respond when deadlock is detected (no processes can make progress), or to respond to requests from higher in the hierarchy.

The methods that control how the director detects and responds to deadlocks are _areActorsDeadlocked() and _resolveDeadlock(). These methods should be overridden in derived classes to get domain-specific behaviour. The implementations given here are trivial and suffice only to illustrate the approach that should be followed.

This base class is not sufficient for executing hierarchical, heterogeneous models. In order to accommodate hierarchical, heterogeneity the subclass CompositeProcessDirector must be used.

Since:
Ptolemy II 0.2
Version:
$Id: ProcessDirector.java 57040 2010-01-27 20:52:32Z cxh $
Author:
Mudit Goel, Neil Smyth, John S. Davis II
See Also:
Director, Serialized Form
Accepted Rating:
Yellow (mudit)
Proposed Rating:
Green (mudit)

Nested Class Summary
 
Nested classes/interfaces inherited from class ptolemy.kernel.util.NamedObj
NamedObj.ContainedObjectsIterator
 
Field Summary
private  java.util.HashSet _activeThreads
          The threads created by this director.
private  java.util.HashSet _blockedThreads
          The set of threads that are blocked on an IO operation.
private  java.util.LinkedList _newActorThreadList
          A list of threads created but not started.
protected  boolean _notDone
          A flag for determining whether successive iterations will be permitted.
private  java.util.HashSet _pausedThreads
          The set of threads that have been paused in response to stopFire().
protected  boolean _stopFireRequested
          Indicator that a stopFire has been requested by a call to stopFire().
 
Fields inherited from class ptolemy.actor.Director
_actorsFinishedExecution, _currentTime, _finishRequested, _initializables, _stopRequested, timeResolution
 
Fields inherited from class ptolemy.kernel.util.NamedObj
_changeListeners, _changeLock, _changeRequests, _debugging, _debugListeners, _elementName, _isPersistent, _verbose, _workspace, ATTRIBUTES, CLASSNAME, COMPLETE, CONTENTS, DEEP, FULLNAME, LINKS
 
Fields inherited from interface ptolemy.actor.Executable
COMPLETED, NOT_READY, STOP_ITERATING
 
Constructor Summary
ProcessDirector()
          Construct a director in the default workspace with an empty string as its name.
ProcessDirector(CompositeEntity container, java.lang.String name)
          Construct a director in the given container with the given name.
ProcessDirector(Workspace workspace)
          Construct a director in the workspace with an empty name.
 
Method Summary
protected  boolean _areAllThreadsStopped()
          Return true if the count of active processes equals the number of paused and blocked threads.
protected  boolean _areThreadsDeadlocked()
          Return true if the count of active processes in the container is 0.
protected  int _getActiveThreadsCount()
          Return the number of active threads under the control of this director.
protected  int _getBlockedThreadsCount()
          Return the number of threads that are currently blocked.
protected  int _getStoppedThreadsCount()
          Return the number of threads that are currently stopped.
protected  ProcessThread _newProcessThread(Actor actor, ProcessDirector director)
          Create a new ProcessThread for controlling the actor that is passed as a parameter of this method.
private  void _requestFinishOnReceivers()
          Call requestFinish() on all receivers.
protected  boolean _resolveDeadlock()
          Return false indicating that deadlock has not been resolved and that execution will be discontinued.
 void addThread(java.lang.Thread thread)
          Notify this director that the specified thread is part of the execution of this model.
 java.lang.Object clone(Workspace workspace)
          Clone the director into the specified workspace.
 void finish()
          Request that the current iteration finishes and postfire() returns false, indicating to the environment that no more iterations should be invoked.
 void fire()
          Wait until a deadlock is detected.
 void initialize()
          Invoke the initialize() methods of all the deeply contained actors in the container (a composite actor) of this director.
 void initialize(Actor actor)
          Initialize the given actor.
 boolean isStopFireRequested()
          Return true if a stop has been requested on the director.
 boolean isStopRequested()
          Return true if a stop has been requested on the director.
 boolean isThreadActive(java.lang.Thread thread)
          Return true if the specified thread has been registered with addThread() and has not been removed with removeThread().
 Receiver newReceiver()
          Return a new receiver of a type compatible with this director.
 boolean postfire()
          Return false if a stop has been requested or if the model has reached deadlock.
 boolean prefire()
          Start threads for all actors that have not had threads started already (this might include actors initialized since the last invocation of prefire).
 void preinitialize()
          Preinitialize the model controlled by this director.
 void removeThread(java.lang.Thread thread)
          Notify this director that the specified thread has finished executing.
 void stop()
          Request that the director cease execution altogether.
 void stopFire()
          Request that execution stop at the conclusion of the current iteration.
 void terminate()
          Terminate all threads under control of this director immediately.
 void threadBlocked(java.lang.Thread thread, ProcessReceiver receiver)
          Notify the director that the specified thread is blocked on an I/O operation.
 void threadHasPaused(java.lang.Thread thread)
          Notify the director that the specified thread has paused in response to a call to stopFire().
 void threadHasResumed(java.lang.Thread thread)
          Notify the director that the specified thread has resumed.
 void threadUnblocked(java.lang.Thread thread, ProcessReceiver receiver)
          Notify the director that the specified thread is unblocked on an I/O operation.
 boolean transferInputs(IOPort port)
          Do nothing.
 boolean transferOutputs(IOPort port)
          Do nothing.
 void wrapup()
          End the execution of the model under the control of this director.
 
Methods inherited from class ptolemy.actor.Director
_description, _fireContainerAt, _isEmbedded, _isTopLevel, _transferInputs, _transferOutputs, addInitializable, attributeChanged, createSchedule, defaultDependency, fireAt, fireAt, fireAtCurrentTime, getCausalityInterface, getCurrentTime, getErrorTolerance, getGlobalTime, getModelNextIterationTime, getModelStartTime, getModelStopTime, getModelTime, getNextIterationTime, getStartTime, getStopTime, getTimeResolution, implementsStrictActorSemantics, invalidateResolvedTypes, invalidateSchedule, isFireFunctional, isStrict, iterate, preinitialize, removeInitializable, requestInitialization, setContainer, setCurrentTime, setModelTime, suggestedModalModelDirectors, supportMultirateFiring
 
Methods inherited from class ptolemy.kernel.util.Attribute
_checkContainer, _getContainedObject, _propagateExistence, getContainer, moveDown, moveToFirst, moveToIndex, moveToLast, moveUp, setName, updateContent
 
Methods inherited from class ptolemy.kernel.util.NamedObj
_addAttribute, _adjustOverride, _attachText, _cloneFixAttributeFields, _debug, _debug, _debug, _debug, _debug, _exportMoMLContents, _getIndentPrefix, _isMoMLSuppressed, _markContentsDerived, _propagateValue, _recordDecoratedAttributes, _removeAttribute, _splitName, _stripNumericSuffix, _validateSettables, addChangeListener, addDebugListener, attributeList, attributeList, attributeTypeChanged, clone, containedObjectsIterator, deepContains, depthInHierarchy, description, description, event, executeChangeRequests, exportMoML, exportMoML, exportMoML, exportMoML, exportMoML, exportMoMLPlain, getAttribute, getAttribute, getAttributes, getChangeListeners, getClassName, getDecoratorAttribute, getDecoratorAttributes, getDerivedLevel, getDerivedList, getDisplayName, getElementName, getFullName, getModelErrorHandler, getName, getName, getPrototypeList, getSource, handleModelError, isDeferringChangeRequests, isOverridden, isPersistent, lazyContainedObjectsIterator, message, propagateExistence, propagateValue, propagateValues, removeChangeListener, removeDebugListener, requestChange, setClassName, setDeferringChangeRequests, setDerivedLevel, setDisplayName, setModelErrorHandler, setPersistent, setSource, sortContainedObjects, toplevel, toString, uniqueName, validateSettables, workspace
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 

Field Detail

_notDone

protected boolean _notDone
A flag for determining whether successive iterations will be permitted.


_stopFireRequested

protected boolean _stopFireRequested
Indicator that a stopFire has been requested by a call to stopFire().


_blockedThreads

private java.util.HashSet _blockedThreads
The set of threads that are blocked on an IO operation.


_pausedThreads

private java.util.HashSet _pausedThreads
The set of threads that have been paused in response to stopFire().


_activeThreads

private java.util.HashSet _activeThreads
The threads created by this director.


_newActorThreadList

private java.util.LinkedList _newActorThreadList
A list of threads created but not started.

Constructor Detail

ProcessDirector

public ProcessDirector()
Construct a director in the default workspace with an empty string as its name. The director is added to the list of objects in the workspace. Increment the version number of the workspace.


ProcessDirector

public ProcessDirector(Workspace workspace)
Construct a director in the workspace with an empty name. The director is added to the list of objects in the workspace. Increment the version number of the workspace.

Parameters:
workspace - The workspace of this object.

ProcessDirector

public ProcessDirector(CompositeEntity container,
                       java.lang.String name)
                throws IllegalActionException,
                       NameDuplicationException
Construct a director in the given container with the given name. If the container argument must not be null, or a NullPointerException will be thrown. If the name argument is null, then the name is set to the empty string. Increment the version number of the workspace.

Parameters:
container - The container
name - Name of this director.
Throws:
IllegalActionException - If the name contains a period, or if the director is not compatible with the specified container.
NameDuplicationException - If the container not a CompositeActor and the name collides with an entity in the container.
Method Detail

addThread

public void addThread(java.lang.Thread thread)
Notify this director that the specified thread is part of the execution of this model. This is used to keep track of whether the model is deadlocked, and also to terminate threads if necessary. It is important that the thread call _removeThread() upon exiting. Note further that this should be called before the thread is started to avoid race conditions where some threads have been started and others have not been started and deadlock is falsely detected because the not-yet-started threads are not counted.

Parameters:
thread - The thread.
See Also:
removeThread(Thread)

clone

public java.lang.Object clone(Workspace workspace)
                       throws java.lang.CloneNotSupportedException
Clone the director into the specified workspace. The new object is not added to the directory of that workspace (It must be added by the user if he wants it to be there). The result is a new director with no container, no pending mutations, and no topology listeners. The count of active processes is zero.

Overrides:
clone in class Attribute
Parameters:
workspace - The workspace for the cloned object.
Returns:
The new ProcessDirector.
Throws:
java.lang.CloneNotSupportedException - If one of the attributes cannot be cloned.
See Also:
NamedObj.exportMoML(Writer, int, String), NamedObj.setDeferringChangeRequests(boolean)

finish

public void finish()
Request that the current iteration finishes and postfire() returns false, indicating to the environment that no more iterations should be invoked. To support domains where actor firings do not necessarily terminate, such as PN, you may wish to call stopFire() as well to request that those actors complete their firings.

Overrides:
finish in class Director

fire

public void fire()
          throws IllegalActionException
Wait until a deadlock is detected. Then deal with the deadlock by calling the protected method _resolveDeadlock() and return. This method is synchronized on the director.

Specified by:
fire in interface Executable
Overrides:
fire in class Director
Throws:
IllegalActionException - If a derived class throws it.

initialize

public void initialize()
                throws IllegalActionException
Invoke the initialize() methods of all the deeply contained actors in the container (a composite actor) of this director. These are expected to call initialize(Actor), which will result in the creation of a new thread for each actor. Also, set current time to 0.0, or to the current time of the executive director of the container, if there is one.

Specified by:
initialize in interface Initializable
Overrides:
initialize in class Director
Throws:
IllegalActionException - If the initialize() method of one of the deeply contained actors throws it.

initialize

public void initialize(Actor actor)
                throws IllegalActionException
Initialize the given actor. This class overrides the base class to reset the flags for all of the receivers, and to create a new ProcessThread for each actor being controlled. This class does *NOT* directly call the initialize method of the actor. That method is instead called by the actor's thread itself. This allows actors in process domains to create tokens during initialization, since sending data in a process-based domain requires threads for each actor.

Overrides:
initialize in class Director
Parameters:
actor - The actor that is to be initialized.
Throws:
IllegalActionException - If the actor is not acceptable to the domain. Not thrown in this base class.

isStopFireRequested

public boolean isStopFireRequested()
Return true if a stop has been requested on the director. This is used by the ProcessThread to tell the difference between a request to pause and a request to stop.

Returns:
True if stop() has been called.

isStopRequested

public boolean isStopRequested()
Return true if a stop has been requested on the director. This is used by the ProcessThread to tell the difference between a request to pause and a request to stop.

Overrides:
isStopRequested in class Director
Returns:
True if stop() has been called.
See Also:
Director.stop()

isThreadActive

public boolean isThreadActive(java.lang.Thread thread)
Return true if the specified thread has been registered with addThread() and has not been removed with removeThread().

Parameters:
thread - The thread.
Returns:
True if the specified thread is active.
See Also:
addThread(Thread), removeThread(Thread)

newReceiver

public Receiver newReceiver()
Return a new receiver of a type compatible with this director. In class, this returns a new Mailbox.

Overrides:
newReceiver in class Director
Returns:
A new Mailbox.

postfire

public boolean postfire()
                 throws IllegalActionException
Return false if a stop has been requested or if the model has reached deadlock. Return true otherwise.

Specified by:
postfire in interface Executable
Overrides:
postfire in class Director
Returns:
False if the director has detected a deadlock or a stop has been requested.
Throws:
IllegalActionException - If a derived class throws it.

prefire

public boolean prefire()
                throws IllegalActionException
Start threads for all actors that have not had threads started already (this might include actors initialized since the last invocation of prefire). This starts the threads, corresponding to all the actors, that were created in a mutation.

Specified by:
prefire in interface Executable
Overrides:
prefire in class Director
Returns:
True.
Throws:
IllegalActionException - If a derived class throws it.

preinitialize

public void preinitialize()
                   throws IllegalActionException
Preinitialize the model controlled by this director. This subclass overrides the base class to initialize the number of running threads before proceeding with preinitialization of the model.

Specified by:
preinitialize in interface Initializable
Overrides:
preinitialize in class Director
Throws:
IllegalActionException - If creating an actor thread throws it.

removeThread

public void removeThread(java.lang.Thread thread)
Notify this director that the specified thread has finished executing. This is used to keep track of whether the model is deadlocked, and also to terminate threads if necessary.

Parameters:
thread - The thread.
See Also:
addThread(Thread)

stop

public void stop()
Request that the director cease execution altogether. This causes a call to stop() on all actors contained by the container of this director, and a call to stopThread() on each of the process threads that contain actors controlled by this director. This also sets a flag so that the next call to postfire() returns false.

Specified by:
stop in interface Executable
Overrides:
stop in class Director

stopFire

public void stopFire()
Request that execution stop at the conclusion of the current iteration. Call stopThread() on each of the process threads that contain actors controlled by this director and call stopFire() on the actors that are contained by these threads. This method is non-blocking.

Specified by:
stopFire in interface Executable
Overrides:
stopFire in class Director

terminate

public void terminate()
Terminate all threads under control of this director immediately. This abrupt termination will not allow normal cleanup actions to be performed, and the model should be recreated after calling this method. This method uses Thread.stop(), a deprecated method in Java.

Specified by:
terminate in interface Executable
Overrides:
terminate in class Director

threadBlocked

public void threadBlocked(java.lang.Thread thread,
                          ProcessReceiver receiver)
Notify the director that the specified thread is blocked on an I/O operation. If the thread has not been registered with addThread(), then this call is ignored.

Parameters:
thread - The thread.
receiver - The receiver handling the I/O operation, or null if it is not a specific receiver.
See Also:
addThread(Thread)

threadHasPaused

public void threadHasPaused(java.lang.Thread thread)
Notify the director that the specified thread has paused in response to a call to stopFire(). If the thread has not been registered with addThread(), then this call is ignored. If the thread has been identified as blocked, it is removed from the set of blocked threads (so it doesn't get counted twice).

Parameters:
thread - The thread.
See Also:
addThread(Thread)

threadHasResumed

public void threadHasResumed(java.lang.Thread thread)
Notify the director that the specified thread has resumed. If the director has not previously been notified that it was paused, then this call is ignored.

Parameters:
thread - The thread.
See Also:
threadHasPaused(Thread)

threadUnblocked

public void threadUnblocked(java.lang.Thread thread,
                            ProcessReceiver receiver)
Notify the director that the specified thread is unblocked on an I/O operation. If the thread has not been registered with threadBlocked(), then this call is ignored.

Parameters:
thread - The thread.
receiver - The receiver handling the I/O operation, or null if it is not a specific receiver.
See Also:
*

transferInputs

public boolean transferInputs(IOPort port)
Do nothing. Input transfers in process domains are handled by branches, which transfer inputs in a separate thread.

Overrides:
transferInputs in class Director
Parameters:
port - The port.
Returns:
False, to indicate that no tokens were transferred.

transferOutputs

public boolean transferOutputs(IOPort port)
Do nothing. Output transfers in process domains are handled by branches, which transfer inputs in a separate thread.

Overrides:
transferOutputs in class Director
Parameters:
port - The port.
Returns:
False, to indicate that no tokens were transferred.

wrapup

public void wrapup()
            throws IllegalActionException
End the execution of the model under the control of this director. A flag is set in all the receivers that causes each process to terminate at the earliest communication point. Prior to setting receiver flags, this method wakes up the threads if they all are stopped. If the container is not an instance of CompositeActor, then this method does nothing.

This method is not synchronized on the workspace, so the caller should be.

Specified by:
wrapup in interface Initializable
Overrides:
wrapup in class Director
Throws:
IllegalActionException - If an error occurs while accessing the receivers of all actors under the control of this director.

_areAllThreadsStopped

protected boolean _areAllThreadsStopped()
Return true if the count of active processes equals the number of paused and blocked threads. Otherwise return false.

Returns:
True if there are no active processes in the container.

_areThreadsDeadlocked

protected boolean _areThreadsDeadlocked()
Return true if the count of active processes in the container is 0. Otherwise return false. Derived classes must override this method to return true to any other forms of deadlocks that they might introduce.

Returns:
True if there are no active processes in the container.

_getActiveThreadsCount

protected final int _getActiveThreadsCount()
Return the number of active threads under the control of this director.

Returns:
The number of active threads.

_getBlockedThreadsCount

protected final int _getBlockedThreadsCount()
Return the number of threads that are currently blocked.

Returns:
Return the number of threads that are currently blocked.

_getStoppedThreadsCount

protected final int _getStoppedThreadsCount()
Return the number of threads that are currently stopped.

Returns:
Return the number of threads that are currently stopped.

_newProcessThread

protected ProcessThread _newProcessThread(Actor actor,
                                          ProcessDirector director)
                                   throws IllegalActionException
Create a new ProcessThread for controlling the actor that is passed as a parameter of this method. Subclasses are encouraged to override this method as necessary for domain specific functionality.

Parameters:
actor - The actor that the created ProcessThread will control.
director - The director that manages the model that the created thread is associated with.
Returns:
Return a new ProcessThread that will control the actor passed as a parameter for this method.
Throws:
IllegalActionException - If creating an new ProcessThread throws it.

_resolveDeadlock

protected boolean _resolveDeadlock()
                            throws IllegalActionException
Return false indicating that deadlock has not been resolved and that execution will be discontinued. In derived classes, override this method to obtain domain specific handling of deadlocks. Return false if a real deadlock has occurred and the simulation can be ended. Return true if the simulation can proceed given additional data and need not be terminated.

Returns:
False.
Throws:
IllegalActionException - Not thrown in this base class.

_requestFinishOnReceivers

private void _requestFinishOnReceivers()
Call requestFinish() on all receivers.