8 #ifndef KTDATAQUEUEPROCESSOR_HH_ 9 #define KTDATAQUEUEPROCESSOR_HH_ 20 KTLOGGER(eqplog,
"KTDataQueueProcessor");
45 template<
class XProcessorType >
70 bool Configure(
const scarab::param_node& node);
198 template<
class XProcessorType >
212 template<
class XProcessorType >
218 template<
class XProcessorType >
223 if (! ConfigureSubClass(node))
return false;
227 template<
class XProcessorType >
231 KTINFO(eqplog,
"Queue started");
232 return ProcessQueue();
235 template<
class XProcessorType >
240 KTINFO(eqplog,
"Queue stopped");
244 template<
class XProcessorType >
252 template<
class XProcessorType >
255 KTINFO(eqplog,
"Beginning to process queue");
256 while (fStatus != kStopped)
258 KTDEBUG(eqplog,
"processing . . .");
260 if ((fQueue.*fPopFromQueue)(daf))
262 KTDEBUG(eqplog,
"Data acquired for processing");
263 (
static_cast<XProcessorType*
>(
this)->*(daf.fFuncPtr))(daf.fData);
264 if (daf.fData->GetLastData()) fStatus = kStopped;
268 if (fStatus == kContinue)
279 KTINFO(eqplog,
"Queue processing has ended");
283 template<
class XProcessorType >
286 while (! fQueue.
empty())
291 KTINFO(eqplog,
"Queue cleared");
296 template<
class XProcessorType >
299 KTDEBUG(eqplog,
"Queueing data");
327 template<
class XProcessorType >
333 KTDEBUG(eqplog,
"Switching to timed pop function");
335 if (fStatus == kRunning)
338 KTINFO(eqplog,
"Pop function changed; interrupting queue");
344 template<
class XProcessorType >
350 KTDEBUG(eqplog,
"Switching to untimed pop function");
352 if (fStatus == kRunning)
355 KTINFO(eqplog,
"Pop function changed; interrupting queue");
361 template<
class XProcessorType >
367 KTDEBUG(eqplog,
"Switching to single-pop function");
369 if (fStatus == kRunning)
372 KTINFO(eqplog,
"Pop function changed; interrupting queue");
bool Run()
Begins processing of queue (switches status from kStopped to kRunning)
KTSignalOneArg< void > fQueueDoneSignal
bool try_pop(XDataType &popped_value)
KTConcurrentQueue< DataAndFunc > Queue
void push(XDataType const &data)
virtual bool ConfigureSubClass(const scarab::param_node &node)=0
void SwitchToUntimedPop()
bool(KTConcurrentQueue< DataAndFunc >::* QueuePoppingFunc)(DataAndFunc &)
bool Configure(const scarab::param_node &node)
Should perform parameter store and command-line configurations.
Creates a signal that takes a KTDataPtr object as its argument.
void SetFuncPtr(void(XProcessorType::*ptr)(KTDataPtr))
Generic data queue for asynchronous processing.
KTDataQueueProcessorTemplate(const std::string &name="default-data-queue-proc-template-name")
KTLOGGER(applog, "KTApplication")
QueuePoppingFunc fPopFromQueue
virtual ~KTDataQueueProcessorTemplate()
bool ProcessQueue()
Begins processing of queue if status is already kRunning; otherwise does nothing. ...
void Stop()
Stops processing of queue (switches status to kStopped)
boost::shared_ptr< KTData > KTDataPtr
Template class for creating data queueing processors.
unsigned get_timeout() const
Contains the logger class and macros, based on Kasper's KLogger class.
void set_timeout(unsigned duration)
void(XProcessorType::* fFuncPtr)(KTDataPtr)
void DoQueueData(KTDataPtr &data, void(XProcessorType::*func)(KTDataPtr))
void SetStatus(KTDataQueueProcessorTemplate< XProcessorType >::Status)