Nymph  v1.5.2
Flow-Based Data Processing Framework
KTConcurrentQueue.hh
Go to the documentation of this file.
1 /*
2  * KTConcurrentQueue.hh
3  *
4  * Created on: Oct 22, 2012
5  * Author: nsoblath
6  *
7  * FIFO Queue
8  *
9  * Based almost exactly on the class concurrent_queue from:
10  * http://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html
11  * Post author: Anthony Williams
12  * Copyright 2005-2013 Just Software Solutions Ltd. All rights reserved.
13  */
14 
15 #ifndef KTCONCURRENTQUEUE_HH_
16 #define KTCONCURRENTQUEUE_HH_
17 
18 #include "KTLogger.hh"
19 
20 #include <boost/date_time/posix_time/posix_time.hpp>
21 #include <boost/thread.hpp>
22 
23 #include <deque>
24 
25 namespace Nymph
26 {
27  KTLOGGER(queuelog, "KTConcurrentQueue");
28 
29  template< class XDataType >
31  {
32  public:
33  typedef std::deque< XDataType > Queue;
34 
36  {
37  Queue& fQueue;
38  QueueNotEmpty(Queue& aQueue) :
39  fQueue(aQueue)
40  {}
41  bool operator()() const
42  {
43  return ! fQueue.empty();
44  }
45  };
46 
47  typedef boost::unique_lock< boost::mutex > ScopedLock;
48 
49  public:
51  fQueue(),
52  fInterrupt(false),
53  fTimeout(boost::posix_time::milliseconds(1000)),
54  fMutex(),
56  {
57  }
58 
60  {
61  fQueue.clear();
62  }
63 
64  private:
65  Queue fQueue;
66  bool fInterrupt;
67 
68  boost::posix_time::time_duration fTimeout;
69 
70  mutable boost::mutex fMutex;
71  boost::condition_variable fConditionVar;
72 
73  public:
74  void push(XDataType const& data)
75  {
76  KTDEBUG(queuelog, "Attempting to push to queue");
77  ScopedLock lock(fMutex);
78  KTDEBUG(queuelog, "Pushing to concurrent queue; size: " << fQueue.size());
79  fQueue.push_back(data);
80  lock.unlock();
81  fConditionVar.notify_one();
82  return;
83  }
84 
85  bool empty() const
86  {
87  ScopedLock lock(fMutex);
88  return fQueue.empty();
89  }
90 
91  bool size() const
92  {
93  ScopedLock lock(fMutex);
94  return fQueue.size();
95  }
96 
97  bool try_pop(XDataType& popped_value)
98  {
99  ScopedLock lock(fMutex);
100  fInterrupt = false;
101  if(fQueue.empty())
102  {
103  return false;
104  }
105 
106  popped_value=fQueue.front();
107  fQueue.pop_front();
108  return true;
109  }
110 
111  bool wait_and_pop(XDataType& popped_value)
112  {
113  ScopedLock lock(fMutex);
114  fInterrupt = false;
115  fConditionVar.wait(lock, QueueNotEmpty(fQueue));
116  if (fInterrupt)
117  {
118  fInterrupt = false;
119  return false;
120  }
121 
122  popped_value=fQueue.front();
123  fQueue.pop_front();
124  KTDEBUG(queuelog, "Popping from concurrent queue; size: " << fQueue.size());
125  return true;
126  }
127 
128  bool timed_wait_and_pop(XDataType& popped_value)
129  {
130  ScopedLock lock(fMutex);
131  fInterrupt = false;
132  boost::system_time const waitUntil = boost::get_system_time() + fTimeout;
133  if (! fConditionVar.timed_wait(lock, waitUntil, QueueNotEmpty(fQueue)))
134  {
135  KTDEBUG(queuelog, "Queue wait has timed out");
136  return false;
137  }
138  if (fInterrupt)
139  {
140  fInterrupt = false;
141  return false;
142  }
143 
144  popped_value=fQueue.front();
145  fQueue.pop_front();
146  KTDEBUG(queuelog, "Popping from concurrent queue; size: " << fQueue.size());
147  return true;
148  }
149 
150  void interrupt()
151  {
152  fInterrupt = true;
153  fConditionVar.notify_one();
154  return;
155  }
156 
157  inline unsigned get_timeout() const
158  {
159  return fTimeout.total_milliseconds();
160  }
161 
162  inline void set_timeout(unsigned duration)
163  {
164  fTimeout = boost::posix_time::milliseconds(duration);
165  return;
166  }
167  };
168 
169 } /* namespace Nymph */
170 #endif /* KTCONCURRENTQUEUE_HH_ */
bool try_pop(XDataType &popped_value)
void push(XDataType const &data)
boost::condition_variable fConditionVar
boost::posix_time::time_duration fTimeout
boost::mutex fMutex
Timeout duration in milliseconds.
#define KTDEBUG(...)
Definition: KTLogger.hh:343
KTLOGGER(applog, "KTApplication")
::Nymph::KTLogger queuelog("KTConcurrentQueue")
std::deque< XDataType > Queue
bool wait_and_pop(XDataType &popped_value)
boost::unique_lock< boost::mutex > ScopedLock
bool timed_wait_and_pop(XDataType &popped_value)
Contains the logger class and macros, based on Kasper&#39;s KLogger class.
void set_timeout(unsigned duration)