DDS  ver. 3.6
ConnectionManagerImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 
6 #ifndef __DDS__ConnectionManagerImpl__
7 #define __DDS__ConnectionManagerImpl__
8 // DDS
9 #include "ChannelInfo.h"
10 #include "CommandAttachmentImpl.h"
11 #include "MonitoringThread.h"
12 #include "Options.h"
13 #include "ProtocolMessage.h"
14 // STD
15 #include <mutex>
16 // BOOST
17 #include <boost/asio/basic_socket_acceptor.hpp>
18 #include <boost/thread/thread.hpp>
19 // MiscCommon
20 #include "INet.h"
21 
22 namespace dds
23 {
24  namespace protocol_api
25  {
26 #if BOOST_VERSION >= 107000
27  typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp, boost::asio::io_context::executor_type>
29 #else
30  typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp> asioAcceptor_t;
31 #endif
32  typedef std::shared_ptr<asioAcceptor_t> asioAcceptorPtr_t;
33 
36  template <class T, class A>
38  {
39  public:
42  typedef std::function<bool(const channelInfo_t& _channelInfo, bool& /*_stop*/)> conditionFunction_t;
43  using channelContainerCache_t = std::map<uint64_t, weakChannelInfo_t>;
44 
45  public:
46  CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
47  : m_minPort(_minPort)
48  , m_maxPort(_maxPort)
49  , m_useUITransport(_useUITransport)
50  {
51  // Create and register signals
52  m_signals = std::make_shared<boost::asio::signal_set>(m_ioContext);
53 
54  // Register to handle the signals that indicate when the server should exit.
55  // It is safe to register for the same signal multiple times in a program,
56  // provided all registration for the specified signal is made through Asio.
57  m_signals->add(SIGINT);
58  m_signals->add(SIGTERM);
59 #if defined(SIGQUIT)
60  m_signals->add(SIGQUIT);
61 #endif // defined(SIGQUIT)
62 
63  m_signals->async_wait(
64  [this](boost::system::error_code /*ec*/, int signo)
65  {
66  // The server is stopped by cancelling all outstanding asynchronous
67  // operations. Once all operations have finished the io_context::run()
68  // call will exit.
69  LOG(dds::misc::info) << "Received a signal: " << signo;
70  LOG(dds::misc::info) << "Stopping DDS transport server";
71 
72  stop();
73  });
74  }
75 
77  {
78  // Delete server info file
79  deleteInfoFile();
80  stop();
81  }
82 
83  void start(bool _join = true, unsigned int _nThreads = 0 /*0 - auto; min. number is 4*/)
84  {
85  try
86  {
87  // Call _start of the "child"
88  A* pThis = static_cast<A*>(this);
89  pThis->_start();
90 
91  const float maxIdleTime =
93 
94  CMonitoringThread::instance().start(maxIdleTime,
95  []() { LOG(dds::misc::info) << "Idle callback called."; });
96 
97  bindPortAndListen(m_acceptor);
98  createClientAndStartAccept(m_acceptor);
99 
100  // If we use second channel for communication with UI we have to start acceptiing connection on that
101  // channel.
102  if (m_useUITransport)
103  {
104  bindPortAndListen(m_acceptorUI);
105  createClientAndStartAccept(m_acceptorUI);
106  }
107 
108  // Create a server info file
109  createInfoFile();
110 
111  // a thread pool for the DDS transport engine
112  // may return 0 when not able to detect
113  unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
114  // we need at least 2 threads
115  if (concurrentThreads < 2)
116  concurrentThreads = 2;
117  LOG(dds::misc::info) << "Starting DDS transport engine using " << concurrentThreads
118  << " concurrent threads.";
119  for (unsigned int x = 0; x < concurrentThreads; ++x)
120  {
121  m_workerThreads.create_thread([this]()
122  { runService(10, m_acceptor->get_executor().context()); });
123  }
124 
125  // Starting service for UI transport engine
126  if (m_acceptorUI != nullptr)
127  {
128  const unsigned int concurrentThreads = 2;
130  << "Starting DDS UI transport engine using " << concurrentThreads << " concurrent threads.";
131  for (unsigned int x = 0; x < concurrentThreads; ++x)
132  {
133  m_workerThreads.create_thread([this]()
134  { runService(10, m_acceptorUI->get_executor().context()); });
135  }
136  }
137 
138  if (_join)
139  m_workerThreads.join_all();
140  }
141  catch (std::exception& e)
142  {
143  LOG(dds::misc::fatal) << e.what();
144  }
145  }
146 
147  void runService(short _counter, boost::asio::io_context& _io_context)
148  {
149  if (_counter <= 0)
150  {
151  LOG(dds::misc::error) << "CConnectionManagerImpl: can't start another io_context.";
152  }
153  try
154  {
155  _io_context.run();
156  }
157  catch (std::exception& ex)
158  {
159  LOG(dds::misc::error) << "CConnectionManagerImpl exception: " << ex.what();
160  LOG(dds::misc::info) << "CConnectionManagerImpl restarting io_context";
161  runService(--_counter, _io_context);
162  }
163  }
164 
165  void stop()
166  {
167  try
168  {
169  // Call _stop of the "child"
170  A* pThis = static_cast<A*>(this);
171  pThis->_stop();
172 
173  // Send shutdown signal to all client connections.
174  typename weakChannelInfo_t::container_t channels(getChannels());
175 
176  for (const auto& v : channels)
177  {
178  if (v.m_channel.expired())
179  continue;
180  auto ptr = v.m_channel.lock();
181  ptr->template pushMsg<cmdSHUTDOWN>();
182  }
183 
184  auto condition = [](const channelInfo_t& _v, bool& /*_stop*/) { return (_v.m_channel->started()); };
185 
186  size_t counter = 0;
187  while (true)
188  {
189  ++counter;
190  std::this_thread::sleep_for(std::chrono::milliseconds(200));
191  if (countNofChannels(condition) == 0)
192  break;
193  if (counter > 300)
194  {
195  LOG(dds::misc::warning) << "Some channels were not shut down properly. Exiting in anyway.";
196  break;
197  }
198  }
199 
200  m_acceptor->close();
201  m_acceptor->get_executor().context().stop();
202 
203  if (m_acceptorUI != nullptr)
204  {
205  m_acceptorUI->close();
206  m_acceptorUI->get_executor().context().stop();
207  }
208 
209  for (const auto& v : channels)
210  {
211  if (v.m_channel.expired())
212  continue;
213  auto ptr = v.m_channel.lock();
214  ptr->stop();
215  }
216 
217  std::lock_guard<std::mutex> lock(m_mutex);
218  m_channels.clear();
219  }
220  catch (std::bad_weak_ptr& e)
221  {
222  // TODO: Do we need to log something here?
223  }
224  catch (std::exception& e)
225  {
226  LOG(dds::misc::fatal) << e.what();
227  }
228  }
229 
230  protected:
232  {
233  std::lock_guard<std::mutex> lock(m_mutex);
234 
235  auto p = _channelInfo.m_channel.lock();
236  if (p == nullptr)
237  return;
238 
239  for (auto& inf : m_channels)
240  {
241  if (inf.m_channel.get() == p.get() && inf.m_protocolHeaderID == 0)
242  {
243  inf.m_protocolHeaderID = _channelInfo.m_protocolHeaderID;
244  return;
245  }
246  }
247  LOG(dds::misc::error) << "Failed to update protocol channel header ID <"
248  << _channelInfo.m_protocolHeaderID << "> . Channel is not registered";
249  }
250 
251  weakChannelInfo_t getChannelByID(uint64_t _protocolHeaderID)
252  {
253  std::lock_guard<std::mutex> lock(m_mutex);
254 
255  // Initial creation of the cache
256  if (m_channelsCache.empty())
257  {
258  for (auto& v : m_channels)
259  {
260  if (v.m_protocolHeaderID == 0)
261  continue;
262  // commander_cmd::SAgentInfo inf = v.m_channel->getAgentInfo(v.m_protocolHeaderID);
263  m_channelsCache.insert(std::make_pair(
264  v.m_protocolHeaderID, weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
265  }
266  }
267 
268  // Value found in the cache
269  auto it = m_channelsCache.find(_protocolHeaderID);
270  if (it != m_channelsCache.end())
271  return it->second;
272 
273  for (auto& v : m_channels)
274  {
275  // commander_cmd::SAgentInfo inf = v.m_channel->getAgentInfo(v.m_protocolHeaderID);
276  if (v.m_protocolHeaderID == _protocolHeaderID)
277  {
278  // Add the item into the cache
279  m_channelsCache.insert(std::make_pair(
280  v.m_protocolHeaderID, weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
281  // TODO: need to clean cache from dead channels
282  return weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot);
283  }
284  }
285  return weakChannelInfo_t();
286  }
287 
289  {
290  std::lock_guard<std::mutex> lock(m_mutex);
291 
292  typename weakChannelInfo_t::container_t result;
293  result.reserve(m_channels.size());
294  for (auto& v : m_channels)
295  {
296  bool stop = false;
297  if (_condition == nullptr || _condition(v, stop))
298  {
299  result.push_back(weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot));
300  if (stop)
301  break;
302  }
303  }
304  return result;
305  }
306 
307  template <ECmdType _cmd, class AttachmentType>
308  void broadcastMsg(const AttachmentType& _attachment, conditionFunction_t _condition = nullptr)
309  {
310  try
311  {
312  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
313 
314  for (const auto& v : channels)
315  {
316  if (v.m_channel.expired())
317  continue;
318  auto ptr = v.m_channel.lock();
319  ptr->template pushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
320  }
321  }
322  catch (std::bad_weak_ptr& e)
323  {
324  // TODO: Do we need to log something here?
325  }
326  }
327 
328  template <ECmdType _cmd, class AttachmentType>
329  void accumulativeBroadcastMsg(const AttachmentType& _attachment, conditionFunction_t _condition = nullptr)
330  {
331  try
332  {
333  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
334 
335  for (const auto& v : channels)
336  {
337  if (v.m_channel.expired())
338  continue;
339  auto ptr = v.m_channel.lock();
340  ptr->template accumulativePushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
341  }
342  }
343  catch (std::bad_weak_ptr& e)
344  {
345  // TODO: Do we need to log something here?
346  }
347  }
348 
349  template <ECmdType _cmd>
350  void broadcastSimpleMsg(conditionFunction_t _condition = nullptr)
351  {
352  SEmptyCmd cmd;
353  broadcastMsg<_cmd>(cmd, _condition);
354  }
355 
356  void broadcastBinaryAttachmentCmd(const std::string& _srcFilePath,
357  const std::string& _fileName,
358  uint16_t _cmdSource,
359  conditionFunction_t _condition = nullptr)
360  {
362 
363  std::string srcFilePath(_srcFilePath);
364  // Resolve environment variables
365  dds::misc::smart_path(&srcFilePath);
366 
367  std::ifstream f(srcFilePath);
368  if (!f.is_open() || !f.good())
369  {
370  throw std::runtime_error("Could not open the source file: " + srcFilePath);
371  }
372  f.seekg(0, std::ios::end);
373  data.reserve(f.tellg());
374  f.seekg(0, std::ios::beg);
375  data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
376 
377  broadcastBinaryAttachmentCmd(data, _fileName, _cmdSource, _condition);
378  }
379 
381  const std::string& _fileName,
382  uint16_t _cmdSource,
383  conditionFunction_t _condition = nullptr)
384  {
385  try
386  {
387  typename weakChannelInfo_t::container_t channels(getChannels(_condition));
388 
389  for (const auto& v : channels)
390  {
391  if (v.m_channel.expired())
392  continue;
393  auto ptr = v.m_channel.lock();
394  ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID);
395  }
396  }
397  catch (std::bad_weak_ptr& e)
398  {
399  // TODO: Do we need to log something here?
400  }
401  }
402 
403  size_t countNofChannels(conditionFunction_t _condition = nullptr)
404  {
405  std::lock_guard<std::mutex> lock(m_mutex);
406 
407  if (_condition == nullptr)
408  return m_channels.size();
409  size_t counter = 0;
410  for (auto& v : m_channels)
411  {
412  bool stop = false;
413  if (_condition(v, stop))
414  {
415  counter++;
416  if (stop)
417  break;
418  }
419  }
420  return counter;
421  }
422 
423  private:
424  void acceptHandler(typename T::connectionPtr_t _client,
425  asioAcceptorPtr_t _acceptor,
426  const boost::system::error_code& _ec)
427  {
428  if (!_ec)
429  {
430  {
431  std::lock_guard<std::mutex> lock(m_mutex);
432  m_channels.push_back(channelInfo_t(_client, _client->getProtocolHeaderID(), false));
433  }
434  _client->start();
435  createClientAndStartAccept(_acceptor);
436  }
437  else
438  {
439  LOG(dds::misc::error) << "Can't accept new connection: " << _ec.message();
440  }
441  }
442 
443  void createClientAndStartAccept(asioAcceptorPtr_t& _acceptor)
444  {
445  typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_executor().context(), 0);
446 
447  A* pThis = static_cast<A*>(this);
448  pThis->newClientCreated(newClient);
449 
450  // Subsribe on lobby member handshake
451  newClient->template registerHandler<EChannelEvents::OnReplyAddSlot>(
452  [this, newClient](const SSenderInfo& _sender) -> void
453  {
454  {
455  std::lock_guard<std::mutex> lock(m_mutex);
456  m_channels.push_back(channelInfo_t(newClient, _sender.m_ID, true));
457  }
458 
460  << "Adding new slot to " << newClient->getId() << " with id " << _sender.m_ID;
461  });
462 
463  // Subscribe on dissconnect event
464  newClient->template registerHandler<EChannelEvents::OnRemoteEndDissconnected>(
465  [this, newClient](const SSenderInfo& /*_sender*/) -> void { this->removeClient(newClient.get()); });
466 
467  _acceptor->async_accept(
468  newClient->socket(),
469  std::bind(
470  &CConnectionManagerImpl::acceptHandler, this, newClient, _acceptor, std::placeholders::_1));
471  }
472 
473  void createInfoFile()
474  {
475  // The child needs to have that method
476  A* pThis = static_cast<A*>(this);
477 
478  std::vector<size_t> ports;
479  ports.push_back(m_acceptor->local_endpoint().port());
480  if (m_acceptorUI != nullptr)
481  ports.push_back(m_acceptorUI->local_endpoint().port());
482 
483  pThis->_createInfoFile(ports);
484  }
485 
486  void deleteInfoFile()
487  {
488  // The child needs to have that method
489  A* pThis = static_cast<A*>(this);
490  pThis->_deleteInfoFile();
491  }
492 
493  void removeClient(T* _client)
494  {
495  // TODO: fix getTypeName call
496  LOG(dds::misc::debug) << "Removing " /*<< _client->getTypeName()*/
497  << " client from the list of active";
498  std::lock_guard<std::mutex> lock(m_mutex);
499  // FIXME: Delete all connections of the channel if the primary protocol header ID is deleted
500  m_channels.erase(remove_if(m_channels.begin(),
501  m_channels.end(),
502  [&](const channelInfo_t& i) { return (i.m_channel.get() == _client); }),
503  m_channels.end());
504  }
505 
506  void bindPortAndListen(asioAcceptorPtr_t& _acceptor)
507  {
508  const int nMaxCount = 20; // Maximum number of attempts to open the port
509  int nCount = 0;
510  // Start monitoring thread
511  while (true)
512  {
513  int nSrvPort =
514  (m_minPort == 0 && m_maxPort == 0) ? 0 : dds::misc::INet::get_free_port(m_minPort, m_maxPort);
515  try
516  {
517  _acceptor = std::make_shared<asioAcceptor_t>(
518  m_ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
519 
520  _acceptor->listen();
521  }
522  catch (std::exception& _e)
523  {
524  if (++nCount >= nMaxCount)
525  throw _e;
526 
527  LOG(dds::misc::info) << "Can't bind port " << nSrvPort << ". Will try another port.";
528  // If multiple commanders are started in the same time, then let's give them a chnce to find
529  // a free port.
530  std::this_thread::sleep_for(std::chrono::milliseconds(200));
531  continue;
532  }
533 
534  break;
535  }
536  }
537 
538  private:
539  size_t m_minPort;
540  size_t m_maxPort;
541  bool m_useUITransport;
543  std::shared_ptr<boost::asio::signal_set> m_signals;
544  std::mutex m_mutex;
545  typename channelInfo_t::container_t m_channels;
546  channelContainerCache_t m_channelsCache;
547 
549  boost::asio::io_context m_ioContext;
550  asioAcceptorPtr_t m_acceptor;
551 
552  // Used for UI (priority) communication
553  boost::asio::io_context m_ioContext_UI;
554  asioAcceptorPtr_t m_acceptorUI;
555 
556  boost::thread_group m_workerThreads;
557  };
558  } // namespace protocol_api
559 } // namespace dds
560 #endif /* defined(__DDS__ConnectionManagerImpl__) */
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:550
size_t countNofChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:403
SWeakChannelInfo< T > weakChannelInfo_t
Definition: ConnectionManagerImpl.h:41
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:76
weakChannelInfo_t getChannelByID(uint64_t _protocolHeaderID)
Definition: ConnectionManagerImpl.h:251
Definition: def.h:148
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
std::map< uint64_t, weakChannelInfo_t > channelContainerCache_t
Definition: ConnectionManagerImpl.h:43
T::weakConnectionPtr_t m_channel
Definition: ChannelInfo.h:48
void accumulativeBroadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:329
Definition: CommandAttachmentImpl.h:54
std::function< bool(const channelInfo_t &_channelInfo, bool &)> conditionFunction_t
Definition: ConnectionManagerImpl.h:42
void runService(short _counter, boost::asio::io_context &_io_context)
Definition: ConnectionManagerImpl.h:147
std::shared_ptr< asioAcceptor_t > asioAcceptorPtr_t
Definition: ConnectionManagerImpl.h:32
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:317
#define LOG(severity)
Definition: Logger.h:34
void broadcastBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:356
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:124
Definition: ChannelInfo.h:14
void broadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:308
Miscellaneous functions and helpers are located here.
Definition: AgentConnectionManager.h:13
void broadcastBinaryAttachmentCmd(const dds::misc::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:380
void stop()
Definition: ConnectionManagerImpl.h:165
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:35
Definition: ChannelInfo.h:38
Definition: def.h:147
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:37
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:46
T::connectionPtr_t m_channel
Definition: ChannelInfo.h:24
Definition: def.h:146
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:95
boost::asio::basic_socket_acceptor< boost::asio::ip::tcp > asioAcceptor_t
Definition: ConnectionManagerImpl.h:30
Definition: def.h:150
void updateChannelProtocolHeaderID(const weakChannelInfo_t &_channelInfo)
Definition: ConnectionManagerImpl.h:231
std::vector< SChannelInfo< T > > container_t
Definition: ChannelInfo.h:28
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:83
Base class for connection managers.
Definition: ConnectionManagerImpl.h:37
Definition: def.h:149
SChannelInfo< T > channelInfo_t
Definition: ConnectionManagerImpl.h:40
weakChannelInfo_t::container_t getChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:288
void broadcastSimpleMsg(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:350
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:54
std::vector< SWeakChannelInfo > container_t
Definition: ChannelInfo.h:52
uint64_t m_protocolHeaderID
Definition: ChannelInfo.h:49