6 #ifndef __DDS__ConnectionManagerImpl__ 7 #define __DDS__ConnectionManagerImpl__ 18 #pragma clang diagnostic push 19 #pragma clang diagnostic ignored "-Wunused-local-typedef" 20 #include <boost/asio.hpp> 21 #pragma clang diagnostic pop 22 #include <boost/thread/thread.hpp> 28 namespace protocol_api
32 template <
class T,
class A>
45 m_acceptor = std::make_shared<boost::asio::ip::tcp::acceptor>(
46 m_io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
52 m_acceptorUI = std::make_shared<boost::asio::ip::tcp::acceptor>(
53 m_io_service_UI, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), nSrvPort));
57 m_signals = std::make_shared<boost::asio::signal_set>(m_io_service);
62 m_signals->add(SIGINT);
63 m_signals->add(SIGTERM);
65 m_signals->add(SIGQUIT);
66 #endif // defined(SIGQUIT) 68 m_signals->async_wait([
this](boost::system::error_code ,
int signo) {
86 void start(
bool _join =
true,
unsigned int _nThreads = 0 )
91 A* pThis =
static_cast<A*
>(
this);
95 const float maxIdleTime =
100 m_acceptor->listen();
101 createClientAndStartAccept(m_acceptor);
105 if (m_acceptorUI !=
nullptr)
107 m_acceptorUI->listen();
108 createClientAndStartAccept(m_acceptorUI);
116 unsigned int concurrentThreads = (0 == _nThreads) ? std::thread::hardware_concurrency() : _nThreads;
118 if (concurrentThreads < 2)
119 concurrentThreads = 2;
121 <<
"Starting DDS transport engine using " << concurrentThreads <<
" concurrent threads.";
122 for (
int x = 0; x < concurrentThreads; ++x)
124 m_workerThreads.create_thread([
this]() {
runService(10, m_acceptor->get_io_service()); });
128 if (m_acceptorUI !=
nullptr)
130 const unsigned int concurrentThreads = 2;
132 <<
"Starting DDS UI transport engine using " << concurrentThreads <<
" concurrent threads.";
133 for (
int x = 0; x < concurrentThreads; ++x)
135 m_workerThreads.create_thread([
this]() {
runService(10, m_acceptorUI->get_io_service()); });
140 m_workerThreads.join_all();
142 catch (std::exception& e)
148 void runService(
short _counter, boost::asio::io_service& _io_service)
158 catch (std::exception& ex)
171 A* pThis =
static_cast<A*
>(
this);
177 for (
const auto& v : channels)
179 if (v.m_channel.expired())
181 auto ptr = v.m_channel.lock();
182 ptr->template pushMsg<cmdSHUTDOWN>();
185 auto condition = [](
const channelInfo_t& _v,
bool& ) {
return (_v.
m_channel->started()); };
191 std::this_thread::sleep_for(std::chrono::milliseconds(200));
202 m_acceptor->get_io_service().stop();
204 if (m_acceptor !=
nullptr)
206 m_acceptorUI->close();
207 m_acceptorUI->get_io_service().stop();
210 for (
const auto& v : channels)
212 if (v.m_channel.expired())
214 auto ptr = v.m_channel.lock();
218 std::lock_guard<std::mutex> lock(m_mutex);
221 catch (std::bad_weak_ptr& e)
225 catch (std::exception& e)
234 std::lock_guard<std::mutex> lock(m_mutex);
237 result.reserve(m_channels.size());
238 for (
auto& v : m_channels)
241 if (_condition ==
nullptr || _condition(v, stop))
251 template <ECmdType _cmd,
class AttachmentType>
252 void broadcastMsg(
const AttachmentType& _attachment, conditionFunction_t _condition =
nullptr)
258 for (
const auto& v : channels)
260 if (v.m_channel.expired())
262 auto ptr = v.m_channel.lock();
263 ptr->template pushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
266 catch (std::bad_weak_ptr& e)
272 template <ECmdType _cmd,
class AttachmentType>
279 for (
const auto& v : channels)
281 if (v.m_channel.expired())
283 auto ptr = v.m_channel.lock();
284 ptr->template accumulativePushMsg<_cmd>(_attachment, v.m_protocolHeaderID);
287 catch (std::bad_weak_ptr& e)
293 template <ECmdType _cmd>
297 broadcastMsg<_cmd>(cmd, _condition);
301 const std::string& _fileName,
303 conditionFunction_t _condition =
nullptr)
309 for (
const auto& v : channels)
311 if (v.m_channel.expired())
313 auto ptr = v.m_channel.lock();
314 ptr->pushBinaryAttachmentCmd(_data, _fileName, _cmdSource, v.m_protocolHeaderID);
317 catch (std::bad_weak_ptr& e)
325 std::lock_guard<std::mutex> lock(m_mutex);
327 if (_condition ==
nullptr)
328 return m_channels.size();
330 for (
auto& v : m_channels)
333 if (_condition(v, stop))
344 void acceptHandler(
typename T::connectionPtr_t _client,
345 std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor,
346 const boost::system::error_code& _ec)
351 std::lock_guard<std::mutex> lock(m_mutex);
352 m_channels.push_back(
channelInfo_t(_client, _client->getProtocolHeaderID()));
355 createClientAndStartAccept(_acceptor);
363 void createClientAndStartAccept(std::shared_ptr<boost::asio::ip::tcp::acceptor> _acceptor)
365 typename T::connectionPtr_t newClient = T::makeNew(_acceptor->get_io_service(), 0);
367 A* pThis =
static_cast<A*
>(
this);
368 pThis->newClientCreated(newClient);
371 newClient->template registerHandler<EChannelEvents::OnLobbyMemberHandshakeOK>(
372 [
this, newClient](
const SSenderInfo& _sender) ->
void {
374 std::lock_guard<std::mutex> lock(m_mutex);
377 if (newClient->getProtocolHeaderID() != _sender.m_ID)
379 m_channels.push_back(
channelInfo_t(newClient, _sender.m_ID));
384 channelInfo_t inf(newClient, 0);
385 auto it = std::find(m_channels.begin(), m_channels.end(), inf);
386 if (it != m_channels.end() && it->m_protocolHeaderID == 0)
388 it->m_protocolHeaderID = _sender.m_ID;
393 <<
"Handshake for unregistered lobby leader connection senderID=" 401 newClient->template registerHandler<EChannelEvents::OnRemoteEndDissconnected>(
402 [
this, newClient](
const SSenderInfo& _sender) ->
void {
405 std::lock_guard<std::mutex> lock(m_statMutex);
406 m_readStatDisconnectedChannels.
addFromStat(newClient->getReadStat());
407 m_writeStatDisconnectedChannels.
addFromStat(newClient->getWriteStat());
409 this->removeClient(newClient.get());
412 _acceptor->async_accept(
415 &CConnectionManagerImpl::acceptHandler,
this, newClient, _acceptor, std::placeholders::_1));
418 void createInfoFile()
421 A* pThis =
static_cast<A*
>(
this);
423 std::vector<size_t> ports;
424 ports.push_back(m_acceptor->local_endpoint().port());
425 if (m_acceptorUI !=
nullptr)
426 ports.push_back(m_acceptorUI->local_endpoint().port());
428 pThis->_createInfoFile(ports);
431 void deleteInfoFile()
434 A* pThis =
static_cast<A*
>(
this);
435 pThis->_deleteInfoFile();
438 void removeClient(T* _client)
442 <<
" client from the list of active";
443 std::lock_guard<std::mutex> lock(m_mutex);
445 m_channels.erase(remove_if(m_channels.begin(),
447 [&](
const channelInfo_t& i) {
return (i.m_channel.get() == _client); }),
457 std::lock_guard<std::mutex> lock(m_statMutex);
458 _readStat.
addFromStat(m_readStatDisconnectedChannels);
459 _writeStat.
addFromStat(m_writeStatDisconnectedChannels);
464 std::shared_ptr<boost::asio::signal_set> m_signals;
469 boost::asio::io_service m_io_service;
470 std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptor;
473 boost::asio::io_service m_io_service_UI;
474 std::shared_ptr<boost::asio::ip::tcp::acceptor> m_acceptorUI;
476 boost::thread_group m_workerThreads;
479 SReadStat m_readStatDisconnectedChannels;
481 std::mutex m_statMutex;
void start(double _idleTime, const std::function< void(void)> &_idleCallback)
Main function user has to run to start monitoring thread.
Definition: MonitoringThread.h:48
size_t countNofChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:323
SWeakChannelInfo< T > weakChannelInfo_t
Definition: ConnectionManagerImpl.h:37
Definition: BaseEventHandlersImpl.h:48
~CConnectionManagerImpl()
Definition: ConnectionManagerImpl.h:79
static CMonitoringThread & instance()
Return singleton instance.
Definition: MonitoringThread.h:38
void accumulativeBroadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:273
Definition: CommandAttachmentImpl.h:56
int get_free_port(int _Min, int _Max)
The function checks and returns a free port from the given range of the ports.
Definition: INet.h:553
std::function< bool(const channelInfo_t &_channelInfo, bool &)> conditionFunction_t
Definition: ConnectionManagerImpl.h:38
void addDisconnectedChannelsStatToStat(SReadStat &_readStat, SWriteStat &_writeStat)
Definition: ConnectionManagerImpl.h:452
const SDDSUserDefaultsOptions_t getOptions() const
Definition: UserDefaults.cpp:234
#define LOG(severity)
Definition: Logger.h:54
Definition: ChannelInfo.h:14
void broadcastMsg(const AttachmentType &_attachment, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:252
Definition: dds-agent/src/AgentConnectionManager.h:18
void stop()
Definition: ConnectionManagerImpl.h:166
Definition: StatImpl.h:70
unsigned int m_idleTime
Definition: dds-user-defaults/src/Options.h:34
Definition: ChannelInfo.h:39
void addFromStat(const SReadStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:133
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:45
CConnectionManagerImpl(size_t _minPort, size_t _maxPort, bool _useUITransport)
Definition: ConnectionManagerImpl.h:41
T::connectionPtr_t m_channel
Definition: ChannelInfo.h:26
void broadcastBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:300
void addFromStat(const SWriteStat &_stat)
Add statistics from another structure.
Definition: StatImpl.cpp:100
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
std::vector< SChannelInfo< T > > container_t
Definition: ChannelInfo.h:29
Definition: StatImpl.h:93
void start(bool _join=true, unsigned int _nThreads=0)
Definition: ConnectionManagerImpl.h:86
void runService(short _counter, boost::asio::io_service &_io_service)
Definition: ConnectionManagerImpl.h:148
Base class for connection managers.
Definition: ConnectionManagerImpl.h:33
SChannelInfo< T > channelInfo_t
Definition: ConnectionManagerImpl.h:36
weakChannelInfo_t::container_t getChannels(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:232
void broadcastSimpleMsg(conditionFunction_t _condition=nullptr)
Definition: ConnectionManagerImpl.h:294
SDDSServerOptions m_server
Definition: dds-user-defaults/src/Options.h:40
std::vector< SWeakChannelInfo > container_t
Definition: ChannelInfo.h:54