14 #include "param_codec.hh" 18 #ifndef SINGLETHREADED 19 #include <boost/thread.hpp> 29 KTLOGGER(proclog,
"KTProcessorToolbox");
45 auto tProcFactory = scarab::factory< KTProcessor, const std::string& >::get_instance();
47 KTPROG(proclog,
"Configuring . . .");
49 if (! node.has(
"processors"))
51 KTWARN(proclog,
"No processors were specified");
55 const scarab::param_array& procArray = node[
"processors"].as_array();
56 for( scarab::param_array::const_iterator procIt = procArray.begin(); procIt != procArray.end(); ++procIt )
58 if( ! procIt->is_node() )
60 KTERROR( proclog,
"Invalid processor entry: not a node" );
63 const scarab::param_node& procNode = procIt->as_node();
65 if (! procNode.has(
"type"))
67 KTERROR(proclog,
"Unable to create processor: no processor type given");
70 string procType = procNode[
"type"]().as_string();
73 if (! procNode.has(
"name"))
75 KTINFO(proclog,
"No name given for processor of type <" << procType <<
">; using type as name.");
80 procName = procNode[
"name"]().as_string();
82 KTProcessor* newProc = tProcFactory->create(procType, procType);
85 KTERROR(proclog,
"Unable to create processor of type <" << procType <<
">");
91 KTERROR(proclog,
"Unable to add processor <" << procName <<
">");
100 if (! node.has(
"connections"))
102 KTWARN(proclog,
"No connections were specified");
106 const scarab::param_array& connArray = node[
"connections"].as_array();
107 for( scarab::param_array::const_iterator connIt = connArray.begin(); connIt != connArray.end(); ++connIt )
109 if( ! connIt->is_node() )
111 KTERROR( proclog,
"Invalid connection entry: not a node" );
114 const scarab::param_node& connNode = connIt->as_node();
116 if ( ! connNode.has(
"signal") || ! connNode.has(
"slot") )
118 KTERROR(proclog,
"Signal/Slot connection information is incomplete!");
119 if (connNode.has(
"signal"))
121 KTWARN(proclog,
"signal = " << connNode[
"signal"]());
125 KTERROR(proclog,
"signal = MISSING");
128 if (connNode.has(
"slot"))
130 KTWARN(proclog,
"slot = " << connNode[
"slot"]());
134 KTERROR(proclog,
"slot = MISSING");
139 bool connReturn =
false;
140 if (connNode.has(
"order"))
142 connReturn =
MakeConnection(connNode[
"signal"]().as_string(), connNode[
"slot"]().as_string(), connNode[
"order"]().as_int());
146 connReturn =
MakeConnection(connNode[
"signal"]().as_string(), connNode[
"slot"]().as_string());
150 KTERROR(proclog,
"Unable to make connection <" << connNode[
"signal"]().as_string() <<
"> --> <" << connNode[
"slot"]().as_string() <<
">");
154 KTINFO(proclog,
"Signal <" << connNode[
"signal"]().as_string() <<
"> connected to slot <" << connNode[
"slot"]().as_string() <<
">");
163 if (! node.has(
"run-queue") || ! node[
"run-queue"].is_array())
165 KTWARN(proclog,
"Run queue was not specified");
169 const scarab::param_array& rqArray = node[
"run-queue"].as_array();
170 for (scarab::param_array::const_iterator rqIt = rqArray.begin(); rqIt != rqArray.end(); ++rqIt)
172 if (rqIt->is_value())
176 KTERROR(proclog,
"Unable to process run-queue entry: could not add processor to the queue");
180 else if (rqIt->is_array())
182 const scarab::param_array& rqNode = rqIt->as_array();
183 std::vector< std::string > names;
185 for (scarab::param_array::const_iterator rqArrayIt = rqNode.begin(); rqArrayIt != rqNode.end(); ++rqArrayIt)
187 if (! rqArrayIt->is_value())
189 KTERROR(proclog,
"Invalid run-queue array entry: not a value");
192 names.push_back((*rqArrayIt)().as_string());
197 KTERROR(proclog,
"Unable to process run-queue entry: could not add list of processors to the queue");
203 KTERROR(proclog,
"Invalid run-queue entry: not a value or array");
216 KTDEBUG(proclog,
"Attempting to configure processor <" << iter->first <<
">");
217 string procName = iter->first;
219 if (node.has(procName))
225 string configName = iter->second.fProc->GetConfigName();
226 KTWARN(proclog,
"Did not find a parameter node <" << procName <<
">\n" 227 "\tWill check using the generic name of the processor, <" << configName <<
">.");
228 if (node.has(configName))
230 nameUsed = configName;
231 KTWARN(proclog,
"Configuring processor <" << procName <<
"> with configuration found using the generic name of the processor: <" << configName <<
">");
235 KTWARN(proclog,
"Did not find configuration information for processor <" << procName <<
"> (type <" << configName <<
">)\n" 236 "\tProcessor <" << procName <<
"> was not configured.");
241 const scarab::param_node& subNode = node[nameUsed].as_node();
243 if (! iter->second.fProc->Configure(subNode))
245 KTERROR(proclog,
"An error occurred while configuring processor <" << iter->first <<
"> with parameter node <" << nameUsed <<
">");
254 scarab::param_translator translator;
255 scarab::param_node optNode;
256 optNode.add(
"encoding",
new scarab::param_value(
"json" ) );
262 KTPROG(proclog,
"Beginning processing . . .");
263 #ifndef SINGLETHREADED 266 for (RunQueue::const_iterator rqIter =
fRunQueue.begin(); rqIter !=
fRunQueue.end(); ++rqIter)
268 #ifdef SINGLETHREADED 269 for (ThreadGroup::const_iterator tgIter = rqIter->begin(); tgIter != rqIter->end(); ++tgIter)
271 if (! tgIter->fProc->Run())
277 KTDEBUG(proclog,
"Starting thread group " << iGroup);
278 boost::thread_group parallelThreads;
279 unsigned iThread = 0;
280 for (ThreadGroup::const_iterator tgIter = rqIter->begin(); tgIter != rqIter->end(); ++tgIter)
284 KTDEBUG(proclog,
"Starting thread " << iThread <<
": " << tgIter->fName);
285 parallelThreads.create_thread(boost::ref(*(tgIter->fProc)));
290 parallelThreads.join_all();
294 KTPROG(proclog,
". . . processing complete.");
304 KTWARN(proclog,
"Processor <" << procName <<
"> was not found.");
307 return it->second.fProc;
315 KTWARN(proclog,
"Processor <" << procName <<
"> was not found.");
318 return it->second.fProc;
329 KTDEBUG(proclog,
"Added processor <" << procName <<
"> (a.k.a. " << proc->
GetConfigName() <<
")");
332 KTWARN(proclog,
"Processor <" << procName <<
"> already exists; new processor was not added.");
338 auto tProcFactory = scarab::factory< KTProcessor, const std::string& >::get_instance();
343 KTProcessor* newProc = tProcFactory->create(procType, procType);
346 KTERROR(proclog,
"Unable to create processor of type <" << procType <<
">");
351 KTERROR(proclog,
"Unable to add processor <" << procName <<
">");
357 KTWARN(proclog,
"Processor <" << procName <<
"> already exists; new processor was not added.");
364 if (procToRemove == NULL)
369 KTDEBUG(proclog,
"Processor <" << procName <<
"> deleted.");
378 KTWARN(proclog,
"Processor <" << procName <<
"> was not found.");
383 return procToRelease;
390 delete it->second.fProc;
400 string signalProcName, signalName;
403 KTERROR(proclog,
"Unable to parse signal name: <" << signal <<
">");
407 string slotProcName, slotName;
410 KTERROR(proclog,
"Unable to parse slot name: <" << slot <<
">");
414 return MakeConnection(signalProcName, signalName, slotProcName, slotName, order);
420 if (signalProc == NULL)
422 KTERROR(proclog,
"Processor named <" << signalProcName <<
"> was not found!");
427 if (slotProc == NULL)
429 KTERROR(proclog,
"Processor named <" << slotProcName <<
"> was not found!");
435 if (order != std::numeric_limits< int >::min())
437 signalProc->
ConnectASlot(signalName, slotProc, slotName, order);
441 signalProc->
ConnectASlot(signalName, slotProc, slotName);
444 catch (std::exception& e)
446 KTERROR(proclog,
"An error occurred while connecting signals and slots:\n" 447 <<
"\tSignal " << signalName <<
" from processor " << signalProcName <<
" (a.k.a. " << signalProc->
GetConfigName() <<
")" <<
'\n' 448 <<
"\tSlot " << slotName <<
" from processor " << slotProcName <<
" (a.k.a. " << slotProc->
GetConfigName() <<
")" <<
'\n' 449 <<
'\t' << e.what());
459 if (sepPos == string::npos)
461 KTERROR(proclog,
"Unable to find separator between processor and signal/slot name in <" << toParse <<
">");
464 nameOfProc = toParse.substr(0, sepPos);
465 nameOfSigSlot = toParse.substr(sepPos + 1);
476 KTERROR(proclog,
"Unable to add processor <" << name <<
"> to thread group");
482 KTINFO(proclog,
"Added processor <" << name <<
"> to the run queue");
495 std::stringstream toPrint;
496 for (
const std::string& name : names)
500 KTERROR(proclog,
"Unable to add processor <" << name <<
"> to thread group");
503 toPrint << name <<
", ";
507 std::string toPrintString = toPrint.str();
508 toPrintString.resize(toPrintString.size()-2);
509 KTINFO(proclog,
"Added processors <" << toPrintString <<
"> to the run queue");
516 KTDEBUG(proclog,
"Attempting to add processor <" << name <<
"> to the run queue");
517 if (procForRunQueue == NULL)
519 KTERROR(proclog,
"Unable to find processor <" << name <<
"> requested for the run queue");
524 if (primaryProc == NULL)
526 KTERROR(proclog,
"Processor <" << name <<
"> is not a primary processor.");
530 group.insert(
Thread(primaryProc, name));
KTLOGGER(applog, "KTApplication")
void ConnectASlot(const std::string &signalName, KTProcessor *processor, const std::string &slotName, int groupNum=-1)
const std::string & GetConfigName() const
Contains the logger class and macros, based on Kasper's KLogger class.