5 #ifndef __DDS__BaseSMChannelImpl__ 6 #define __DDS__BaseSMChannelImpl__ 15 #include <boost/asio.hpp> 16 #include <boost/date_time.hpp> 17 #include <boost/interprocess/ipc/message_queue.hpp> 18 #include <boost/noncopyable.hpp> 19 #include <boost/thread/thread.hpp> 28 #define BEGIN_SM_MSG_MAP(theClass) \ 30 friend protocol_api::CBaseSMChannelImpl<theClass>; \ 31 void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \ 36 using namespace dds; \ 37 using namespace dds::protocol_api; \ 38 bool processed = true; \ 39 ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \ 41 sender.m_ID = _currentMsg->header().m_ID; \ 48 #define SM_MESSAGE_HANDLER(msg, func) \ 51 typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \ 52 attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \ 53 processed = func(attachmentPtr, sender); \ 56 if (!handlerExists(msg)) \ 58 LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \ 59 << _currentMsg->toString(); \ 63 dispatchHandlers(msg, sender, attachmentPtr); \ 69 #define SM_MESSAGE_HANDLER_DISPATCH(msg) \ 73 typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \ 74 attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \ 75 LOG(MiscCommon::debug) << "Dispatching " << g_cmdToString[msg]; \ 76 if (!handlerExists(msg)) \ 78 LOG(MiscCommon::error) << "The received message can't be dispatched, it has no registered handler: " \ 79 << _currentMsg->toString(); \ 83 dispatchHandlers<>(msg, sender, attachmentPtr); \ 88 #define END_SM_MSG_MAP() \ 90 LOG(MiscCommon::error) << "The received SM message doesn't have a handler: " << _currentMsg->toString(); \ 93 catch (std::exception & _e) \ 95 LOG(MiscCommon::error) << "SMChannel processMessage: " << _e.what(); \ 100 #define SM_RAW_MESSAGE_HANDLER(theClass, func) \ 102 friend protocol_api::CBaseSMChannelImpl<theClass>; \ 103 void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \ 108 using namespace dds; \ 109 using namespace dds::protocol_api; \ 110 bool processed = true; \ 111 SSenderInfo sender; \ 112 sender.m_ID = _currentMsg->header().m_ID; \ 115 processed = func(_currentMsg, sender); \ 118 if (!handlerExists(ECmdType::cmdRAW_MSG)) \ 120 LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \ 121 << _currentMsg->toString(); \ 125 dispatchHandlers(ECmdType::cmdRAW_MSG, sender, _currentMsg); \ 129 catch (std::exception & _e) \ 131 LOG(MiscCommon::error) << "SMChannel processMessage: " << _e.what(); \ 137 namespace protocol_api
150 public std::enable_shared_from_this<T>
152 struct SProtocolMessageInfo
155 : m_outputID(_outputID)
164 typedef std::deque<SProtocolMessageInfo> protocolMessagePtrQueue_t;
166 typedef std::shared_ptr<boost::interprocess::message_queue> messageQueuePtr_t;
168 struct SMessageQueueInfo
170 using Container_t = std::vector<SMessageQueueInfo>;
174 messageQueuePtr_t m_mq;
177 struct SMessageOutputBuffer
179 using Ptr_t = std::shared_ptr<SMessageOutputBuffer>;
180 using Container_t = std::map<uint64_t, Ptr_t>;
182 SMessageQueueInfo m_info;
184 protocolMessagePtrQueue_t m_writeQueue;
185 std::mutex m_mutexWriteBuffer;
186 protocolMessagePtrQueue_t m_writeBufferQueue;
187 std::atomic<bool> m_drainWriteQueue{
false };
201 const std::string& _inputName,
202 const std::string& _outputName,
203 uint64_t _protocolHeaderID,
210 , m_ioContext(_service)
212 defaultInit({ _inputName }, _outputName, _inputOpenType, _outputOpenType);
216 const std::vector<std::string>& _inputNames,
217 const std::string& _outputName,
218 uint64_t _protocolHeaderID,
225 , m_ioContext(_service)
227 defaultInit(_inputNames, _outputName, _inputOpenType, _outputOpenType);
231 const std::string& _outputName,
235 for (
const auto& v : _inputNames)
237 SMessageQueueInfo inInfo;
239 inInfo.m_openType = _inputOpenType;
240 m_transportIn.push_back(inInfo);
244 auto buffer = std::make_shared<SMessageOutputBuffer>();
245 buffer->m_info.m_name = _outputName;
246 buffer->m_info.m_openType = _outputOpenType;
247 m_outputBuffers.emplace(0, buffer);
249 createMessageQueue();
263 const std::string& _inputName,
264 const std::string& _outputName,
265 uint64_t _ProtocolHeaderID,
270 new T(_service, _inputName, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
275 const std::vector<std::string>& _inputNames,
276 const std::string& _outputName,
277 uint64_t _ProtocolHeaderID,
282 new T(_service, _inputNames, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
287 void createMessageQueue()
290 std::lock_guard<std::mutex> lock(m_mutexTransportIn);
291 for (
auto& info : m_transportIn)
295 info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
300 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
301 for (
auto& v : m_outputBuffers)
303 SMessageQueueInfo& info = v.second->m_info;
306 info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
311 messageQueuePtr_t createMessageQueue(
const std::string& _name,
EMQOpenType _openType)
313 static const unsigned int maxNofMessages = 100;
319 static const unsigned int maxMessageSize = 1024;
326 return std::make_shared<boost::interprocess::message_queue>(
327 boost::interprocess::open_or_create, _name.c_str(), maxNofMessages, maxMessageSize);
329 return std::make_shared<boost::interprocess::message_queue>(
330 boost::interprocess::create_only, _name.c_str(), maxNofMessages, maxMessageSize);
332 return std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_only,
336 <<
"Can't initialize shared memory transport with name " << _name <<
": " 337 <<
"Unknown EMQOpenType given: " << static_cast<int>(_openType);
341 catch (boost::interprocess::interprocess_exception& _e)
344 <<
"Can't initialize shared memory transport with name " << _name <<
": " << _e.what();
356 const std::string& _name,
365 std::stringstream ss;
366 ss <<
"Can't add output " << _name
367 <<
". Output ID must be greater than 0. Current value: " << _outputID;
368 throw std::runtime_error(ss.str());
372 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
373 auto it = m_outputBuffers.find(_outputID);
374 if (it != m_outputBuffers.end())
376 std::stringstream ss;
377 ss <<
"Can't add output " << _name <<
". Output with ID " << _outputID <<
" already exists.";
378 throw std::runtime_error(ss.str());
382 auto buffer = std::make_shared<SMessageOutputBuffer>();
383 buffer->m_info.m_name = _name;
384 buffer->m_info.m_openType = _openType;
385 buffer->m_info.m_mq = createMessageQueue(_name, _openType);
387 if (buffer->m_info.m_mq !=
nullptr)
389 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
390 auto result = m_outputBuffers.emplace(_outputID, buffer);
393 std::stringstream ss;
394 ss <<
"Failed to add shared memory channel output with ID: " << _outputID <<
" name: " << _name;
395 throw std::runtime_error(ss.str());
400 <<
"Added shared memory channel output with ID: " << _outputID <<
" name: " << _name;
405 std::stringstream ss;
406 ss <<
"Can't add shared memory channel output with ID: " << _outputID <<
" name: " << _name;
407 throw std::runtime_error(ss.str());
419 bool queuesCreated(
true);
420 for (
const auto& v : m_transportIn)
422 if (v.m_mq ==
nullptr)
424 queuesCreated =
false;
431 for (
const auto& v : m_outputBuffers)
433 if (v.second->m_info.m_mq ==
nullptr)
435 queuesCreated =
false;
443 <<
"Can't start shared memory channel because there was a problem creating message queues";
452 auto self(this->shared_from_this());
453 for (
const auto& v : m_transportIn)
455 m_ioContext.post([
this,
self, &v] {
460 catch (std::exception& ex)
479 sendYourselfShutdown();
485 std::lock_guard<std::mutex> lock(m_mutexTransportIn);
486 for (
const auto& v : m_transportIn)
488 const bool status = boost::interprocess::message_queue::remove(v.m_name.c_str());
493 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
494 for (
const auto& v : m_outputBuffers)
496 const SMessageQueueInfo& info = v.second->m_info;
497 const bool status = boost::interprocess::message_queue::remove(info.m_name.c_str());
514 const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
516 std::lock_guard<std::mutex> lock(buffer->m_mutexWriteBuffer);
519 if (buffer->m_drainWriteQueue)
521 buffer->m_writeQueue.clear();
527 buffer->m_writeQueue.push_back(SProtocolMessageInfo(_outputID, _msg));
531 <<
": BaseSMChannelImpl pushMsg: WriteQueue size = " << buffer->m_writeQueue.size();
534 auto self(this->shared_from_this());
535 m_ioContext.post([
this,
self, &buffer] {
538 writeMessage(buffer);
540 catch (std::exception& ex)
546 catch (std::exception& ex)
552 template <ECmdType _cmd,
class A>
553 void pushMsg(
const A& _attachment, uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
557 uint64_t headerID = adjustProtocolHeaderID(_protocolHeaderID);
562 catch (std::exception& ex)
568 template <ECmdType _cmd>
569 void pushMsg(uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
572 pushMsg<_cmd>(cmd, _protocolHeaderID, _outputID);
579 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
580 for (
auto it : m_outputBuffers)
587 messageQueuePtr_t mq = it.second->m_info.m_mq;
589 mq->send(msg->data(), msg->length(), 1);
596 const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
597 buffer->m_drainWriteQueue = _newVal;
601 const typename SMessageOutputBuffer::Ptr_t& getOutputBuffer(uint64_t _outputID)
603 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
604 auto it = m_outputBuffers.find(_outputID);
605 if (it != m_outputBuffers.end())
608 throw std::runtime_error(
"Can't find corresponding output buffer: " + std::to_string(_outputID));
611 uint64_t adjustProtocolHeaderID(uint64_t _protocolHeaderID)
const 616 void sendYourselfShutdown()
618 std::lock_guard<std::mutex> lock(m_mutexTransportIn);
620 for (
auto& info : m_transportIn)
627 while (!info.m_mq->timed_send(
631 boost::posix_time::ptime(boost::posix_time::microsec_clock::universal_time()) +
632 boost::posix_time::milliseconds(10)))
637 << info.m_name <<
": stopping send yourself shutdown while already shutting down";
644 void readMessage(
const SMessageQueueInfo& _info)
650 unsigned int priority;
651 boost::interprocess::message_queue::size_type receivedSize;
654 currentMsg->resize(_info.m_mq->get_max_msg_size());
655 _info.m_mq->receive(currentMsg->data(), _info.m_mq->get_max_msg_size(), receivedSize, priority);
660 << _info.m_name <<
": Received message: " << receivedSize <<
" bytes, expected at least" 666 currentMsg->resize(receivedSize);
667 if (currentMsg->decode_header())
678 catch (boost::interprocess::interprocess_exception& ex)
684 void processBody(boost::interprocess::message_queue::size_type _bodySize,
685 const SMessageQueueInfo& _info,
688 if (_bodySize != _currentMsg->body_length())
691 <<
" bytes, expected " << _currentMsg->body_length();
695 if (_currentMsg->body_length() == 0)
698 << _info.m_name <<
": Received message BODY no attachment: " << _currentMsg->toString();
703 <<
" bytes): " << _currentMsg->toString();
707 T* pThis = static_cast<T*>(
this);
708 pThis->processMessage(_currentMsg);
710 ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd);
715 auto self(this->shared_from_this());
716 m_ioContext.post([
this,
self, &_info] {
721 catch (std::exception& ex)
735 void writeMessage(
const typename SMessageOutputBuffer::Ptr_t& _buffer)
737 if (_buffer ==
nullptr)
738 throw std::runtime_error(
"Can't find corresponding output buffer");
741 std::lock_guard<std::mutex> lockWriteBuffer(_buffer->m_mutexWriteBuffer);
742 if (!_buffer->m_writeBufferQueue.empty())
745 if (_buffer->m_writeQueue.empty())
748 _buffer->m_writeBufferQueue.assign(_buffer->m_writeQueue.begin(), _buffer->m_writeQueue.end());
749 _buffer->m_writeQueue.clear();
754 for (
auto& msg : _buffer->m_writeBufferQueue)
756 if (_buffer->m_info.m_mq !=
nullptr)
758 while (!_buffer->m_info.m_mq->timed_send(
762 boost::posix_time::ptime(boost::posix_time::microsec_clock::universal_time()) +
763 boost::posix_time::milliseconds(10)))
768 << _buffer->m_info.m_name <<
": stopping write operation due to shutdown";
777 if (_buffer->m_drainWriteQueue)
780 << _buffer->m_info.m_name
781 <<
": Draining write queue, while there is a message pending: " 782 << g_cmdToString[msg.m_msg->header().m_cmd];
790 << _buffer->m_info.m_name <<
": Can't find output transport with output ID " 792 <<
". Write message failed. Command: " << g_cmdToString[msg.m_msg->header().m_cmd];
796 catch (boost::interprocess::interprocess_exception& ex)
799 << _buffer->m_info.m_name <<
": BaseSMChannelImpl: error sending message: " << ex.what();
804 std::lock_guard<std::mutex> lock(_buffer->m_mutexWriteBuffer);
805 _buffer->m_writeBufferQueue.clear();
808 writeMessage(_buffer);
811 const std::string& getName()
const 813 return m_transportIn.front().m_name;
822 boost::asio::io_context& m_ioContext;
824 typename SMessageQueueInfo::Container_t
826 typename SMessageOutputBuffer::Container_t m_outputBuffers;
827 std::mutex m_mutexTransportIn;
828 std::mutex m_mutexTransportOut;
uint64_t m_protocolHeaderID
Definition: BaseSMChannelImpl.h:819
Definition: BaseEventHandlersImpl.h:48
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseSMChannelImpl.h:192
void pushMsg(const A &_attachment, uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:553
Definition: ChannelEventHandlersImpl.h:29
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::string &_inputName, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:262
void syncSendShutdownAll()
Definition: BaseSMChannelImpl.h:575
std::atomic< bool > m_isShuttingDown
Definition: BaseSMChannelImpl.h:817
Definition: CommandAttachmentImpl.h:54
void defaultInit(const std::vector< std::string > _inputNames, const std::string &_outputName, EMQOpenType _inputOpenType, EMQOpenType _outputOpenType)
Definition: BaseSMChannelImpl.h:230
void addOutput(uint64_t _outputID, const std::string &_name, EMQOpenType _openType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:355
DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelEventHandlersImpl) DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelMessageHandlersImpl) protected
Definition: BaseSMChannelImpl.h:196
Definition: ProtocolCommands.h:30
uint64_t getProtocolHeaderID() const
Definition: BaseSMChannelImpl.h:350
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &, uint64_t _ID)
Definition: CommandAttachmentImpl.h:71
std::atomic< bool > m_started
True if we were able to start the channel, False otherwise.
Definition: BaseSMChannelImpl.h:818
#define LOG(severity)
Definition: Logger.h:56
Definition: BaseSMChannelImpl.h:147
std::shared_ptr< T > connectionPtr_t
Definition: BaseSMChannelImpl.h:191
void stop()
Definition: BaseSMChannelImpl.h:472
Definition: AgentConnectionManager.h:13
void removeMessageQueue()
Definition: BaseSMChannelImpl.h:482
Definition: ProtocolMessage.h:89
Shared memory channel start.
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::vector< std::string > &_inputNames, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:274
void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:503
EMQOpenType
Definition: BaseSMChannelImpl.h:139
void start()
Definition: BaseSMChannelImpl.h:416
bool started() const
Definition: BaseSMChannelImpl.h:411
uint64_t m_ID
Definition: BaseEventHandlersImpl.h:50
void drainWriteQueue(bool _newVal, uint64_t _outputID)
Definition: BaseSMChannelImpl.h:594
void pushMsg(uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:569
Definition: ChannelMessageHandlersImpl.h:20
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:81