DDS  ver. 2.0
threadPool.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef THREADPOOL_H_
6 #define THREADPOOL_H_
7 // STD
8 #include <queue>
9 // BOOST
10 #include <boost/bind.hpp>
11 #include <boost/shared_ptr.hpp>
12 #include <boost/thread/condition.hpp>
13 #include <boost/thread/thread.hpp>
14 namespace dds
15 {
16  namespace ssh_cmd
17  {
18  //=============================================================================
19  // TODO: Move it to MiscCommon
20  //=============================================================================
21  template <class _T, class _P>
22  class CTaskImp
23  {
24  public:
25  bool run(_P& _param)
26  {
27  _T* pThis = reinterpret_cast<_T*>(this);
28  return pThis->runTask(_param);
29  }
30  };
31  //=============================================================================
32  template <class _T, class _P>
33  class CTask
34  {
35  public:
37 
38  public:
39  CTask(task_t& _task, _P& _param)
40  : m_task(_task)
41  , m_taskParam(_param)
42  {
43  }
44  bool run()
45  {
46  return m_task.run(m_taskParam);
47  }
48 
49  private:
50  task_t& m_task;
51  _P m_taskParam;
52  };
53  //=============================================================================
54  template <class _T, class _P>
56  {
57  typedef CTask<_T, _P> task_t;
58  typedef std::queue<task_t*> taskqueue_t;
59 
60  public:
61  CThreadPool(size_t _threadsCount)
62  : m_stopped(false)
63  , m_stopping(false)
64  , m_successfulTasks(0)
65  , m_tasksCount(0)
66 
67  {
68  for (size_t i = 0; i < _threadsCount; ++i)
69  m_threads.create_thread(boost::bind(&CThreadPool::execute, this));
70  }
71 
73  {
74  stop();
75  }
76 
77  void pushTask(typename CTask<_T, _P>::task_t& _task, _P _param)
78  {
79  boost::mutex::scoped_lock lock(m_mutex);
80  task_t* task = new task_t(_task, _param);
81  m_tasks.push(task);
82  m_threadNeeded.notify_all();
83  ++m_tasksCount;
84  }
85  void execute()
86  {
87  do
88  {
89  task_t* task = NULL;
90 
91  {
92  // Find a job to perform
93  boost::mutex::scoped_lock lock(m_mutex);
94  if (m_tasks.empty() && !m_stopped)
95  {
96  m_threadNeeded.wait(lock);
97  }
98  if (!m_stopped && !m_tasks.empty())
99  {
100  task = m_tasks.front();
101  m_tasks.pop();
102  }
103  }
104  // Execute job
105  if (task)
106  {
107  if (task->run())
108  {
109  boost::mutex::scoped_lock lock(m_mutex);
110  ++m_successfulTasks;
111  }
112  delete task;
113  task = NULL;
114  }
115  m_threadAvailable.notify_all();
116  } while (!m_stopped);
117  }
118  void stop(bool processRemainingJobs = false)
119  {
120  {
121  // prevent more jobs from being added to the queue
122  boost::mutex::scoped_lock lock(m_mutex);
123  if (m_stopped)
124  return;
125  m_stopping = true;
126  }
127  if (processRemainingJobs)
128  {
129  boost::mutex::scoped_lock lock(m_mutex);
130  // wait for queue to drain.
131  while (!m_tasks.empty() && !m_stopped)
132  {
133  m_threadAvailable.wait(lock);
134  }
135  }
136  // tell all threads to stop
137  {
138  boost::mutex::scoped_lock lock(m_mutex);
139  m_stopped = true;
140  }
141  m_threadNeeded.notify_all();
142 
143  m_threads.join_all();
144  }
145  size_t tasksCount() const
146  {
147  return m_tasksCount;
148  }
149  size_t successfulTasks() const
150  {
151  return m_successfulTasks;
152  }
153 
154  private:
155  boost::thread_group m_threads;
156  taskqueue_t m_tasks;
157  boost::mutex m_mutex;
158  boost::condition m_threadNeeded;
159  boost::condition m_threadAvailable;
160  bool m_stopped;
161  bool m_stopping;
162  size_t m_successfulTasks;
163  size_t m_tasksCount;
164  };
165  }
166 }
167 #endif
bool run(_P &_param)
Definition: threadPool.h:25
Definition: threadPool.h:33
size_t successfulTasks() const
Definition: threadPool.h:149
CTaskImp< _T, _P > task_t
Definition: threadPool.h:36
CThreadPool(size_t _threadsCount)
Definition: threadPool.h:61
Definition: dds-agent/src/AgentConnectionManager.h:18
~CThreadPool()
Definition: threadPool.h:72
#define _T(s)
Use TCHAR instead of char or wchar_t. It will be appropriately translated.
Definition: def.h:85
void execute()
Definition: threadPool.h:85
Definition: threadPool.h:55
void pushTask(typename CTask< _T, _P >::task_t &_task, _P _param)
Definition: threadPool.h:77
CTask(task_t &_task, _P &_param)
Definition: threadPool.h:39
void stop(bool processRemainingJobs=false)
Definition: threadPool.h:118
bool run()
Definition: threadPool.h:44
Definition: threadPool.h:22
size_t tasksCount() const
Definition: threadPool.h:145