6 #ifndef __DDS__ConnectionManagerImpl__ 7 #define __DDS__ConnectionManagerImpl__ 18 #include <boost/asio/basic_socket_acceptor.hpp> 19 #include <boost/thread/thread.hpp> 25 namespace protocol_api
27 #if BOOST_VERSION >= 107000 28 typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp, boost::asio::io_context::executor_type>
31 typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp>
asioAcceptor_t;
37 template <
class T,
class A>
50 , m_useUITransport(_useUITransport)
53 m_signals = std::make_shared<boost::asio::signal_set>(m_ioContext);
58 m_signals->add(SIGINT);
59 m_signals->add(SIGTERM);
61 m_signals->add(SIGQUIT);
62 #endif // defined(SIGQUIT) 64 m_signals->async_wait([
this](boost::system::error_code ,
int signo) {
82 void start(
bool _join =
true,
unsigned int _nThreads = 0 )
87 A* pThis = static_cast<A*>(
this);
90 const float maxIdleTime =
96 bindPortAndListen(m_acceptor);
97 createClientAndStartAccept(m_acceptor);
101 if (m_useUITransport)
103 bindPortAndListen(m_acceptorUI);
104 createClientAndStartAccept(m_acceptorUI);
112 unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
114 if (concurrentThreads < 2)
115 concurrentThreads = 2;
117 <<
"Starting DDS transport engine using " << concurrentThreads <<
" concurrent threads.";
118 for (
int x = 0; x < concurrentThreads; ++x)
120 m_workerThreads.create_thread(
121 [
this]() {
runService(10, m_acceptor->get_executor().context()); });
125 if (m_acceptorUI !=
nullptr)
127 const unsigned int concurrentThreads = 2;
129 <<
"Starting DDS UI transport engine using " << concurrentThreads <<
" concurrent threads.";
130 for (
int x = 0; x < concurrentThreads; ++x)
132 m_workerThreads.create_thread(
133 [
this]() {
runService(10, m_acceptorUI->get_executor().context()); });
138 m_workerThreads.join_all();
140 catch (std::exception& e)
146 void runService(
short _counter, boost::asio::io_context& _io_context)
156 catch (std::exception& ex)
169 A* pThis = static_cast<A*>(
this);
175 for (
const auto& v : channels)
177 if (v.m_channel.expired())
179 auto ptr = v.m_channel.lock();
180 ptr->template pushMsg<cmdSHUTDOWN>();
189 std::this_thread::sleep_for(std::chrono::milliseconds(200));
200 m_acceptor->get_executor().context().stop();
202 if (m_acceptorUI !=
nullptr)
204 m_acceptorUI->close();
205 m_acceptorUI->get_executor().context().stop();
208 for (
const auto& v : channels)
210 if (v.m_channel.expired())
212 auto ptr = v.m_channel.lock();
216 std::lock_guard<std::mutex> lock(m_mutex);
219 catch (std::bad_weak_ptr& e)
223 catch (std::exception& e)
232 std::lock_guard<std::mutex> lock(m_mutex);
238 for (
auto& inf : m_channels)
240 if (inf.m_channel.get() == p.get() && inf.m_protocolHeaderID == 0)
252 std::lock_guard<std::mutex> lock(m_mutex);
255 if (m_channelsCache.empty())
257 for (
auto& v : m_channels)
259 if (v.m_protocolHeaderID == 0)
262 m_channelsCache.insert(std::make_pair(
263 v.m_protocolHeaderID,
weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
268 auto it = m_channelsCache.find(_protocolHeaderID);
269 if (it != m_channelsCache.end())
272 for (
auto& v : m_channels)
275 if (v.m_protocolHeaderID == _protocolHeaderID)
278 m_channelsCache.insert(std::make_pair(
279 v.m_protocolHeaderID,
weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
289 std::lock_guard<std::mutex> lock(m_mutex);
292 result.reserve(m_channels.size());
293 for (
auto& v : m_channels)
296 if (_condition ==
nullptr || _condition(v,
stop))
298 result.push_back(
weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot));
306 template <ECmdType _cmd,
class AttachmentType>
313 for (
const auto& v : channels)
315 if (v.m_channel.expired())
317 auto ptr = v.m_channel.lock();
318 ptr->template pushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
321 catch (std::bad_weak_ptr& e)
327 template <ECmdType _cmd,
class AttachmentType>
334 for (
const auto& v : channels)
336 if (v.m_channel.expired())
338 auto ptr = v.m_channel.lock();
339 ptr->template accumulativePushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
342 catch (std::bad_weak_ptr& e)
348 template <ECmdType _cmd>
352 broadcastMsg<_cmd>(cmd, _condition);
356 const std::string& _fileName,
362 std::string srcFilePath(_srcFilePath);
366 std::ifstream f(srcFilePath);
367 if (!f.is_open() || !f.good())
369 throw std::runtime_error(
"Could not open the source file: " + srcFilePath);
371 f.seekg(0, std::ios::end);
372 data.reserve(f.tellg());
373 f.seekg(0, std::ios::beg);
374 data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
380 const std::string& _fileName,
388 for (
const auto& v : channels)
390 if (v.m_channel.expired())
392 auto ptr = v.m_channel.lock();
393 ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID);
396 catch (std::bad_weak_ptr& e)
404 std::lock_guard<std::mutex> lock(m_mutex);
406 if (_condition ==
nullptr)
407 return m_channels.size();
409 for (
auto& v : m_channels)
412 if (_condition(v,
stop))
427 std::lock_guard<std::mutex> lock(m_statMutex);
428 _readStat.
addFromStat(m_readStatDisconnectedChannels);
429 _writeStat.
addFromStat(m_writeStatDisconnectedChannels);
433 void acceptHandler(
typename T::connectionPtr_t _client,
435 const boost::system::error_code& _ec)
440 std::lock_guard<std::mutex> lock(m_mutex);
441 m_channels.push_back(
channelInfo_t(_client, _client->getProtocolHeaderID(),
false));
444 createClientAndStartAccept(_acceptor);
454 typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_executor().context(), 0);
456 A* pThis = static_cast<A*>(
this);
457 pThis->newClientCreated(newClient);
460 newClient->template registerHandler<EChannelEvents::OnReplyAddSlot>(
461 [
this, newClient](
const SSenderInfo& _sender) ->
void {
463 std::lock_guard<std::mutex> lock(m_mutex);
464 m_channels.push_back(
channelInfo_t(newClient, _sender.m_ID,
true));
468 <<
"Adding new slot to " << newClient->getId() <<
" with id " << _sender.m_ID;
472 newClient->template registerHandler<EChannelEvents::OnRemoteEndDissconnected>(
473 [
this, newClient](
const SSenderInfo& _sender) ->
void {
476 std::lock_guard<std::mutex> lock(m_statMutex);
477 m_readStatDisconnectedChannels.
addFromStat(newClient->getReadStat());
478 m_writeStatDisconnectedChannels.
addFromStat(newClient->getWriteStat());
480 this->removeClient(newClient.get());
483 _acceptor->async_accept(
486 &CConnectionManagerImpl::acceptHandler,
this, newClient, _acceptor, std::placeholders::_1));
489 void createInfoFile()
492 A* pThis = static_cast<A*>(
this);
494 std::vector<size_t> ports;
495 ports.push_back(m_acceptor->local_endpoint().port());
496 if (m_acceptorUI !=
nullptr)
497 ports.push_back(m_acceptorUI->local_endpoint().port());
499 pThis->_createInfoFile(ports);
502 void deleteInfoFile()
505 A* pThis = static_cast<A*>(
this);
506 pThis->_deleteInfoFile();
509 void removeClient(T* _client)
513 <<
" client from the list of active";
514 std::lock_guard<std::mutex> lock(m_mutex);
516 m_channels.erase(remove_if(m_channels.begin(),
518 [&](
const channelInfo_t& i) {
return (i.m_channel.get() == _client); }),
524 const int nMaxCount = 20;
533 _acceptor = std::make_shared<asioAcceptor_t>(
534 m_ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
538 catch (std::exception& _e)
540 if (++nCount >= nMaxCount)
546 std::this_thread::sleep_for(std::chrono::milliseconds(200));
557 bool m_useUITransport;
559 std::shared_ptr<boost::asio::signal_set> m_signals;
565 boost::asio::io_context m_ioContext;
569 boost::asio::io_context m_ioContext_UI;
572 boost::thread_group m_workerThreads;
575 SReadStat m_readStatDisconnectedChannels;
576 SWriteStat m_writeStatDisconnectedChannels;
577 std::mutex m_statMutex;
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
size_t countNofChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:402
SWeakChannelInfo< T > weakChannelInfo_t
Definition: ConnectionManagerImpl.h:42
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:75
weakChannelInfo_t getChannelByID(uint64_t _protocolHeaderID)
Definition: ConnectionManagerImpl.h:250
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
std::map< uint64_t, weakChannelInfo_t > channelContainerCache_t
Definition: ConnectionManagerImpl.h:44
T::weakConnectionPtr_t m_channel
Definition: ChannelInfo.h:48
void accumulativeBroadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:328
Definition: CommandAttachmentImpl.h:54
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:553
std::function< bool(const channelInfo_t &_channelInfo, bool &)> conditionFunction_t
Definition: ConnectionManagerImpl.h:43
void runService(short _counter, boost::asio::io_context &_io_context)
Definition: ConnectionManagerImpl.h:146
void addDisconnectedChannelsStatToStat(SReadStat &_readStat, SWriteStat &_writeStat)
Definition: ConnectionManagerImpl.h:422
std::shared_ptr< asioAcceptor_t > asioAcceptorPtr_t
Definition: ConnectionManagerImpl.h:33
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:249
#define LOG(severity)
Definition: Logger.h:56
void broadcastBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:355
Definition: ChannelInfo.h:14
void broadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:307
Definition: AgentConnectionManager.h:13
void stop()
Definition: ConnectionManagerImpl.h:164
Definition: StatImpl.h:68
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:34
Definition: ChannelInfo.h:38
void addFromStat(const SReadStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:133
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:36
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:47
T::connectionPtr_t m_channel
Definition: ChannelInfo.h:24
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:95
void broadcastBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:379
boost::asio::basic_socket_acceptor< boost::asio::ip::tcp > asioAcceptor_t
Definition: ConnectionManagerImpl.h:31
void addFromStat(const SWriteStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:100
void updateChannelProtocolHeaderID(const weakChannelInfo_t &_channelInfo)
Definition: ConnectionManagerImpl.h:230
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
std::vector< SChannelInfo< T > > container_t
Definition: ChannelInfo.h:28
Definition: StatImpl.h:91
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:82
Base class for connection managers.
Definition: ConnectionManagerImpl.h:38
SChannelInfo< T > channelInfo_t
Definition: ConnectionManagerImpl.h:41
weakChannelInfo_t::container_t getChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:287
void broadcastSimpleMsg(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:349
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:46
std::vector< SWeakChannelInfo > container_t
Definition: ChannelInfo.h:52
uint64_t m_protocolHeaderID
Definition: ChannelInfo.h:49