Link to the Information Technology Laboratory Website Link to the Information Access Division Website Link to the NIST Website NIST, IAD Banner

The Flow class

Flow objects are used by client nodes to transport streams of data. The flows objects are used to give you buffers, which are ready to be filled for output flows and containing data for input flows. Each flow object, input or output, has an internal queue of buffers used to smooth the data transfer. In order to use the flow class, you have to include the proper include file of the Flow in your code:

#include "Flow_Audio_Multichannel/Flow_Audio_Multichannel.h" 

Above in shown the inclusion to use the Flow_Audio_Multichannel.

Creating a flow

You should never create a flow directly but instead you should use one of the three methods provided by the Smartflow object.

The following method allows creating an input flow.

Flow * makeInputFlow( const std::string type, 
                      const std::string name, 
                      const std::string args=DEFAULT_EMPTY, 
                      const std::string group=DEFAULT_FLOW_GROUP, 
                      const flow_policy_t policy=DO_NOT_DROP_BUFFER);

This method is used to create an output flow.

Flow * makeOutputFlow( const std::string type, 
                       const std::string name, 
                       const std::string args=DEFAULT_EMPTY, 
                       const std::string group=DEFAULT_FLOW_GROUP, 
                       const flow_policy_t policy=DO_NOT_DROP_BUFFER);

And that one create a synchronized output flow dedicated to handle files (see section Handling files).

Flow * makeSynchronizedOutputFlow( const std::string type, 
                                   const std::string name, 
                                   unsigned int numberOfConsumersExpected, 
                                   const std::string args=DEFAULT_EMPTY, 
                                   const std::string group=DEFAULT_FLOW_GROUP);

Most of these parameters are the same in these methods and all return a flow object. You need to use the first one when you create input flows, i.e., when your client node consumes data from a different client node. The two other methods create output flows. Most of client nodes capturing data from devices (cameras, microphones, etc.) provide output flow(s). The ones that display data, for example, most likely only use input flows.

Following is a description of the parameters of these methods:

  • type describes the type of the flow (i.e.: Flow_Multichannel_Audio). Every flow type in the NDFS-II starts with "Flow_". You need to make sure that a flow exists and that you type the proper name, otherwise the shared library corresponding to the flow won't be loaded properly.

  • name describes the name of the flow. This name is used by data flow servers and the Control Center to identify the flow. If you use several flows of the same type within a client node, you need to be able to differentiate them.

  • args is a string used to define the optional parameters specific to the flow. Each flow can have as many parameters as you wish in the string. Some of these are standard parameters, meaning they are present in every flow and have a default value. You can override the value of these parameters. The args string should be formatted as "param1=val1 param2=val2 param3=val3". Here is a list of the parameters that each flow has by default:

    • blocksize: it represents the maximum buffer's size the flow can handle.

    • history: the number of the buffer, which can be stored in the internal queue of the flow object.

  • group is used as a connection ID. If two client nodes have the same type and group, they will exchange data through the flow.

  • policy defines the behavior of the internal flow queue when the queue is running full.

  • numberOfConsumersExpected is specific to the last method and used when creating flows to process files. See the Handling files section for more details

Following are some code examples showing how to create an output flow:

flowOut = (Flow_Audio_Multichannel*) (sf->makeOutputFlow("Flow_Audio_Multichannel",
            "flow_array_provider","rows=64 columns=400 history=100" )); 

The flowOut pointer above has been declared as a pointer to the Flow_Audio_Multichannel type. It could have been declared using the Flow type. The prototypes of methods used to create flows returns the address of an object having the Flow type. This is why we cast our pointer as a Flow_Audio_Multichannel flow.

In this specific example, you will notice that we are passing some arguments to the flow as a third parameter. Theses arguments are specific to this flow type only, so if you try to use them while creating flows of different kinds, they will be ignored. Here they describe how many audio channels are transported in the flow (rows argument) and how many samples at a time we store in the Buffer (columns argument). Using these two values, we can compute the maximum size of the Buffer we need. We could have directly given a block size using the blocksize parameter but it is more "user-friendly" to express the size needed as the number of channels and samples.

Anyone has the possibility to implement their own flows and therefore create parameters that will be used at the flow creation.

The history parameter represents the size of the internal queue running in the client node scope. Here the size of the queue is set to 100. That means the queue can contain up to 100 Buffers, each having a size big enough to contain 400 samples of 64 channels.

You can take advantage of these parameters to do whatever you like in your flow, but flows always need a history size in order to create their internal queue, and a block size to be able to allocate enough shared-memory used to store data buffers while transporting them.

In the example above, the blocksize is deducted internally from the rows and columns parameter values. The blocksize and history parameters always have a default value specific to the flow type. These default values are used if not specified otherwise while creating your flow. An example is as follows:

flowIn = (Flow_BlockTest*)(sf->makeInputFlow("Flow_BlockTest", "myFlowConsumer",
            "blocksize=16", "ID_long_memcpy")); 

In the above example, the input flow is created using the blocksize parameter and is set to 16 bytes. The history value is not overridden so the default history value defined in the flow is used instead.

When the flow creation is done, you can optionally check that it has succeeded:

if ( !flowIn ) {
   ACE_DEBUG( (USER_PREFIX ACE_TEXT("Error message: %s\n"), sf->strerror()) );
   delete sf; return 1;
}

if ( flowIn->get_FlowType() != "Flow_BlockTest" ) {
   ACE_DEBUG( (USER_PREFIX ACE_TEXT("Flow Cast failed\n")) );
   delete sf; return 1;
}

After you have checked that the flow has been properly instantiated and has the proper type, you can start it. Starting the flow activates the queue and the data transfer. No data can be received or sent otherwise. Just invoke the start() method of your flow:

if ( flowIn->start() != 0 ) {
   ACE_DEBUG( (USER_PREFIX ACE_TEXT("Error message: %s\n"), sf->strerror()) );
   delete sf; return 1; 
}

Once the flow is created and started you are ready to use it.

Using a flow in your code

A Flow object can be seen as a Buffer manager. It takes care of the memory managements of the Buffers and their queue. It can also be used for flow control.

You never create or free a Buffer directly using constructor or destructor methods. Instead you request Buffers from the Flow object. Regardless of the direction of your flow (input or output), you use the same Flow::getBuffer() method as follows:

Buffer *myBuffer = flowIn->getBuffer();

For an output flow, this method always returns. The only case when it does not is when the system cannot allocate memory for the Buffer. If it happens to you, it means your system already uses as much memory as it can and cannot allocate more. Checking for memory leaks when that happens is usually the right thing to do.

For an input flow, this method returns a Buffer filled with data if one is available in the queue. This method is blocking, so you can wait forever for your Buffer if there is no producer for the flow to subscribe to. This method returns a NULL Buffer in only one case: when you stop the flow.

When you don’t need your Buffer anymore, you can release it. The code used to do that is the same for a producer or a consumer:

flowIn->releaseBuffer(myBuffer);

This call frees the memory allocated to the Buffer so you don’t have to handle it yourself.

Stopping a flow

Once you have processed all your data, or you just want to stop your client node or your flow, you should call the Flow::stop() method.

flowIn->stop();

This call deactivates the internal queue of the flow. By deactivating the queue of an input or output flow, every Buffer still in the queue is lost. If somewhere in your code you are blocked on the Flow::getBuffer() method, calling the Flow::stop() method forces Flow::getBuffer() to return. In that case, the Buffer is usually NULL. So you should always check that your Buffer is not NULL when the Flow::getBuffer() method returns.

A typical architecture for a client node is often as follows:

  • Initialization part

  • Processing/Acquisition loop

  • Stop and cleanup

The connection to the data flow server and the creation of flows are most of the time done in the initialization part.

The loop consists of requesting a Buffer from a Flow, filling it up for an input flow or working on the data contained in the Buffer for an output flow, and releasing the Buffer when you have finished.

The cleanup part is often used to free some previously allocated memory, and destroy your flows.

The transition between the initialization part and the loop is very straightforward: once the initialization is done, you can start your loop. The transition between the loop and the cleanup part is however more tricky. The loop is often infinite because it is running on an indefinite stream of data, so there is no concept of end for the loop. As a consequence, the loop needs to be interrupted. Calling a custom exit function defined in the user code can do it by changing the value of a boolean variable you use as a condition in your loop. This exit method is also a very good place to invoke the stop method.

Here is an example to illustrate this architecture:

//Declaration of the exit function. 
//You must tell the NDFS-II that you want to call this method
//by using the Smartflow::set_user_exit_function() method

void myExitFunction() {
if ( flow != NULL )
   flow->stop();
   keepGoing = false;
}

//we notify the system which method to call when exiting

sf->set_user_exit_function(myExitFunction);

//the boolean used as a condition in the loop

bool keepGoing = true;

while ( keepGoing ) {
   myBuffer = flow->getBuffer();
   if ( myBuffer != NULL ) {
   //time to do some work with the Buffer
   ….
   }
}//end of the loop

//We are doing the clean up here before exiting the client node

Setting keepGoing to false in the exit function will exit the loop the next time this boolean variable is tested. You could however end up in a case where the boolean is set to true but the client node is still stuck in the loop on a Flow::getBuffer() call because the data provider is gone and no more data is available. Invoking the Flow::stop() method provokes the method Flow::getBuffer() to return. The returned Buffer is NULL but this case is handle above and we leave the loop properly.

Checking the filling level of the queue

When developing applications using the data flow system, it can be very useful to know if a consumer does not consume fast enough, e.g., because it is overloaded. A way to know that is to monitor the filling level of the flows’ queues. If a client node does not consume data fast enough from a flow, the internal queue starts filling up.

The double Flow::getFillingLevelOfQueue() method is provided to check how full the queue of a specific flow is. A double having a value between 0 and 1 is returned. The value ‘1’ means that the queue is full and ‘0’ that the queue is empty.

ACE_DEBUG( (USER_PREFIX ACE_TEXT("Filling level of the flow’s queue: %f\n"),
            myFlow->getFillingLevelOfQueue()) ); 

In the code above, we are displaying the queue level of the flow myFlow using the log macro.

Avoiding blocking if no data is available

The Flow::getBuffer() call blocks. It means that this method won’t return until a buffer is available. This behavior is fine for many client nodes; it could however be too restrictive for some other specific nodes. The way to avoid being blocked is to check that a buffer is available before requesting it. The Flow class provides a method for that purpose:

bool Flow::isBufferAvailable();

The method returns true if a buffer is available and false otherwise.

if ( myFlowIn->isBufferAvailable() != true ) {
   //No buffer is available
   //So we don’t call the getBuffer() method to avoid being blocked
   myBuffer = NULL;
} 
else { 
   //We know that a buffer is available immediately in the queue
   //Therefore calling Flow::getBuffer() will not block.
   myBuffer = myFlowIn->getBuffer();
}
if ( myBuffer != NULL ) {
   //If we are here that’s because we got a Buffer.
   //We can now work with the Buffer.
   ...
}

The example above shows a simple way to check that some data is available on the queue before requesting a Buffer object.

Pausing/Restarting a flow

In multimodal application, a client node may not want data from a specific sensor all the time. So client nodes have the possibility to temporally suspend the data transfer by invoking the Flow::pause() method. After calling this method, no data will be sent to the consumer until it resumes the data transfer calling the Flow::restart() method. Note that in the meantime, every data block produced by a producer won’t be received by the consumer and therefore lost for that consumer.

This behavior may be useful in case such as a client subscribing to many HD-Video flows that only wants to display one at the time. So every flow is paused but one.

The pause/restart functionality works for both input and output flows. When a consumer pauses a flow, it is the only affected by not receiving data blocks anymore. Other consumers and the producer won’t be affected. When the producer pauses a flow, each consumer connected is affected by not receiving data blocks anymore.

Taking advantage of the Flows’ policies

Every input or output flow created by a client node has an internal flow queue to smooth the data transport. These queues can have several behaviors depending on the user needs. By default, the queue is blocking, i.e. each buffer sent by a producer will be deliver to the connected consumer(s). If no consumer is present, the block is simply dropped.

In the case of a producer feeding two consumers, if one of the consumers stops consuming, its internal flow queue will eventually run full and as a consequence block the producer from sending any more data because no data loss is tolerated. So, by a ripple effect, the other consumer will be affected by not getting any more Buffer from the stopped producer.

This behavior is highly desirable in application where no buffer loss is tolerated. It is however unsuited for multimodal application where there is a strong requirement to be reactive. In order to address these needs, the flow can be declared as non-blocking during its creation. Here is some sample code that shows how to declare a non-blocking flow:

flow = (Flow_BlockTest*)(sf->makeInputFlow("Flow_BlockTest", "myFlowConsumer",
            "blocksize=16", "ID_long_memcpy", DROP_OLDEST_BUFFER));

The last (optional) parameter of this method specifies which queue policy should be applied. In this example, if the input flow queue of the client node runs full and a new buffer is coming, the oldest buffer(s) of the queue will be removed to make room for the new one. Three different policies can be applied to flows:

  • DO_NOT_DROP_BUFFER: Blocking, default behavior

  • DROP_MOST_RECENT_BUFFER: Non-blocking, the most recent buffer is dropped if the queue is full

  • DROP_OLDEST_BUFFER: Non-blocking, the oldest buffer(s) of the queue are removed to make room for the new one.

These policies apply to the local buffer queues within the client nodes, meaning that a producer can create a blocking output flow and feed a consumer that created its input flow using a non-blocking policy.

TIP: If the application can tolerate losing buffers, it is often a good idea to create in the producer a blocking output flow, and to create non-blocking input flows in consumers. So, in this case a consumer that is not consuming fast enough for the data rate cannot slow down the producers.

Handling files

The NDFS-II has been designed to support the development of multimodal applications and pervasive environments using continuous streams of data where client nodes can join or leave the application at any time. There is therefore no concept of beginning or end of streams.

So the typical behavior of the NDFS-II is unsuited for file processing because there is no guarantee that a consumer will receive the first buffers of a flow containing a file if the consumer is launched slightly later than the consumer.

The NDFS-II also provides capabilities to parallelize or distribute file processing. For file processing, specific methods should be used. Typically file processing starts with reading a file and making it available for processing in a flow to consumer client node(s). This processing often consists of successive operations on the data. It is then possible to represent this process as a pipeline where the first node reads the file, the last one saves the result, and the node(s) in between process the data. In order to make sure that no data block from the file is lost, a specific method needs to be used to create output flows.

In this case the flow is created as follows:

myflow = (Flow_Video_Mpeg2TS*) sf->makeSynchronizedOutputFlow("Flow_Video_Mpeg2TS",
            "video_reader", 2);

This will ensure that the data transfer won’t start until there are 2 consumer connected as specified with the third parameter of the method.

Reading a file is most of the time faster than sending it over the network. So it is usual to have the reading process done but not the data transfer. As a result, the internal queue of the flow may not be empty but from the user’s point of view, the job is done. In order to make sure that the queue is empty before stopping the flow, the stop method should be called with an optional boolean parameter set to true.

flow->stop(true);

Otherwise the operation to stop the flow preempts sending the data and therefore empties the queue before each data block has been sent.