DDS  ver. 1.6
ConnectionManagerImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 
6 #ifndef __DDS__ConnectionManagerImpl__
7 #define __DDS__ConnectionManagerImpl__
8 // DDS
10 #include "MonitoringThread.h"
11 #include "Options.h"
12 #include "ProtocolMessage.h"
13 #include "StatImpl.h"
14 // STD
15 #include <mutex>
16 // BOOST
17 #pragma clang diagnostic push
18 #pragma clang diagnostic ignored "-Wunused-local-typedef"
19 #include <boost/asio.hpp>
20 #pragma clang diagnostic pop
21 #include <boost/thread/thread.hpp>
22 // MiscCommon
23 #include "INet.h"
24 
25 namespace dds
26 {
27  namespace protocol_api
28  {
31  template <class T, class A>
33  {
34  public:
35  CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
36  {
37  int nSrvPort =
38  (_minPort == 0 && _maxPort == 0) ? 0 : MiscCommon::INet::get_free_port(_minPort, _maxPort);
39  m_acceptor = std::make_shared<boost::asio::ip::tcp::acceptor>(
40  m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
41 
42  if (_useUITransport)
43  {
44  int nSrvPort =
45  (_minPort == 0 && _maxPort == 0) ? 0 : MiscCommon::INet::get_free_port(_minPort, _maxPort);
46  m_acceptorUI = std::make_shared<boost::asio::ip::tcp::acceptor>(
47  m_io_service_UI, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
48  }
49 
50  // Create and register signals
51  m_signals = std::make_shared<boost::asio::signal_set>(m_io_service);
52 
53  // Register to handle the signals that indicate when the server should exit.
54  // It is safe to register for the same signal multiple times in a program,
55  // provided all registration for the specified signal is made through Asio.
56  m_signals->add(SIGINT);
57  m_signals->add(SIGTERM);
58 #if defined(SIGQUIT)
59  m_signals->add(SIGQUIT);
60 #endif // defined(SIGQUIT)
61 
62  m_signals->async_wait([this](boost::system::error_code /*ec*/, int signo) {
63  // The server is stopped by cancelling all outstanding asynchronous
64  // operations. Once all operations have finished the io_service::run()
65  // call will exit.
66  LOG(MiscCommon::info) << "Received a signal: " << signo;
67  LOG(MiscCommon::info) << "Sopping DDS transport server";
68 
69  stop();
70  });
71  }
72 
74  {
75  // Delete server info file
76  deleteInfoFile();
77  stop();
78  }
79 
80  void start(bool _join = true, unsigned int _nThreads = 0 /*0 - auto; min. number is 4*/)
81  {
82  try
83  {
84  // Call _start of the "child"
85  A* pThis = static_cast<A*>(this);
86  pThis->_start();
87 
88  // Start monitoring thread
89  const float maxIdleTime =
91 
92  CMonitoringThread::instance().start(maxIdleTime,
93  []() { LOG(MiscCommon::info) << "Idle callback called."; });
94  m_acceptor->listen();
95  createClientAndStartAccept(m_acceptor);
96 
97  // If we use second channel for communication with UI we have to start acceptiing connection on that
98  // channel.
99  if (m_acceptorUI != nullptr)
100  {
101  m_acceptorUI->listen();
102  createClientAndStartAccept(m_acceptorUI);
103  }
104 
105  // Create a server info file
106  createInfoFile();
107 
108  // a thread pool for the DDS transport engine
109  // may return 0 when not able to detect
110  unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
111  // we need at least 2 threads
112  if (concurrentThreads < 2)
113  concurrentThreads = 2;
115  << "Starting DDS transport engine using " << concurrentThreads << " concurrent threads.";
116  for (int x = 0; x < concurrentThreads; ++x)
117  {
118  m_workerThreads.create_thread([this]() { runService(10, m_acceptor->get_io_service()); });
119  }
120 
121  // Starting service for UI transport engine
122  if (m_acceptorUI != nullptr)
123  {
124  const unsigned int concurrentThreads = 2;
126  << "Starting DDS UI transport engine using " << concurrentThreads << " concurrent threads.";
127  for (int x = 0; x < concurrentThreads; ++x)
128  {
129  m_workerThreads.create_thread([this]() { runService(10, m_acceptorUI->get_io_service()); });
130  }
131  }
132 
133  if (_join)
134  m_workerThreads.join_all();
135  }
136  catch (std::exception& e)
137  {
138  LOG(MiscCommon::fatal) << e.what();
139  }
140  }
141 
142  void runService(short _counter, boost::asio::io_service& _io_service)
143  {
144  if (_counter <= 0)
145  {
146  LOG(MiscCommon::error) << "CConnectionManagerImpl: can't start another io_service.";
147  }
148  try
149  {
150  _io_service.run();
151  }
152  catch (std::exception& ex)
153  {
154  LOG(MiscCommon::error) << "CConnectionManagerImpl exception: " << ex.what();
155  LOG(MiscCommon::info) << "CConnectionManagerImpl restarting io_service";
156  runService(--_counter, _io_service);
157  }
158  }
159 
160  void stop()
161  {
162  try
163  {
164  // Call _stop of the "child"
165  A* pThis = static_cast<A*>(this);
166  pThis->_stop();
167 
168  // Send shutdown signal to all client connections.
169  typename T::weakConnectionPtrVector_t channels(getChannels());
170 
171  for (const auto& v : channels)
172  {
173  if (v.expired())
174  continue;
175  auto ptr = v.lock();
176  ptr->template pushMsg<cmdSHUTDOWN>();
177  }
178 
179  auto condition = [](typename T::connectionPtr_t _v, bool& /*_stop*/) { return (_v->started()); };
180 
181  size_t counter = 0;
182  while (true)
183  {
184  ++counter;
185  std::this_thread::sleep_for(std::chrono::milliseconds(200));
186  if (countNofChannels(condition) == 0)
187  break;
188  if (counter > 300)
189  {
190  LOG(MiscCommon::warning) << "Some channels were not shut down properly. Exiting in anyway.";
191  break;
192  }
193  }
194 
195  m_acceptor->close();
196  m_acceptor->get_io_service().stop();
197 
198  if (m_acceptor != nullptr)
199  {
200  m_acceptorUI->close();
201  m_acceptorUI->get_io_service().stop();
202  }
203 
204  for (const auto& v : channels)
205  {
206  if (v.expired())
207  continue;
208  auto ptr = v.lock();
209  ptr->stop();
210  }
211 
212  std::lock_guard<std::mutex> lock(m_mutex);
213  m_channels.clear();
214  }
215  catch (std::bad_weak_ptr& e)
216  {
217  // TODO: Do we need to log something here?
218  }
219  catch (std::exception& e)
220  {
221  LOG(MiscCommon::fatal) << e.what();
222  }
223  }
224 
225  protected:
226  typename T::weakConnectionPtr_t getWeakPtr(T* _client)
227  {
228  std::lock_guard<std::mutex> lock(m_mutex);
229 
230  for (auto& v : m_channels)
231  {
232  if (v.get() == _client)
233  return v;
234  }
235  return typename T::weakConnectionPtr_t();
236  }
237 
238  typename T::weakConnectionPtrVector_t getChannels(
239  std::function<bool(typename T::connectionPtr_t, bool&)> _condition = nullptr)
240  {
241  std::lock_guard<std::mutex> lock(m_mutex);
242 
243  typename T::weakConnectionPtrVector_t result;
244  result.reserve(m_channels.size());
245  for (auto& v : m_channels)
246  {
247  bool stop = false;
248  if (_condition == nullptr || _condition(v, stop))
249  {
250  result.push_back(v);
251  if (stop)
252  break;
253  }
254  }
255  return result;
256  }
257 
258  template <ECmdType _cmd, class AttachmentType>
259  void broadcastMsg(const AttachmentType& _attachment,
260  std::function<bool(typename T::connectionPtr_t, bool&)> _condition = nullptr)
261  {
262  try
263  {
264  typename T::weakConnectionPtrVector_t channels(getChannels(_condition));
265 
266  for (const auto& v : channels)
267  {
268  if (v.expired())
269  continue;
270  auto ptr = v.lock();
271  ptr->template pushMsg<_cmd>(_attachment);
272  }
273  }
274  catch (std::bad_weak_ptr& e)
275  {
276  // TODO: Do we need to log something here?
277  }
278  }
279 
280  template <ECmdType _cmd, class AttachmentType>
281  void accumulativeBroadcastMsg(const AttachmentType& _attachment,
282  std::function<bool(typename T::connectionPtr_t, bool&)> _condition = nullptr)
283  {
284  try
285  {
286  typename T::weakConnectionPtrVector_t channels(getChannels(_condition));
287 
288  for (const auto& v : channels)
289  {
290  if (v.expired())
291  continue;
292  auto ptr = v.lock();
293  ptr->template accumulativePushMsg<_cmd>(_attachment);
294  }
295  }
296  catch (std::bad_weak_ptr& e)
297  {
298  // TODO: Do we need to log something here?
299  }
300  }
301 
302  template <ECmdType _cmd>
303  void broadcastSimpleMsg(std::function<bool(typename T::connectionPtr_t, bool&)> _condition = nullptr)
304  {
305  SEmptyCmd cmd;
306  broadcastMsg<_cmd>(cmd, _condition);
307  }
308 
310  const MiscCommon::BYTEVector_t& _data,
311  const std::string& _fileName,
312  uint16_t _cmdSource,
313  std::function<bool(typename T::connectionPtr_t, bool&)> _condition = nullptr)
314  {
315  try
316  {
317  typename T::weakConnectionPtrVector_t channels(getChannels(_condition));
318 
319  for (const auto& v : channels)
320  {
321  if (v.expired())
322  continue;
323  auto ptr = v.lock();
324  ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource);
325  }
326  }
327  catch (std::bad_weak_ptr& e)
328  {
329  // TODO: Do we need to log something here?
330  }
331  }
332 
333  size_t countNofChannels(std::function<bool(typename T::connectionPtr_t, bool&)> _condition = nullptr)
334  {
335  std::lock_guard<std::mutex> lock(m_mutex);
336 
337  if (_condition == nullptr)
338  return m_channels.size();
339  size_t counter = 0;
340  for (auto& v : m_channels)
341  {
342  bool stop = false;
343  if (_condition(v, stop))
344  {
345  counter++;
346  if (stop)
347  break;
348  }
349  }
350  return counter;
351  }
352 
353  private:
354  void acceptHandler(typename T::connectionPtr_t _client,
355  std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor,
356  const boost::system::error_code& _ec)
357  {
358  if (!_ec)
359  {
360  _client->start();
361  {
362  std::lock_guard<std::mutex> lock(m_mutex);
363  m_channels.push_back(_client);
364  }
365  createClientAndStartAccept(_acceptor);
366  }
367  else
368  {
369  LOG(MiscCommon::error) << "Can't accept new connection: " << _ec.message();
370  }
371  }
372 
373  void createClientAndStartAccept(std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor)
374  {
375  typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_io_service());
376 
377  A* pThis = static_cast<A*>(this);
378  pThis->newClientCreated(newClient);
379 
380  // Subscribe on dissconnect event
381  newClient->registerDisconnectEventHandler([this](T* _channel) -> void {
382  {
383  // collect statistics for disconnected channels
384  std::lock_guard<std::mutex> lock(m_statMutex);
385  m_readStatDisconnectedChannels.addFromStat(_channel->getReadStat());
386  m_writeStatDisconnectedChannels.addFromStat(_channel->getWriteStat());
387  }
388  return this->removeClient(_channel);
389  });
390 
391  _acceptor->async_accept(
392  newClient->socket(),
393  std::bind(
394  &CConnectionManagerImpl::acceptHandler, this, newClient, _acceptor, std::placeholders::_1));
395  }
396 
397  void createInfoFile()
398  {
399  // The child needs to have that method
400  A* pThis = static_cast<A*>(this);
401 
402  std::vector<size_t> ports;
403  ports.push_back(m_acceptor->local_endpoint().port());
404  if (m_acceptorUI != nullptr)
405  ports.push_back(m_acceptorUI->local_endpoint().port());
406 
407  pThis->_createInfoFile(ports);
408  }
409 
410  void deleteInfoFile()
411  {
412  // The child needs to have that method
413  A* pThis = static_cast<A*>(this);
414  pThis->_deleteInfoFile();
415  }
416 
417  void removeClient(T* _client)
418  {
419  // TODO: fix getTypeName call
420  LOG(MiscCommon::debug) << "Removing " /*<< _client->getTypeName()*/
421  << " client from the list of active";
422  std::lock_guard<std::mutex> lock(m_mutex);
423  m_channels.erase(remove_if(m_channels.begin(),
424  m_channels.end(),
425  [&](typename T::connectionPtr_t& i) { return (i.get() == _client); }),
426  m_channels.end());
427  }
428 
429  public:
431  {
432  // Add disconnected channels statistics to some external statistics.
433  // This is done in order not to copy self stat structures and return them.
434  // Or not to return reference to self stat together with mutex.
435  std::lock_guard<std::mutex> lock(m_statMutex);
436  _readStat.addFromStat(m_readStatDisconnectedChannels);
437  _writeStat.addFromStat(m_writeStatDisconnectedChannels);
438  }
439 
440  private:
442  std::shared_ptr<boost::asio::signal_set> m_signals;
443  std::mutex m_mutex;
444  typename T::connectionPtrVector_t m_channels;
445 
447  boost::asio::io_service m_io_service;
448  std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;
449 
450  // Used for UI (priority) communication
451  boost::asio::io_service m_io_service_UI;
452  std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptorUI;
453 
454  boost::thread_group m_workerThreads;
455 
456  // Statistics of disconnected channels
457  SReadStat m_readStatDisconnectedChannels;
458  SWriteStat m_writeStatDisconnectedChannels;
459  std::mutex m_statMutex;
460  };
461  }
462 }
463 #endif /* defined(__DDS__ConnectionManagerImpl__) */
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
Definition: def.h:151
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:73
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
Definition: CommandAttachmentImpl.h:55
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:553
size_t countNofChannels(std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:333
void addDisconnectedChannelsStatToStat(SReadStat &_readStat, SWriteStat &_writeStat)
Definition: ConnectionManagerImpl.h:430
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:190
T::weakConnectionPtr_t getWeakPtr(T *_client)
Definition: ConnectionManagerImpl.h:226
#define LOG(severity)
Definition: Logger.h:54
void broadcastBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:309
Definition: dds-agent/src/AgentConnectionManager.h:16
void broadcastMsg(const AttachmentType &_attachment, std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:259
Definition: def.h:153
void stop()
Definition: ConnectionManagerImpl.h:160
Definition: StatImpl.h:70
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:34
void addFromStat(const SReadStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:133
T::weakConnectionPtrVector_t getChannels(std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:238
Definition: def.h:152
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:35
Definition: def.h:149
void addFromStat(const SWriteStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:100
void accumulativeBroadcastMsg(const AttachmentType &_attachment, std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:281
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
void broadcastSimpleMsg(std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:303
static CUserDefaults & instance()
Return singleton instance.
Definition: UserDefaults.cpp:40
Definition: StatImpl.h:93
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:80
void runService(short _counter, boost::asio::io_service &_io_service)
Definition: ConnectionManagerImpl.h:142
Base class for connection managers.
Definition: ConnectionManagerImpl.h:32
Definition: def.h:150
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:40