Nymph  v1.5.2
Flow-Based Data Processing Framework
KTDataQueueProcessor.hh
Go to the documentation of this file.
1 /*
2  * KTDataQueueProcessorTemplate.hh
3  *
4  * Created on: Jan 10, 2013
5  * Author: nsoblath
6  */
7 
8 #ifndef KTDATAQUEUEPROCESSOR_HH_
9 #define KTDATAQUEUEPROCESSOR_HH_
10 
11 #include "KTPrimaryProcessor.hh"
12 
13 #include "KTConcurrentQueue.hh"
14 #include "KTData.hh"
15 #include "KTLogger.hh"
16 #include "KTSlot.hh"
17 
18 namespace Nymph
19 {
20  KTLOGGER(eqplog, "KTDataQueueProcessor");
21 
22  //***********************************
23  // Data Queue Processor Template
24  //***********************************
25 
45  template< class XProcessorType >
47  {
48  public:
49  struct DataAndFunc
50  {
52  void (XProcessorType::*fFuncPtr)(KTDataPtr);
53  };
54 
56 
57  typedef bool (KTConcurrentQueue< DataAndFunc >::*QueuePoppingFunc)(DataAndFunc&);
58 
59  enum Status
60  {
64  };
65 
66  public:
67  KTDataQueueProcessorTemplate(const std::string& name = "default-data-queue-proc-template-name");
69 
70  bool Configure(const scarab::param_node& node);
71  virtual bool ConfigureSubClass(const scarab::param_node& node) = 0;
72 
73  Status GetStatus() const;
75 
76  protected:
78 
79  //**************************************
80  // Derived Processor function pointer
81  //**************************************
82  public:
83  void SetFuncPtr(void (XProcessorType::*ptr)(KTDataPtr));
84 
85  protected:
86  void (XProcessorType::*fFuncPtr)(KTDataPtr);
87 
88 
89  //*********
90  // Queue
91  //*********
92  public:
94  bool Run();
95 
97  void Stop();
98 
100  bool ProcessQueue();
101 
102  void ClearQueue();
103 
104  protected:
105  Queue fQueue;
107 
108  //*********
109  // Queueing functions for slots
110  //*********
111  protected:
114  void DoQueueData(KTDataPtr& data, void (XProcessorType::*func)(KTDataPtr));
115 
118  //void DoQueueDataList(std::list< KTDataPtr& >* dataList, void (XProcessorType::*fFuncPtr)(KTDataPtr));
119 
120  //*********
121  // Slots
122  //*********
123  public:
124  void SwitchToTimedPop();
125  void SwitchToUntimedPop();
126  void SwitchToSinglePop();
127 
128  //*********
129  // Signals
130  //*********
131  protected:
133 
134  };
135 
136 
137  //**************************
138  // Data Queue Processor
139  //**************************
140 
160  class KTDataQueueProcessor : public KTDataQueueProcessorTemplate< KTDataQueueProcessor >
161  {
162  public:
163  KTDataQueueProcessor(const std::string& name = "data-queue");
164  virtual ~KTDataQueueProcessor();
165 
166  bool ConfigureSubClass(const scarab::param_node& node);
167 
168  public:
169  void EmitDataSignal(KTDataPtr data);
170 
171  //***************
172  // Signals
173  //***************
174 
175  private:
177 
178  //*********
179  // Slots
180  //*********
181  public:
184  void QueueData(KTDataPtr& data);
185 
188  //void QueueDataList(std::list< KTDataPtr >* dataList);
189 
190  };
191 
192 
193  //**************************************************
194  // Data Queue Processor Template Implementation
195  //**************************************************
196 
197 
198  template< class XProcessorType >
200  KTPrimaryProcessor(name),
201  fStatus(kStopped),
202  fFuncPtr(NULL),
203  fQueue(),
204  fPopFromQueue(&KTConcurrentQueue< DataAndFunc >::wait_and_pop),
205  fQueueDoneSignal("queue-done", this)
206  {
207  RegisterSlot("use-timed-pop", this, &KTDataQueueProcessorTemplate< XProcessorType >::SwitchToTimedPop);
208  RegisterSlot("use-untimed-pop", this, &KTDataQueueProcessorTemplate< XProcessorType >::SwitchToUntimedPop);
209  RegisterSlot("use-single-pop", this, &KTDataQueueProcessorTemplate< XProcessorType >::SwitchToSinglePop);
210  }
211 
212  template< class XProcessorType >
214  {
215  ClearQueue();
216  }
217 
218  template< class XProcessorType >
220  {
221  fQueue.set_timeout(node.get_value< unsigned >("timeout", fQueue.get_timeout()));
222 
223  if (! ConfigureSubClass(node)) return false;
224  return true;
225  }
226 
227  template< class XProcessorType >
229  {
230  fStatus = kRunning;
231  KTINFO(eqplog, "Queue started");
232  return ProcessQueue();
233  }
234 
235  template< class XProcessorType >
237  {
238  fStatus = kStopped;
239  fQueue.interrupt();
240  KTINFO(eqplog, "Queue stopped");
241  return;
242  }
243 
244  template< class XProcessorType >
246  {
247  fFuncPtr = ptr;
248  return;
249  }
250 
251 
252  template< class XProcessorType >
254  {
255  KTINFO(eqplog, "Beginning to process queue");
256  while (fStatus != kStopped)
257  {
258  KTDEBUG(eqplog, "processing . . .");
259  DataAndFunc daf;
260  if ((fQueue.*fPopFromQueue)(daf))
261  {
262  KTDEBUG(eqplog, "Data acquired for processing");
263  (static_cast<XProcessorType*>(this)->*(daf.fFuncPtr))(daf.fData);
264  if (daf.fData->GetLastData()) fStatus = kStopped;
265  }
266  else
267  {
268  if (fStatus == kContinue)
269  {
270  fStatus = kRunning;
271  }
272  else
273  {
274  fStatus = kStopped;
275  }
276  }
277  }
278  fQueueDoneSignal();
279  KTINFO(eqplog, "Queue processing has ended");
280  return true;
281  }
282 
283  template< class XProcessorType >
285  {
286  while (! fQueue.empty())
287  {
288  DataAndFunc daf;
289  fQueue.try_pop(daf);
290  }
291  KTINFO(eqplog, "Queue cleared");
292  return;
293  }
294 
295 
296  template< class XProcessorType >
298  {
299  KTDEBUG(eqplog, "Queueing data");
300  DataAndFunc daf;
301  daf.fData = data; // I'd like to use move semantics here (operator=(shared_ptr&&)), but they didn't work, so I bootstrapped with copy and reset.
302  data.reset();
303  daf.fFuncPtr = func;
304  fQueue.push(daf);
305  return;
306  }
307 /*
308  template< class XProcessorType >
309  void KTDataQueueProcessorTemplate< XProcessorType >::DoQueueDataList(std::list< KTDataPtr& >* dataList, void (XProcessorType::*func)(KTDataPtr))
310  {
311  typedef std::list< KTDataPtr > DataList;
312 
313  KTDEBUG(eqplog, "Queueing data objects");
314  DataAndFunc daf;
315  while (! dataList->empty())
316  {
317  daf.fData = &(dataList->front()); // using move semantics
318  daf.fFuncPtr = func;
319  dataList->pop_front();
320  fQueue.push(daf);
321  }
322  delete dataList;
323  return;
324  }
325 */
326 
327  template< class XProcessorType >
329  {
331  return;
332 
333  KTDEBUG(eqplog, "Switching to timed pop function");
335  if (fStatus == kRunning)
336  {
337  fStatus = kContinue;
338  KTINFO(eqplog, "Pop function changed; interrupting queue");
339  fQueue.interrupt();
340  }
341  return;
342  }
343 
344  template< class XProcessorType >
346  {
348  return;
349 
350  KTDEBUG(eqplog, "Switching to untimed pop function");
351  this->fPopFromQueue = &KTConcurrentQueue< DataAndFunc >::wait_and_pop;
352  if (fStatus == kRunning)
353  {
354  fStatus = kContinue;
355  KTINFO(eqplog, "Pop function changed; interrupting queue");
356  fQueue.interrupt();
357  }
358  return;
359  }
360 
361  template< class XProcessorType >
363  {
364  if (fPopFromQueue == &KTConcurrentQueue< DataAndFunc >::try_pop)
365  return;
366 
367  KTDEBUG(eqplog, "Switching to single-pop function");
368  this->fPopFromQueue = &KTConcurrentQueue< DataAndFunc >::try_pop;
369  if (fStatus == kRunning)
370  {
371  fStatus = kContinue;
372  KTINFO(eqplog, "Pop function changed; interrupting queue");
373  fQueue.interrupt();
374  }
375  return;
376  }
377 
378 } /* namespace Nymph */
379 #endif /* KTDATAQUEUEPROCESSOR_HH_ */
bool Run()
Begins processing of queue (switches status from kStopped to kRunning)
bool try_pop(XDataType &popped_value)
#define KTINFO(...)
Definition: KTLogger.hh:344
KTConcurrentQueue< DataAndFunc > Queue
void push(XDataType const &data)
virtual bool ConfigureSubClass(const scarab::param_node &node)=0
bool(KTConcurrentQueue< DataAndFunc >::* QueuePoppingFunc)(DataAndFunc &)
bool Configure(const scarab::param_node &node)
Should perform parameter store and command-line configurations.
Creates a signal that takes a KTDataPtr object as its argument.
Definition: KTSignal.hh:119
void SetFuncPtr(void(XProcessorType::*ptr)(KTDataPtr))
Generic data queue for asynchronous processing.
#define KTDEBUG(...)
Definition: KTLogger.hh:343
KTDataQueueProcessorTemplate(const std::string &name="default-data-queue-proc-template-name")
KTLOGGER(applog, "KTApplication")
bool ProcessQueue()
Begins processing of queue if status is already kRunning; otherwise does nothing. ...
void Stop()
Stops processing of queue (switches status to kStopped)
boost::shared_ptr< KTData > KTDataPtr
Definition: KTData.hh:67
Template class for creating data queueing processors.
Contains the logger class and macros, based on Kasper&#39;s KLogger class.
void set_timeout(unsigned duration)
void DoQueueData(KTDataPtr &data, void(XProcessorType::*func)(KTDataPtr))
void SetStatus(KTDataQueueProcessorTemplate< XProcessorType >::Status)