Nymph  v1.5.2
Flow-Based Data Processing Framework
KTProcessorToolbox.cc
Go to the documentation of this file.
1 /*
2  * KTProcessorToolbox.cc
3  *
4  * Created on: Sep 27, 2012
5  * Author: nsoblath
6  */
7 
8 #include "KTProcessorToolbox.hh"
9 
10 #include "KTLogger.hh"
11 #include "KTPrimaryProcessor.hh"
12 
13 #include "factory.hh"
14 #include "param_codec.hh"
15 
16 #include <vector>
17 
18 #ifndef SINGLETHREADED
19 #include <boost/thread.hpp>
20 #endif
21 
22 using std::deque;
23 using std::set;
24 using std::string;
25 using std::vector;
26 
27 namespace Nymph
28 {
29  KTLOGGER(proclog, "KTProcessorToolbox");
30 
31  KTProcessorToolbox::KTProcessorToolbox(const std::string& name) :
32  KTConfigurable(name),
33  fRunQueue(),
34  fProcMap()
35  {
36  }
37 
39  {
41  }
42 
43  bool KTProcessorToolbox::Configure(const scarab::param_node& node)
44  {
45  auto tProcFactory = scarab::factory< KTProcessor, const std::string& >::get_instance();
46 
47  KTPROG(proclog, "Configuring . . .");
48  // Deal with "processor" blocks first
49  if (! node.has("processors"))
50  {
51  KTWARN(proclog, "No processors were specified");
52  }
53  else
54  {
55  const scarab::param_array& procArray = node["processors"].as_array();
56  for( scarab::param_array::const_iterator procIt = procArray.begin(); procIt != procArray.end(); ++procIt )
57  {
58  if( ! procIt->is_node() )
59  {
60  KTERROR( proclog, "Invalid processor entry: not a node" );
61  return false;
62  }
63  const scarab::param_node& procNode = procIt->as_node();
64 
65  if (! procNode.has("type"))
66  {
67  KTERROR(proclog, "Unable to create processor: no processor type given");
68  return false;
69  }
70  string procType = procNode["type"]().as_string();
71 
72  string procName;
73  if (! procNode.has("name"))
74  {
75  KTINFO(proclog, "No name given for processor of type <" << procType << ">; using type as name.");
76  procName = procType;
77  }
78  else
79  {
80  procName = procNode["name"]().as_string();
81  }
82  KTProcessor* newProc = tProcFactory->create(procType, procType);
83  if (newProc == NULL)
84  {
85  KTERROR(proclog, "Unable to create processor of type <" << procType << ">");
86  return false;
87  }
88 
89  if (! AddProcessor(procName, newProc))
90  {
91  KTERROR(proclog, "Unable to add processor <" << procName << ">");
92  delete newProc;
93  return false;
94  }
95  }
96  }
97 
98 
99  // Then deal with connections"
100  if (! node.has("connections"))
101  {
102  KTWARN(proclog, "No connections were specified");
103  }
104  else
105  {
106  const scarab::param_array& connArray = node["connections"].as_array();
107  for( scarab::param_array::const_iterator connIt = connArray.begin(); connIt != connArray.end(); ++connIt )
108  {
109  if( ! connIt->is_node() )
110  {
111  KTERROR( proclog, "Invalid connection entry: not a node" );
112  return false;
113  }
114  const scarab::param_node& connNode = connIt->as_node();
115 
116  if ( ! connNode.has("signal") || ! connNode.has("slot") )
117  {
118  KTERROR(proclog, "Signal/Slot connection information is incomplete!");
119  if (connNode.has("signal"))
120  {
121  KTWARN(proclog, "signal = " << connNode["signal"]());
122  }
123  else
124  {
125  KTERROR(proclog, "signal = MISSING");
126  }
127 
128  if (connNode.has("slot"))
129  {
130  KTWARN(proclog, "slot = " << connNode["slot"]());
131  }
132  else
133  {
134  KTERROR(proclog, "slot = MISSING");
135  }
136  return false;
137  }
138 
139  bool connReturn = false;
140  if (connNode.has("order"))
141  {
142  connReturn = MakeConnection(connNode["signal"]().as_string(), connNode["slot"]().as_string(), connNode["order"]().as_int());
143  }
144  else
145  {
146  connReturn = MakeConnection(connNode["signal"]().as_string(), connNode["slot"]().as_string());
147  }
148  if (! connReturn)
149  {
150  KTERROR(proclog, "Unable to make connection <" << connNode["signal"]().as_string() << "> --> <" << connNode["slot"]().as_string() << ">");
151  return false;
152  }
153 
154  KTINFO(proclog, "Signal <" << connNode["signal"]().as_string() << "> connected to slot <" << connNode["slot"]().as_string() << ">");
155  }
156  }
157 
158 
159  // Finally, deal with processor-run specifications
160  // The run queue is an array of processor names, or groups of names, which will be run sequentially.
161  // If names are grouped (in another array), those in that group will be run in parallel.
162  // In single threaded mode all threads will be run sequentially in the order they were specified.
163  if (! node.has("run-queue") || ! node["run-queue"].is_array())
164  {
165  KTWARN(proclog, "Run queue was not specified");
166  }
167  else
168  {
169  const scarab::param_array& rqArray = node["run-queue"].as_array();
170  for (scarab::param_array::const_iterator rqIt = rqArray.begin(); rqIt != rqArray.end(); ++rqIt)
171  {
172  if (rqIt->is_value())
173  {
174  if (! PushBackToRunQueue((*rqIt)().as_string()))
175  {
176  KTERROR(proclog, "Unable to process run-queue entry: could not add processor to the queue");
177  return false;
178  }
179  }
180  else if (rqIt->is_array())
181  {
182  const scarab::param_array& rqNode = rqIt->as_array();
183  std::vector< std::string > names;
184 
185  for (scarab::param_array::const_iterator rqArrayIt = rqNode.begin(); rqArrayIt != rqNode.end(); ++rqArrayIt)
186  {
187  if (! rqArrayIt->is_value())
188  {
189  KTERROR(proclog, "Invalid run-queue array entry: not a value");
190  return false;
191  }
192  names.push_back((*rqArrayIt)().as_string());
193  }
194 
195  if (! PushBackToRunQueue(names))
196  {
197  KTERROR(proclog, "Unable to process run-queue entry: could not add list of processors to the queue");
198  return false;
199  }
200  }
201  else
202  {
203  KTERROR(proclog, "Invalid run-queue entry: not a value or array");
204  return false;
205  }
206  }
207  }
208 
209  return true;
210  }
211 
212  bool KTProcessorToolbox::ConfigureProcessors(const scarab::param_node& node)
213  {
214  for (ProcMapIt iter = fProcMap.begin(); iter != fProcMap.end(); iter++)
215  {
216  KTDEBUG(proclog, "Attempting to configure processor <" << iter->first << ">");
217  string procName = iter->first;
218  string nameUsed;
219  if (node.has(procName))
220  {
221  nameUsed = procName;
222  }
223  else
224  {
225  string configName = iter->second.fProc->GetConfigName();
226  KTWARN(proclog, "Did not find a parameter node <" << procName << ">\n"
227  "\tWill check using the generic name of the processor, <" << configName << ">.");
228  if (node.has(configName))
229  {
230  nameUsed = configName;
231  KTWARN(proclog, "Configuring processor <" << procName << "> with configuration found using the generic name of the processor: <" << configName << ">");
232  }
233  else
234  {
235  KTWARN(proclog, "Did not find configuration information for processor <" << procName << "> (type <" << configName << ">)\n"
236  "\tProcessor <" << procName << "> was not configured.");
237  continue;
238  }
239  }
240 
241  const scarab::param_node& subNode = node[nameUsed].as_node();
242 
243  if (! iter->second.fProc->Configure(subNode))
244  {
245  KTERROR(proclog, "An error occurred while configuring processor <" << iter->first << "> with parameter node <" << nameUsed << ">");
246  return false;
247  }
248  }
249  return true;
250  }
251 
252  bool KTProcessorToolbox::ConfigureProcessors(const std::string& config)
253  {
254  scarab::param_translator translator;
255  scarab::param_node optNode;
256  optNode.add( "encoding", new scarab::param_value( "json" ) );
257  return ConfigureProcessors( translator.read_string( config, optNode )->as_node() );
258  }
259 
261  {
262  KTPROG(proclog, "Beginning processing . . .");
263 #ifndef SINGLETHREADED
264  unsigned iGroup = 0;
265 #endif
266  for (RunQueue::const_iterator rqIter = fRunQueue.begin(); rqIter != fRunQueue.end(); ++rqIter)
267  {
268 #ifdef SINGLETHREADED
269  for (ThreadGroup::const_iterator tgIter = rqIter->begin(); tgIter != rqIter->end(); ++tgIter)
270  {
271  if (! tgIter->fProc->Run())
272  {
273  return false;
274  }
275  }
276 #else
277  KTDEBUG(proclog, "Starting thread group " << iGroup);
278  boost::thread_group parallelThreads;
279  unsigned iThread = 0;
280  for (ThreadGroup::const_iterator tgIter = rqIter->begin(); tgIter != rqIter->end(); ++tgIter)
281  {
282  // create a boost::thread object to launch the thread
283  // use boost::ref to avoid copying the processor
284  KTDEBUG(proclog, "Starting thread " << iThread << ": " << tgIter->fName);
285  parallelThreads.create_thread(boost::ref(*(tgIter->fProc)));
286  //parallelThreads.create_thread(boost::ref(**tgIter));
287  iThread++;
288  }
289  // wait for execution to complete
290  parallelThreads.join_all();
291  iGroup++;
292 #endif
293  }
294  KTPROG(proclog, ". . . processing complete.");
295  return true;
296  }
297 
298 
299  KTProcessor* KTProcessorToolbox::GetProcessor(const std::string& procName)
300  {
301  ProcMapIt it = fProcMap.find(procName);
302  if (it == fProcMap.end())
303  {
304  KTWARN(proclog, "Processor <" << procName << "> was not found.");
305  return NULL;
306  }
307  return it->second.fProc;
308  }
309 
310  const KTProcessor* KTProcessorToolbox::GetProcessor(const std::string& procName) const
311  {
312  ProcMapCIt it = fProcMap.find(procName);
313  if (it == fProcMap.end())
314  {
315  KTWARN(proclog, "Processor <" << procName << "> was not found.");
316  return NULL;
317  }
318  return it->second.fProc;
319  }
320 
321  bool KTProcessorToolbox::AddProcessor(const std::string& procName, KTProcessor* proc)
322  {
323  ProcMapIt it = fProcMap.find(procName);
324  if (it == fProcMap.end())
325  {
326  ProcessorInfo pInfo;
327  pInfo.fProc = proc;
328  fProcMap.insert(ProcMapValue(procName, pInfo));
329  KTDEBUG(proclog, "Added processor <" << procName << "> (a.k.a. " << proc->GetConfigName() << ")");
330  return true;
331  }
332  KTWARN(proclog, "Processor <" << procName << "> already exists; new processor was not added.");
333  return false;
334  }
335 
336  bool KTProcessorToolbox::AddProcessor(const std::string& procType, const std::string& procName)
337  {
338  auto tProcFactory = scarab::factory< KTProcessor, const std::string& >::get_instance();
339 
340  ProcMapIt it = fProcMap.find(procName);
341  if (it == fProcMap.end())
342  {
343  KTProcessor* newProc = tProcFactory->create(procType, procType);
344  if (newProc == NULL)
345  {
346  KTERROR(proclog, "Unable to create processor of type <" << procType << ">");
347  return false;
348  }
349  if (! AddProcessor(procName, newProc))
350  {
351  KTERROR(proclog, "Unable to add processor <" << procName << ">");
352  delete newProc;
353  return false;
354  }
355  return true;
356  }
357  KTWARN(proclog, "Processor <" << procName << "> already exists; new processor was not added.");
358  return false;
359  }
360 
361  bool KTProcessorToolbox::RemoveProcessor(const std::string& procName)
362  {
363  KTProcessor* procToRemove = ReleaseProcessor(procName);
364  if (procToRemove == NULL)
365  {
366  return false;
367  }
368  delete procToRemove;
369  KTDEBUG(proclog, "Processor <" << procName << "> deleted.");
370  return true;
371  }
372 
373  KTProcessor* KTProcessorToolbox::ReleaseProcessor(const std::string& procName)
374  {
375  ProcMapIt it = fProcMap.find(procName);
376  if (it == fProcMap.end())
377  {
378  KTWARN(proclog, "Processor <" << procName << "> was not found.");
379  return NULL;
380  }
381  KTProcessor* procToRelease = it->second.fProc;
382  fProcMap.erase(it);
383  return procToRelease;
384  }
385 
387  {
388  for (ProcMapIt it = fProcMap.begin(); it != fProcMap.end(); it++)
389  {
390  delete it->second.fProc;
391  }
392  fProcMap.clear();
393  fRunQueue.clear();
394  return;
395  }
396 
397 
398  bool KTProcessorToolbox::MakeConnection(const std::string& signal, const std::string& slot, int order)
399  {
400  string signalProcName, signalName;
401  if (! ParseSignalSlotName(signal, signalProcName, signalName))
402  {
403  KTERROR(proclog, "Unable to parse signal name: <" << signal << ">");
404  return false;
405  }
406 
407  string slotProcName, slotName;
408  if (! ParseSignalSlotName(slot, slotProcName, slotName))
409  {
410  KTERROR(proclog, "Unable to parse slot name: <" << slot << ">");
411  return false;
412  }
413 
414  return MakeConnection(signalProcName, signalName, slotProcName, slotName, order);
415  }
416 
417  bool KTProcessorToolbox::MakeConnection(const std::string& signalProcName, const std::string& signalName, const std::string& slotProcName, const std::string& slotName, int order)
418  {
419  KTProcessor* signalProc = GetProcessor(signalProcName);
420  if (signalProc == NULL)
421  {
422  KTERROR(proclog, "Processor named <" << signalProcName << "> was not found!");
423  return false;
424  }
425 
426  KTProcessor* slotProc = GetProcessor(slotProcName);
427  if (slotProc == NULL)
428  {
429  KTERROR(proclog, "Processor named <" << slotProcName << "> was not found!");
430  return false;
431  }
432 
433  try
434  {
435  if (order != std::numeric_limits< int >::min())
436  {
437  signalProc->ConnectASlot(signalName, slotProc, slotName, order);
438  }
439  else
440  {
441  signalProc->ConnectASlot(signalName, slotProc, slotName);
442  }
443  }
444  catch (std::exception& e)
445  {
446  KTERROR(proclog, "An error occurred while connecting signals and slots:\n"
447  << "\tSignal " << signalName << " from processor " << signalProcName << " (a.k.a. " << signalProc->GetConfigName() << ")" << '\n'
448  << "\tSlot " << slotName << " from processor " << slotProcName << " (a.k.a. " << slotProc->GetConfigName() << ")" << '\n'
449  << '\t' << e.what());
450  return false;
451  }
452 
453  return true;
454  }
455 
456  bool KTProcessorToolbox::ParseSignalSlotName(const std::string& toParse, std::string& nameOfProc, std::string& nameOfSigSlot) const
457  {
458  size_t sepPos = toParse.find_first_of(fSigSlotNameSep);
459  if (sepPos == string::npos)
460  {
461  KTERROR(proclog, "Unable to find separator between processor and signal/slot name in <" << toParse << ">");
462  return false;
463  }
464  nameOfProc = toParse.substr(0, sepPos);
465  nameOfSigSlot = toParse.substr(sepPos + 1);
466  return true;
467  }
468 
469 
470  bool KTProcessorToolbox::PushBackToRunQueue(const std::string& name)
471  {
472  ThreadGroup threadGroup;
473 
474  if (! AddProcessorToThreadGroup( name, threadGroup))
475  {
476  KTERROR(proclog, "Unable to add processor <" << name << "> to thread group");
477  return false;
478  }
479 
480  fRunQueue.push_back(threadGroup);
481 
482  KTINFO(proclog, "Added processor <" << name << "> to the run queue");
483  return true;
484  }
485 
486  bool KTProcessorToolbox::PushBackToRunQueue(std::initializer_list< std::string > names)
487  {
488  return PushBackToRunQueue(std::vector< std::string >(names));
489  }
490 
491  bool KTProcessorToolbox::PushBackToRunQueue(std::vector< std::string > names)
492  {
493  ThreadGroup threadGroup;
494 
495  std::stringstream toPrint;
496  for (const std::string& name : names)
497  {
498  if (! AddProcessorToThreadGroup(name, threadGroup))
499  {
500  KTERROR(proclog, "Unable to add processor <" << name << "> to thread group");
501  return false;
502  }
503  toPrint << name << ", "; // the extra comma at the end is removed below
504  }
505 
506  fRunQueue.push_back(threadGroup);
507  std::string toPrintString = toPrint.str();
508  toPrintString.resize(toPrintString.size()-2);
509  KTINFO(proclog, "Added processors <" << toPrintString << "> to the run queue");
510  return true;
511  }
512 
513  bool KTProcessorToolbox::AddProcessorToThreadGroup(const std::string& name, ThreadGroup& group)
514  {
515  KTProcessor* procForRunQueue = GetProcessor(name);
516  KTDEBUG(proclog, "Attempting to add processor <" << name << "> to the run queue");
517  if (procForRunQueue == NULL)
518  {
519  KTERROR(proclog, "Unable to find processor <" << name << "> requested for the run queue");
520  return false;
521  }
522 
523  KTPrimaryProcessor* primaryProc = dynamic_cast< KTPrimaryProcessor* >(procForRunQueue);
524  if (primaryProc == NULL)
525  {
526  KTERROR(proclog, "Processor <" << name << "> is not a primary processor.");
527  return false;
528  }
529  //group.insert(primaryProc);
530  group.insert(Thread(primaryProc, name));
531  return true;
532  }
533 
534 } /* namespace Nymph */
KTProcessor * ReleaseProcessor(const std::string &procName)
#define KTINFO(...)
Definition: KTLogger.hh:344
ProcessorMap::iterator ProcMapIt
KTProcessor * GetProcessor(const std::string &procName)
Get a pointer to a processor in the toolbox.
ProcessorMap::value_type ProcMapValue
bool AddProcessorToThreadGroup(const std::string &name, ThreadGroup &group)
bool AddProcessor(const std::string &procName, KTProcessor *proc)
ProcessorMap::const_iterator ProcMapCIt
#define KTDEBUG(...)
Definition: KTLogger.hh:343
bool ParseSignalSlotName(const std::string &toParse, std::string &nameOfProc, std::string &nameOfSigSlot) const
KTLOGGER(applog, "KTApplication")
bool PushBackToRunQueue(const std::string &name)
Push a single processor to the back of the run queue.
void ConnectASlot(const std::string &signalName, KTProcessor *processor, const std::string &slotName, int groupNum=-1)
Definition: KTProcessor.cc:47
bool ConfigureProcessors(const scarab::param_node &node)
Configure processors (only those specified in the toolbox)
#define KTPROG(...)
Definition: KTLogger.hh:345
bool Configure(const scarab::param_node &node)
Configure the toolbox: create the processors; connnect signals and slots; and setup the run queue...
Contains KTProcessorToolbox.
bool MakeConnection(const std::string &signal, const std::string &slot, int order=std::numeric_limits< int >::min())
const std::string & GetConfigName() const
#define KTERROR(...)
Definition: KTLogger.hh:347
KTProcessorToolbox(const std::string &name="processor-toolbox")
#define KTWARN(...)
Definition: KTLogger.hh:346
bool RemoveProcessor(const std::string &procName)
Remove a processor from the toolbox.
std::set< Thread, CompareThread > ThreadGroup
Contains the logger class and macros, based on Kasper&#39;s KLogger class.
static const char fSigSlotNameSep