6 #ifndef __DDS__ConnectionManagerImpl__ 7 #define __DDS__ConnectionManagerImpl__ 17 #include <boost/asio/basic_socket_acceptor.hpp> 18 #include <boost/thread/thread.hpp> 24 namespace protocol_api
26 #if BOOST_VERSION >= 107000 27 typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp, boost::asio::io_context::executor_type>
30 typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp>
asioAcceptor_t;
36 template <
class T,
class A>
49 , m_useUITransport(_useUITransport)
52 m_signals = std::make_shared<boost::asio::signal_set>(m_ioContext);
57 m_signals->add(SIGINT);
58 m_signals->add(SIGTERM);
60 m_signals->add(SIGQUIT);
61 #endif // defined(SIGQUIT) 63 m_signals->async_wait(
64 [
this](boost::system::error_code ,
int signo)
83 void start(
bool _join =
true,
unsigned int _nThreads = 0 )
88 A* pThis = static_cast<A*>(
this);
91 const float maxIdleTime =
97 bindPortAndListen(m_acceptor);
98 createClientAndStartAccept(m_acceptor);
102 if (m_useUITransport)
104 bindPortAndListen(m_acceptorUI);
105 createClientAndStartAccept(m_acceptorUI);
113 unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
115 if (concurrentThreads < 2)
116 concurrentThreads = 2;
118 <<
" concurrent threads.";
119 for (
unsigned int x = 0; x < concurrentThreads; ++x)
121 m_workerThreads.create_thread([
this]()
122 {
runService(10, m_acceptor->get_executor().context()); });
126 if (m_acceptorUI !=
nullptr)
128 const unsigned int concurrentThreads = 2;
130 <<
"Starting DDS UI transport engine using " << concurrentThreads <<
" concurrent threads.";
131 for (
unsigned int x = 0; x < concurrentThreads; ++x)
133 m_workerThreads.create_thread([
this]()
134 {
runService(10, m_acceptorUI->get_executor().context()); });
139 m_workerThreads.join_all();
141 catch (std::exception& e)
147 void runService(
short _counter, boost::asio::io_context& _io_context)
157 catch (std::exception& ex)
170 A* pThis = static_cast<A*>(
this);
176 for (
const auto& v : channels)
178 if (v.m_channel.expired())
180 auto ptr = v.m_channel.lock();
181 ptr->template pushMsg<cmdSHUTDOWN>();
190 std::this_thread::sleep_for(std::chrono::milliseconds(200));
201 m_acceptor->get_executor().context().stop();
203 if (m_acceptorUI !=
nullptr)
205 m_acceptorUI->close();
206 m_acceptorUI->get_executor().context().stop();
209 for (
const auto& v : channels)
211 if (v.m_channel.expired())
213 auto ptr = v.m_channel.lock();
217 std::lock_guard<std::mutex> lock(m_mutex);
220 catch (std::bad_weak_ptr& e)
224 catch (std::exception& e)
233 std::lock_guard<std::mutex> lock(m_mutex);
239 for (
auto& inf : m_channels)
241 if (inf.m_channel.get() == p.get() && inf.m_protocolHeaderID == 0)
253 std::lock_guard<std::mutex> lock(m_mutex);
256 if (m_channelsCache.empty())
258 for (
auto& v : m_channels)
260 if (v.m_protocolHeaderID == 0)
263 m_channelsCache.insert(std::make_pair(
264 v.m_protocolHeaderID,
weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
269 auto it = m_channelsCache.find(_protocolHeaderID);
270 if (it != m_channelsCache.end())
273 for (
auto& v : m_channels)
276 if (v.m_protocolHeaderID == _protocolHeaderID)
279 m_channelsCache.insert(std::make_pair(
280 v.m_protocolHeaderID,
weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot)));
290 std::lock_guard<std::mutex> lock(m_mutex);
293 result.reserve(m_channels.size());
294 for (
auto& v : m_channels)
297 if (_condition ==
nullptr || _condition(v,
stop))
299 result.push_back(
weakChannelInfo_t(v.m_channel, v.m_protocolHeaderID, v.m_isSlot));
307 template <ECmdType _cmd,
class AttachmentType>
314 for (
const auto& v : channels)
316 if (v.m_channel.expired())
318 auto ptr = v.m_channel.lock();
319 ptr->template pushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
322 catch (std::bad_weak_ptr& e)
328 template <ECmdType _cmd,
class AttachmentType>
335 for (
const auto& v : channels)
337 if (v.m_channel.expired())
339 auto ptr = v.m_channel.lock();
340 ptr->template accumulativePushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
343 catch (std::bad_weak_ptr& e)
349 template <ECmdType _cmd>
353 broadcastMsg<_cmd>(cmd, _condition);
357 const std::string& _fileName,
363 std::string srcFilePath(_srcFilePath);
367 std::ifstream f(srcFilePath);
368 if (!f.is_open() || !f.good())
370 throw std::runtime_error(
"Could not open the source file: " + srcFilePath);
372 f.seekg(0, std::ios::end);
373 data.reserve(f.tellg());
374 f.seekg(0, std::ios::beg);
375 data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
381 const std::string& _fileName,
389 for (
const auto& v : channels)
391 if (v.m_channel.expired())
393 auto ptr = v.m_channel.lock();
394 ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID);
397 catch (std::bad_weak_ptr& e)
405 std::lock_guard<std::mutex> lock(m_mutex);
407 if (_condition ==
nullptr)
408 return m_channels.size();
410 for (
auto& v : m_channels)
413 if (_condition(v,
stop))
424 void acceptHandler(
typename T::connectionPtr_t _client,
426 const boost::system::error_code& _ec)
431 std::lock_guard<std::mutex> lock(m_mutex);
432 m_channels.push_back(
channelInfo_t(_client, _client->getProtocolHeaderID(),
false));
435 createClientAndStartAccept(_acceptor);
445 typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_executor().context(), 0);
447 A* pThis = static_cast<A*>(
this);
448 pThis->newClientCreated(newClient);
451 newClient->template registerHandler<EChannelEvents::OnReplyAddSlot>(
452 [
this, newClient](
const SSenderInfo& _sender) ->
void 455 std::lock_guard<std::mutex> lock(m_mutex);
456 m_channels.push_back(
channelInfo_t(newClient, _sender.m_ID,
true));
460 <<
"Adding new slot to " << newClient->getId() <<
" with id " << _sender.m_ID;
464 newClient->template registerHandler<EChannelEvents::OnRemoteEndDissconnected>(
465 [
this, newClient](
const SSenderInfo& ) ->
void { this->removeClient(newClient.get()); });
467 _acceptor->async_accept(
470 &CConnectionManagerImpl::acceptHandler,
this, newClient, _acceptor, std::placeholders::_1));
473 void createInfoFile()
476 A* pThis = static_cast<A*>(
this);
478 std::vector<size_t> ports;
479 ports.push_back(m_acceptor->local_endpoint().port());
480 if (m_acceptorUI !=
nullptr)
481 ports.push_back(m_acceptorUI->local_endpoint().port());
483 pThis->_createInfoFile(ports);
486 void deleteInfoFile()
489 A* pThis = static_cast<A*>(
this);
490 pThis->_deleteInfoFile();
493 void removeClient(T* _client)
497 <<
" client from the list of active";
498 std::lock_guard<std::mutex> lock(m_mutex);
500 m_channels.erase(remove_if(m_channels.begin(),
502 [&](
const channelInfo_t& i) {
return (i.m_channel.get() == _client); }),
508 const int nMaxCount = 20;
517 _acceptor = std::make_shared<asioAcceptor_t>(
518 m_ioContext, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
522 catch (std::exception& _e)
524 if (++nCount >= nMaxCount)
530 std::this_thread::sleep_for(std::chrono::milliseconds(200));
541 bool m_useUITransport;
543 std::shared_ptr<boost::asio::signal_set> m_signals;
549 boost::asio::io_context m_ioContext;
553 boost::asio::io_context m_ioContext_UI;
556 boost::thread_group m_workerThreads;
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:550
size_t countNofChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:403
SWeakChannelInfo< T > weakChannelInfo_t
Definition: ConnectionManagerImpl.h:41
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:76
weakChannelInfo_t getChannelByID(uint64_t _protocolHeaderID)
Definition: ConnectionManagerImpl.h:251
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
std::map< uint64_t, weakChannelInfo_t > channelContainerCache_t
Definition: ConnectionManagerImpl.h:43
T::weakConnectionPtr_t m_channel
Definition: ChannelInfo.h:48
void accumulativeBroadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:329
Definition: CommandAttachmentImpl.h:54
std::function< bool(const channelInfo_t &_channelInfo, bool &)> conditionFunction_t
Definition: ConnectionManagerImpl.h:42
void runService(short _counter, boost::asio::io_context &_io_context)
Definition: ConnectionManagerImpl.h:147
std::shared_ptr< asioAcceptor_t > asioAcceptorPtr_t
Definition: ConnectionManagerImpl.h:32
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:317
#define LOG(severity)
Definition: Logger.h:34
void broadcastBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:356
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:124
Definition: ChannelInfo.h:14
void broadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:308
Miscellaneous functions and helpers are located here.
Definition: AgentConnectionManager.h:13
void broadcastBinaryAttachmentCmd(const dds::misc::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:380
void stop()
Definition: ConnectionManagerImpl.h:165
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:35
Definition: ChannelInfo.h:38
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:37
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:46
T::connectionPtr_t m_channel
Definition: ChannelInfo.h:24
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:95
boost::asio::basic_socket_acceptor< boost::asio::ip::tcp > asioAcceptor_t
Definition: ConnectionManagerImpl.h:30
void updateChannelProtocolHeaderID(const weakChannelInfo_t &_channelInfo)
Definition: ConnectionManagerImpl.h:231
std::vector< SChannelInfo< T > > container_t
Definition: ChannelInfo.h:28
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:83
Base class for connection managers.
Definition: ConnectionManagerImpl.h:37
SChannelInfo< T > channelInfo_t
Definition: ConnectionManagerImpl.h:40
weakChannelInfo_t::container_t getChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:288
void broadcastSimpleMsg(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:350
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:54
std::vector< SWeakChannelInfo > container_t
Definition: ChannelInfo.h:52
uint64_t m_protocolHeaderID
Definition: ChannelInfo.h:49