6 #ifndef __DDS__ConnectionManagerImpl__ 7 #define __DDS__ConnectionManagerImpl__ 17 #pragma clang diagnostic push 18 #pragma clang diagnostic ignored "-Wunused-local-typedef" 19 #include <boost/asio.hpp> 20 #pragma clang diagnostic pop 21 #include <boost/thread/thread.hpp> 27 namespace protocol_api
31 template <
class T,
class A>
39 m_acceptor = std::make_shared<boost::asio::ip::tcp::acceptor>(
40 m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
46 m_acceptorUI = std::make_shared<boost::asio::ip::tcp::acceptor>(
47 m_io_service_UI, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
51 m_signals = std::make_shared<boost::asio::signal_set>(m_io_service);
56 m_signals->add(SIGINT);
57 m_signals->add(SIGTERM);
59 m_signals->add(SIGQUIT);
60 #endif // defined(SIGQUIT) 62 m_signals->async_wait([
this](boost::system::error_code ,
int signo) {
80 void start(
bool _join =
true,
unsigned int _nThreads = 0 )
85 A* pThis =
static_cast<A*
>(
this);
89 const float maxIdleTime =
95 createClientAndStartAccept(m_acceptor);
99 if (m_acceptorUI !=
nullptr)
101 m_acceptorUI->listen();
102 createClientAndStartAccept(m_acceptorUI);
110 unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
112 if (concurrentThreads < 2)
113 concurrentThreads = 2;
115 <<
"Starting DDS transport engine using " << concurrentThreads <<
" concurrent threads.";
116 for (
int x = 0; x < concurrentThreads; ++x)
118 m_workerThreads.create_thread([
this]() {
runService(10, m_acceptor->get_io_service()); });
122 if (m_acceptorUI !=
nullptr)
124 const unsigned int concurrentThreads = 2;
126 <<
"Starting DDS UI transport engine using " << concurrentThreads <<
" concurrent threads.";
127 for (
int x = 0; x < concurrentThreads; ++x)
129 m_workerThreads.create_thread([
this]() {
runService(10, m_acceptorUI->get_io_service()); });
134 m_workerThreads.join_all();
136 catch (std::exception& e)
142 void runService(
short _counter, boost::asio::io_service& _io_service)
152 catch (std::exception& ex)
165 A* pThis =
static_cast<A*
>(
this);
169 typename T::weakConnectionPtrVector_t channels(
getChannels());
171 for (
const auto& v : channels)
176 ptr->template pushMsg<cmdSHUTDOWN>();
179 auto condition = [](
typename T::connectionPtr_t _v,
bool& ) {
return (_v->started()); };
185 std::this_thread::sleep_for(std::chrono::milliseconds(200));
196 m_acceptor->get_io_service().stop();
198 if (m_acceptor !=
nullptr)
200 m_acceptorUI->close();
201 m_acceptorUI->get_io_service().stop();
204 for (
const auto& v : channels)
212 std::lock_guard<std::mutex> lock(m_mutex);
215 catch (std::bad_weak_ptr& e)
219 catch (std::exception& e)
228 std::lock_guard<std::mutex> lock(m_mutex);
230 for (
auto& v : m_channels)
232 if (v.get() == _client)
235 return typename T::weakConnectionPtr_t();
239 std::function<
bool(
typename T::connectionPtr_t,
bool&)> _condition =
nullptr)
241 std::lock_guard<std::mutex> lock(m_mutex);
243 typename T::weakConnectionPtrVector_t result;
244 result.reserve(m_channels.size());
245 for (
auto& v : m_channels)
248 if (_condition ==
nullptr || _condition(v, stop))
258 template <ECmdType _cmd,
class AttachmentType>
260 std::function<
bool(
typename T::connectionPtr_t,
bool&)> _condition =
nullptr)
264 typename T::weakConnectionPtrVector_t channels(
getChannels(_condition));
266 for (
const auto& v : channels)
271 ptr->template pushMsg<_cmd>(_attachment);
274 catch (std::bad_weak_ptr& e)
280 template <ECmdType _cmd,
class AttachmentType>
282 std::function<
bool(
typename T::connectionPtr_t,
bool&)> _condition =
nullptr)
286 typename T::weakConnectionPtrVector_t channels(
getChannels(_condition));
288 for (
const auto& v : channels)
293 ptr->template accumulativePushMsg<_cmd>(_attachment);
296 catch (std::bad_weak_ptr& e)
302 template <ECmdType _cmd>
303 void broadcastSimpleMsg(std::function<
bool(
typename T::connectionPtr_t,
bool&)> _condition =
nullptr)
306 broadcastMsg<_cmd>(cmd, _condition);
311 const std::string& _fileName,
313 std::function<
bool(
typename T::connectionPtr_t,
bool&)> _condition =
nullptr)
317 typename T::weakConnectionPtrVector_t channels(
getChannels(_condition));
319 for (
const auto& v : channels)
324 ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource);
327 catch (std::bad_weak_ptr& e)
333 size_t countNofChannels(std::function<
bool(
typename T::connectionPtr_t,
bool&)> _condition =
nullptr)
335 std::lock_guard<std::mutex> lock(m_mutex);
337 if (_condition ==
nullptr)
338 return m_channels.size();
340 for (
auto& v : m_channels)
343 if (_condition(v, stop))
354 void acceptHandler(
typename T::connectionPtr_t _client,
355 std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor,
356 const boost::system::error_code& _ec)
362 std::lock_guard<std::mutex> lock(m_mutex);
363 m_channels.push_back(_client);
365 createClientAndStartAccept(_acceptor);
373 void createClientAndStartAccept(std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor)
375 typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_io_service());
377 A* pThis =
static_cast<A*
>(
this);
378 pThis->newClientCreated(newClient);
381 newClient->registerDisconnectEventHandler([
this](T* _channel) ->
void {
384 std::lock_guard<std::mutex> lock(m_statMutex);
385 m_readStatDisconnectedChannels.
addFromStat(_channel->getReadStat());
386 m_writeStatDisconnectedChannels.
addFromStat(_channel->getWriteStat());
388 return this->removeClient(_channel);
391 _acceptor->async_accept(
394 &CConnectionManagerImpl::acceptHandler,
this, newClient, _acceptor, std::placeholders::_1));
397 void createInfoFile()
400 A* pThis =
static_cast<A*
>(
this);
402 std::vector<size_t> ports;
403 ports.push_back(m_acceptor->local_endpoint().port());
404 if (m_acceptorUI !=
nullptr)
405 ports.push_back(m_acceptorUI->local_endpoint().port());
407 pThis->_createInfoFile(ports);
410 void deleteInfoFile()
413 A* pThis =
static_cast<A*
>(
this);
414 pThis->_deleteInfoFile();
417 void removeClient(T* _client)
421 <<
" client from the list of active";
422 std::lock_guard<std::mutex> lock(m_mutex);
423 m_channels.erase(remove_if(m_channels.begin(),
425 [&](
typename T::connectionPtr_t& i) {
return (i.get() == _client); }),
435 std::lock_guard<std::mutex> lock(m_statMutex);
436 _readStat.
addFromStat(m_readStatDisconnectedChannels);
437 _writeStat.
addFromStat(m_writeStatDisconnectedChannels);
442 std::shared_ptr<boost::asio::signal_set> m_signals;
444 typename T::connectionPtrVector_t m_channels;
447 boost::asio::io_service m_io_service;
448 std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;
451 boost::asio::io_service m_io_service_UI;
452 std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptorUI;
454 boost::thread_group m_workerThreads;
457 SReadStat m_readStatDisconnectedChannels;
459 std::mutex m_statMutex;
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:73
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
Definition: CommandAttachmentImpl.h:55
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:553
size_t countNofChannels(std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:333
void addDisconnectedChannelsStatToStat(SReadStat &_readStat, SWriteStat &_writeStat)
Definition: ConnectionManagerImpl.h:430
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:190
T::weakConnectionPtr_t getWeakPtr(T *_client)
Definition: ConnectionManagerImpl.h:226
#define LOG(severity)
Definition: Logger.h:54
void broadcastBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:309
Definition: dds-agent/src/AgentConnectionManager.h:16
void broadcastMsg(const AttachmentType &_attachment, std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:259
void stop()
Definition: ConnectionManagerImpl.h:160
Definition: StatImpl.h:70
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:34
void addFromStat(const SReadStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:133
T::weakConnectionPtrVector_t getChannels(std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:238
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:35
void addFromStat(const SWriteStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:100
void accumulativeBroadcastMsg(const AttachmentType &_attachment, std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:281
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
void broadcastSimpleMsg(std::function< bool(typename T::connectionPtr_t, bool &)> _condition=nullptr)
Definition: ConnectionManagerImpl.h:303
static CUserDefaults & instance()
Return singleton instance.
Definition: UserDefaults.cpp:40
Definition: StatImpl.h:93
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:80
void runService(short _counter, boost::asio::io_service &_io_service)
Definition: ConnectionManagerImpl.h:142
Base class for connection managers.
Definition: ConnectionManagerImpl.h:32
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:40