ptolemy.distributed.domains.sdf.kernel
Class DistributedSDFDirector

java.lang.Object
  extended by ptolemy.kernel.util.NamedObj
      extended by ptolemy.kernel.util.Attribute
          extended by ptolemy.actor.Director
              extended by ptolemy.actor.sched.StaticSchedulingDirector
                  extended by ptolemy.domains.sdf.kernel.SDFDirector
                      extended by ptolemy.distributed.domains.sdf.kernel.DistributedSDFDirector
All Implemented Interfaces:
java.io.Serializable, java.lang.Cloneable, Executable, Initializable, PeriodicDirector, Changeable, Debuggable, DebugListener, Derivable, ModelErrorHandler, MoMLExportable, Moveable, Nameable

public class DistributedSDFDirector
extends SDFDirector

Director for the distributed version of the synchronous dataflow (SDF) model of computation.

Distributed-SDF overview

The Distributed-SDF domain is an extended version of the existing SDF Domain that performs the simulation in a distributed manner.

Requirements

Features

A DistributedSDFDirector is the class that controls execution of actors under the distributed version of the SDF domain. It extends SDFDirector.

By default, actor scheduling is handled by the DistributedSDFScheduler class. Furthermore, the newReceiver method creates Receivers of type DistributedSDFReceiver, which extends SDFReceiver.

See ptolemy.domains.sdf.kernel.SDFScheduler for more information about the SDF Domain.

Parameters

Since:
Ptolemy II 5.1
Version:
$Id: DistributedSDFDirector.java 57040 2010-01-27 20:52:32Z cxh $
Author:
Daniel Lazaro Cuadrado (kapokasa@kom.aau.dk)
See Also:
DistributedServerRMIGeneric, DistributedSDFReceiver, DistributedSDFScheduler, SDFDirector, SDFScheduler, Serialized Form
Accepted Rating:
Red (cxh)
Proposed Rating:
Red (kapokasa)

Nested Class Summary
 
Nested classes/interfaces inherited from class ptolemy.kernel.util.NamedObj
NamedObj.ContainedObjectsIterator
 
Field Summary
private  java.util.HashMap actorsThreadsMap
          Map of Actors to Threads (that contain the Service).
private  ClientServerInteractionManager clientServerInteractionManager
          Encapsulates Jini functionality.
(package private)  java.util.HashMap commandsMap
          Map of commands to be executed.
private  java.lang.String configFileName
          The name of the Jini configuration file to be provided to the ClientServerInteractionManager.
 Parameter parallelExecution
          A Parameter representing whether a sequential or parallel execution will be performed.
 Parameter parallelSchedule
          A Parameter representing whether a sequential or parallel schedule will be computed.
 Parameter pipelining
          A Parameter representing whether a pipelined parallel execution will be performed.
private  ThreadSynchronizer synchronizer
          Performs synchronization of the ClientThreads and used to issue commandMaps.
private  boolean VERBOSE
          It states whether debugging messages should be printed.
 
Fields inherited from class ptolemy.domains.sdf.kernel.SDFDirector
_iterationCount, _periodicDirectorHelper, allowDisconnectedGraphs, allowRateChanges, constrainBufferSizes, iterations, period, synchronizeToRealTime, vectorizationFactor
 
Fields inherited from class ptolemy.actor.sched.StaticSchedulingDirector
_postfireReturns
 
Fields inherited from class ptolemy.actor.Director
_actorsFinishedExecution, _currentTime, _finishRequested, _initializables, _stopRequested, timeResolution
 
Fields inherited from class ptolemy.kernel.util.NamedObj
_changeListeners, _changeLock, _changeRequests, _debugging, _debugListeners, _elementName, _isPersistent, _verbose, _workspace, ATTRIBUTES, CLASSNAME, COMPLETE, CONTENTS, DEEP, FULLNAME, LINKS
 
Fields inherited from interface ptolemy.actor.Executable
COMPLETED, NOT_READY, STOP_ITERATING
 
Constructor Summary
DistributedSDFDirector()
          Construct a director in the default workspace with an empty string as its name.
DistributedSDFDirector(CompositeEntity container, java.lang.String name)
          Construct a director in the given container with the given name.
DistributedSDFDirector(Workspace workspace)
          Construct a director in the workspace with an empty name.
 
Method Summary
 void attributeChanged(Attribute attribute)
          React to a change in an attribute.
private  void bufferingPhase()
          Fills the queues with data tokens so that a fully parallel execution can be performed.
private  void connectActors()
          Interconnect all the remote actors in the same manner as the model's topology.
private  java.util.HashMap createServicesReceiversMap(Receiver[][] receivers)
          Create a map containing the services and Receivers ID's corresponding to a given bidimensional array of Receiver.
private  void distributeActorsOntoServers()
          Distribute the actors to the corresponding Service specified in the actorsThreadsMap and initialize them remotely.
private  void exitClientThreads()
          Create a commandMap containing and EXIT command for all the existing clientThreads and issue it to the synchronizer.
 void fire()
          Calculate the current schedule, if necessary, and iterate the contained actors in the order given by the schedule.
private  java.util.LinkedList getActors()
          Create a LinkedList containing all the instances of Actor contained by the CompositeActor in which this director is embedded.
private  java.util.LinkedList getServers()
          Get the Jini Services discovered.
private  void init()
          Initialize the object.
 void initialize()
          Initialize the actors associated with this director (super).
private  void initializeJini()
          Initializes Jini.
private  void mapActorsOntoServers()
          Map the actors on to the Services.
 Receiver newReceiver()
          Return a new receiver consistent with the Distributed-SDF domain.
private  void parallelFire()
          Perform the dispatching of the schedule in parallel to the distributed platform.
private  void pipelinedParallelFire()
          Perform the dispatching of the schedule in a pipelined parallel manner on to the distributed platform.
 void preinitialize()
          Preinitialize the actors associated with this director and compute the schedule (super).
private  void printActorsOntoServersMap()
          Print the actors-services mapping.
 void wrapup()
          Invoke the wrapup() method of all the actors contained in the director's container (super).
 
Methods inherited from class ptolemy.domains.sdf.kernel.SDFDirector
clone, createSchedule, fireAt, getModelNextIterationTime, periodValue, postfire, prefire, suggestedModalModelDirectors, supportMultirateFiring, transferInputs, transferOutputs
 
Methods inherited from class ptolemy.actor.sched.StaticSchedulingDirector
_setScheduler, addDebugListener, getScheduler, invalidateSchedule, isScheduleValid, removeDebugListener, setScheduler
 
Methods inherited from class ptolemy.actor.Director
_description, _fireContainerAt, _isEmbedded, _isTopLevel, _transferInputs, _transferOutputs, addInitializable, defaultDependency, finish, fireAt, fireAtCurrentTime, getCausalityInterface, getCurrentTime, getErrorTolerance, getGlobalTime, getModelStartTime, getModelStopTime, getModelTime, getNextIterationTime, getStartTime, getStopTime, getTimeResolution, implementsStrictActorSemantics, initialize, invalidateResolvedTypes, isFireFunctional, isStopRequested, isStrict, iterate, preinitialize, removeInitializable, requestInitialization, setContainer, setCurrentTime, setModelTime, stop, stopFire, terminate
 
Methods inherited from class ptolemy.kernel.util.Attribute
_checkContainer, _getContainedObject, _propagateExistence, getContainer, moveDown, moveToFirst, moveToIndex, moveToLast, moveUp, setName, updateContent
 
Methods inherited from class ptolemy.kernel.util.NamedObj
_addAttribute, _adjustOverride, _attachText, _cloneFixAttributeFields, _debug, _debug, _debug, _debug, _debug, _exportMoMLContents, _getIndentPrefix, _isMoMLSuppressed, _markContentsDerived, _propagateValue, _recordDecoratedAttributes, _removeAttribute, _splitName, _stripNumericSuffix, _validateSettables, addChangeListener, attributeList, attributeList, attributeTypeChanged, clone, containedObjectsIterator, deepContains, depthInHierarchy, description, description, event, executeChangeRequests, exportMoML, exportMoML, exportMoML, exportMoML, exportMoML, exportMoMLPlain, getAttribute, getAttribute, getAttributes, getChangeListeners, getClassName, getDecoratorAttribute, getDecoratorAttributes, getDerivedLevel, getDerivedList, getDisplayName, getElementName, getFullName, getModelErrorHandler, getName, getName, getPrototypeList, getSource, handleModelError, isDeferringChangeRequests, isOverridden, isPersistent, lazyContainedObjectsIterator, message, propagateExistence, propagateValue, propagateValues, removeChangeListener, requestChange, setClassName, setDeferringChangeRequests, setDerivedLevel, setDisplayName, setModelErrorHandler, setPersistent, setSource, sortContainedObjects, toplevel, toString, uniqueName, validateSettables, workspace
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface ptolemy.actor.Executable
isFireFunctional, isStrict, iterate, stop, stopFire, terminate
 
Methods inherited from interface ptolemy.actor.Initializable
addInitializable, removeInitializable
 
Methods inherited from interface ptolemy.kernel.util.Nameable
description, getContainer, getDisplayName, getFullName, getName, getName, setName
 

Field Detail

parallelSchedule

public Parameter parallelSchedule
A Parameter representing whether a sequential or parallel schedule will be computed. This parameter must be a boolean. The default value is false BooleanToken.


pipelining

public Parameter pipelining
A Parameter representing whether a pipelined parallel execution will be performed. A pipelined execution only makes sense when parallel execution is true. This parameter must be a boolean. The default value is false BooleanToken.


parallelExecution

public Parameter parallelExecution
A Parameter representing whether a sequential or parallel execution will be performed. This parameter must be a boolean. The default value is false BooleanToken.


VERBOSE

private boolean VERBOSE
It states whether debugging messages should be printed.


configFileName

private java.lang.String configFileName
The name of the Jini configuration file to be provided to the ClientServerInteractionManager. The name should be relative to $PTII and start with a "/".


clientServerInteractionManager

private ClientServerInteractionManager clientServerInteractionManager
Encapsulates Jini functionality.


actorsThreadsMap

private java.util.HashMap actorsThreadsMap
Map of Actors to Threads (that contain the Service).


synchronizer

private ThreadSynchronizer synchronizer
Performs synchronization of the ClientThreads and used to issue commandMaps.


commandsMap

java.util.HashMap commandsMap
Map of commands to be executed. This is used by the pipelined parallel execution for efficiency. Since new levels are added everytime to the commands map, after buffering, a sequence of a fully parallel commandsMap is used several times. There is no point in recalculating it again and it is reused from the buffering phase.

Constructor Detail

DistributedSDFDirector

public DistributedSDFDirector()
                       throws IllegalActionException,
                              NameDuplicationException
Construct a director in the default workspace with an empty string as its name. The director is added to the list of objects in the workspace. Increment the version number of the workspace. The DistributedSDFDirector will have a default scheduler of type DistributedSDFScheduler.

Throws:
IllegalActionException - If the name has a period in it, or the director is not compatible with the specified container.
NameDuplicationException - If the container already contains an entity with the specified name.

DistributedSDFDirector

public DistributedSDFDirector(Workspace workspace)
                       throws IllegalActionException,
                              NameDuplicationException
Construct a director in the workspace with an empty name. The director is added to the list of objects in the workspace. Increment the version number of the workspace. The DistributedSDFDirector will have a default scheduler of type DistributedSDFScheduler.

Parameters:
workspace - The workspace for this object.
Throws:
IllegalActionException - If the name has a period in it, or the director is not compatible with the specified container.
NameDuplicationException - If the container already contains an entity with the specified name.

DistributedSDFDirector

public DistributedSDFDirector(CompositeEntity container,
                              java.lang.String name)
                       throws IllegalActionException,
                              NameDuplicationException
Construct a director in the given container with the given name. The container argument must not be null, or a NullPointerException will be thrown. If the name argument is null, then the name is set to the empty string. Increment the version number of the workspace. The DistributedSDFDirector will have a default scheduler of type DistributedSDFScheduler.

Parameters:
container - Container of the director.
name - Name of this director.
Throws:
IllegalActionException - If the director is not compatible with the specified container. May be thrown in a derived class.
NameDuplicationException - If the container is not a CompositeActor and the name collides with an entity in the container.
Method Detail

attributeChanged

public void attributeChanged(Attribute attribute)
                      throws IllegalActionException
React to a change in an attribute. If the changed attribute matches a parameter of the director, then the corresponding local copy of the parameter value will be updated. If the attribute that changes is parallelSchedule the schedule is invalidated.

Overrides:
attributeChanged in class SDFDirector
Parameters:
attribute - The changed parameter.
Throws:
IllegalActionException - If the parameter set is not valid.

fire

public void fire()
          throws IllegalActionException
Calculate the current schedule, if necessary, and iterate the contained actors in the order given by the schedule. Depending on the value of parallelSchedule either a sequential or a parallel schedule will be computed. Depending on the value of parallelExecution either a sequential or a parallel execution of the schedule will be computed. No internal state of the director is updated during fire, so it may be used with domains that require this property, such as CT.

Iterating an actor involves calling the actor's iterate() method, which is equivalent to calling the actor's prefire(), fire() and postfire() methods in succession. If iterate() returns NOT_READY, indicating that the actor is not ready to execute, then an IllegalActionException will be thrown. The values returned from iterate() are recorded and are used to determine the value that postfire() will return at the end of the director's iteration.

This method may be overridden by some domains to perform additional domain-specific operations.

Specified by:
fire in interface Executable
Overrides:
fire in class StaticSchedulingDirector
Throws:
IllegalActionException - If any actor executed by this actor return false in prefire.

initialize

public void initialize()
                throws IllegalActionException
Initialize the actors associated with this director (super). If parallelExecution is true, the infrastructure for a distributed execution is initialized. Once the required number of services are discovered, the actors are mapped on to them, sent to the distributed services and virtually connected over the network.

Specified by:
initialize in interface Initializable
Overrides:
initialize in class SDFDirector
Throws:
IllegalActionException - If the initialize() method of one of the associated actors throws it, or if there is no scheduler.

newReceiver

public Receiver newReceiver()
Return a new receiver consistent with the Distributed-SDF domain.

Overrides:
newReceiver in class SDFDirector
Returns:
A new DistributedSDFReceiver.

preinitialize

public void preinitialize()
                   throws IllegalActionException
Preinitialize the actors associated with this director and compute the schedule (super). The schedule is computed during preinitialization so that hierarchical opaque composite actors can be scheduled properly, since the act of computing the schedule sets the rate parameters of the external ports. In addition, performing scheduling during preinitialization enables it to be present during code generation. The order in which the actors are preinitialized is arbitrary. The schedule computed will be either sequential or parallel depending on the value of parallelSchedule.

Specified by:
preinitialize in interface Initializable
Overrides:
preinitialize in class SDFDirector
Throws:
IllegalActionException - If the preinitialize() method of one of the associated actors throws it.

wrapup

public void wrapup()
            throws IllegalActionException
Invoke the wrapup() method of all the actors contained in the director's container (super). In case of parallelExecution being true, exit from all the client threads is performed. This method should be invoked once per execution. None of the other action methods should be invoked after it in the execution. This method is not synchronized on the workspace, so the caller should be.

Specified by:
wrapup in interface Initializable
Overrides:
wrapup in class Director
Throws:
IllegalActionException - If the wrapup() method of one of the associated actors throws it.
See Also:
ClientThread

bufferingPhase

private void bufferingPhase()
                     throws IllegalActionException
Fills the queues with data tokens so that a fully parallel execution can be performed. It performs firings of the different levels of the schedule, adding one more level in every round. For example for a parallel schedule consisting of three levels, first if fires the actors in level 1, followed by actors in levels 1 and 2.

Throws:
IllegalActionException - If there is no scheduler.

connectActors

private void connectActors()
                    throws IllegalActionException
Interconnect all the remote actors in the same manner as the model's topology. In other words, the connections defined by the model's topology are created virtually over the distributed platform. For each actor, a portReceiverMap is created. A portReceiverMap is a data structure representing for a given port the receivers it contains. In case the port is and input port it consists of a set of receivers ID's i.e. (inputport, (ID1, ..., IDn). In case of an outputport, it contains a map of services to receiver's IDs, i.e. (outputport, ((service1, (ID1, ..., IDi), ..., (servicen, (IDj, ..., IDr))). This structure is sent over the network to the corresponding service. The types of the port are also set on the remote actor.

Throws:
IllegalActionException - If the remote receivers can't be created.

createServicesReceiversMap

private java.util.HashMap createServicesReceiversMap(Receiver[][] receivers)
Create a map containing the services and Receivers ID's corresponding to a given bidimensional array of Receiver. i.e. ((service1, (ID1, ..., IDi), ..., (servicen, (IDj, ..., IDr)).

Parameters:
receivers - The bidimensional array of Receivers.
Returns:
A HashMap containing services and lists of Receiver IDs.

distributeActorsOntoServers

private void distributeActorsOntoServers()
Distribute the actors to the corresponding Service specified in the actorsThreadsMap and initialize them remotely.


exitClientThreads

private void exitClientThreads()
Create a commandMap containing and EXIT command for all the existing clientThreads and issue it to the synchronizer. This will results in all the clientThreads to terminate.

See Also:
ThreadSynchronizer, ClientThread

getActors

private java.util.LinkedList getActors()
Create a LinkedList containing all the instances of Actor contained by the CompositeActor in which this director is embedded.

Returns:
A LinkedList containing all the instances of Actor contained by the CompositeActor in which this director is embedded.

getServers

private java.util.LinkedList getServers()
Get the Jini Services discovered.

Returns:
A LinkedList containing Jini Services discovered.
See Also:
ClientServerInteractionManager

init

private void init()
           throws IllegalActionException,
                  NameDuplicationException
Initialize the object. In this case, we give the DistributedSDFDirector a default scheduler of the class DistributedSDFScheduler, a parallelSchedule parameter, a pipelining parameter and parallelExecution parameter.

Throws:
IllegalActionException
NameDuplicationException

initializeJini

private void initializeJini()
                     throws IllegalActionException
Initializes Jini. It creates an instance of ClientServerInteractionManager to ease the discovery. Set the required number of services to run the simulation, being the number of Actors in the model. This includes:
  • Prepare for discovery
  • Discover a Service Locator
  • Looking up Services
  • Filtering Services
  • Throws:
    IllegalActionException - If Jini cannot be initialized.
    See Also:
    ClientServerInteractionManager

    mapActorsOntoServers

    private void mapActorsOntoServers()
    Map the actors on to the Services. The mapping is trivial in the sense that no effort is made to optimize the mapping. ALL the actors are distributed to a Service not taking into account any QoS. One might want to create smarter ways to do the mapping... this is the place to do so. Create the client threads. For every actor that is distributed (ALL) a local clientThread is created to allow for parallel execution.

    See Also:
    ClientThread

    parallelFire

    private void parallelFire()
                       throws IllegalActionException
    Perform the dispatching of the schedule in parallel to the distributed platform. For each level of the Schedule, a commandMap is created and issued to the synchronizer. //TODO: This can be made real static, precalculate and issue might yield slight better results? Is it worth the effort?

    Throws:
    IllegalActionException - If port methods throw it.
    See Also:
    ThreadSynchronizer

    pipelinedParallelFire

    private void pipelinedParallelFire()
                                throws IllegalActionException
    Perform the dispatching of the schedule in a pipelined parallel manner on to the distributed platform. For each level of the Schedule, a commandMap is created and issued to the synchronizer.

    Throws:
    IllegalActionException - If there is no scheduler.
    IllegalActionException - If port methods throw it.
    See Also:
    ThreadSynchronizer

    printActorsOntoServersMap

    private void printActorsOntoServersMap()
    Print the actors-services mapping.