10-3 shows the class derivation hierarchy for the classes that implement the dynamic scheduling of Kahn process networks. The class
ThreadList provides mechanisms for terminating groups of threads. This class is used by
PNScheduler to create threads for each node in the program graph. The class
SyncDataFlowProcess implements the threads for the nodes.
ThreadList implements a container class for manipulating groups of threads. It has two public methods.
virtual void add(PtThread*); This method adds a
PtThread object to the list.
virtual ~ThreadScheduler(); This method terminates and deletes all threads in the list.
PNScheduler controls the execution of a process network. Three data members support synchronization between the scheduler and the processes.
ThreadList* threads; A container for the threads managed by the scheduler.
PNMonitor* monitor; A monitor to guard the scheduler's condition variable.
PNCondition* start; A condition variable for synchronizing with threads.
int iteration; A counter for regulating the execution of the processes.
createThreads method, shown below, creates one process for each node in the program graph. A
SyncDataFlowProcess is created for each
DataFlowStar and added to the
// Create threads (dataflow processes).
if (! galaxy()) return;
LOG_NEW; threads = new ThreadList;
// Create Threads for all the Stars.
while((star = (DataFlowStar*)nextStar++) != NULL)
LOG_NEW; SyncDataFlowProcess* p
= new SyncDataFlowProcess(*star,*start,iteration);
It is often desirable to have a partial execution of a process network. The class
SyncDataFlowProcess, which is derived from
DataFlowProcess, supports this by synchronizing the execution of a thread with the
iteration counter that belongs to the
run methods of
SyncDataFlowProcess implement this synchronization. The
run method, shown below, increments the
iteration count to give every process an opportunity to run. The
run method, shown below, ensures that the number of invocations of the star's
run method does not exceed the
// Run (or continue) the simulation.
if (SimControl::haltRequested() || ! galaxy())
while((currentTime < stopTime) && !SimControl::haltRequested())
// Notify all threads to continue.
while (PNGeodesic::blockedOnFull() > 0
currentTime += schedulePeriod;
int i = 0;
// Configure the star for dynamic execution.
// Fire the star ad infinitum.
// Wait for notification to start.
while (iteration <= i) start.wait();
i = iteration;
if (star.waitPort()) star.waitPort()->receiveData();
} while (star.run());
increaseBuffers method is used during the course of execution to adjust the channel capacities according to the theory presented in [Par95, ch. 4]. Each time execution stops, the program graph is examined for full channels. If there are any full channels, then the capacity of the smallest one is increased.
// Increase buffer capacities.
// Return number of full buffers encountered.
int fullBuffers = 0;
PNGeodesic* smallest = NULL;
// Increase the capacity of the smallest full geodesic.
while ((star = nextStar++) != NULL)
while ((port = nextPort++) != NULL)
PNGeodesic* geo = NULL;
if (port->isItOutput() &&
(geo = (PNGeodesic*)port->geo()) != NULL)
if (geo->size() >= geo->capacity())
if (smallest == NULL ||
smallest = geo;
if (smallest != NULL)
smallest->setCapacity(smallest->capacity() + 1);
Copyright © 1990-1997, University of California. All rights