Skip to main content
U.S. flag

An official website of the United States government

Official websites use .gov
A .gov website belongs to an official government organization in the United States.

Secure .gov websites use HTTPS
A lock ( ) or https:// means you’ve safely connected to the .gov website. Share sensitive information only on official, secure websites.

NIST Data Flow System II - User's Guide - Draft Version - Chapter 5

RETURN TO DOCUMENT TABLE OF CONTENTS

Table of Contents

The Smartflow class
 -- Connecting to the NDFS-II network
 -- Setting up an exit strategy for your client node
The Flow class
 -- Creating a flow
 -- Using a flow in your code
 -- Stopping a flow
 -- Checking the filling level of the queue
 --Avoiding blocking if no data is available
 -- Pausing/Restarting a flow
 -- Taking advantage of the Flows' policies
 -- Handling files
The Buffer class
 -- The memcpy approach
 -- The pushData/popData approach
 -- Data Vs. Metadata
 -- Combining an iterator with data and metadata
 -- Creating your own metadata type

In this section, we present the three main classes needed to use of the framework. The Smartflow class is the API entry point. This object is used to initialize connection with the server and create flows. The Flow class is used to handle flows, which represent a stream of data blocks. A data block is then manipulated using a Buffer object.

The Smartflow class

This class is used as an "entry point" for all the other classes in the library, and most of the other classes are created directly or indirectly by methods of a Smartflow object, and never directly by instantiating the subordinate objects.

Connecting to the NDFS-II network

The Smartflow object is used to connect or disconnect to the NDFS-II network, to create or destroy flows and to declare the user-defined exit method, which is invoked when a client node quits. In order to use the Smartflow object, you need to include this header in your source code.

#include "Smartflow.h"

The very first step is to declare and instantiate a Smartflow object:

Smartflow *sf = new Smartflow();

After the object has been created, you must call the init method:

bool init( int &argc,char** &argv,
bool keepSFOptions = false,
const std::string application=DEFAULT_APPLICATION_NAME);

This method initializes the various managers required by NDFS-II. It uses the argc and argv parameters from the main function. The third parameter keepSFOptions is used to indicate if you want to keep the specific options concerning the NDFS-II in the argv variable. By default these parameters will be removed and leave the remaining parameters for further parsing in the user's code. The fourth parameter is used to give an application name. This parameter is presently reserved but will be useful for future development. Most of the time you just need to invoke the init() method as follows:

sf->init(argc, argv);

Then you need to invoke the method bool readyToRun() to check that the Smartflow object is ready to connect to the server:

if ( !sf->readyToRun() ) {
//Using the standard output to display the error message.
cout << sf->strError() << endl;
...
return 1;
}

Most of the objects in NDFS-II use the Error Manager to let you know if something went wrong. When a method fails and returns false, you can invoke the std::string strError(void) method. This method returns a string giving more details about what went wrong.

At this point, you are ready to connect to the data flow server using

bool connectToApplicationServer(const std::string clientname=DEFAULT_EMPTY,
                         const std::string clientgroup=DEFAULT_CLIENT_GROUP);

You can specify the name of your client name and its group. The clientgroup argument is not presently used, but is reserved for future development. You can here specify a name for your client. If you don't, the name of the executable, i.e. argv[0] will be used. An example method call is as follows:

if ( !sf->connectToApplicationServer() ) {
//using the log macro to display the error message
ACE_DEBUG( (USER_PREFIX ACE_TEXT("Error message: %s\n"), sf->strError()) );
...
return 1;
}

Setting up an exit strategy for your client node

At this point, your client node is connected to the data flow server and is ready to create flows. Most client nodes work with streams of data and therefore run a loop to produce or consume the data. The user can define in his code an exit function for the client node to exit cleanly. That method is then automatically invoked by the system when the client node receives the kill or abort signal. After defining the method, the user needs to declare which method to invoke. You can do so by invoking:

bool setUserExitFunction(void (*user_exit_function_) (),
                  bool controlC = true);

or

bool setUserExitFunction(void (*user_exit_function_) (void*userarg),
                  void *arg,
                  bool controlC = true);

You should use the first method if your exit method has no parameters, otherwise use the second one. The first parameter is a pointer to the exit method, the second one a pointer to the parameter list of your exit method, and the last one defines whether this method should be called when using "Control+c" or closing the console window. This is the default behavior if not specified otherwise.

Examples showing how to use these methods are located in the folders

src/clients/example_provider_with_complex_exit_func and

src/clients/example_provider_with_simple_exit_func

Setting an exit method is not required but strongly advised. At this point, you are ready to create flows. Flows are created using the Smartflow object.

The Flow class

Flow objects are used by client nodes to transport streams of data. The flows objects are used to give you buffers, which are ready to be filled for output flows and containing data for input flows. Each flow object, input or output, has an internal queue of buffers used to smooth the data transfer. In order to use the flow class, you have to include the proper include file of the Flow in your code:

#include "Flow_Audio_Multichannel/Flow_Audio_Multichannel.h"

Above in shown the inclusion to use the Flow_Audio_Multichannel.

Creating a flow

You should never create a flow directly but instead you should use one of the three methods provided by the Smartflow object.

The following method allows creating an input flow.

Flow * makeInputFlow( const std::string type,
             const std::string name,
             const std::string args=DEFAULT_EMPTY,
             const std::string group=DEFAULT_FLOW_GROUP,
             const flow_policy_t policy=DO_NOT_DROP_BUFFER);

This method is used to create an output flow.

Flow * makeOutputFlow( const std::string type,
              const std::string name,
              const std::string args=DEFAULT_EMPTY,
              const std::string group=DEFAULT_FLOW_GROUP,
              const flow_policy_t policy=DO_NOT_DROP_BUFFER);

And that one create a synchronized output flow dedicated to handle files (see section Handling files).

Flow * makeSynchronizedOutputFlow( const std::string type,
                          const std::string name,
                          unsigned int numberOfConsumersExpected,
                          const std::string args=DEFAULT_EMPTY,
                          const std::string group=DEFAULT_FLOW_GROUP);

Most of these parameters are the same in these methods and all return a flow object. You need to use the first one when you create input flows, i.e., when your client node consumes data from a different client node. The two other methods create output flows. Most of client nodes capturing data from devices (cameras, microphones, etc.) provide output flow(s). The ones that display data, for example, most likely only use input flows.

Following is a description of the parameters of these methods:

  • type describes the type of the flow (i.e.: Flow_Multichannel_Audio). Every flow type in the NDFS-II starts with "Flow_". You need to make sure that a flow exists and that you type the proper name, otherwise the shared library corresponding to the flow won't be loaded properly.
  • name describes the name of the flow. This name is used by data flow servers and the Control Center to identify the flow. If you use several flows of the same type within a client node, you need to be able to differentiate them.
  • args is a string used to define the optional parameters specific to the flow. Each flow can have as many parameters as you wish in the string. Some of these are standard parameters, meaning they are present in every flow and have a default value. You can override the value of these parameters. The args string should be formatted as "param1=val1 param2=val2 param3=val3". Here is a list of the parameters that each flow has by default:
    • blocksize: it represents the maximum buffer's size the flow can handle.
    • history: the number of the buffer, which can be stored in the internal queue of the flow object.
  • group is used as a connection ID. If two client nodes have the same type and group, they will exchange data through the flow.
  • policy defines the behavior of the internal flow queue when the queue is running full.
  • numberOfConsumersExpected is specific to the last method and used when creating flows to process files. See the Handling files section for more details

Following are some code examples showing how to create an output flow:

flowOut = (Flow_Audio_Multichannel*) (sf->makeOutputFlow("Flow_Audio_Multichannel",
   "flow_array_provider","rows=64 columns=400 history=100" ));

The flowOut pointer above has been declared as a pointer to the Flow_Audio_Multichannel type. It could have been declared using the Flow type. The prototypes of methods used to create flows returns the address of an object having the Flow type. This is why we cast our pointer as a Flow_Audio_Multichannel flow.

In this specific example, you will notice that we are passing some arguments to the flow as a third parameter. Theses arguments are specific to this flow type only, so if you try to use them while creating flows of different kinds, they will be ignored. Here they describe how many audio channels are transported in the flow (rows argument) and how many samples at a time we store in the Buffer (columns argument). Using these two values, we can compute the maximum size of the Buffer we need. We could have directly given a block size using the blocksize parameter but it is more "user-friendly" to express the size needed as the number of channels and samples.

Anyone has the possibility to implement their own flows and therefore create parameters that will be used at the flow creation.

The history parameter represents the size of the internal queue running in the client node scope. Here the size of the queue is set to 100. That means the queue can contain up to 100 Buffers, each having a size big enough to contain 400 samples of 64 channels.

You can take advantage of these parameters to do whatever you like in your flow, but flows always need a history size in order to create their internal queue, and a block size to be able to allocate enough shared-memory used to store data buffers while transporting them.

In the example above, the blocksize is deducted internally from the rows and columns parameter values. The blocksize and history parameters always have a default value specific to the flow type. These default values are used if not specified otherwise while creating your flow. An example is as follows:

flowIn = (Flow_BlockTest*)(sf->makeInputFlow("Flow_BlockTest", "myFlowConsumer",
   "blocksize=16", "ID_long_memcpy"));

In the above example, the input flow is created using the blocksize parameter and is set to 16 bytes. The history value is not overridden so the default history value defined in the flow is used instead.

When the flow creation is done, you can optionally check that it has succeeded:

if ( !flowIn ) {
ACE_DEBUG( (USER_PREFIX ACE_TEXT("Error message: %s\n"), sf->strerror()) );
delete sf; return 1;
}

if ( flowIn->getFlowType() != "Flow_BlockTest" ) {
ACE_DEBUG( (USER_PREFIX ACE_TEXT("Flow Cast failed\n")) );
delete sf; return 1;
}

After you have checked that the flow has been properly instantiated and has the proper type, you can start it. Starting the flow activates the queue and the data transfer. No data can be received or sent otherwise. Just invoke the start() method of your flow:

if ( !flowIn->start() ) {
ACE_DEBUG( (USER_PREFIX ACE_TEXT("Error message: %s\n"), sf->strError()) );
delete sf; return 1;
}

Once the flow is created and started you are ready to use it.


Using a flow in your code

A Flow object can be seen as a Buffer manager. It takes care of the memory managements of the Buffers and their queue. It can also be used for flow control.

You never create or free a Buffer directly using constructor or destructor methods. Instead you request Buffers from the Flow object. Regardless of the direction of your flow (input or output), you use the same Flow::getBuffer() method as follows:

Buffer *myBuffer = flowIn->getBuffer();

For an output flow, this method always returns. The only case when it does not is when the system cannot allocate memory for the Buffer. If it happens to you, it means your system already uses as much memory as it can and cannot allocate more. Checking for memory leaks when that happens is usually the right thing to do.

For an input flow, this method returns a Buffer filled with data if one is available in the queue. This method is blocking, so you can wait forever for your Buffer if there is no producer for the flow to subscribe to. This method returns a NULL Buffer in only one case: when you stop the flow.

When you don't need your Buffer anymore, you can release it. The code used to do that is the same for a producer or a consumer:

flowIn->releaseBuffer(myBuffer);

This call frees the memory allocated to the Buffer so you don't have to handle it yourself.


Stopping a flow

Once you have processed all your data, or you just want to stop your client node or your flow, you should call the Flow::stop() method.

flowIn->stop();

This call deactivates the internal queue of the flow. By deactivating the queue of an input or output flow, every Buffer still in the queue is lost. If somewhere in your code you are blocked on the Flow::getBuffer() method, calling the Flow::stop() method forces Flow::getBuffer() to return. In that case, the Buffer is usually NULL. So you should always check that your Buffer is not NULL when the Flow::getBuffer() method returns.

A typical architecture for a client node is often as follows:

  • Initialization part
  • Processing/Acquisition loop
  • Stop and cleanup

The connection to the data flow server and the creation of flows are most of the time done in the initialization part.

The loop consists of requesting a Buffer from a Flow, filling it up for an input flow or working on the data contained in the Buffer for an output flow, and releasing the Buffer when you have finished.

The cleanup part is often used to free some previously allocated memory, and destroy your flows.

The transition between the initialization part and the loop is very straightforward: once the initialization is done, you can start your loop. The transition between the loop and the cleanup part is however more tricky. The loop is often infinite because it is running on an indefinite stream of data, so there is no concept of end for the loop. As a consequence, the loop needs to be interrupted. Calling a custom exit function defined in the user code can do it by changing the value of a boolean variable you use as a condition in your loop. This exit method is also a very good place to invoke the stop method.

Here is an example to illustrate this architecture:

//Declaration of the exit function.
//You must tell the NDFS-II that you want to call this method
//by using the Smartflow::setUserExitFunction() method

void myExitFunction() {
if ( flow != NULL )
flow->stop();
keepGoing = false;
}

//we notify the system which method to call when exiting

sf->setUserExitFunction(myExitFunction);

//the boolean used as a condition in the loop

bool keepGoing = true;

while ( keepGoing ) {
myBuffer = flow->getBuffer();
if ( myBuffer != NULL ) {
//time to do some work with the Buffer
....
}
}//end of the loop

//We are doing the clean up here before exiting the client node

Setting keepGoing to false in the exit function will exit the loop the next time this boolean variable is tested. You could however end up in a case where the boolean is set to true but the client node is still stuck in the loop on a Flow::getBuffer() call because the data provider is gone and no more data is available. Invoking the Flow::stop() method provokes the method Flow::getBuffer() to return. The returned Buffer is NULL but this case is handle above and we leave the loop properly.

Checking the filling level of the queue

When developing applications using the data flow system, it can be very useful to know if a consumer does not consume fast enough, e.g., because it is overloaded. A way to know that is to monitor the filling level of the flows' queues. If a client node does not consume data fast enough from a flow, the internal queue starts filling up.

The double Flow::getFillingLevelOfQueue() method is provided to check how full the queue of a specific flow is. A double having a value between 0 and 1 is returned. The value '1' means that the queue is full and '0' that the queue is empty.

ACE_DEBUG( (USER_PREFIX ACE_TEXT("Filling level of the flow's queue: %f\n"),
   myFlow->getFillingLevelOfQueue()) );

In the code above, we are displaying the queue level of the flow myFlow using the log macro.

Avoiding blocking if no data is available

The Flow::getBuffer() call blocks. It means that this method won't return until a buffer is available. This behavior is fine for many client nodes; it could however be too restrictive for some other specific nodes. The way to avoid being blocked is to check that a buffer is available before requesting it. The Flow class provides a method for that purpose:

bool Flow::isBufferAvailable();

The method returns true if a buffer is available and false otherwise.

if ( myFlowIn->isBufferAvailable() != true ) {
//No buffer is available
//So we don't call the getBuffer() method to avoid being blocked
myBuffer = NULL;
}
else {
//We know that a buffer is available immediately in the queue
//Therefore calling Flow::getBuffer() will not block.
myBuffer = myFlowIn->getBuffer();
}
if ( myBuffer != NULL ) {
//If we are here that's because we got a Buffer.
//We can now work with the Buffer.
...
}

The example above shows a simple way to check that some data is available on the queue before requesting a Buffer object.

Pausing/Restarting a flow

In multimodal application, a client node may not want data from a specific sensor all the time. So client nodes have the possibility to temporally suspend the data transfer by invoking the Flow::pause() method. After calling this method, no data will be sent to the consumer until it resumes the data transfer calling the Flow::restart() method. Note that in the meantime, every data block produced by a producer won't be received by the consumer and therefore lost for that consumer.

This behavior may be useful in case such as a client subscribing to many HD-Video flows that only wants to display one at the time. So every flow is paused but one.

The pause/restart functionality works for both input and output flows. When a consumer pauses a flow, it is the only affected by not receiving data blocks anymore. Other consumers and the producer won't be affected. When the producer pauses a flow, each consumer connected is affected by not receiving data blocks anymore.

Taking advantage of the Flows' policies

Every input or output flow created by a client node has an internal flow queue to smooth the data transport. These queues can have several behaviors depending on the user needs. By default, the queue is blocking, i.e. each buffer sent by a producer will be deliver to the connected consumer(s). If no consumer is present, the block is simply dropped.

In the case of a producer feeding two consumers, if one of the consumers stops consuming, its internal flow queue will eventually run full and as a consequence block the producer from sending any more data because no data loss is tolerated. So, by a ripple effect, the other consumer will be affected by not getting any more Buffer from the stopped producer.

This behavior is highly desirable in application where no buffer loss is tolerated. It is however unsuited for multimodal application where there is a strong requirement to be reactive. In order to address these needs, the flow can be declared as non-blocking during its creation. Here is some sample code that shows how to declare a non-blocking flow:

flow = (Flow_BlockTest*)(sf->makeInputFlow("Flow_BlockTest", "myFlowConsumer",
   "blocksize=16", "ID_long_memcpy", DROP_OLDEST_BUFFER));

The last (optional) parameter of this method specifies which queue policy should be applied. In this example, if the input flow queue of the client node runs full and a new buffer is coming, the oldest buffer(s) of the queue will be removed to make room for the new one. Three different policies can be applied to flows:

  • DO_NOT_DROP_BUFFER: Blocking, default behavior
  • DROP_MOST_RECENT_BUFFER: Non-blocking, the most recent buffer is dropped if the queue is full
  • DROP_OLDEST_BUFFER: Non-blocking, the oldest buffer(s) of the queue are removed to make room for the new one.

These policies apply to the local buffer queues within the client nodes, meaning that a producer can create a blocking output flow and feed a consumer that created its input flow using a non-blocking policy.

TIP: If the application can tolerate losing buffers, it is often a good idea to create in the producer a blocking output flow, and to create non-blocking input flows in consumers. So, in this case a consumer that is not consuming fast enough for the data rate cannot slow down the producers.

Handling files

The NDFS-II has been designed to support the development of multimodal applications and pervasive environments using continuous streams of data where client nodes can join or leave the application at any time. There is therefore no concept of beginning or end of streams.

So the typical behavior of the NDFS-II is unsuited for file processing because there is no guarantee that a consumer will receive the first buffers of a flow containing a file if the consumer is launched slightly later than the consumer.

The NDFS-II also provides capabilities to parallelize or distribute file processing. For file processing, specific methods should be used. Typically file processing starts with reading a file and making it available for processing in a flow to consumer client node(s). This processing often consists of successive operations on the data. It is then possible to represent this process as a pipeline where the first node reads the file, the last one saves the result, and the node(s) in between process the data. In order to make sure that no data block from the file is lost, a specific method needs to be used to create output flows.

In this case the flow is created as follows:

myflow = (Flow_Video_Mpeg2TS*) sf->makeSynchronizedOutputFlow("Flow_Video_Mpeg2TS",
   "video_reader", 2);

This will ensure that the data transfer won't start until there are 2 consumer connected as specified with the third parameter of the method.

Reading a file is most of the time faster than sending it over the network. So it is usual to have the reading process done but not the data transfer. As a result, the internal queue of the flow may not be empty but from the user's point of view, the job is done. In order to make sure that the queue is empty before stopping the flow, the stop method should be called with an optional boolean parameter set to true.

flow->stop(true);

Otherwise the operation to stop the flow preempts sending the data and therefore empties the queue before each data block has been sent.

The Buffer class

Once you have successfully created a flow, you can get a Buffer from your flow object using the method Flow::getBuffer(). You should never create or destroy a Buffer; instead, the Flow object takes care of that for you. The same method is used to get a Buffer for input or output flow:

  • for an output flow, this method returns an empty Buffer ready to be filled with data.
  • for an input flow, this method returns a Buffer filled with data. If there is no provider, the method is blocking. It only returns when a Buffer is available. This behavior can be annoying in some applications, which try to get a Buffer but don't want to be blocked if no data is available.

Be aware that the Buffer does not handle endianness concerning your data. It can be done by some specific Flows for specific kind of data.

You can test whether a Buffer is available by calling the Flow::isBufferAvailable() method. Depending on the result of this method, you can decide whether to ask for a Buffer or not. See the Avoiding blocking if no data is available section fore more details.

In order to use the Buffer object, you must include the following header in your code:

#include "Buffer.h"

Once you get you your Buffer, there are several ways to fill it with data or reads its content. The way you read your Buffer obtained from an input flow depends on how the Buffer has originally been filled.

The memcpy approach

This approach is the simplest one. It is really convenient to use if you want to transfer one chunk of data. Consumer and producer client nodes using this approach are provided to illustrate this concept.

They are located in the src/clients/example_provider_memcpy and src/clients/example_consumer_memcpy folders.

How to fill the Buffer

All you need to do is to copy your data in the writing area of the Buffer. You can access this memory area using the method Buffer::data(). It returns a pointer on the beginning of the writing area of the Buffer. Using the memcpy() method is essentially all you need to fill out the Buffer:

ACE_OS::memcpy(myBuffer->data(), &myData, sizeOfMyData);

Here &myData is the address of your data and sizeOfMyData its size. We recommend that you use the ACE::memcpy() method for portability issues. After copying your data in the Buffer, you need to specify how much data has been copied because the Buffer object has no way to determine how much data has been has been written. You can do this as follows:

myBuffer->setSize(sizeOfYourData);

The Buffer is now ready to be sent to any client node that subscribed to the flow. You send the Buffer by releasing it. Releasing a Buffer tells the system that you have finished working on it. The systems therefore can send it to the consumers that subscribed to the flow. If there is no consumer for the flow, the Buffer is just dropped.

flow->releaseBuffer(myBuffer);

This call frees the memory allocated to the Buffer so you don't have to handle it yourself.

How to read the Buffer

On the consumer side, here is how you read a Buffer filled using the memcpy approach.

Before copying the data from the NDFS Buffer object to your local buffer, you need to allocate some memory. A specific method is provided to let you know how much space you need.

myBuffer = flow->getBuffer();
bufferSize = myBuffer->getSize();

At this point, you know the size you need to copy your data from the buffer to your object.

ACE_OS::memcpy(&myData, myBuffer->data() , bufferSize);

In the example above, &myData is the address of the object where you want to copy your data, myBuffer->data() is the address of the writing area of the Buffer and bufferSize the size of the data contained in the buffer.

Once you have copied your data, the buffer is no longer needed so you can release it.

flow->releaseBuffer(myBuffer);

The memcpy approach is the lowest level approach to access the data blocks. In that case, the data you receive are exactly the same you sent in the byte order. If your sender and receiver nodes are running on different kinds of operating systems or architectures, you will need to handle 32bits vs. 64bits or endianness issues yourself.

The pushData/popData approach

This approach uses methods from the NDFS II API to fill the Buffer with data. It is inspired by the push/pop methods of the C++ STL containers. The NDFS-II Buffer can be seen as a generic array, which can be filled with data blocks using our API methods. A Buffer is then an array of data chunks, which can be pushed or popped. The buffer is not tied to a specific data type, meaning that you can successively add a combination of different kinds of data and metadata. Client nodes are provided to highlight these approach and are located in the src/clients/example_provider_pushdata and src/clients/example_consumer_popdata folders.

How to fill the Buffer

As in the memcpy approach, you first need to get a buffer:

myBuffer = flow->getBuffer();

Before filling it, you have to tell the Buffer object you will use the pushData approach to fill the Buffer. In order to do that, you must call:

myBuffer->initBuf();

Then you can fill it as follows:

myBuffer->pushData(&dataToSend, myDataSize);

You can push as much data as you want until the Buffer is full. The pushData() method returns a boolean value letting you know if the data has been properly copied.

When you have filled the buffer, you just need to release it in order to send it.

flow->releaseBuffer(myBuffer);

How to read the Buffer

Symmetrically, as when you fill up a buffer, you need first to get a buffer from a flow.

myBuffer = flow->getBuffer();

Then, after allocating some space to store your data, you need to pop the data from the buffer.

myBuffer->popData(&myData, myDataSize);

After popping the data, the NDFS buffer is no longer required, so you can just release it.

flow->releaseBuffer(myBuffer);

Data Vs. Metadata

Buffers can be filled with a combination of data and metadata in any order. The only constraint is the size of the Buffer.

Each time you send a Buffer, or you push a data chunk in a buffer, a timestamp can automatically be added. The timestamp used is the one provided by the operating system. You can however add a timestamp coming from a device or one you generate yourself. Here are the different methods available to add data or metadata in a Buffer.

bool pushData(void *src, buffer_size_t size);

bool popData(void *src,buffer_size_t size);

The first of the two methods above is used to push data chunks in a Buffer. src is the address of your data and size its size. These methods work together.

bool pushDataTS(void *src, buffer_size_t size, ACE_Time_Value *ts = 0);

bool popDataTS(void *src,buffer_size_t size, ACE_Time_Value *ts );

These two methods are the same as the pushData() and popData() methods but they give the ability to provide and retrieve a timestamp associated with the data block. The third parameter ts of the pushDataTS() is optional. If you decide to provide your own timestamp, which has to be an ACE_Time_Value timestamp, it will be transmitted along with the data chunk. If you don't, the timestamp associated with the data chunk is generated by the operating system. When using pushDataTS() on the provider side, you should use popDataTS() on the consumer side.

bool pushMetadata(void *src, buffer_size_t size, data_type_t type = 0);

bool popMetadata(void *src,buffer_size_t size );

These methods are used to push and pop metadata in the Buffer. src is the address of your metadata. It can be of any kind with any size as long as it could fit in the Buffer. Conceptually, data or metadata are binary data. So metadata can be seen as data associated with a flag specifying that it is actually metadata. The optional type parameter specifies the metadata type. If you don't provide the third parameter, the default metadata type will be used. You may need to handle several metadata types so the data flow system allows you to create and provide your own type of metadata in order to make a difference between your metadata kinds. More information and examples about custom metadata can be found in the Creating your own metadata type section.

All these methods return a boolean indicating if the data or metadata has been properly copied in the Buffer. Using these methods allows to add in a buffer for example a video frame followed by two metadata providing the name of the codec used or some relevant information associated to it. Two examples including a producer and a consumer are provided and located in the src/clients/example_provider_push_data and src/clients/example_consumer_popdata folders.

Combining an iterator with data and metadata

The NDFS-II API provides a Buffer iterator to ease the data retrieval from the Buffer object, which has been filled with multiple pushData() calls. Here is sample code to show how to instantiate an iterator and associate it with a Buffer:

BufferIterator *iterator = new BufferIterator();
myBuffer = flow->getBuffer();

//We got the Buffer from the flow and we now are associating it to the iterator
iterator->setBuffer(myBuffer);
iterator->init();

while ( !iterator->end() ) {
switch(iterator->getDataType()) {
case DATA:
   //We received Data
   dataSize = iterator->getDataSize();
   //memory allocation of myData using the proper type and the size
   //'dataSize' needs to be done before getting the data from the Buffer
   .....
   iterator->getData(myData);
   break;
case METADATA:
  //We received Metadata
  metaDataSize = iterator->getDataSize();
  //memory allocation of myMetaData using the proper type and the size
  //'metaDataSize' needs to be done before getting the metadata from the
  //Buffer
  .....
  iterator->getData(myMetaData);
  break;
case DATAWITHTIMESTAMP:
  //We received Data with a timestamp
  dataSize = iterator->getDataSize();
  //memory allocation of myData using the proper type and the size
  //'dataSize' needs to be done before getting the data from the Buffer
  .....
  iterator->getData(myData);
  ACE_TIME_VALUE ts = iterator->getTimestamp();
  break;
default:
  break;
}

In this code, we get a buffer from the flow, create an iterator and associate it with the buffer. The iterator can be reused later with a new buffer by invoking the setBuffer() and init() method again. Then we iterate on our buffer using the iterator as long as we have data in the buffer. The getDataType() method returns the data type of the current data chunk we are presently iterating on. Using that piece of information, we can get the data using the method getData() of the iterator. If a timestamp is associated with the data block, we can get it using the getTimestamp() method. If no timestamp has been associated with the data chunk and the method is invoked anyway, the timestamp returned has its fields initialized with the 0 value. Examples to highlight this approach are provided and can be found in the src/clients/example_provider_multipush and src/clients/example_consumer_iterator folders.

Creating your own metadata type

The METADATA type is a generic type provided by the system. Many multimodal applications may require the use of several metadata types. So users can declare their own types of metadata. The following is an example of how to declare this custom type:

#define NEWMETADATA 10
#define OTHERMETADATA 11

A metadata type only needs representing a metadata type ID. This declaration must be in both consumer and provider client nodes or could be declared in a custom flow and is done using an integer. The integer associated with the type must be superior to 10. Values under 10 are reserved for internal use.

On the provider part, putting a custom metadata in the buffer will look like:

mybuffer->pushMetadata(&theMetaData, metadataSize, NEWMETADATA);

On the consumer part, the following code needs to be included in the while loop shown in the Combining an iterator with data and metadata section:

case NEWMETADATA:
//We received NEWMETADATA
metaDataSize = iterator->getDataSize();
//the memory allocation of myMetaData using the proper type and the
//size 'metaDataSize' needs to be done before getting the metadata from
//the Buffer
....
iterator->getData(myMetaData);
break;

Using custom metadata can be very useful especially when a consumer does not exactly know what it will get from a provider. One can for example push different kinds of data in a buffer. If metadata information is pushed in the Buffer before pushing any chunk of data, it is then possible to know what kind of data is coming and act accordingly. See the example_consumer_custom_metadata and example_provider_custom_metadata located in the src/clients folder for a full example.

Created March 2, 2016, Updated June 2, 2021