public class DatagramReader extends TypedAtomicActor
NOTE: This actor has been developed to work in the Discrete Event (DE) and Synchronous Data Flow (SDF) domains. Use elsewhere with caution.
NOTE: This actor has problems, the tests do not reliably pass.
For details, see https://chess.eecs.berkeley.edu/bugzilla/show_bug.cgi?id=54
.
The simplest scenario has the thread constantly stalled awaiting a packet. When a packet arrives, the thread quickly queues it in one of the buffers of the actor, calls the getDirector().fireAtCurrentTime(), and then stalls again awaiting the next packet. By stalling again and again, the thread keeps the actor aware at all times of incoming packets. This is particularly important if packets come in more quickly than the model can process them. Depending on the domain (e.g. DE) in which this actor is used, the director may respond to the fireAtCurrentTime() call of the thread by calling the fire() method of the actor. In this case, fire() then broadcasts the data received, along with the return address and return socket number from which the datagram originated.
The data portion of the packet is broadcast at the output port. The type of the output is always an array of bytes.
The return address and socket number are broadcast as String and int respectively. These tell where the received datagram originated from.
The behavior of the actor under less simple scenarios is governed by parameters of this actor. Additional packet(s) can arrive while the director is getting around to calling fire(). Conversely, the director may make extra calls to fire(), even before any datagrams have come in. I call these the eager packet and eager director scenarios respectively.
Background: There are two packet buffers. The thread and the fire() method share these buffers and maintain consistency via synchronization on the object _syncFireAndThread. This synchronization prevents conflicts when accessing the shared buffers and when accessing the count of queued packets.
The overwrite parameter applies to the eager packet scenario. Setting this parameter to true is useful in cases where it is possible for data to come in too fast for the model to process. This setting alleviates data gluts without undue loss of data when the model is able to keep up. When overwrite is set to true (the default), the actor discards the packet already received in favor of the new packet. If false, the new packet is queued behind the existing one. In the latter case, both buffers are now full. The thread then waits for fire() to consume a queued packet before it stalls again awaiting the next. In all other cases (overwrite true or no queued packets) the thread immediately stalls to await the next packet.
The blockAwaitingDatagram parameter applies to the eager director case. This case comes up most often in SDF, where an actor is expected to block in fire until an output can be produced. If true, a call to fire() will block unless or until a datagram has arrived. If false, then fire() returns without waiting, using the defaultOutput parameter in place of real data. The returnAddress and returnSocketNumber ports have default outputs as well, but they are not parameter-programmable.
NOTE: This actor has a parameter localSocketNumber for the port number assigned to its local datagram socket. Initially, the local socket number is set to 4004. There is no particular reason for choosing this number, except that is noticeable in the code and in Vergil, thus encouraging you to change it to any desired value in the range 0..65535. Note that socket numbers 0..1023 are generally reserved and numbers 1024 and above are generally available.
Some commonly used port numbers (a.k.a. socket numbers) are shown below:
Well-known Ports (Commonly Used Ports) 7 (Echo) 21 (FTP) 23 (TELNET) 25 (SMTP) 53 (DNS) 79 (finger) 80 (HTTP) 110 (POP3) 119 (NNTP) 161 (SNMP) 162 (SNMP Trap)Reference: http://192.168.1.1/Forward.htm (A webpage hosted from within the Linksys BEFSR41 Cable/DSL Router)
NOTE: This actor can also be configured to handle multicase datagram socket. A MulticastSocket is a DatagramSocket with additional capabilities to join groups of other multicast hosts on the internet. A multicast group is specified by a class D IP address and a standard UDP port number. When one member sends a packet to a multicast group, all recipients subscribing to that host and port receive the packet. Currently, The parameter defaultReturnAddress is overloaded to specify a multicast datagram IP address. When the return address is a multicast IP address, The parameter localSocketNumber is used to specify the UDP port number for the multicast group. A multicast IP address ranges from 224.0.0.0 to 239.255.255.255, inclusive. To send a packet to the group, the sender can be either a DatagramSocket or a MulticastSocket. The only difference is that MulticastSocket allows you to control the time-to-live of the datagram. Don't use 224.0.0.1 ~ 224.255.255.255 when the live time of is specified larger than 1.
FIXME: we might not want to overload the defaultReturnAddress and the localSocketNumber parameter...
Another useful tidbit is the command 'netstat'. This works in a DOS prompt and also in the UNIX-like Bash shell. In either shell, enter 'netstat -an'. This command shows current port allocations! Ports allocated to Ptolemy models are shown along with other port allocations. Other useful network commands include 'ping' and 'tracert'. Both TCP and UDP (datagram) ports are shown by netstat. FIXME: Find out whether a TCP port using a specific number blocks a UDP port from using that same number.
Yellow (winthrop) |
Yellow (winthrop) |
Entity.ContainedObjectsIterator
Modifier and Type | Field and Description |
---|---|
Parameter |
actorBufferLength
Length (in bytes) of each of the actor's two packet buffers for
receiving a datagram.
|
Parameter |
blockAwaitingDatagram
Whether to block in fire().
|
Parameter |
defaultOutput
The default for the output output.
|
Parameter |
defaultReturnAddress
The default for the returnAddress output.
|
Parameter |
defaultReturnSocketNumber
The default the returnSocketNumber output.
|
Parameter |
localSocketNumber
This actor's local socket (a.k.a. port) number.
|
TypedIOPort |
output
This port outputs the data portion of the received datagram
packet.
|
Parameter |
overwrite
Whether to overwrite when inundated with datagrams or let
them pile up.
|
Parameter |
platformBufferLength
Length (in bytes) of the buffer within java and/or the
platform layers below java.
|
TypedIOPort |
returnAddress
This port outputs the IP address portion of the received
datagram packet.
|
TypedIOPort |
returnSocketNumber
This port outputs the socket (a.k.a port) number portion of the
received datagram packet.
|
Parameter |
setPlatformBufferLength
Determine whether the platformBufferLength parameter will be
used to set the platform's receive buffer size.
|
TypedIOPort |
trigger
The trigger input port reads and discards a token from each
channel that has a token.
|
_typesValid
_actorFiringListeners, _initializables, _notifyingActorFiring, _stopRequested
_changeListeners, _changeLock, _changeRequests, _debugging, _debugListeners, _deferChangeRequests, _elementName, _isPersistent, _verbose, _workspace, ATTRIBUTES, CLASSNAME, COMPLETE, CONTENTS, DEEP, FULLNAME, LINKS
COMPLETED, NOT_READY, STOP_ITERATING
Constructor and Description |
---|
DatagramReader(CompositeEntity container,
java.lang.String name)
Construct an actor with the given container and name.
|
Modifier and Type | Method and Description |
---|---|
void |
attributeChanged(Attribute attribute)
React to a change of the given attribute.
|
java.lang.Object |
clone(Workspace workspace)
Clone the actor into the specified workspace.
|
void |
fire()
Broadcast a received datagram, or block awaiting one, or
broadcast default values.
|
void |
initialize()
Initialize this actor, including the creation of an evaluation
variable for the Ptolemy parser, a DatagramSocket for
receiving datagrams, and a SocketReadingThread for blocking in
the DatagramSocket.receive() method call.
|
void |
setContainer(CompositeEntity container)
Override the setContainer() method to call wrapup() if the
actor is deleted while the model is running.
|
void |
stop()
Request that execution of the current iteration stop as soon
as possible.
|
void |
stopFire()
Stop the fire() method, but only if it is blocked.
|
void |
wrapup()
Release resources acquired in the initialize() method,
specifically the evaluation variable, the DatagramSocket, and
the SocketReadingThread.
|
_containedTypeConstraints, _customTypeConstraints, _defaultTypeConstraints, _fireAt, _fireAt, attributeTypeChanged, clone, isBackwardTypeInferenceEnabled, newPort, typeConstraintList, typeConstraints
_actorFiring, _actorFiring, _declareDelayDependency, addActorFiringListener, addInitializable, connectionsChanged, createReceivers, declareDelayDependency, getCausalityInterface, getDirector, getExecutiveDirector, getManager, inputPortList, isFireFunctional, isStrict, iterate, newReceiver, outputPortList, postfire, prefire, preinitialize, pruneDependencies, recordFiring, removeActorFiringListener, removeDependency, removeInitializable, terminate
_adjustDeferrals, _checkContainer, _getContainedObject, _propagateExistence, getContainer, instantiate, isAtomic, isOpaque, moveDown, moveToFirst, moveToIndex, moveToLast, moveUp, propagateExistence, setName
_addPort, _description, _exportMoMLContents, _removePort, _validateSettables, connectedPortList, connectedPorts, containedObjectsIterator, getAttribute, getPort, getPorts, linkedRelationList, linkedRelations, portList, removeAllPorts, setClassDefinition, uniqueName
_setParent, exportMoML, getChildren, getElementName, getParent, getPrototypeList, isClassDefinition, isWithinClassDefinition
_addAttribute, _adjustOverride, _attachText, _cloneFixAttributeFields, _containedDecorators, _copyChangeRequestList, _debug, _debug, _debug, _debug, _debug, _executeChangeRequests, _getIndentPrefix, _isMoMLSuppressed, _markContentsDerived, _notifyHierarchyListenersAfterChange, _notifyHierarchyListenersBeforeChange, _propagateValue, _removeAttribute, _splitName, _stripNumericSuffix, addChangeListener, addDebugListener, addHierarchyListener, attributeDeleted, attributeList, attributeList, decorators, deepContains, depthInHierarchy, description, description, event, executeChangeRequests, exportMoML, exportMoML, exportMoML, exportMoML, exportMoMLPlain, getAttribute, getAttributes, getChangeListeners, getClassName, getDecoratorAttribute, getDecoratorAttributes, getDerivedLevel, getDerivedList, getDisplayName, getFullName, getModelErrorHandler, getName, getName, getSource, handleModelError, isDeferringChangeRequests, isOverridden, isPersistent, lazyContainedObjectsIterator, message, notifyOfNameChange, propagateValue, propagateValues, removeAttribute, removeChangeListener, removeDebugListener, removeHierarchyListener, requestChange, setClassName, setDeferringChangeRequests, setDerivedLevel, setDisplayName, setModelErrorHandler, setPersistent, setSource, sortContainedObjects, toplevel, toString, validateSettables, workspace
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
createReceivers, getCausalityInterface, getDirector, getExecutiveDirector, getManager, inputPortList, newReceiver, outputPortList
isFireFunctional, isStrict, iterate, postfire, prefire, terminate
addInitializable, preinitialize, removeInitializable
description, getContainer, getDisplayName, getFullName, getName, getName, setName
getDerivedLevel, getDerivedList, propagateValue
public TypedIOPort returnAddress
public TypedIOPort returnSocketNumber
public TypedIOPort output
public TypedIOPort trigger
public Parameter localSocketNumber
public Parameter actorBufferLength
public Parameter platformBufferLength
public Parameter setPlatformBufferLength
public Parameter overwrite
public Parameter blockAwaitingDatagram
public Parameter defaultReturnAddress
public Parameter defaultReturnSocketNumber
public Parameter defaultOutput
public DatagramReader(CompositeEntity container, java.lang.String name) throws NameDuplicationException, IllegalActionException
container
- The container.name
- The name of this actor.IllegalActionException
- If the actor cannot be contained
by the proposed container.NameDuplicationException
- If the container already has an
actor with this name.public void attributeChanged(Attribute attribute) throws IllegalActionException
attributeChanged
in class NamedObj
attribute
- The attribute that changed.IllegalActionException
- If the change is not
allowed.public java.lang.Object clone(Workspace workspace) throws java.lang.CloneNotSupportedException
clone
in class TypedAtomicActor
workspace
- The workspace for the new object.java.lang.CloneNotSupportedException
- If a derived class contains
an attribute that cannot be cloned.NamedObj.exportMoML(Writer, int, String)
,
NamedObj.setDeferringChangeRequests(boolean)
public void fire() throws IllegalActionException
fire
in interface Executable
fire
in class AtomicActor<TypedIOPort>
IllegalActionException
- If the data cannot be
converted into a token of the same type as the configured type
of the output port.public void initialize() throws IllegalActionException
initialize
in interface Initializable
initialize
in class AtomicActor<TypedIOPort>
IllegalActionException
- If the
localSocketNumber parameter has a value outside 0..65535
or a socket could not be created.public void setContainer(CompositeEntity container) throws IllegalActionException, NameDuplicationException
setContainer
in class AtomicActor<TypedIOPort>
container
- The proposed container.IllegalActionException
- If the action would result in a
recursive containment structure, or if
this entity and container are not in the same workspace.NameDuplicationException
- If the container already has
an entity with the name of this entity.ComponentEntity.getContainer()
public void stopFire()
stopFire
in interface Executable
stopFire
in class AtomicActor<TypedIOPort>
public void stop()
stop
in interface Executable
stop
in class AtomicActor<TypedIOPort>
public void wrapup() throws IllegalActionException
wrapup
in interface Initializable
wrapup
in class AtomicActor<TypedIOPort>
IllegalActionException
- Not thrown in this base class.