15 #ifndef KTCONCURRENTQUEUE_HH_ 16 #define KTCONCURRENTQUEUE_HH_ 20 #include <boost/date_time/posix_time/posix_time.hpp> 21 #include <boost/thread.hpp> 29 template<
class XDataType >
33 typedef std::deque< XDataType >
Queue;
43 return ! fQueue.empty();
53 fTimeout(boost::posix_time::milliseconds(1000)),
74 void push(XDataType
const& data)
77 ScopedLock lock(fMutex);
78 KTDEBUG(
queuelog,
"Pushing to concurrent queue; size: " << fQueue.size());
79 fQueue.push_back(data);
81 fConditionVar.notify_one();
87 ScopedLock lock(fMutex);
88 return fQueue.empty();
93 ScopedLock lock(fMutex);
99 ScopedLock lock(fMutex);
106 popped_value=fQueue.front();
113 ScopedLock lock(fMutex);
122 popped_value=fQueue.front();
124 KTDEBUG(
queuelog,
"Popping from concurrent queue; size: " << fQueue.size());
130 ScopedLock lock(fMutex);
132 boost::system_time
const waitUntil = boost::get_system_time() +
fTimeout;
133 if (! fConditionVar.timed_wait(lock, waitUntil,
QueueNotEmpty(fQueue)))
144 popped_value=fQueue.front();
146 KTDEBUG(
queuelog,
"Popping from concurrent queue; size: " << fQueue.size());
153 fConditionVar.notify_one();
159 return fTimeout.total_milliseconds();
164 fTimeout = boost::posix_time::milliseconds(duration);
bool try_pop(XDataType &popped_value)
void push(XDataType const &data)
boost::condition_variable fConditionVar
QueueNotEmpty(Queue &aQueue)
boost::posix_time::time_duration fTimeout
virtual ~KTConcurrentQueue()
boost::mutex fMutex
Timeout duration in milliseconds.
KTLOGGER(applog, "KTApplication")
::Nymph::KTLogger queuelog("KTConcurrentQueue")
std::deque< XDataType > Queue
bool wait_and_pop(XDataType &popped_value)
boost::unique_lock< boost::mutex > ScopedLock
bool timed_wait_and_pop(XDataType &popped_value)
unsigned get_timeout() const
Contains the logger class and macros, based on Kasper's KLogger class.
void set_timeout(unsigned duration)