DDS  ver. 2.0
ConnectionManagerImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 
6 #ifndef __DDS__ConnectionManagerImpl__
7 #define __DDS__ConnectionManagerImpl__
8 // DDS
9 #include "ChannelInfo.h"
10 #include "CommandAttachmentImpl.h"
11 #include "MonitoringThread.h"
12 #include "Options.h"
13 #include "ProtocolMessage.h"
14 #include "StatImpl.h"
15 // STD
16 #include <mutex>
17 // BOOST
18 #pragma clang diagnostic push
19 #pragma clang diagnostic ignored "-Wunused-local-typedef"
20 #include <boost/asio.hpp>
21 #pragma clang diagnostic pop
22 #include <boost/thread/thread.hpp>
23 // MiscCommon
24 #include "INet.h"
25 
26 namespace dds
27 {
28  namespace protocol_api
29  {
32  template <class T, class A>
34  {
35  public:
38  typedef std::function<bool(const channelInfo_t& _channelInfo, bool& /*_stop*/)> conditionFunction_t;
39 
40  public:
41  CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
42  {
43  int nSrvPort =
44  (_minPort == 0 && _maxPort == 0) ? 0 : MiscCommon::INet::get_free_port(_minPort, _maxPort);
45  m_acceptor = std::make_shared<boost::asio::ip::tcp::acceptor>(
46  m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
47 
48  if (_useUITransport)
49  {
50  int nSrvPort =
51  (_minPort == 0 && _maxPort == 0) ? 0 : MiscCommon::INet::get_free_port(_minPort, _maxPort);
52  m_acceptorUI = std::make_shared<boost::asio::ip::tcp::acceptor>(
53  m_io_service_UI, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
54  }
55 
56  // Create and register signals
57  m_signals = std::make_shared<boost::asio::signal_set>(m_io_service);
58 
59  // Register to handle the signals that indicate when the server should exit.
60  // It is safe to register for the same signal multiple times in a program,
61  // provided all registration for the specified signal is made through Asio.
62  m_signals->add(SIGINT);
63  m_signals->add(SIGTERM);
64 #if defined(SIGQUIT)
65  m_signals->add(SIGQUIT);
66 #endif // defined(SIGQUIT)
67 
68  m_signals->async_wait([this](boost::system::error_code /*ec*/, int signo) {
69  // The server is stopped by cancelling all outstanding asynchronous
70  // operations. Once all operations have finished the io_service::run()
71  // call will exit.
72  LOG(MiscCommon::info) << "Received a signal: " << signo;
73  LOG(MiscCommon::info) << "Sopping DDS transport server";
74 
75  stop();
76  });
77  }
78 
80  {
81  // Delete server info file
82  deleteInfoFile();
83  stop();
84  }
85 
86  void start(bool _join = true, unsigned int _nThreads = 0 /*0 - auto; min. number is 4*/)
87  {
88  try
89  {
90  // Call _start of the "child"
91  A* pThis = static_cast<A*>(this);
92  pThis->_start();
93 
94  // Start monitoring thread
95  const float maxIdleTime =
97 
98  CMonitoringThread::instance().start(maxIdleTime,
99  []() { LOG(MiscCommon::info) << "Idle callback called."; });
100  m_acceptor->listen();
101  createClientAndStartAccept(m_acceptor);
102 
103  // If we use second channel for communication with UI we have to start acceptiing connection on that
104  // channel.
105  if (m_acceptorUI != nullptr)
106  {
107  m_acceptorUI->listen();
108  createClientAndStartAccept(m_acceptorUI);
109  }
110 
111  // Create a server info file
112  createInfoFile();
113 
114  // a thread pool for the DDS transport engine
115  // may return 0 when not able to detect
116  unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
117  // we need at least 2 threads
118  if (concurrentThreads < 2)
119  concurrentThreads = 2;
121  << "Starting DDS transport engine using " << concurrentThreads << " concurrent threads.";
122  for (int x = 0; x < concurrentThreads; ++x)
123  {
124  m_workerThreads.create_thread([this]() { runService(10, m_acceptor->get_io_service()); });
125  }
126 
127  // Starting service for UI transport engine
128  if (m_acceptorUI != nullptr)
129  {
130  const unsigned int concurrentThreads = 2;
132  << "Starting DDS UI transport engine using " << concurrentThreads << " concurrent threads.";
133  for (int x = 0; x < concurrentThreads; ++x)
134  {
135  m_workerThreads.create_thread([this]() { runService(10, m_acceptorUI->get_io_service()); });
136  }
137  }
138 
139  if (_join)
140  m_workerThreads.join_all();
141  }
142  catch (std::exception& e)
143  {
144  LOG(MiscCommon::fatal) << e.what();
145  }
146  }
147 
148  void runService(short _counter, boost::asio::io_service& _io_service)
149  {
150  if (_counter <= 0)
151  {
152  LOG(MiscCommon::error) << "CConnectionManagerImpl: can't start another io_service.";
153  }
154  try
155  {
156  _io_service.run();
157  }
158  catch (std::exception& ex)
159  {
160  LOG(MiscCommon::error) << "CConnectionManagerImpl exception: " << ex.what();
161  LOG(MiscCommon::info) << "CConnectionManagerImpl restarting io_service";
162  runService(--_counter, _io_service);
163  }
164  }
165 
166  void stop()
167  {
168  try
169  {
170  // Call _stop of the "child"
171  A* pThis = static_cast<A*>(this);
172  pThis->_stop();
173 
174  // Send shutdown signal to all client connections.
175  typename weakChannelInfo_t::container_t channels(getChannels());
176 
177  for (const auto& v : channels)
178  {
179  if (v.m_channel.expired())
180  continue;
181  auto ptr = v.m_channel.lock();
182  ptr->template pushMsg<cmdSHUTDOWN>();
183  }
184 
185  auto condition = [](const channelInfo_t& _v, bool& /*_stop*/) { return (_v.m_channel->started()); };
186 
187  size_t counter = 0;
188  while (true)
189  {
190  ++counter;
191  std::this_thread::sleep_for(std::chrono::milliseconds(200));
192  if (countNofChannels(condition) == 0)
193  break;
194  if (counter > 300)
195  {
196  LOG(MiscCommon::warning) << "Some channels were not shut down properly. Exiting in anyway.";
197  break;
198  }
199  }
200 
201  m_acceptor->close();
202  m_acceptor->get_io_service().stop();
203 
204  if (m_acceptor != nullptr)
205  {
206  m_acceptorUI->close();
207  m_acceptorUI->get_io_service().stop();
208  }
209 
210  for (const auto& v : channels)
211  {
212  if (v.m_channel.expired())
213  continue;
214  auto ptr = v.m_channel.lock();
215  ptr->stop();
216  }
217 
218  std::lock_guard<std::mutex> lock(m_mutex);
219  m_channels.clear();
220  }
221  catch (std::bad_weak_ptr& e)
222  {
223  // TODO: Do we need to log something here?
224  }
225  catch (std::exception& e)
226  {
227  LOG(MiscCommon::fatal) << e.what();
228  }
229  }
230 
231  protected:
232  typename weakChannelInfo_t::container_t getChannels(conditionFunction_t _condition = nullptr)
233  {
234  std::lock_guard<std::mutex> lock(m_mutex);
235 
236  typename weakChannelInfo_t::container_t result;
237  result.reserve(m_channels.size());
238  for (auto& v : m_channels)
239  {
240  bool stop = false;
241  if (_condition == nullptr || _condition(v, stop))
242  {
243  result.push_back(weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID));
244  if (stop)
245  break;
246  }
247  }
248  return result;
249  }
250 
251  template <ECmdType _cmd, class AttachmentType>
252  void broadcastMsg(const AttachmentType& _attachment, conditionFunction_t _condition = nullptr)
253  {
254  try
255  {
256  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
257 
258  for (const auto& v : channels)
259  {
260  if (v.m_channel.expired())
261  continue;
262  auto ptr = v.m_channel.lock();
263  ptr->template pushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
264  }
265  }
266  catch (std::bad_weak_ptr& e)
267  {
268  // TODO: Do we need to log something here?
269  }
270  }
271 
272  template <ECmdType _cmd, class AttachmentType>
273  void accumulativeBroadcastMsg(const AttachmentType& _attachment, conditionFunction_t _condition = nullptr)
274  {
275  try
276  {
277  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
278 
279  for (const auto& v : channels)
280  {
281  if (v.m_channel.expired())
282  continue;
283  auto ptr = v.m_channel.lock();
284  ptr->template accumulativePushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
285  }
286  }
287  catch (std::bad_weak_ptr& e)
288  {
289  // TODO: Do we need to log something here?
290  }
291  }
292 
293  template <ECmdType _cmd>
294  void broadcastSimpleMsg(conditionFunction_t _condition = nullptr)
295  {
296  SEmptyCmd cmd;
297  broadcastMsg<_cmd>(cmd, _condition);
298  }
299 
301  const std::string& _fileName,
302  uint16_t _cmdSource,
303  conditionFunction_t _condition = nullptr)
304  {
305  try
306  {
307  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
308 
309  for (const auto& v : channels)
310  {
311  if (v.m_channel.expired())
312  continue;
313  auto ptr = v.m_channel.lock();
314  ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID);
315  }
316  }
317  catch (std::bad_weak_ptr& e)
318  {
319  // TODO: Do we need to log something here?
320  }
321  }
322 
323  size_t countNofChannels(conditionFunction_t _condition = nullptr)
324  {
325  std::lock_guard<std::mutex> lock(m_mutex);
326 
327  if (_condition == nullptr)
328  return m_channels.size();
329  size_t counter = 0;
330  for (auto& v : m_channels)
331  {
332  bool stop = false;
333  if (_condition(v, stop))
334  {
335  counter++;
336  if (stop)
337  break;
338  }
339  }
340  return counter;
341  }
342 
343  private:
344  void acceptHandler(typename T::connectionPtr_t _client,
345  std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor,
346  const boost::system::error_code& _ec)
347  {
348  if (!_ec)
349  {
350  {
351  std::lock_guard<std::mutex> lock(m_mutex);
352  m_channels.push_back(channelInfo_t(_client, _client->getProtocolHeaderID()));
353  }
354  _client->start();
355  createClientAndStartAccept(_acceptor);
356  }
357  else
358  {
359  LOG(MiscCommon::error) << "Can't accept new connection: " << _ec.message();
360  }
361  }
362 
363  void createClientAndStartAccept(std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor)
364  {
365  typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_io_service(), 0);
366 
367  A* pThis = static_cast<A*>(this);
368  pThis->newClientCreated(newClient);
369 
370  // Subsribe on lobby member handshake
371  newClient->template registerHandler<EChannelEvents::OnLobbyMemberHandshakeOK>(
372  [this, newClient](const SSenderInfo& _sender) -> void {
373  {
374  std::lock_guard<std::mutex> lock(m_mutex);
375 
376  // Avoid adding lobby leader twice
377  if (newClient->getProtocolHeaderID() != _sender.m_ID)
378  {
379  m_channels.push_back(channelInfo_t(newClient, _sender.m_ID));
380  }
381  else
382  {
383  // Replace empty PHID for lobby leaders
384  channelInfo_t inf(newClient, 0);
385  auto it = std::find(m_channels.begin(), m_channels.end(), inf);
386  if (it != m_channels.end() && it->m_protocolHeaderID == 0)
387  {
388  it->m_protocolHeaderID = _sender.m_ID;
389  }
390  else
391  {
393  << "Handshake for unregistered lobby leader connection senderID="
394  << _sender.m_ID;
395  }
396  }
397  }
398  });
399 
400  // Subscribe on dissconnect event
401  newClient->template registerHandler<EChannelEvents::OnRemoteEndDissconnected>(
402  [this, newClient](const SSenderInfo& _sender) -> void {
403  {
404  // collect statistics for disconnected channels
405  std::lock_guard<std::mutex> lock(m_statMutex);
406  m_readStatDisconnectedChannels.addFromStat(newClient->getReadStat());
407  m_writeStatDisconnectedChannels.addFromStat(newClient->getWriteStat());
408  }
409  this->removeClient(newClient.get());
410  });
411 
412  _acceptor->async_accept(
413  newClient->socket(),
414  std::bind(
415  &CConnectionManagerImpl::acceptHandler, this, newClient, _acceptor, std::placeholders::_1));
416  }
417 
418  void createInfoFile()
419  {
420  // The child needs to have that method
421  A* pThis = static_cast<A*>(this);
422 
423  std::vector<size_t> ports;
424  ports.push_back(m_acceptor->local_endpoint().port());
425  if (m_acceptorUI != nullptr)
426  ports.push_back(m_acceptorUI->local_endpoint().port());
427 
428  pThis->_createInfoFile(ports);
429  }
430 
431  void deleteInfoFile()
432  {
433  // The child needs to have that method
434  A* pThis = static_cast<A*>(this);
435  pThis->_deleteInfoFile();
436  }
437 
438  void removeClient(T* _client)
439  {
440  // TODO: fix getTypeName call
441  LOG(MiscCommon::debug) << "Removing " /*<< _client->getTypeName()*/
442  << " client from the list of active";
443  std::lock_guard<std::mutex> lock(m_mutex);
444  // TODO: FIXME: Delete all connections of the channel if the primary protocol header ID is deleted
445  m_channels.erase(remove_if(m_channels.begin(),
446  m_channels.end(),
447  [&](const channelInfo_t& i) { return (i.m_channel.get() == _client); }),
448  m_channels.end());
449  }
450 
451  public:
453  {
454  // Add disconnected channels statistics to some external statistics.
455  // This is done in order not to copy self stat structures and return them.
456  // Or not to return reference to self stat together with mutex.
457  std::lock_guard<std::mutex> lock(m_statMutex);
458  _readStat.addFromStat(m_readStatDisconnectedChannels);
459  _writeStat.addFromStat(m_writeStatDisconnectedChannels);
460  }
461 
462  private:
464  std::shared_ptr<boost::asio::signal_set> m_signals;
465  std::mutex m_mutex;
466  typename channelInfo_t::container_t m_channels;
467 
469  boost::asio::io_service m_io_service;
470  std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;
471 
472  // Used for UI (priority) communication
473  boost::asio::io_service m_io_service_UI;
474  std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptorUI;
475 
476  boost::thread_group m_workerThreads;
477 
478  // Statistics of disconnected channels
479  SReadStat m_readStatDisconnectedChannels;
480  SWriteStat m_writeStatDisconnectedChannels;
481  std::mutex m_statMutex;
482  };
483  }
484 }
485 #endif /* defined(__DDS__ConnectionManagerImpl__) */
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
Definition: def.h:151
size_t countNofChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:323
SWeakChannelInfo< T > weakChannelInfo_t
Definition: ConnectionManagerImpl.h:37
Definition: BaseEventHandlersImpl.h:48
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:79
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
void accumulativeBroadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:273
Definition: CommandAttachmentImpl.h:56
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:553
std::function< bool(const channelInfo_t &_channelInfo, bool &)> conditionFunction_t
Definition: ConnectionManagerImpl.h:38
void addDisconnectedChannelsStatToStat(SReadStat &_readStat, SWriteStat &_writeStat)
Definition: ConnectionManagerImpl.h:452
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:234
#define LOG(severity)
Definition: Logger.h:54
Definition: ChannelInfo.h:14
void broadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:252
Definition: dds-agent/src/AgentConnectionManager.h:18
Definition: def.h:153
void stop()
Definition: ConnectionManagerImpl.h:166
Definition: StatImpl.h:70
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:34
Definition: ChannelInfo.h:39
void addFromStat(const SReadStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:133
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:45
Definition: def.h:152
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:41
T::connectionPtr_t m_channel
Definition: ChannelInfo.h:26
Definition: def.h:149
void broadcastBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:300
void addFromStat(const SWriteStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:100
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
std::vector< SChannelInfo< T > > container_t
Definition: ChannelInfo.h:29
Definition: StatImpl.h:93
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:86
void runService(short _counter, boost::asio::io_service &_io_service)
Definition: ConnectionManagerImpl.h:148
Base class for connection managers.
Definition: ConnectionManagerImpl.h:33
Definition: def.h:150
SChannelInfo< T > channelInfo_t
Definition: ConnectionManagerImpl.h:36
weakChannelInfo_t::container_t getChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:232
void broadcastSimpleMsg(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:294
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:40
std::vector< SWeakChannelInfo > container_t
Definition: ChannelInfo.h:54