/* Director for SysML in the style of IBM Rational Rhapsody. Copyright (c) 2012-2014 The Regents of the University of California. All rights reserved. Permission is hereby granted, without written agreement and without license or royalty fees, to use, copy, modify, and distribute this software and its documentation for any purpose, provided that the above copyright notice and the following two paragraphs appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. PT_COPYRIGHT_VERSION_2 COPYRIGHTENDKEY */ package ptolemy.domains.sysml.kernel; import java.util.Collections; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.PriorityQueue; import java.util.concurrent.ConcurrentHashMap; import ptolemy.actor.Actor; import ptolemy.actor.FiringEvent; import ptolemy.actor.FiringsRecordable; import ptolemy.actor.IOPort; import ptolemy.actor.Mailbox; import ptolemy.actor.NoRoomException; import ptolemy.actor.NoTokenException; import ptolemy.actor.Receiver; import ptolemy.actor.process.ProcessDirector; import ptolemy.actor.process.ProcessThread; import ptolemy.actor.util.Time; import ptolemy.data.BooleanToken; import ptolemy.data.Token; import ptolemy.data.expr.Parameter; import ptolemy.kernel.CompositeEntity; import ptolemy.kernel.util.Attribute; import ptolemy.kernel.util.IllegalActionException; import ptolemy.kernel.util.InternalErrorException; import ptolemy.kernel.util.NameDuplicationException; import ptolemy.kernel.util.Nameable; import ptolemy.kernel.util.Workspace; /////////////////////////////////////////////////////////////////// //// SysMLConcurrentDirector /**

Concurrent version of a SysML director. This version is inspired by a subset of the semantics of IBM Rational's Rhapsody SysML tool. In this MoC, each actor executes in its own thread (corresponding to an "active object" in SysML). Inputs provided to an input port (by the thread of another actor) are put into a single queue belonging to the destination actor. The thread for the destination actor retrieves the first input in the queue and uses it to set the value of exactly one input port. All other input ports are marked absent. The actor then fires, possibly producing one or more outputs which are directed to their destination actors.

When multiple actors send tokens to an actor, whether to the same port or to distinct ports, this MoC is nondeterministic. The order in which the tokens are processed will depend on the happenstances of scheduling, since the tokens are put into a single queue in the order in which they arrive.

In this MoC, we assume that an actor iterates within its thread only if either it has called fireAt() to request a future firing (or a re-firing at the current time), or it has at least one event in its input queue. Thus, the actor's thread will block until one of those conditions is satisfied.

When all threads are blocked, then if at least one has called fireAt() to request a future firing, then this director will advance model time to the smallest time of such a request, and then again begin executing actors until they all block.

When all actors are blocked, and none has called fireAt(), the model terminates. @author Edward A. Lee @version $Id: SysMLConcurrentDirector.java 70402 2014-10-23 00:52:20Z cxh $ @since Ptolemy II 10.0 @Pt.ProposedRating Red (eal) @Pt.AcceptedRating Red (eal) */ public class SysMLConcurrentDirector extends ProcessDirector { /** Construct a director in the given container with the given name. * @param container Container of the director. * @param name Name of this director. * @exception IllegalActionException If the director is not compatible * with the specified container. Thrown in derived classes. * @exception NameDuplicationException If the container not a * CompositeActor and the name collides with an entity in the container. */ public SysMLConcurrentDirector(CompositeEntity container, String name) throws IllegalActionException, NameDuplicationException { super(container, name); } /////////////////////////////////////////////////////////////////// //// public methods //// /** Clone the director into the specified workspace. * @param workspace The workspace for the cloned object. * @exception CloneNotSupportedException If one of the attributes * cannot be cloned. * @return The new PNDirector. */ @Override public Object clone(Workspace workspace) throws CloneNotSupportedException { SysMLConcurrentDirector newObject = (SysMLConcurrentDirector) super .clone(workspace); newObject._actorData = new ConcurrentHashMap(); newObject._nextTime = Time.POSITIVE_INFINITY; newObject._winningThreads = new LinkedList(); return newObject; } /** Start a new iteration (at a new time, presumably) and * wait until a deadlock is detected. * Then deal with the deadlock * by calling the protected method _resolveDeadlock() and return. * This method is synchronized on the director. * @exception IllegalActionException If a derived class throws it. */ @Override public synchronized void fire() throws IllegalActionException { // Mark threads unblocked that were waiting for time to advance, if any. for (SingleQueueProcessThread thread : _winningThreads) { threadUnblocked(thread, null); } _winningThreads.clear(); // Notify all of a new iteration. notifyAll(); // The superclass does all the work. super.fire(); } /** Override the base class to make a local record of the requested * firing. * @param actor The actor scheduled to be fired. * @param time The requested time. * @param microstep The requested microstep. * @return An instance of Time with the current time value, or * if there is an executive director, the time at which the * container of this director will next be fired * in response to this request. * @exception IllegalActionException If there is an executive director * and it throws it. Derived classes may choose to throw this * exception for other reasons. */ @Override public synchronized Time fireAt(Actor actor, Time time, int microstep) throws IllegalActionException { ActorData actorData = _actorData.get(actor); if (actorData == null) { throw new IllegalActionException(this, actor, "Nothing known about actor."); } actorData.fireAtTimes.add(time); if (_debugging) { _debug(actor.getFullName() + " requests firing at time " + time); } return time; } /** Invoke the initialize() method of ProcessDirector. Also set all the * state variables to the their initial values. The list of process * listeners is not reset as the developer might want to reuse the * list of listeners. * @exception IllegalActionException If the initialize() method of one * of the deeply contained actors throws it. */ @Override public synchronized void initialize() throws IllegalActionException { // Recreate actor data. _actorData.clear(); _nextTime = Time.POSITIVE_INFINITY; _winningThreads.clear(); // Put the container of this director into the directory // so that we can send data to the inside of its output // ports, if it has any. Actor container = (Actor) getContainer(); ActorData actorData = new ActorData(); _actorData.put(container, actorData); // Use synchronized version. actorData.inputQueue = Collections .synchronizedList(new LinkedList()); // Initialize the count of actors that are initialized. // This counts the container of this director, hence we // initialize to 1. _actorsInitialized = 1; // The following calls initialize(Actor), which may // create the threads and start them. // It also initializes the _queueDirectory structure. super.initialize(); // Start threads for actors created since the last invocation // of the prefire() or initialize() method. I'm not sure why // the base class postpones starting threads until prefire(), // but if we change the base class to start threads, then some // tests in PN fail. Iterator threads = _newActorThreadList.iterator(); while (threads.hasNext()) { ProcessThread procThread = (ProcessThread) threads.next(); procThread.start(); } _newActorThreadList.clear(); // Actors may have called fireAt() when initialized. // If so, we need to delegate the fireAt() up the hierarchy. if (isEmbedded()) { // Now we need to wait until all the threads have at least past // their initialize() method because the initialize() method may // call fireAt(), and we need to translate those calls into a // fireContainerAt() call, in case this director is embedded. synchronized (this) { while (_actorsInitialized < _actorData.size() && !_stopRequested) { try { wait(); } catch (InterruptedException e) { throw new IllegalActionException(this, "Interrupted while waiting for actors to initialize."); } } } // If any actor called fireAt() during initialization, then // find the earliest requested firing time and convey a fireAt() // call to the container. Time nextFiringTime = _earliestNextFiringTime(); if (nextFiringTime.compareTo(Time.POSITIVE_INFINITY) < 0) { fireContainerAt(nextFiringTime); } } } /** Initialize the given actor. This method is generally called * by the initialize() method of the director, and by the manager * whenever an actor is added to an executing model as a * mutation. This method will generally perform domain-specific * initialization on the specified actor and call its * initialize() method. In this base class, only the actor's * initialize() method of the actor is called and no * domain-specific initialization is performed. Typical actions * a director might perform include starting threads to execute * the actor or checking to see whether the actor can be managed * by this director. For example, a time-based domain (such as * CT) might reject sequence based actors. * @param actor The actor that is to be initialized. * @exception IllegalActionException If the actor is not * acceptable to the domain. Not thrown in this base class. */ @Override public void initialize(Actor actor) throws IllegalActionException { if (_debugging) { _debug("Initializing actor: " + ((Nameable) actor).getFullName() + "."); } ActorData actorData = new ActorData(); _actorData.put(actor, actorData); // Use synchronized version. actorData.inputQueue = Collections .synchronizedList(new LinkedList()); // NOTE: The following does NOT initialize the actors. They initialize // themselves in the threads that are created by the superclass. // The following line will start those threads. super.initialize(actor); } /** Return a new receiver SysMLAReceiver. * @return A new SysMLAReceiver. */ @Override public Receiver newReceiver() { try { return new SysMLAReceiver(); } catch (IllegalActionException e) { throw new InternalErrorException(e); } } /** Return false if a stop has been requested or if * the model has reached deadlock. Otherwise, if there is a * pending fireAt request, either advance time to that * requested time (if at the top level) or request a * firing at that time. If there is no pending fireAt * request, then return false. Otherwise, return true. * @return False if the director has detected a deadlock or * a stop has been requested. * @exception IllegalActionException If a derived class throws it. */ @Override public boolean postfire() throws IllegalActionException { super.postfire(); // Determine the earliest time at which an actor wants to be fired next. Time earliestFireAtTime = _earliestNextFiringTime(); if (earliestFireAtTime == Time.POSITIVE_INFINITY) { // Time does not advance. // Advance directly to the stop time, if it is finite, unless // this is embedded, in which case, just wait for the environment. if (!isEmbedded()) { earliestFireAtTime = getModelStopTime(); if (earliestFireAtTime == null || earliestFireAtTime == Time.POSITIVE_INFINITY) { // If the stop time is also infinity, then stop execution. // FIXME: If there are actors with unpredictable events, // such as FMUs, then this might not be what we want to do. // For now, we require a finite stop time so that the model // will attempt a finite step size. if (_debugging) { _debug("No pending events and stop time is not given. Stopping execution."); } stop(); return false; } else { _nextTime = getModelStopTime(); return true; } } } if (earliestFireAtTime.compareTo(getModelStopTime()) > 0) { // The next available time is past the stop time. if (_debugging) { _debug("Next firing request is beyond the model stop time of " + getModelStopTime()); } stop(); return false; } if (_debugging) { _debug("Next earliest fire at request is at time " + earliestFireAtTime); } _nextTime = earliestFireAtTime; if (_nextTime.compareTo(Time.POSITIVE_INFINITY) < 0) { if (_nextTime.compareTo(getModelStopTime()) > 0) { return false; } if (isEmbedded()) { fireContainerAt(_nextTime); if (_debugging) { _debug("+++ Requeting refiring at " + _nextTime); } } else { setModelTime(_nextTime); if (_debugging) { _debug("+++ Advancing time to " + _nextTime); } } } _nextTime = Time.POSITIVE_INFINITY; return _notDone; } /** Override the base class to set time to match environment * time if this director is embedded. * @return Whatever the superclass returns. * @exception IllegalActionException Not thrown in this base class. */ @Override public boolean prefire() throws IllegalActionException { if (_debugging) { _debug("Director: Called prefire()."); } if (isEmbedded()) { setModelTime(localClock.getLocalTimeForCurrentEnvironmentTime()); if (_debugging) { _debug("-- Setting current time to " + getModelTime()); } } return super.prefire(); } /** Transfer at most one token from an input * port of the container to the ports * it is connected to on the inside. * @param port The port. * @return True if tokens were transferred. * @exception IllegalActionException If transfer fails. */ @Override public boolean transferInputs(IOPort port) throws IllegalActionException { return _transferInputs(port); } /** For all inputs in the input queue of the container of this * actor, put the input token into the inside of the corresponding * output port and then transfer outputs from that port. * @exception IllegalActionException If transfer fails. */ @Override public void transferOutputs() throws IllegalActionException { ActorData actorData = _actorData.get(getContainer()); while (!actorData.inputQueue.isEmpty()) { Input input = actorData.inputQueue.remove(0); input.receiver.reallyPut(input.token); // FIXME: This should probably be done when // the event is dequeued from the single global queue, // in sequential mode. _transferOutputs(input.receiver.getContainer()); } } /////////////////////////////////////////////////////////////////// //// protected methods //// /** Override the base class to return true if all active threads are blocked. * @return True if all active threads are blocked. */ @Override protected synchronized boolean _areThreadsDeadlocked() { return _getBlockedThreadsCount() >= _getActiveThreadsCount(); } /** Clear all the input receivers for the specified actor. * @param actor The actor. * @exception IllegalActionException If the receivers can't be cleared. */ protected void _clearReceivers(Actor actor) throws IllegalActionException { List inputPorts = actor.inputPortList(); for (IOPort inputPort : inputPorts) { if (_isFlowPort(inputPort)) { continue; } Receiver[][] receivers = inputPort.getReceivers(); for (Receiver[] receiver : receivers) { for (int j = 0; j < receiver.length; j++) { if (receiver[j] != null) { receiver[j].clear(); } } } } } /** Return the earliest time that any actor has requested * a refiring. As a side effect, update the _winningThreads * set to identify the threads that will be unblocked * when time is advanced. * @return The next requested firing time. */ protected Time _earliestNextFiringTime() { Time earliestFireAtTime = Time.POSITIVE_INFINITY; _winningThreads.clear(); List actors = ((CompositeEntity) getContainer()) .deepEntityList(); for (Actor actor : actors) { ActorData actorData = _actorData.get(actor); if (actorData == null) { // Actor is not active. continue; } if (actorData.fireAtTimes != null && actorData.fireAtTimes.size() > 0) { Time otherTime = actorData.fireAtTimes.peek(); if (earliestFireAtTime.compareTo(otherTime) >= 0) { earliestFireAtTime = otherTime; _winningThreads.add(actorData.thread); } } } return earliestFireAtTime; } /** Iterate the specified actor once. * @param actor The actor to be iterated once. * @return True if either prefire() returns false * or postfire() returns true. * @exception IllegalActionException If the actor throws it. */ protected boolean _iterateActorOnce(Actor actor) throws IllegalActionException { if (_debugging) { _debug("---" + actor.getFullName() + ": Iterating."); } FiringsRecordable firingsRecordable = null; if (actor instanceof FiringsRecordable) { firingsRecordable = (FiringsRecordable) actor; } if (firingsRecordable != null) { firingsRecordable.recordFiring(FiringEvent.BEFORE_PREFIRE); } boolean result = true; if (actor.prefire()) { if (firingsRecordable != null) { firingsRecordable.recordFiring(FiringEvent.AFTER_PREFIRE); firingsRecordable.recordFiring(FiringEvent.BEFORE_FIRE); } actor.fire(); if (firingsRecordable != null) { firingsRecordable.recordFiring(FiringEvent.AFTER_FIRE); firingsRecordable.recordFiring(FiringEvent.BEFORE_POSTFIRE); } result = actor.postfire(); if (firingsRecordable != null) { firingsRecordable.recordFiring(FiringEvent.AFTER_POSTFIRE); } } else if (firingsRecordable != null) { firingsRecordable.recordFiring(FiringEvent.AFTER_PREFIRE); } if (!result) { // Postfire returned false. Remove the actor from // the active actors. if (_debugging) { _debug(actor.getFullName() + " postfire() returns false. Terminating actor."); } _actorData.remove(actor); } return result; } /** Iterate the specified actor until its input queue * is empty and any pending fireAt() time requests * are in the future. NOTE: This method is used * only if activeObjects = false. * @param actor The actor to be run. * @return The earliest pending fireAt time in the * future, or TIME.POSITIVE_INFINITY if there is none. * @exception IllegalActionException If the actor throws it. */ protected Time _runToCompletion(Actor actor) throws IllegalActionException { // First, clear all input receivers that are not marked as flow ports. // Record whether the actor actually has any input receivers. _clearReceivers(actor); ActorData actorData = _actorData.get(actor); if (_debugging) { _debug("******* Iterating actor " + actor.getName() + " at time " + getModelTime()); _debug("input queue: " + actorData.inputQueue); } if (actorData.inputQueue.size() == 0) { // Input queue is empty. if (actorData.fireAtTimes.size() == 0) { // NOTE: Tried out a semantics where every actors fires at least // once at every time step. Does Rhapsody do this? // But this doesn't really work. This actor may produce // an output at the current time, creating an event downstream // to be processed. Run-to-completion semantics will never // terminate, and we get stuck. /* if (!_iterateActorOnce(actor)) { // Postfire returned false. return Time.POSITIVE_INFINITY; } // If the input queue is still empty and there are still no // pending fireAt() requests, then we are done. if (actorData.inputQueue.size() == 0 && actorData.fireAtTimes.size() == 0) { */ // Input queue is empty and no future firing // has been requested. Nothing more to do. if (_debugging) { _debug(actor.getFullName() + " at time " + getModelTime() + " waiting for input."); } return Time.POSITIVE_INFINITY; /* } */ } // If this actor has requested a future firing, // then continue as long as that time has been reached. while (actorData.fireAtTimes.size() > 0 && !_stopRequested) { // Actor has requested a firing. Get the time for the request. Time targetTime = actorData.fireAtTimes.peek(); // If time has not advanced sufficiently, then we are done. if (getModelTime().compareTo(targetTime) < 0) { if (_debugging) { _debug(actor.getFullName() + " at time " + getModelTime() + " waiting for time to advance to " + targetTime); } return targetTime; } // If we get here, queue is empty, but current time // matches the target time, so we iterate anyway. // Remove the time from the pending fireAt times. actorData.fireAtTimes.poll(); if (!_iterateActorOnce(actor)) { // Postfire returned false. return Time.POSITIVE_INFINITY; } } // If we get here, then there are no pending fireAt requests // and the input queue is empty. Nothing more to do. return Time.POSITIVE_INFINITY; } while (actorData.inputQueue.size() != 0 && !_stopRequested) { // Input queue is not empty. Extract // an input from the queue and deposit in the receiver, unless // it is an event from a flow port, in which case the value // has already been updated. Input input = actorData.inputQueue.remove(0); if (!input.isChangeEvent) { input.receiver.reallyPut(input.token); if (_debugging) { IOPort port = input.receiver.getContainer(); int channel = port.getChannelForReceiver(input.receiver); _debug(actor.getFullName() + ": Providing input to port " + port.getName() + " on channel " + channel + " with value: " + input.token); } } else if (_debugging) { IOPort port = input.receiver.getContainer(); int channel = port.getChannelForReceiver(input.receiver); SysMLConcurrentDirector.this._debug(actor.getFullName() + ": Providing change event to port " + port.getName() + " on channel " + channel); } if (!_iterateActorOnce(actor)) { return Time.POSITIVE_INFINITY; } } // If there are now pending fireAt requests, then we // should return the earliest one. if (actorData.fireAtTimes.size() > 0) { return actorData.fireAtTimes.peek(); } else { return Time.POSITIVE_INFINITY; } } /** Return true if the specified port is a flow port. * @param port The port. * @return True if the port contains a boolean-valued parameter named "flow" * with value true. */ protected boolean _isFlowPort(IOPort port) { boolean isFlowPort = false; Attribute flowPortMarker = port.getAttribute("flow"); if (flowPortMarker instanceof Parameter) { try { Token flowPortMarkerValue = ((Parameter) flowPortMarker) .getToken(); if (flowPortMarkerValue instanceof BooleanToken && ((BooleanToken) flowPortMarkerValue).booleanValue()) { isFlowPort = true; } } catch (IllegalActionException e) { // If we get an exception, ignore and assume it's not // a flow port. } } return isFlowPort; } /** Create a new ProcessThread for controlling the actor that * is passed as a parameter of this method. * @param actor The actor that the created ProcessThread will * control. * @param director The director that manages the model that the * created thread is associated with. * @return Return a new ProcessThread that will control the * actor passed as a parameter for this method. * @exception IllegalActionException If creating a new ProcessThread * throws it. */ @Override protected ProcessThread _newProcessThread(Actor actor, ProcessDirector director) throws IllegalActionException { return new SingleQueueProcessThread(actor, director); } /** Return true indicating that deadlock has been resolved * and that execution should continue. The postfire() method * will deal with determining whether execution really should continue. * @return True. * @exception IllegalActionException Not thrown in this class. */ @Override protected synchronized boolean _resolveDeadlock() throws IllegalActionException { return true; } /////////////////////////////////////////////////////////////////// //// private variables //// /** Count of actors whose threads have completed their initialize method. */ private int _actorsInitialized; /** Directory of data associated with each actor. */ private Map _actorData = new ConcurrentHashMap(); /** Earliest time of a fireAt request among all actors. */ private Time _nextTime = Time.POSITIVE_INFINITY; /** Threads waiting for the next advance of time. */ private List _winningThreads = new LinkedList(); /////////////////////////////////////////////////////////////////// //// inner classes //// /** Data structure for data associated with an actor. */ private static class ActorData { /** Fire at times by actor. */ public PriorityQueue