DDS  ver. 3.4
ConnectionManagerImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 
6 #ifndef __DDS__ConnectionManagerImpl__
7 #define __DDS__ConnectionManagerImpl__
8 // DDS
9 #include "ChannelInfo.h"
10 #include "CommandAttachmentImpl.h"
11 #include "MonitoringThread.h"
12 #include "Options.h"
13 #include "ProtocolMessage.h"
14 #include "StatImpl.h"
15 // STD
16 #include <mutex>
17 // BOOST
18 #include <boost/asio/basic_socket_acceptor.hpp>
19 #include <boost/thread/thread.hpp>
20 // MiscCommon
21 #include "INet.h"
22 
23 namespace dds
24 {
25  namespace protocol_api
26  {
27 #if BOOST_VERSION >= 107000
28  typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp, boost::asio::io_context::executor_type>
30 #else
31  typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp> asioAcceptor_t;
32 #endif
33  typedef std::shared_ptr<asioAcceptor_t> asioAcceptorPtr_t;
34 
37  template <class T, class A>
39  {
40  public:
43  typedef std::function<bool(const channelInfo_t& _channelInfo, bool& /*_stop*/)> conditionFunction_t;
44  using channelContainerCache_t = std::map<uint64_t, weakChannelInfo_t>;
45 
46  public:
47  CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
48  : m_minPort(_minPort)
49  , m_maxPort(_maxPort)
50  , m_useUITransport(_useUITransport)
51  {
52  // Create and register signals
53  m_signals = std::make_shared<boost::asio::signal_set>(m_ioContext);
54 
55  // Register to handle the signals that indicate when the server should exit.
56  // It is safe to register for the same signal multiple times in a program,
57  // provided all registration for the specified signal is made through Asio.
58  m_signals->add(SIGINT);
59  m_signals->add(SIGTERM);
60 #if defined(SIGQUIT)
61  m_signals->add(SIGQUIT);
62 #endif // defined(SIGQUIT)
63 
64  m_signals->async_wait([this](boost::system::error_code /*ec*/, int signo) {
65  // The server is stopped by cancelling all outstanding asynchronous
66  // operations. Once all operations have finished the io_context::run()
67  // call will exit.
68  LOG(MiscCommon::info) << "Received a signal: " << signo;
69  LOG(MiscCommon::info) << "Stopping DDS transport server";
70 
71  stop();
72  });
73  }
74 
76  {
77  // Delete server info file
78  deleteInfoFile();
79  stop();
80  }
81 
82  void start(bool _join = true, unsigned int _nThreads = 0 /*0 - auto; min. number is 4*/)
83  {
84  try
85  {
86  // Call _start of the "child"
87  A* pThis = static_cast<A*>(this);
88  pThis->_start();
89 
90  const float maxIdleTime =
92 
93  CMonitoringThread::instance().start(maxIdleTime,
94  []() { LOG(MiscCommon::info) << "Idle callback called."; });
95 
96  bindPortAndListen(m_acceptor);
97  createClientAndStartAccept(m_acceptor);
98 
99  // If we use second channel for communication with UI we have to start acceptiing connection on that
100  // channel.
101  if (m_useUITransport)
102  {
103  bindPortAndListen(m_acceptorUI);
104  createClientAndStartAccept(m_acceptorUI);
105  }
106 
107  // Create a server info file
108  createInfoFile();
109 
110  // a thread pool for the DDS transport engine
111  // may return 0 when not able to detect
112  unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
113  // we need at least 2 threads
114  if (concurrentThreads < 2)
115  concurrentThreads = 2;
117  << "Starting DDS transport engine using " << concurrentThreads << " concurrent threads.";
118  for (int x = 0; x < concurrentThreads; ++x)
119  {
120  m_workerThreads.create_thread(
121  [this]() { runService(10, m_acceptor->get_executor().context()); });
122  }
123 
124  // Starting service for UI transport engine
125  if (m_acceptorUI != nullptr)
126  {
127  const unsigned int concurrentThreads = 2;
129  << "Starting DDS UI transport engine using " << concurrentThreads << " concurrent threads.";
130  for (int x = 0; x < concurrentThreads; ++x)
131  {
132  m_workerThreads.create_thread(
133  [this]() { runService(10, m_acceptorUI->get_executor().context()); });
134  }
135  }
136 
137  if (_join)
138  m_workerThreads.join_all();
139  }
140  catch (std::exception& e)
141  {
142  LOG(MiscCommon::fatal) << e.what();
143  }
144  }
145 
146  void runService(short _counter, boost::asio::io_context& _io_context)
147  {
148  if (_counter <= 0)
149  {
150  LOG(MiscCommon::error) << "CConnectionManagerImpl: can't start another io_context.";
151  }
152  try
153  {
154  _io_context.run();
155  }
156  catch (std::exception& ex)
157  {
158  LOG(MiscCommon::error) << "CConnectionManagerImpl exception: " << ex.what();
159  LOG(MiscCommon::info) << "CConnectionManagerImpl restarting io_context";
160  runService(--_counter, _io_context);
161  }
162  }
163 
164  void stop()
165  {
166  try
167  {
168  // Call _stop of the "child"
169  A* pThis = static_cast<A*>(this);
170  pThis->_stop();
171 
172  // Send shutdown signal to all client connections.
173  typename weakChannelInfo_t::container_t channels(getChannels());
174 
175  for (const auto& v : channels)
176  {
177  if (v.m_channel.expired())
178  continue;
179  auto ptr = v.m_channel.lock();
180  ptr->template pushMsg<cmdSHUTDOWN>();
181  }
182 
183  auto condition = [](const channelInfo_t& _v, bool& /*_stop*/) { return (_v.m_channel->started()); };
184 
185  size_t counter = 0;
186  while (true)
187  {
188  ++counter;
189  std::this_thread::sleep_for(std::chrono::milliseconds(200));
190  if (countNofChannels(condition) == 0)
191  break;
192  if (counter > 300)
193  {
194  LOG(MiscCommon::warning) << "Some channels were not shut down properly. Exiting in anyway.";
195  break;
196  }
197  }
198 
199  m_acceptor->close();
200  m_acceptor->get_executor().context().stop();
201 
202  if (m_acceptorUI != nullptr)
203  {
204  m_acceptorUI->close();
205  m_acceptorUI->get_executor().context().stop();
206  }
207 
208  for (const auto& v : channels)
209  {
210  if (v.m_channel.expired())
211  continue;
212  auto ptr = v.m_channel.lock();
213  ptr->stop();
214  }
215 
216  std::lock_guard<std::mutex> lock(m_mutex);
217  m_channels.clear();
218  }
219  catch (std::bad_weak_ptr& e)
220  {
221  // TODO: Do we need to log something here?
222  }
223  catch (std::exception& e)
224  {
225  LOG(MiscCommon::fatal) << e.what();
226  }
227  }
228 
229  protected:
231  {
232  std::lock_guard<std::mutex> lock(m_mutex);
233 
234  auto p = _channelInfo.m_channel.lock();
235  if (p == nullptr)
236  return;
237 
238  for (auto& inf : m_channels)
239  {
240  if (inf.m_channel.get() == p.get() && inf.m_protocolHeaderID == 0)
241  {
242  inf.m_protocolHeaderID = _channelInfo.m_protocolHeaderID;
243  return;
244  }
245  }
246  LOG(MiscCommon::error) << "Failed to update protocol channel header ID <"
247  << _channelInfo.m_protocolHeaderID << "> . Channel is not registered";
248  }
249 
250  weakChannelInfo_t getChannelByID(uint64_t _protocolHeaderID)
251  {
252  std::lock_guard<std::mutex> lock(m_mutex);
253 
254  // Initial creation of the cache
255  if (m_channelsCache.empty())
256  {
257  for (auto& v : m_channels)
258  {
259  if (v.m_protocolHeaderID == 0)
260  continue;
261  // commander_cmd::SAgentInfo inf = v.m_channel->getAgentInfo(v.m_protocolHeaderID);
262  m_channelsCache.insert(std::make_pair(
263  v.m_protocolHeaderID, weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
264  }
265  }
266 
267  // Value found in the cache
268  auto it = m_channelsCache.find(_protocolHeaderID);
269  if (it != m_channelsCache.end())
270  return it->second;
271 
272  for (auto& v : m_channels)
273  {
274  // commander_cmd::SAgentInfo inf = v.m_channel->getAgentInfo(v.m_protocolHeaderID);
275  if (v.m_protocolHeaderID == _protocolHeaderID)
276  {
277  // Add the item into the cache
278  m_channelsCache.insert(std::make_pair(
279  v.m_protocolHeaderID, weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
280  // TODO: need to clean cache from dead channels
281  return weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot);
282  }
283  }
284  return weakChannelInfo_t();
285  }
286 
288  {
289  std::lock_guard<std::mutex> lock(m_mutex);
290 
291  typename weakChannelInfo_t::container_t result;
292  result.reserve(m_channels.size());
293  for (auto& v : m_channels)
294  {
295  bool stop = false;
296  if (_condition == nullptr || _condition(v, stop))
297  {
298  result.push_back(weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot));
299  if (stop)
300  break;
301  }
302  }
303  return result;
304  }
305 
306  template <ECmdType _cmd, class AttachmentType>
307  void broadcastMsg(const AttachmentType& _attachment, conditionFunction_t _condition = nullptr)
308  {
309  try
310  {
311  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
312 
313  for (const auto& v : channels)
314  {
315  if (v.m_channel.expired())
316  continue;
317  auto ptr = v.m_channel.lock();
318  ptr->template pushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
319  }
320  }
321  catch (std::bad_weak_ptr& e)
322  {
323  // TODO: Do we need to log something here?
324  }
325  }
326 
327  template <ECmdType _cmd, class AttachmentType>
328  void accumulativeBroadcastMsg(const AttachmentType& _attachment, conditionFunction_t _condition = nullptr)
329  {
330  try
331  {
332  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
333 
334  for (const auto& v : channels)
335  {
336  if (v.m_channel.expired())
337  continue;
338  auto ptr = v.m_channel.lock();
339  ptr->template accumulativePushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
340  }
341  }
342  catch (std::bad_weak_ptr& e)
343  {
344  // TODO: Do we need to log something here?
345  }
346  }
347 
348  template <ECmdType _cmd>
349  void broadcastSimpleMsg(conditionFunction_t _condition = nullptr)
350  {
351  SEmptyCmd cmd;
352  broadcastMsg<_cmd>(cmd, _condition);
353  }
354 
355  void broadcastBinaryAttachmentCmd(const std::string& _srcFilePath,
356  const std::string& _fileName,
357  uint16_t _cmdSource,
358  conditionFunction_t _condition = nullptr)
359  {
361 
362  std::string srcFilePath(_srcFilePath);
363  // Resolve environment variables
364  MiscCommon::smart_path(&srcFilePath);
365 
366  std::ifstream f(srcFilePath);
367  if (!f.is_open() || !f.good())
368  {
369  throw std::runtime_error("Could not open the source file: " + srcFilePath);
370  }
371  f.seekg(0, std::ios::end);
372  data.reserve(f.tellg());
373  f.seekg(0, std::ios::beg);
374  data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
375 
376  broadcastBinaryAttachmentCmd(data, _fileName, _cmdSource, _condition);
377  }
378 
380  const std::string& _fileName,
381  uint16_t _cmdSource,
382  conditionFunction_t _condition = nullptr)
383  {
384  try
385  {
386  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
387 
388  for (const auto& v : channels)
389  {
390  if (v.m_channel.expired())
391  continue;
392  auto ptr = v.m_channel.lock();
393  ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID);
394  }
395  }
396  catch (std::bad_weak_ptr& e)
397  {
398  // TODO: Do we need to log something here?
399  }
400  }
401 
402  size_t countNofChannels(conditionFunction_t _condition = nullptr)
403  {
404  std::lock_guard<std::mutex> lock(m_mutex);
405 
406  if (_condition == nullptr)
407  return m_channels.size();
408  size_t counter = 0;
409  for (auto& v : m_channels)
410  {
411  bool stop = false;
412  if (_condition(v, stop))
413  {
414  counter++;
415  if (stop)
416  break;
417  }
418  }
419  return counter;
420  }
421 
423  {
424  // Add disconnected channels statistics to some external statistics.
425  // This is done in order not to copy self stat structures and return them.
426  // Or not to return reference to self stat together with mutex.
427  std::lock_guard<std::mutex> lock(m_statMutex);
428  _readStat.addFromStat(m_readStatDisconnectedChannels);
429  _writeStat.addFromStat(m_writeStatDisconnectedChannels);
430  }
431 
432  private:
433  void acceptHandler(typename T::connectionPtr_t _client,
434  asioAcceptorPtr_t _acceptor,
435  const boost::system::error_code& _ec)
436  {
437  if (!_ec)
438  {
439  {
440  std::lock_guard<std::mutex> lock(m_mutex);
441  m_channels.push_back(channelInfo_t(_client, _client->getProtocolHeaderID(), false));
442  }
443  _client->start();
444  createClientAndStartAccept(_acceptor);
445  }
446  else
447  {
448  LOG(MiscCommon::error) << "Can't accept new connection: " << _ec.message();
449  }
450  }
451 
452  void createClientAndStartAccept(asioAcceptorPtr_t& _acceptor)
453  {
454  typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_executor().context(), 0);
455 
456  A* pThis = static_cast<A*>(this);
457  pThis->newClientCreated(newClient);
458 
459  // Subsribe on lobby member handshake
460  newClient->template registerHandler<EChannelEvents::OnReplyAddSlot>(
461  [this, newClient](const SSenderInfo& _sender) -> void {
462  {
463  std::lock_guard<std::mutex> lock(m_mutex);
464  m_channels.push_back(channelInfo_t(newClient, _sender.m_ID, true));
465  }
466 
468  << "Adding new slot to " << newClient->getId() << " with id " << _sender.m_ID;
469  });
470 
471  // Subscribe on dissconnect event
472  newClient->template registerHandler<EChannelEvents::OnRemoteEndDissconnected>(
473  [this, newClient](const SSenderInfo& _sender) -> void {
474  {
475  // collect statistics for disconnected channels
476  std::lock_guard<std::mutex> lock(m_statMutex);
477  m_readStatDisconnectedChannels.addFromStat(newClient->getReadStat());
478  m_writeStatDisconnectedChannels.addFromStat(newClient->getWriteStat());
479  }
480  this->removeClient(newClient.get());
481  });
482 
483  _acceptor->async_accept(
484  newClient->socket(),
485  std::bind(
486  &CConnectionManagerImpl::acceptHandler, this, newClient, _acceptor, std::placeholders::_1));
487  }
488 
489  void createInfoFile()
490  {
491  // The child needs to have that method
492  A* pThis = static_cast<A*>(this);
493 
494  std::vector<size_t> ports;
495  ports.push_back(m_acceptor->local_endpoint().port());
496  if (m_acceptorUI != nullptr)
497  ports.push_back(m_acceptorUI->local_endpoint().port());
498 
499  pThis->_createInfoFile(ports);
500  }
501 
502  void deleteInfoFile()
503  {
504  // The child needs to have that method
505  A* pThis = static_cast<A*>(this);
506  pThis->_deleteInfoFile();
507  }
508 
509  void removeClient(T* _client)
510  {
511  // TODO: fix getTypeName call
512  LOG(MiscCommon::debug) << "Removing " /*<< _client->getTypeName()*/
513  << " client from the list of active";
514  std::lock_guard<std::mutex> lock(m_mutex);
515  // TODO: FIXME: Delete all connections of the channel if the primary protocol header ID is deleted
516  m_channels.erase(remove_if(m_channels.begin(),
517  m_channels.end(),
518  [&](const channelInfo_t& i) { return (i.m_channel.get() == _client); }),
519  m_channels.end());
520  }
521 
522  void bindPortAndListen(asioAcceptorPtr_t& _acceptor)
523  {
524  const int nMaxCount = 20; // Maximum number of attempts to open the port
525  int nCount = 0;
526  // Start monitoring thread
527  while (true)
528  {
529  int nSrvPort =
530  (m_minPort == 0 && m_maxPort == 0) ? 0 : MiscCommon::INet::get_free_port(m_minPort, m_maxPort);
531  try
532  {
533  _acceptor = std::make_shared<asioAcceptor_t>(
534  m_ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
535 
536  _acceptor->listen();
537  }
538  catch (std::exception& _e)
539  {
540  if (++nCount >= nMaxCount)
541  throw _e;
542 
543  LOG(MiscCommon::info) << "Can't bind port " << nSrvPort << ". Will try another port.";
544  // If multiple commanders are started in the same time, then let's give them a chnce to find
545  // a free port.
546  std::this_thread::sleep_for(std::chrono::milliseconds(200));
547  continue;
548  }
549 
550  break;
551  }
552  }
553 
554  private:
555  size_t m_minPort;
556  size_t m_maxPort;
557  bool m_useUITransport;
559  std::shared_ptr<boost::asio::signal_set> m_signals;
560  std::mutex m_mutex;
561  typename channelInfo_t::container_t m_channels;
562  channelContainerCache_t m_channelsCache;
563 
565  boost::asio::io_context m_ioContext;
566  asioAcceptorPtr_t m_acceptor;
567 
568  // Used for UI (priority) communication
569  boost::asio::io_context m_ioContext_UI;
570  asioAcceptorPtr_t m_acceptorUI;
571 
572  boost::thread_group m_workerThreads;
573 
574  // Statistics of disconnected channels
575  SReadStat m_readStatDisconnectedChannels;
576  SWriteStat m_writeStatDisconnectedChannels;
577  std::mutex m_statMutex;
578  };
579  } // namespace protocol_api
580 } // namespace dds
581 #endif /* defined(__DDS__ConnectionManagerImpl__) */
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
Definition: def.h:151
size_t countNofChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:402
SWeakChannelInfo< T > weakChannelInfo_t
Definition: ConnectionManagerImpl.h:42
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:75
weakChannelInfo_t getChannelByID(uint64_t _protocolHeaderID)
Definition: ConnectionManagerImpl.h:250
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
std::map< uint64_t, weakChannelInfo_t > channelContainerCache_t
Definition: ConnectionManagerImpl.h:44
T::weakConnectionPtr_t m_channel
Definition: ChannelInfo.h:48
void accumulativeBroadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:328
Definition: CommandAttachmentImpl.h:54
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:553
std::function< bool(const channelInfo_t &_channelInfo, bool &)> conditionFunction_t
Definition: ConnectionManagerImpl.h:43
void runService(short _counter, boost::asio::io_context &_io_context)
Definition: ConnectionManagerImpl.h:146
void addDisconnectedChannelsStatToStat(SReadStat &_readStat, SWriteStat &_writeStat)
Definition: ConnectionManagerImpl.h:422
std::shared_ptr< asioAcceptor_t > asioAcceptorPtr_t
Definition: ConnectionManagerImpl.h:33
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:249
#define LOG(severity)
Definition: Logger.h:56
void broadcastBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:355
Definition: ChannelInfo.h:14
void broadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:307
Definition: AgentConnectionManager.h:13
Definition: def.h:153
void stop()
Definition: ConnectionManagerImpl.h:164
Definition: StatImpl.h:68
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:34
Definition: ChannelInfo.h:38
void addFromStat(const SReadStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:133
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:36
Definition: def.h:152
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:47
T::connectionPtr_t m_channel
Definition: ChannelInfo.h:24
Definition: def.h:149
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:95
void broadcastBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:379
boost::asio::basic_socket_acceptor< boost::asio::ip::tcp > asioAcceptor_t
Definition: ConnectionManagerImpl.h:31
void addFromStat(const SWriteStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:100
void updateChannelProtocolHeaderID(const weakChannelInfo_t &_channelInfo)
Definition: ConnectionManagerImpl.h:230
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
std::vector< SChannelInfo< T > > container_t
Definition: ChannelInfo.h:28
Definition: StatImpl.h:91
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:82
Base class for connection managers.
Definition: ConnectionManagerImpl.h:38
Definition: def.h:150
SChannelInfo< T > channelInfo_t
Definition: ConnectionManagerImpl.h:41
weakChannelInfo_t::container_t getChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:287
void broadcastSimpleMsg(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:349
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:46
std::vector< SWeakChannelInfo > container_t
Definition: ChannelInfo.h:52
uint64_t m_protocolHeaderID
Definition: ChannelInfo.h:49