5 #ifndef __DDS__BaseSMChannelImpl__ 6 #define __DDS__BaseSMChannelImpl__ 15 #include <boost/interprocess/ipc/message_queue.hpp> 16 #include <boost/noncopyable.hpp> 17 #include <boost/thread/thread.hpp> 18 #pragma clang diagnostic push 19 #pragma clang diagnostic ignored "-Wunused-local-typedef" 20 #pragma clang diagnostic ignored "-Wdeprecated-declarations" 21 #include <boost/asio.hpp> 22 #pragma clang diagnostic pop 28 #define BEGIN_SM_MSG_MAP(theClass) \ 30 friend protocol_api::CBaseSMChannelImpl<theClass>; \ 31 void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \ 36 using namespace dds; \ 37 using namespace dds::protocol_api; \ 38 bool processed = true; \ 39 ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \ 46 #define SM_MESSAGE_HANDLER(msg, func) \ 49 typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \ 50 attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \ 51 processed = func(attachmentPtr); \ 54 if (getNofMessageHandlers<msg>() == 0) \ 56 LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \ 57 << _currentMsg->toString(); \ 61 dispatchMessageHandlers(msg, attachmentPtr, this); \ 67 #define END_SM_MSG_MAP() \ 69 LOG(MiscCommon::error) << "The received SM message doesn't have a handler: " << _currentMsg->toString(); \ 72 catch (std::exception & _e) \ 74 LOG(MiscCommon::error) << "SMChannel processMessage: " << _e.what(); \ 80 namespace protocol_api
85 public std::enable_shared_from_this<T>
87 typedef std::deque<CProtocolMessage::protocolMessagePtr_t> protocolMessagePtrQueue_t;
88 typedef std::shared_ptr<boost::interprocess::message_queue> messageQueuePtr_t;
98 , m_currentMsg(std::make_shared<CProtocolMessage>())
99 , m_inputMessageQueueName(_inputName)
100 , m_outputMessageQueueName(_outputName)
102 , m_mutexWriteBuffer()
103 , m_writeBufferQueue()
105 createMessageQueue();
115 static connectionPtr_t
makeNew(
const std::string& _inputName,
const std::string& _outputName)
117 connectionPtr_t newObject(
new T(_inputName, _outputName));
122 void createMessageQueue()
128 static const unsigned int maxNofMessages = 100;
131 static const unsigned int maxMessageSize = 65000;
133 m_transportIn.reset();
134 m_transportOut.reset();
136 std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_or_create,
137 m_inputMessageQueueName.c_str(),
141 std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_or_create,
142 m_outputMessageQueueName.c_str(),
146 catch (boost::interprocess::interprocess_exception& _e)
149 <<
"Can't initialize shared memory transport with input name " << m_inputMessageQueueName
150 <<
" and output name " << m_outputMessageQueueName <<
": " << _e.what();
151 m_transportIn.reset();
152 m_transportOut.reset();
166 createMessageQueue();
169 std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
170 m_writeBufferQueue.clear();
171 m_writeQueue.clear();
178 if (m_transportIn ==
nullptr || m_transportOut ==
nullptr)
181 <<
"Can't start shared memory channel because there was a problem creating message queues";
188 m_io_service.reset();
190 auto self(this->shared_from_this());
191 m_io_service.post([
this,
self] {
196 catch (std::exception& ex)
202 const int nConcurrentThreads(3);
203 m_workerThreads = std::make_shared<boost::thread_group>();
204 for (
int x = 0; x < nConcurrentThreads; ++x)
206 m_workerThreads->create_thread(boost::bind(&boost::asio::io_service::run, &(m_io_service)));
216 sendYourselfShutdown();
219 m_workerThreads->join_all();
220 m_workerThreads.reset();
225 const bool inputRemoved = boost::interprocess::message_queue::remove(m_inputMessageQueueName.c_str());
226 const bool outputRemoved = boost::interprocess::message_queue::remove(m_outputMessageQueueName.c_str());
228 <<
" remove status: " << inputRemoved;
230 <<
" remove status: " << outputRemoved;
233 template <ECmdType _cmd,
class A>
246 std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
250 m_writeQueue.push_back(msg);
252 LOG(
MiscCommon::debug) <<
"BaseSMChannelImpl pushMsg: WriteQueue size = " << m_writeQueue.size();
254 catch (std::exception& ex)
260 auto self(this->shared_from_this());
261 m_io_service.post([
this,
self] {
266 catch (std::exception& ex)
273 template <ECmdType _cmd>
281 void sendYourselfShutdown()
286 m_transportIn->send(msg->data(), msg->length(), 1);
293 unsigned int priority;
294 boost::interprocess::message_queue::size_type receivedSize;
297 m_currentMsg->resize(m_transportIn->get_max_msg_size());
298 m_transportIn->receive(
299 m_currentMsg->data(), m_transportIn->get_max_msg_size(), receivedSize, priority);
309 m_currentMsg->resize(receivedSize);
310 if (m_currentMsg->decode_header())
329 catch (boost::interprocess::interprocess_exception& ex)
335 void processBody(boost::interprocess::message_queue::size_type _bodySize)
337 if (_bodySize != m_currentMsg->body_length())
340 <<
"Received message BODY: " << _bodySize <<
" bytes, expected " << m_currentMsg->body_length();
344 if (m_currentMsg->body_length() == 0)
351 <<
"Received message BODY (" << _bodySize <<
" bytes): " << m_currentMsg->toString();
355 T* pThis =
static_cast<T*
>(
this);
356 pThis->processMessage(m_currentMsg);
359 m_currentMsg->clear();
361 auto self(this->shared_from_this());
362 m_io_service.post([
this,
self] {
367 catch (std::exception& ex)
378 std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
379 if (!m_writeBufferQueue.empty())
382 if (m_writeQueue.empty())
385 m_writeBufferQueue.assign(m_writeQueue.begin(), m_writeQueue.end());
386 m_writeQueue.clear();
391 for (
auto msg : m_writeBufferQueue)
393 m_transportOut->send(msg->data(), msg->length(), 0);
396 catch (boost::interprocess::interprocess_exception& ex)
403 std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
404 m_writeBufferQueue.clear();
414 messageQueuePtr_t m_transportIn;
415 messageQueuePtr_t m_transportOut;
416 boost::asio::io_service m_io_service;
417 std::shared_ptr<boost::thread_group> m_workerThreads;
420 std::string m_inputMessageQueueName;
421 std::string m_outputMessageQueueName;
423 protocolMessagePtrQueue_t m_writeQueue;
425 std::mutex m_mutexWriteBuffer;
426 protocolMessagePtrQueue_t m_writeBufferQueue;
void reinit()
Definition: BaseSMChannelImpl.h:157
static connectionPtr_t makeNew(const std::string &_inputName, const std::string &_outputName)
Definition: BaseSMChannelImpl.h:115
void pushMsg(const A &_attachment)
Definition: BaseSMChannelImpl.h:234
Definition: CommandAttachmentImpl.h:55
Definition: ProtocolCommands.h:29
std::atomic< bool > m_started
True if we were able to start the channel, False otherwise.
Definition: BaseSMChannelImpl.h:411
#define LOG(severity)
Definition: Logger.h:54
Definition: BaseSMChannelImpl.h:83
std::shared_ptr< T > connectionPtr_t
Definition: BaseSMChannelImpl.h:91
void stop()
Definition: BaseSMChannelImpl.h:210
Definition: dds-agent/src/AgentConnectionManager.h:16
void removeMessageQueue()
Definition: BaseSMChannelImpl.h:223
Definition: ProtocolMessage.h:77
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &)
Definition: CommandAttachmentImpl.h:72
void start()
Definition: BaseSMChannelImpl.h:176
void pushMsg()
Definition: BaseSMChannelImpl.h:274
Definition: ChannelMessageHandlersImpl.h:29
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:69