ptolemy.actor.ptalon.lib
Class ReduceWorker

java.lang.Object
  extended by ptolemy.kernel.util.NamedObj
      extended by ptolemy.kernel.InstantiableNamedObj
          extended by ptolemy.kernel.Entity
              extended by ptolemy.kernel.ComponentEntity
                  extended by ptolemy.actor.AtomicActor
                      extended by ptolemy.actor.TypedAtomicActor
                          extended by ptolemy.actor.ptalon.lib.ReduceWorker
All Implemented Interfaces:
java.io.Serializable, java.lang.Cloneable, Actor, Executable, FiringsRecordable, Initializable, TypedActor, Changeable, Debuggable, DebugListener, Derivable, Instantiable, ModelErrorHandler, MoMLExportable, Moveable, Nameable

public class ReduceWorker
extends TypedAtomicActor

A ReduceWorker actor, as a subsystem of the MapReduce system.

This actor has a parameter classNameForReduce which is the qualified name for a Java class that extends ptolemy.actor.ptalon.lib.MapReduceAlgorithm. It must also have a no argument constructor. By extending this abstract class, it will implement a method named reduce with type signature:

public List<String> reduce(String key, BlockingQueue<String> value)

This method defines the Reduce algorithm for the MapReduce system. At each call, it should return a list of Strings, which is a reduction of the list of input values. At each firing, this actor inputs all avaiable input keys and values. It outputs the value tokens when its doneReading port recieves a true value. This should only happen after all inputs have been sent to the system.

When implementing a custom reduce method in a subclass of MapReduceAlgorithm, note to use the take method to get values from the queue. Call the isQueueEmpty of MapReduceAlgorithm to test if this actor has stopped putting values on the queue and that all values have been taken from the queue. The last element of the queue will allways be the empty string. Ignore this value.

Since:
Ptolemy II 6.1
Version:
$Id: ReduceWorker.java 57040 2010-01-27 20:52:32Z cxh $
Author:
Adam Cataldo
See Also:
KeyValuePair, Serialized Form
Accepted Rating:
Red (cxh)
Proposed Rating:
Red (cxh)

Nested Class Summary
 
Nested classes/interfaces inherited from class ptolemy.kernel.Entity
Entity.ContainedObjectsIterator
 
Field Summary
private  boolean _doneReading
           
private  boolean _readMode
           
private  java.lang.Class<?> _reduceClass
           
private  java.util.Map<java.lang.String,MapReduceAlgorithm> _runningAlgorithms
          Each key is a distinct key for a reduce call, and each value is a MapReduceAlgorithm to reduce a set of values.
 StringParameter classNameForReduce
          The qualifed class name for a Java class containing a method with signature: public static List<String[]> map(String key, String value) Each element of each returned list should be a length two array of Strings.
 TypedIOPort doneReading
          A boolean input.
 TypedIOPort inputKey
          A String input key.
 TypedIOPort inputValue
          A String input value.
 TypedIOPort outputKey
          A String output key.
 TypedIOPort outputValue
          A String output value.
 
Fields inherited from class ptolemy.actor.AtomicActor
_actorFiringListeners, _initializables, _notifyingActorFiring, _stopRequested
 
Fields inherited from class ptolemy.kernel.util.NamedObj
_changeListeners, _changeLock, _changeRequests, _debugging, _debugListeners, _elementName, _isPersistent, _verbose, _workspace, ATTRIBUTES, CLASSNAME, COMPLETE, CONTENTS, DEEP, FULLNAME, LINKS
 
Fields inherited from interface ptolemy.actor.Executable
COMPLETED, NOT_READY, STOP_ITERATING
 
Constructor Summary
ReduceWorker(CompositeEntity container, java.lang.String name)
          Create a new actor in the specified container with the specified name.
 
Method Summary
private  void _setReduceMethod()
          Set the map method using the class specified in classNameForMap.
 void attributeChanged(Attribute attribute)
          React to a change in an attribute.
 void fire()
          Read in a token on the inputKey and inputValue ports and output pairs of tokens on the ouputKey, outputValue ports.
 void initialize()
          Extract the map method from the classNameForMap parameter.
 boolean postfire()
          Return true, unless stop() has been called, in which case, return false.
 boolean prefire()
          Return true if there is an available key token and value token on the inputKey and inputValue ports.
 void wrapup()
          Clean up memory.
 
Methods inherited from class ptolemy.actor.TypedAtomicActor
_addPort, _fireAt, _fireAt, attributeTypeChanged, clone, newPort, typeConstraintList, typeConstraints
 
Methods inherited from class ptolemy.actor.AtomicActor
_actorFiring, _actorFiring, addActorFiringListener, addInitializable, clone, connectionsChanged, createReceivers, declareDelayDependency, getCausalityInterface, getDirector, getExecutiveDirector, getManager, inputPortList, isFireFunctional, isStrict, iterate, newReceiver, outputPortList, preinitialize, pruneDependencies, recordFiring, removeActorFiringListener, removeDependency, removeInitializable, setContainer, stop, stopFire, terminate
 
Methods inherited from class ptolemy.kernel.ComponentEntity
_adjustDeferrals, _checkContainer, _getContainedObject, _propagateExistence, getContainer, instantiate, isAtomic, isOpaque, moveDown, moveToFirst, moveToIndex, moveToLast, moveUp, propagateExistence, setName
 
Methods inherited from class ptolemy.kernel.Entity
_description, _exportMoMLContents, _removePort, _validateSettables, connectedPortList, connectedPorts, containedObjectsIterator, getAttribute, getPort, getPorts, linkedRelationList, linkedRelations, portList, removeAllPorts, setClassDefinition, uniqueName
 
Methods inherited from class ptolemy.kernel.InstantiableNamedObj
_setParent, exportMoML, getChildren, getElementName, getParent, getPrototypeList, isClassDefinition, isWithinClassDefinition
 
Methods inherited from class ptolemy.kernel.util.NamedObj
_addAttribute, _adjustOverride, _attachText, _cloneFixAttributeFields, _debug, _debug, _debug, _debug, _debug, _getIndentPrefix, _isMoMLSuppressed, _markContentsDerived, _propagateValue, _recordDecoratedAttributes, _removeAttribute, _splitName, _stripNumericSuffix, addChangeListener, addDebugListener, attributeList, attributeList, deepContains, depthInHierarchy, description, description, event, executeChangeRequests, exportMoML, exportMoML, exportMoML, exportMoML, exportMoMLPlain, getAttribute, getAttributes, getChangeListeners, getClassName, getDecoratorAttribute, getDecoratorAttributes, getDerivedLevel, getDerivedList, getDisplayName, getFullName, getModelErrorHandler, getName, getName, getSource, handleModelError, isDeferringChangeRequests, isOverridden, isPersistent, lazyContainedObjectsIterator, message, propagateValue, propagateValues, removeChangeListener, removeDebugListener, requestChange, setClassName, setDeferringChangeRequests, setDerivedLevel, setDisplayName, setModelErrorHandler, setPersistent, setSource, sortContainedObjects, toplevel, toString, validateSettables, workspace
 
Methods inherited from class java.lang.Object
equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
 
Methods inherited from interface ptolemy.actor.Actor
createReceivers, getCausalityInterface, getDirector, getExecutiveDirector, getManager, inputPortList, newReceiver, outputPortList
 
Methods inherited from interface ptolemy.actor.Executable
isFireFunctional, isStrict, iterate, stop, stopFire, terminate
 
Methods inherited from interface ptolemy.actor.Initializable
addInitializable, preinitialize, removeInitializable
 
Methods inherited from interface ptolemy.kernel.util.Nameable
description, getContainer, getDisplayName, getFullName, getName, getName, setName
 
Methods inherited from interface ptolemy.kernel.util.Derivable
getDerivedLevel, getDerivedList, propagateValue
 

Field Detail

classNameForReduce

public StringParameter classNameForReduce
The qualifed class name for a Java class containing a method with signature:

public static List<String[]> map(String key, String value)

Each element of each returned list should be a length two array of Strings.


doneReading

public TypedIOPort doneReading
A boolean input. When this input is true, the actor is done reading values, and it may output tokens for each key it recieved.


inputKey

public TypedIOPort inputKey
A String input key.


inputValue

public TypedIOPort inputValue
A String input value.


outputKey

public TypedIOPort outputKey
A String output key.


outputValue

public TypedIOPort outputValue
A String output value.


_doneReading

private boolean _doneReading

_readMode

private boolean _readMode

_reduceClass

private java.lang.Class<?> _reduceClass

_runningAlgorithms

private java.util.Map<java.lang.String,MapReduceAlgorithm> _runningAlgorithms
Each key is a distinct key for a reduce call, and each value is a MapReduceAlgorithm to reduce a set of values.

Constructor Detail

ReduceWorker

public ReduceWorker(CompositeEntity container,
                    java.lang.String name)
             throws IllegalActionException,
                    NameDuplicationException
Create a new actor in the specified container with the specified name. The name must be unique within the container or an exception is thrown. The container argument must not be null, or a NullPointerException will be thrown.

Parameters:
container - The container.
name - The name of this actor within the container.
Throws:
IllegalActionException - If this actor cannot be contained by the proposed container (see the setContainer() method).
NameDuplicationException - If the name coincides with an entity already in the container.
Method Detail

attributeChanged

public void attributeChanged(Attribute attribute)
                      throws IllegalActionException
React to a change in an attribute. This method is called by a contained attribute when its value changes. In this base class, the method does nothing. In derived classes, this method may throw an exception, indicating that the new attribute value is invalid. It is up to the caller to restore the attribute to a valid value if an exception is thrown. If the attribute changed is classNameForReduce, update this actor accordingly.

Overrides:
attributeChanged in class NamedObj
Parameters:
attribute - The attribute that changed.
Throws:
IllegalActionException - If the change is not acceptable to this container. If the class set in classNameForReduce does not exist, or if the class exists but does not contain a map method with an appropriate signature, this exception will be thrown.

fire

public void fire()
          throws IllegalActionException
Read in a token on the inputKey and inputValue ports and output pairs of tokens on the ouputKey, outputValue ports.

Specified by:
fire in interface Executable
Overrides:
fire in class AtomicActor
Throws:
IllegalActionException - If there is any trouble calling the map method.

prefire

public boolean prefire()
                throws IllegalActionException
Return true if there is an available key token and value token on the inputKey and inputValue ports.

Specified by:
prefire in interface Executable
Overrides:
prefire in class AtomicActor
Returns:
True if this actor is ready for firing, false otherwise.
Throws:
IllegalActionException - Not thrown in this class.

postfire

public boolean postfire()
                 throws IllegalActionException
Description copied from class: AtomicActor
Return true, unless stop() has been called, in which case, return false. Derived classes override this method to define operations to be performed at the end of every iteration of its execution, after one invocation of the prefire() method and any number of invocations of the fire() method. This method typically wraps up an iteration, which may involve updating local state. In derived classes, this method returns false to indicate that this actor should not be fired again.

Specified by:
postfire in interface Executable
Overrides:
postfire in class AtomicActor
Returns:
The base class return value.
Throws:
IllegalActionException - If thrown in the base class.

wrapup

public void wrapup()
            throws IllegalActionException
Clean up memory.

Specified by:
wrapup in interface Initializable
Overrides:
wrapup in class AtomicActor
Throws:
IllegalActionException - If thrown in the base class.

initialize

public void initialize()
                throws IllegalActionException
Extract the map method from the classNameForMap parameter.

Specified by:
initialize in interface Initializable
Overrides:
initialize in class AtomicActor
Throws:
IllegalActionException - If unable to extract an appropriate map method.

_setReduceMethod

private void _setReduceMethod()
                       throws IllegalActionException
Set the map method using the class specified in classNameForMap.

Throws:
IllegalActionException - If the map method does not exist, has the wrong type signature, or has the wrong access modifiers.