5 #ifndef __DDS__BaseSMChannelImpl__ 6 #define __DDS__BaseSMChannelImpl__ 15 #include <boost/asio.hpp> 16 #include <boost/date_time.hpp> 17 #include <boost/interprocess/ipc/message_queue.hpp> 18 #include <boost/noncopyable.hpp> 19 #include <boost/thread/thread.hpp> 28 #define BEGIN_SM_MSG_MAP(theClass) \ 30 friend protocol_api::CBaseSMChannelImpl<theClass>; \ 31 void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \ 36 using namespace dds; \ 37 using namespace dds::protocol_api; \ 38 ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \ 40 sender.m_ID = _currentMsg->header().m_ID; \ 47 #define SM_MESSAGE_HANDLER(msg, func) \ 50 typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \ 51 attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \ 52 bool processed = func(attachmentPtr, sender); \ 55 if (!handlerExists(msg)) \ 57 LOG(dds::misc::error) << "The received message was not processed and has no registered handler: " \ 58 << _currentMsg->toString(); \ 62 dispatchHandlers(msg, sender, attachmentPtr); \ 68 #define SM_MESSAGE_HANDLER_DISPATCH(msg) \ 71 typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \ 72 attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \ 73 LOG(dds::misc::debug) << "Dispatching " << g_cmdToString[msg]; \ 74 if (!handlerExists(msg)) \ 76 LOG(dds::misc::error) << "The received message can't be dispatched, it has no registered handler: " \ 77 << _currentMsg->toString(); \ 81 dispatchHandlers<>(msg, sender, attachmentPtr); \ 86 #define END_SM_MSG_MAP() \ 88 LOG(dds::misc::error) << "The received SM message doesn't have a handler: " << _currentMsg->toString(); \ 91 catch (std::exception & _e) \ 93 LOG(dds::misc::error) << "SMChannel processMessage: " << _e.what(); \ 98 #define SM_RAW_MESSAGE_HANDLER(theClass, func) \ 100 friend protocol_api::CBaseSMChannelImpl<theClass>; \ 101 void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \ 106 using namespace dds; \ 107 using namespace dds::protocol_api; \ 108 SSenderInfo sender; \ 109 sender.m_ID = _currentMsg->header().m_ID; \ 112 bool processed = func(_currentMsg, sender); \ 115 if (!handlerExists(ECmdType::cmdRAW_MSG)) \ 117 LOG(dds::misc::error) << "The received message was not processed and has no registered handler: " \ 118 << _currentMsg->toString(); \ 122 dispatchHandlers(ECmdType::cmdRAW_MSG, sender, _currentMsg); \ 126 catch (std::exception & _e) \ 128 LOG(dds::misc::error) << "SMChannel processMessage: " << _e.what(); \ 134 namespace protocol_api
147 public std::enable_shared_from_this<T>
149 struct SProtocolMessageInfo
152 : m_outputID(_outputID)
161 typedef std::deque<SProtocolMessageInfo> protocolMessagePtrQueue_t;
163 typedef std::shared_ptr<boost::interprocess::message_queue> messageQueuePtr_t;
165 struct SMessageQueueInfo
167 using Container_t = std::vector<SMessageQueueInfo>;
171 messageQueuePtr_t m_mq;
174 struct SMessageOutputBuffer
176 using Ptr_t = std::shared_ptr<SMessageOutputBuffer>;
177 using Container_t = std::map<uint64_t, Ptr_t>;
179 SMessageQueueInfo m_info;
181 protocolMessagePtrQueue_t m_writeQueue;
182 std::mutex m_mutexWriteBuffer;
183 protocolMessagePtrQueue_t m_writeBufferQueue;
184 std::atomic<bool> m_drainWriteQueue{
false };
198 const std::string& _inputName,
199 const std::string& _outputName,
200 uint64_t _protocolHeaderID,
207 , m_ioContext(_service)
209 defaultInit({ _inputName }, _outputName, _inputOpenType, _outputOpenType);
213 const std::vector<std::string>& _inputNames,
214 const std::string& _outputName,
215 uint64_t _protocolHeaderID,
222 , m_ioContext(_service)
224 defaultInit(_inputNames, _outputName, _inputOpenType, _outputOpenType);
228 const std::string& _outputName,
232 for (
const auto& v : _inputNames)
234 SMessageQueueInfo inInfo;
236 inInfo.m_openType = _inputOpenType;
237 m_transportIn.push_back(inInfo);
241 auto buffer = std::make_shared<SMessageOutputBuffer>();
242 buffer->m_info.m_name = _outputName;
243 buffer->m_info.m_openType = _outputOpenType;
244 m_outputBuffers.emplace(0, buffer);
246 createMessageQueue();
260 const std::string& _inputName,
261 const std::string& _outputName,
262 uint64_t _ProtocolHeaderID,
267 new T(_service, _inputName, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
272 const std::vector<std::string>& _inputNames,
273 const std::string& _outputName,
274 uint64_t _ProtocolHeaderID,
279 new T(_service, _inputNames, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
284 void createMessageQueue()
287 std::lock_guard<std::mutex> lock(m_mutexTransportIn);
288 for (
auto& info : m_transportIn)
292 info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
297 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
298 for (
auto& v : m_outputBuffers)
300 SMessageQueueInfo& info = v.second->m_info;
303 info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
308 messageQueuePtr_t createMessageQueue(
const std::string& _name,
EMQOpenType _openType)
310 static const unsigned int maxNofMessages = 100;
316 static const unsigned int maxMessageSize = 1024;
323 return std::make_shared<boost::interprocess::message_queue>(
324 boost::interprocess::open_or_create, _name.c_str(), maxNofMessages, maxMessageSize);
326 return std::make_shared<boost::interprocess::message_queue>(
327 boost::interprocess::create_only, _name.c_str(), maxNofMessages, maxMessageSize);
329 return std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_only,
333 <<
"Can't initialize shared memory transport with name " << _name <<
": " 334 <<
"Unknown EMQOpenType given: " << static_cast<int>(_openType);
338 catch (boost::interprocess::interprocess_exception& _e)
341 <<
"Can't initialize shared memory transport with name " << _name <<
": " << _e.what();
353 const std::string& _name,
362 std::stringstream ss;
363 ss <<
"Can't add output " << _name
364 <<
". Output ID must be greater than 0. Current value: " << _outputID;
365 throw std::runtime_error(ss.str());
369 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
370 auto it = m_outputBuffers.find(_outputID);
371 if (it != m_outputBuffers.end())
373 std::stringstream ss;
374 ss <<
"Can't add output " << _name <<
". Output with ID " << _outputID <<
" already exists.";
375 throw std::runtime_error(ss.str());
379 auto buffer = std::make_shared<SMessageOutputBuffer>();
380 buffer->m_info.m_name = _name;
381 buffer->m_info.m_openType = _openType;
382 buffer->m_info.m_mq = createMessageQueue(_name, _openType);
384 if (buffer->m_info.m_mq !=
nullptr)
386 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
387 auto result = m_outputBuffers.emplace(_outputID, buffer);
390 std::stringstream ss;
391 ss <<
"Failed to add shared memory channel output with ID: " << _outputID <<
" name: " << _name;
392 throw std::runtime_error(ss.str());
397 <<
"Added shared memory channel output with ID: " << _outputID <<
" name: " << _name;
402 std::stringstream ss;
403 ss <<
"Can't add shared memory channel output with ID: " << _outputID <<
" name: " << _name;
404 throw std::runtime_error(ss.str());
416 bool queuesCreated(
true);
417 for (
const auto& v : m_transportIn)
419 if (v.m_mq ==
nullptr)
421 queuesCreated =
false;
428 for (
const auto& v : m_outputBuffers)
430 if (v.second->m_info.m_mq ==
nullptr)
432 queuesCreated =
false;
440 <<
"Can't start shared memory channel because there was a problem creating message queues";
449 auto self(this->shared_from_this());
450 for (
const auto& v : m_transportIn)
459 catch (std::exception& ex)
483 std::lock_guard<std::mutex> lock(m_mutexTransportIn);
484 for (
const auto& v : m_transportIn)
486 const bool status = boost::interprocess::message_queue::remove(v.m_name.c_str());
491 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
492 for (
const auto& v : m_outputBuffers)
494 const SMessageQueueInfo& info = v.second->m_info;
495 const bool status = boost::interprocess::message_queue::remove(info.m_name.c_str());
496 LOG(
dds::misc::info) <<
"Message queue " << info.m_name <<
" remove status: " << status;
518 const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
520 std::lock_guard<std::mutex> lock(buffer->m_mutexWriteBuffer);
523 if (buffer->m_drainWriteQueue)
525 buffer->m_writeQueue.clear();
531 buffer->m_writeQueue.push_back(SProtocolMessageInfo(_outputID, _msg));
535 <<
": BaseSMChannelImpl pushMsg: WriteQueue size = " << buffer->m_writeQueue.size();
538 auto self(this->shared_from_this());
540 [
this,
self, &buffer]
544 writeMessage(buffer);
546 catch (std::exception& ex)
552 catch (std::exception& ex)
554 LOG(
dds::misc::error) << getName() <<
": BaseSMChannelImpl can't push message: " << ex.what();
558 template <ECmdType _cmd,
class A>
559 void pushMsg(
const A& _attachment, uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
563 uint64_t headerID = adjustProtocolHeaderID(_protocolHeaderID);
568 catch (std::exception& ex)
574 template <ECmdType _cmd>
575 void pushMsg(uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
578 pushMsg<_cmd>(cmd, _protocolHeaderID, _outputID);
583 const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
584 buffer->m_drainWriteQueue = _newVal;
588 const typename SMessageOutputBuffer::Ptr_t& getOutputBuffer(uint64_t _outputID)
590 std::lock_guard<std::mutex> lock(m_mutexTransportOut);
591 auto it = m_outputBuffers.find(_outputID);
592 if (it != m_outputBuffers.end())
595 throw std::runtime_error(
"Can't find corresponding output buffer: " + std::to_string(_outputID));
598 uint64_t adjustProtocolHeaderID(uint64_t _protocolHeaderID)
const 603 void readMessage(
const SMessageQueueInfo& _info)
609 unsigned int priority;
610 boost::interprocess::message_queue::size_type receivedSize;
613 currentMsg->resize(_info.m_mq->get_max_msg_size());
615 namespace pt = boost::posix_time;
616 while (!_info.m_mq->timed_receive(
618 _info.m_mq->get_max_msg_size(),
621 pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(500)))
633 << _info.m_name <<
": Received message: " << receivedSize <<
" bytes, expected at least" 639 currentMsg->resize(receivedSize);
640 if (currentMsg->decode_header())
651 catch (boost::interprocess::interprocess_exception& ex)
653 LOG(
dds::misc::error) <<
"BaseSMChannelImpl: error receiving message: " << ex.what() <<
"\n";
657 void processBody(boost::interprocess::message_queue::size_type _bodySize,
658 const SMessageQueueInfo& _info,
661 if (_bodySize != _currentMsg->body_length())
664 <<
" bytes, expected " << _currentMsg->body_length();
668 if (_currentMsg->body_length() == 0)
671 << _info.m_name <<
": Received message BODY no attachment: " << _currentMsg->toString();
676 <<
" bytes): " << _currentMsg->toString();
680 T* pThis = static_cast<T*>(
this);
681 pThis->processMessage(_currentMsg);
683 auto self(this->shared_from_this());
691 catch (std::exception& ex)
699 void writeMessage(
const typename SMessageOutputBuffer::Ptr_t& _buffer)
701 if (_buffer ==
nullptr)
702 throw std::runtime_error(
"Can't find corresponding output buffer");
705 std::lock_guard<std::mutex> lockWriteBuffer(_buffer->m_mutexWriteBuffer);
706 if (!_buffer->m_writeBufferQueue.empty())
709 if (_buffer->m_writeQueue.empty())
712 _buffer->m_writeBufferQueue.assign(_buffer->m_writeQueue.begin(), _buffer->m_writeQueue.end());
713 _buffer->m_writeQueue.clear();
718 for (
auto& msg : _buffer->m_writeBufferQueue)
720 if (_buffer->m_info.m_mq !=
nullptr)
722 namespace pt = boost::posix_time;
723 while (!_buffer->m_info.m_mq->timed_send(
727 pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(500)))
732 << _buffer->m_info.m_name <<
": stopping write operation due to shutdown";
741 if (_buffer->m_drainWriteQueue)
744 << _buffer->m_info.m_name
745 <<
": Draining write queue, while there is a message pending: " 746 << g_cmdToString[msg.m_msg->header().m_cmd];
754 << _buffer->m_info.m_name <<
": Can't find output transport with output ID " 756 <<
". Write message failed. Command: " << g_cmdToString[msg.m_msg->header().m_cmd];
760 catch (boost::interprocess::interprocess_exception& ex)
763 << _buffer->m_info.m_name <<
": BaseSMChannelImpl: error sending message: " << ex.what();
768 std::lock_guard<std::mutex> lock(_buffer->m_mutexWriteBuffer);
769 _buffer->m_writeBufferQueue.clear();
772 writeMessage(_buffer);
775 const std::string& getName()
const 777 return m_transportIn.front().m_name;
786 boost::asio::io_context& m_ioContext;
788 typename SMessageQueueInfo::Container_t
790 typename SMessageOutputBuffer::Container_t m_outputBuffers;
791 std::mutex m_mutexTransportIn;
792 std::mutex m_mutexTransportOut;
uint64_t m_protocolHeaderID
Definition: BaseSMChannelImpl.h:783
Definition: BaseEventHandlersImpl.h:48
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseSMChannelImpl.h:189
void pushMsg(const A &_attachment, uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:559
Definition: ChannelEventHandlersImpl.h:26
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::string &_inputName, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:259
std::atomic< bool > m_isShuttingDown
Definition: BaseSMChannelImpl.h:781
Definition: CommandAttachmentImpl.h:54
void defaultInit(const std::vector< std::string > _inputNames, const std::string &_outputName, EMQOpenType _inputOpenType, EMQOpenType _outputOpenType)
Definition: BaseSMChannelImpl.h:227
void addOutput(uint64_t _outputID, const std::string &_name, EMQOpenType _openType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:352
DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelEventHandlersImpl) DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelMessageHandlersImpl) protected
Definition: BaseSMChannelImpl.h:193
uint64_t getProtocolHeaderID() const
Definition: BaseSMChannelImpl.h:347
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &, uint64_t _ID)
Definition: CommandAttachmentImpl.h:71
std::atomic< bool > m_started
True if we were able to start the channel, False otherwise.
Definition: BaseSMChannelImpl.h:782
#define LOG(severity)
Definition: Logger.h:34
Definition: BaseSMChannelImpl.h:144
std::shared_ptr< T > connectionPtr_t
Definition: BaseSMChannelImpl.h:188
void stop()
Definition: BaseSMChannelImpl.h:470
Miscellaneous functions and helpers are located here.
Definition: AgentConnectionManager.h:13
void removeMessageQueue()
Definition: BaseSMChannelImpl.h:480
Definition: ProtocolMessage.h:89
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::vector< std::string > &_inputNames, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:271
void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:501
EMQOpenType
Definition: BaseSMChannelImpl.h:136
void start()
Definition: BaseSMChannelImpl.h:413
bool started() const
Definition: BaseSMChannelImpl.h:408
uint64_t m_ID
Definition: BaseEventHandlersImpl.h:50
void drainWriteQueue(bool _newVal, uint64_t _outputID)
Definition: BaseSMChannelImpl.h:581
void pushMsg(uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:575
Definition: ChannelMessageHandlersImpl.h:20
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:81