/* An output port that publishes its data on a named channel. Copyright (c) 1997-2014 The Regents of the University of California. All rights reserved. Permission is hereby granted, without written agreement and without license or royalty fees, to use, copy, modify, and distribute this software and its documentation for any purpose, provided that the above copyright notice and the following two paragraphs appear in all copies of this software. IN NO EVENT SHALL THE UNIVERSITY OF CALIFORNIA BE LIABLE TO ANY PARTY FOR DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS DOCUMENTATION, EVEN IF THE UNIVERSITY OF CALIFORNIA HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THE UNIVERSITY OF CALIFORNIA SPECIFICALLY DISCLAIMS ANY WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS ON AN "AS IS" BASIS, AND THE UNIVERSITY OF CALIFORNIA HAS NO OBLIGATION TO PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS. PT_COPYRIGHT_VERSION_2 COPYRIGHTENDKEY */ package ptolemy.actor; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import ptolemy.actor.util.DFUtilities; import ptolemy.data.ArrayToken; import ptolemy.data.BooleanToken; import ptolemy.data.IntToken; import ptolemy.data.Token; import ptolemy.data.expr.StringParameter; import ptolemy.data.expr.Variable; import ptolemy.kernel.ComponentEntity; import ptolemy.kernel.CompositeEntity; import ptolemy.kernel.Entity; import ptolemy.kernel.InstantiableNamedObj; import ptolemy.kernel.Port; import ptolemy.kernel.util.Attribute; import ptolemy.kernel.util.IllegalActionException; import ptolemy.kernel.util.InternalErrorException; import ptolemy.kernel.util.NameDuplicationException; import ptolemy.kernel.util.NamedObj; /////////////////////////////////////////////////////////////////// //// SubscriberPort /** This is a specialized input port that subscribes to data sent to it on the specified named channel. The tokens are "tunneled" from an instance of {@link PublisherPort} that names the same channel. If {@link #global} is false (the default), then this subscriber will only see instances of PublisherPort that are under the control of the same director. That is, it can be at a different level of the hierarchy, or in an entirely different composite actor, as long as the relevant composite actors are transparent (have no director). If {@link #global} is true, then the publisher may be anywhere in the model, as long as its global parameter is also true.
Any number of instances of SubscriberPort can subscribe to the same channel.
This actor actually has a hidden input port that is connected
to the publisher via hidden "liberal links" (links that are
allowed to cross levels of the hierarchy). Consequently,
any data dependencies that the director might assume on a regular
"wired" connection will also be assumed across Publisher-Subscriber
pairs. Similarly, type constraints will propagate across
Publisher-Subscriber pairs. That is, the type of the Subscriber
output will match the type of the Publisher input.
@author Edward A. Lee, Contributor: Christopher Brooks
@version $Id: SubscriberPort.java 70398 2014-10-22 23:44:32Z cxh $
@since Ptolemy II 10.0
@Pt.ProposedRating Yellow (eal)
@Pt.AcceptedRating Red (eal)
*/
public class SubscriberPort extends PubSubPort {
/** Construct a subscriber port with a containing actor and a name.
* This is always an input port.
* @param container The container actor.
* @param name The name of the port.
* @exception IllegalActionException If the port is not of an acceptable
* class for the container, or if the container does not implement the
* Actor interface.
* @exception NameDuplicationException If the name coincides with
* a port already in the container.
*/
public SubscriberPort(ComponentEntity container, String name)
throws IllegalActionException, NameDuplicationException {
super(container, name);
setOutput(false);
setInput(true);
// In order for this to show up in the vergil library, it has to have
// an icon description.
_attachText("_smallIconDescription", "\n");
}
///////////////////////////////////////////////////////////////////
//// public methods ////
/** If a publish and subscribe channel is set, then set up the connections.
* @param attribute The attribute that changed.
* @exception IllegalActionException Thrown if the new color attribute cannot
* be created.
*/
@Override
public void attributeChanged(Attribute attribute)
throws IllegalActionException {
if (attribute == channel) {
String newValue = channel.stringValue();
if (!newValue.equals(_channel)) {
NamedObj immediateContainer = getContainer();
if (immediateContainer != null) {
NamedObj container = immediateContainer.getContainer();
if (container instanceof CompositeActor
&& !(_channel == null || _channel.trim().equals(""))) {
((CompositeActor) container).unlinkToPublishedPort(
_channel, this, _global);
}
}
_channel = newValue;
}
} else if (attribute == global) {
boolean newValue = ((BooleanToken) global.getToken())
.booleanValue();
if (newValue == false && _global == true) {
NamedObj immediateContainer = getContainer();
if (immediateContainer != null) {
NamedObj container = immediateContainer.getContainer();
if (container instanceof CompositeActor
&& !(_channel == null || _channel.trim().equals(""))) {
((CompositeActor) container).unlinkToPublishedPort(
_channel, this, _global);
}
}
}
_global = newValue;
// Do not call SubscriptionAggregator.attributeChanged()
// because it will remove the published port name by _channel.
// If _channel is set to a real name (not a regex pattern),
// Then chaos ensues. See test 3.0 in SubscriptionAggregator.tcl
} else if (attribute == initialTokens) {
// Set the initial token parameter for the benefit of SDF.
// If this port is not opaque, SDF will not see it, so we
// will need in preinitialize() to set the init production
// of the inside ports.
Token initialOutputsValue = initialTokens.getToken();
if (initialOutputsValue != null) {
if (!(initialOutputsValue instanceof ArrayToken)) {
throw new IllegalActionException(this,
"initialOutputs value is required to be an array.");
}
int length = ((ArrayToken) initialOutputsValue).length();
DFUtilities.setOrCreate(this, "tokenInitProduction", length);
}
} else {
super.attributeChanged(attribute);
}
}
/** Notify this object that the containment hierarchy above it has
* changed. This restores the tokenInitConsumption parameters of
* any ports that had that parameter changed in a previous
* call to preinitialize().
* @exception IllegalActionException If the change is not
* acceptable.
*/
@Override
public void hierarchyChanged() throws IllegalActionException {
// If we have previously set the tokenInitConsumption variable
// of some port, restore it now to its original value.
if (_tokenInitConsumptionSet != null) {
for (IOPort port : _tokenInitConsumptionSet.keySet()) {
String previousValue = _tokenInitConsumptionSet.get(port);
Variable variable = DFUtilities.getRateVariable(port,
"tokenInitConsumption");
if (previousValue == null) {
try {
variable.setContainer(null);
} catch (NameDuplicationException e) {
// Should not occur.
throw new InternalErrorException(e);
}
} else {
variable.setExpression(previousValue);
}
}
}
super.hierarchyChanged();
}
/** Notify this object that the containment hierarchy above it will be
* changed, which results in the channel being unlinked from the publisher.
* @exception IllegalActionException If unlinking to a published port fails.
*/
@Override
public void hierarchyWillChange() throws IllegalActionException {
if (channel != null) {
String channelValue = null;
try {
// The channel may refer to parameters via $
// but the parameters are not yet in scope.
channelValue = channel.stringValue();
} catch (Throwable throwable) {
channelValue = channel.getExpression();
}
if (channelValue != null) {
NamedObj immediateContainer = getContainer();
if (immediateContainer != null) {
NamedObj container = immediateContainer.getContainer();
if (container instanceof CompositeActor) {
((CompositeActor) container).unlinkToPublishedPort(
channelValue, this);
}
}
}
}
super.hierarchyWillChange();
}
/** If {@link #initialTokens} has been set, then make available the
* inputs specified by its array value.
*/
@Override
public void initialize() throws IllegalActionException {
if (((InstantiableNamedObj) getContainer()).isWithinClassDefinition()) {
// Don't initialize Class Definitions.
// FIXME: Probably shouldn't even be a registered Initializable.
// See $PTII/ptolemy/actor/lib/test/auto/PublisherToplevelSubscriberPortAOC.xml
return;
}
// If the publisher port is not opaque and is an instance of
// ConstantPublisherPort, then we have some work to do. If
// this port is opaque, we set it to return a constant value
// provided by the ConstantPublisherPort. If not, then we have
// set the inside destination ports to return constant values.
if (_publisherPort instanceof ConstantPublisherPort) {
Token constantToken = ((ConstantPublisherPort) _publisherPort).constantValue
.getToken();
Token limitToken = ((ConstantPublisherPort) _publisherPort).numberOfTokens
.getToken();
int limit = ((IntToken) limitToken).intValue();
if (isOpaque()) {
_setConstant(constantToken, limit);
} else {
// NOTE: insideSinkPortList() doesn't work here if the
// port is transparent. The returned list is empty,
// unfortunately, so we have duplicate that functionality
// here.
Director dir = ((Actor) getContainer()).getDirector();
int depthOfDirector = dir.depthInHierarchy();
LinkedList