DDS  ver. 3.6
BaseSMChannelImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef __DDS__BaseSMChannelImpl__
6 #define __DDS__BaseSMChannelImpl__
7 // STD
8 #include <chrono>
9 #include <deque>
10 #include <iostream>
11 #include <map>
12 #include <memory>
13 #include <mutex>
14 // BOOST
15 #include <boost/asio.hpp>
16 #include <boost/date_time.hpp>
17 #include <boost/interprocess/ipc/message_queue.hpp>
18 #include <boost/noncopyable.hpp>
19 #include <boost/thread/thread.hpp>
20 // DDS
23 #include "CommandAttachmentImpl.h"
24 #include "Logger.h"
25 
26 // Either raw message or command based processing can be used at a time
27 // Command based message processing
28 #define BEGIN_SM_MSG_MAP(theClass) \
29  public: \
30  friend protocol_api::CBaseSMChannelImpl<theClass>; \
31  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
32  { \
33  if (!m_started) \
34  return; \
35  \
36  using namespace dds; \
37  using namespace dds::protocol_api; \
38  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \
39  SSenderInfo sender; \
40  sender.m_ID = _currentMsg->header().m_ID; \
41  \
42  try \
43  { \
44  switch (currentCmd) \
45  {
46 
47 #define SM_MESSAGE_HANDLER(msg, func) \
48  case msg: \
49  { \
50  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
51  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
52  bool processed = func(attachmentPtr, sender); \
53  if (!processed) \
54  { \
55  if (!handlerExists(msg)) \
56  { \
57  LOG(dds::misc::error) << "The received message was not processed and has no registered handler: " \
58  << _currentMsg->toString(); \
59  } \
60  else \
61  { \
62  dispatchHandlers(msg, sender, attachmentPtr); \
63  } \
64  } \
65  break; \
66  }
67 
68 #define SM_MESSAGE_HANDLER_DISPATCH(msg) \
69  case msg: \
70  { \
71  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
72  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
73  LOG(dds::misc::debug) << "Dispatching " << g_cmdToString[msg]; \
74  if (!handlerExists(msg)) \
75  { \
76  LOG(dds::misc::error) << "The received message can't be dispatched, it has no registered handler: " \
77  << _currentMsg->toString(); \
78  } \
79  else \
80  { \
81  dispatchHandlers<>(msg, sender, attachmentPtr); \
82  } \
83  break; \
84  }
85 
86 #define END_SM_MSG_MAP() \
87  default: \
88  LOG(dds::misc::error) << "The received SM message doesn't have a handler: " << _currentMsg->toString(); \
89  } \
90  } \
91  catch (std::exception & _e) \
92  { \
93  LOG(dds::misc::error) << "SMChannel processMessage: " << _e.what(); \
94  } \
95  }
96 
97 // Raw message processing
98 #define SM_RAW_MESSAGE_HANDLER(theClass, func) \
99  public: \
100  friend protocol_api::CBaseSMChannelImpl<theClass>; \
101  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
102  { \
103  if (!m_started) \
104  return; \
105  \
106  using namespace dds; \
107  using namespace dds::protocol_api; \
108  SSenderInfo sender; \
109  sender.m_ID = _currentMsg->header().m_ID; \
110  try \
111  { \
112  bool processed = func(_currentMsg, sender); \
113  if (!processed) \
114  { \
115  if (!handlerExists(ECmdType::cmdRAW_MSG)) \
116  { \
117  LOG(dds::misc::error) << "The received message was not processed and has no registered handler: " \
118  << _currentMsg->toString(); \
119  } \
120  else \
121  { \
122  dispatchHandlers(ECmdType::cmdRAW_MSG, sender, _currentMsg); \
123  } \
124  } \
125  } \
126  catch (std::exception & _e) \
127  { \
128  LOG(dds::misc::error) << "SMChannel processMessage: " << _e.what(); \
129  } \
130  }
131 
132 namespace dds
133 {
134  namespace protocol_api
135  {
136  enum class EMQOpenType
137  {
138  CreateOnly,
139  OpenOrCreate,
140  OpenOnly
141  };
142 
143  template <class T>
144  class CBaseSMChannelImpl : public boost::noncopyable,
147  public std::enable_shared_from_this<T>
148  {
149  struct SProtocolMessageInfo
150  {
151  SProtocolMessageInfo(uint64_t _outputID, CProtocolMessage::protocolMessagePtr_t _msg)
152  : m_outputID(_outputID)
153  , m_msg(_msg)
154  {
155  }
156 
157  uint64_t m_outputID;
159  };
160 
161  typedef std::deque<SProtocolMessageInfo> protocolMessagePtrQueue_t;
162 
163  typedef std::shared_ptr<boost::interprocess::message_queue> messageQueuePtr_t;
164 
165  struct SMessageQueueInfo
166  {
167  using Container_t = std::vector<SMessageQueueInfo>;
168 
169  std::string m_name;
170  EMQOpenType m_openType;
171  messageQueuePtr_t m_mq;
172  };
173 
174  struct SMessageOutputBuffer
175  {
176  using Ptr_t = std::shared_ptr<SMessageOutputBuffer>;
177  using Container_t = std::map<uint64_t, Ptr_t>;
178 
179  SMessageQueueInfo m_info;
180 
181  protocolMessagePtrQueue_t m_writeQueue;
182  std::mutex m_mutexWriteBuffer;
183  protocolMessagePtrQueue_t m_writeBufferQueue;
184  std::atomic<bool> m_drainWriteQueue{ false };
185  };
186 
187  public:
188  typedef std::shared_ptr<T> connectionPtr_t;
189  typedef std::weak_ptr<T> weakConnectionPtr_t;
190 
191  // Both are needed because unqualified name lookup terminates at the first scope that has anything with the
192  // right name
195 
196  protected:
197  CBaseSMChannelImpl<T>(boost::asio::io_context& _service,
198  const std::string& _inputName,
199  const std::string& _outputName,
200  uint64_t _protocolHeaderID,
201  EMQOpenType _inputOpenType,
202  EMQOpenType _outputOpenType)
204  , m_isShuttingDown(false)
205  , m_started(false)
206  , m_protocolHeaderID(_protocolHeaderID)
207  , m_ioContext(_service)
208  {
209  defaultInit({ _inputName }, _outputName, _inputOpenType, _outputOpenType);
210  }
211 
212  CBaseSMChannelImpl<T>(boost::asio::io_context& _service,
213  const std::vector<std::string>& _inputNames,
214  const std::string& _outputName,
215  uint64_t _protocolHeaderID,
216  EMQOpenType _inputOpenType,
217  EMQOpenType _outputOpenType)
219  , m_isShuttingDown(false)
220  , m_started(false)
221  , m_protocolHeaderID(_protocolHeaderID)
222  , m_ioContext(_service)
223  {
224  defaultInit(_inputNames, _outputName, _inputOpenType, _outputOpenType);
225  }
226 
227  void defaultInit(const std::vector<std::string> _inputNames,
228  const std::string& _outputName,
229  EMQOpenType _inputOpenType,
230  EMQOpenType _outputOpenType)
231  {
232  for (const auto& v : _inputNames)
233  {
234  SMessageQueueInfo inInfo;
235  inInfo.m_name = v;
236  inInfo.m_openType = _inputOpenType;
237  m_transportIn.push_back(inInfo);
238  }
239 
240  // Output transport - default output transport initialized with protocol header ID
241  auto buffer = std::make_shared<SMessageOutputBuffer>();
242  buffer->m_info.m_name = _outputName;
243  buffer->m_info.m_openType = _outputOpenType;
244  m_outputBuffers.emplace(0, buffer);
245 
246  createMessageQueue();
247 
248  LOG(dds::misc::info) << "SM: New channel: inputName=" << m_transportIn.front().m_name
249  << " outputName=" << _outputName << " protocolHeaderID=" << m_protocolHeaderID;
250  }
251 
252  public:
254  {
255  LOG(dds::misc::info) << "SM: channel destructor is called. MQ: " << getName();
256  stop();
257  }
258 
259  static connectionPtr_t makeNew(boost::asio::io_context& _service,
260  const std::string& _inputName,
261  const std::string& _outputName,
262  uint64_t _ProtocolHeaderID,
263  EMQOpenType _inputOpenType = EMQOpenType::OpenOrCreate,
264  EMQOpenType _outputOpenType = EMQOpenType::OpenOrCreate)
265  {
266  connectionPtr_t newObject(
267  new T(_service, _inputName, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
268  return newObject;
269  }
270 
271  static connectionPtr_t makeNew(boost::asio::io_context& _service,
272  const std::vector<std::string>& _inputNames,
273  const std::string& _outputName,
274  uint64_t _ProtocolHeaderID,
275  EMQOpenType _inputOpenType = EMQOpenType::OpenOrCreate,
276  EMQOpenType _outputOpenType = EMQOpenType::OpenOrCreate)
277  {
278  connectionPtr_t newObject(
279  new T(_service, _inputNames, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
280  return newObject;
281  }
282 
283  private:
284  void createMessageQueue()
285  {
286  {
287  std::lock_guard<std::mutex> lock(m_mutexTransportIn);
288  for (auto& info : m_transportIn)
289  {
290  LOG(dds::misc::info) << "SM: Initializing input message queue: " << info.m_name;
291  info.m_mq.reset();
292  info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
293  }
294  }
295 
296  {
297  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
298  for (auto& v : m_outputBuffers)
299  {
300  SMessageQueueInfo& info = v.second->m_info;
301  LOG(dds::misc::info) << "SM: Initializing output message queue: " << info.m_name;
302  info.m_mq.reset();
303  info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
304  }
305  }
306  }
307 
308  messageQueuePtr_t createMessageQueue(const std::string& _name, EMQOpenType _openType)
309  {
310  static const unsigned int maxNofMessages = 100;
311  // Taking into account that maximum size of the string for the command is 2^16 plus some extra bytes
312  // for key size (128 bytes) and other.
313  // TODO: Because of performance problems we had to reduce the size of the message from 65K to 1K.
314  // TODO: Need to implement an algorithm to break protocol messages on smaller chunks if they are bigger
315  // than nmaxMessageSize
316  static const unsigned int maxMessageSize = 1024;
317 
318  try
319  {
320  switch (_openType)
321  {
323  return std::make_shared<boost::interprocess::message_queue>(
324  boost::interprocess::open_or_create, _name.c_str(), maxNofMessages, maxMessageSize);
326  return std::make_shared<boost::interprocess::message_queue>(
327  boost::interprocess::create_only, _name.c_str(), maxNofMessages, maxMessageSize);
329  return std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_only,
330  _name.c_str());
331  default:
333  << "Can't initialize shared memory transport with name " << _name << ": "
334  << "Unknown EMQOpenType given: " << static_cast<int>(_openType);
335  return nullptr;
336  }
337  }
338  catch (boost::interprocess::interprocess_exception& _e)
339  {
341  << "Can't initialize shared memory transport with name " << _name << ": " << _e.what();
342  return nullptr;
343  }
344  }
345 
346  public:
347  uint64_t getProtocolHeaderID() const
348  {
349  return m_protocolHeaderID;
350  }
351 
352  void addOutput(uint64_t _outputID,
353  const std::string& _name,
355  {
356  // We use EMQOpenType::OpenOrCreate in order to prevent race condition when the leader is already
357  // selected, but the shared memory has not been created yet. If the shared memory was not created by the
358  // lobby leader it will be automatically created by the lobby member.
359 
360  if (_outputID < 1)
361  {
362  std::stringstream ss;
363  ss << "Can't add output " << _name
364  << ". Output ID must be greater than 0. Current value: " << _outputID;
365  throw std::runtime_error(ss.str());
366  }
367 
368  {
369  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
370  auto it = m_outputBuffers.find(_outputID);
371  if (it != m_outputBuffers.end())
372  {
373  std::stringstream ss;
374  ss << "Can't add output " << _name << ". Output with ID " << _outputID << " already exists.";
375  throw std::runtime_error(ss.str());
376  }
377  }
378 
379  auto buffer = std::make_shared<SMessageOutputBuffer>();
380  buffer->m_info.m_name = _name;
381  buffer->m_info.m_openType = _openType;
382  buffer->m_info.m_mq = createMessageQueue(_name, _openType);
383 
384  if (buffer->m_info.m_mq != nullptr)
385  {
386  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
387  auto result = m_outputBuffers.emplace(_outputID, buffer);
388  if (!result.second)
389  {
390  std::stringstream ss;
391  ss << "Failed to add shared memory channel output with ID: " << _outputID << " name: " << _name;
392  throw std::runtime_error(ss.str());
393  }
394  else
395  {
397  << "Added shared memory channel output with ID: " << _outputID << " name: " << _name;
398  }
399  }
400  else
401  {
402  std::stringstream ss;
403  ss << "Can't add shared memory channel output with ID: " << _outputID << " name: " << _name;
404  throw std::runtime_error(ss.str());
405  }
406  }
407 
408  bool started() const
409  {
410  return m_started;
411  }
412 
413  void start()
414  {
415  // Check that all message queues were succesfully created
416  bool queuesCreated(true);
417  for (const auto& v : m_transportIn)
418  {
419  if (v.m_mq == nullptr)
420  {
421  queuesCreated = false;
422  break;
423  }
424  }
425 
426  if (queuesCreated)
427  {
428  for (const auto& v : m_outputBuffers)
429  {
430  if (v.second->m_info.m_mq == nullptr)
431  {
432  queuesCreated = false;
433  break;
434  }
435  }
436  }
437  if (!queuesCreated)
438  {
440  << "Can't start shared memory channel because there was a problem creating message queues";
441  m_started = false;
442  return;
443  }
444  //
445 
446  m_started = true;
447  m_isShuttingDown = false;
448 
449  auto self(this->shared_from_this());
450  for (const auto& v : m_transportIn)
451  {
452  m_ioContext.post(
453  [this, self, &v]
454  {
455  try
456  {
457  readMessage(v);
458  }
459  catch (std::exception& ex)
460  {
461  LOG(dds::misc::error) << "BaseSMChannelImpl can't read message: " << ex.what();
462  }
463  });
464  }
465 
466  SSenderInfo sender;
467  sender.m_ID = m_protocolHeaderID;
468  }
469 
470  void stop()
471  {
472  LOG(dds::misc::info) << "SM: channel STOP is called. MQ: " << getName();
473  if (!m_started)
474  return;
475 
476  m_started = false;
477  m_isShuttingDown = true;
478  }
479 
481  {
482  {
483  std::lock_guard<std::mutex> lock(m_mutexTransportIn);
484  for (const auto& v : m_transportIn)
485  {
486  const bool status = boost::interprocess::message_queue::remove(v.m_name.c_str());
487  LOG(dds::misc::info) << "Message queue " << v.m_name << " remove status: " << status;
488  }
489  }
490  {
491  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
492  for (const auto& v : m_outputBuffers)
493  {
494  const SMessageQueueInfo& info = v.second->m_info;
495  const bool status = boost::interprocess::message_queue::remove(info.m_name.c_str());
496  LOG(dds::misc::info) << "Message queue " << info.m_name << " remove status: " << status;
497  }
498  }
499  }
500 
501  void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _outputID = 0)
502  {
503  if (!m_started)
504  {
505  LOG(dds::misc::error) << "Skip pushing message. The channel was not started.";
506  return;
507  }
508 
509  if (m_isShuttingDown)
510  {
511  LOG(dds::misc::warning) << "Skip pushing message. The channel is shutting down.";
512  return;
513  }
514 
515  try
516  {
517  // Get corresponding buffer
518  const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
519 
520  std::lock_guard<std::mutex> lock(buffer->m_mutexWriteBuffer);
521 
522  // Need to drain the queue, i.e. skip messages
523  if (buffer->m_drainWriteQueue)
524  {
525  buffer->m_writeQueue.clear();
526  return;
527  }
528 
529  // add the current message to the queue
530  if (cmdUNKNOWN != _cmd)
531  buffer->m_writeQueue.push_back(SProtocolMessageInfo(_outputID, _msg));
532 
534  << getName()
535  << ": BaseSMChannelImpl pushMsg: WriteQueue size = " << buffer->m_writeQueue.size();
536 
537  // process standard async writing
538  auto self(this->shared_from_this());
539  m_ioContext.post(
540  [this, self, &buffer]
541  {
542  try
543  {
544  writeMessage(buffer);
545  }
546  catch (std::exception& ex)
547  {
548  LOG(dds::misc::error) << "BaseSMChannelImpl can't write message: " << ex.what();
549  }
550  });
551  }
552  catch (std::exception& ex)
553  {
554  LOG(dds::misc::error) << getName() << ": BaseSMChannelImpl can't push message: " << ex.what();
555  }
556  }
557 
558  template <ECmdType _cmd, class A>
559  void pushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
560  {
561  try
562  {
563  uint64_t headerID = adjustProtocolHeaderID(_protocolHeaderID);
565  SCommandAttachmentImpl<_cmd>::encode(_attachment, headerID);
566  pushMsg(msg, _cmd, _outputID);
567  }
568  catch (std::exception& ex)
569  {
570  LOG(dds::misc::error) << "BaseSMChannelImpl can't push message: " << ex.what();
571  }
572  }
573 
574  template <ECmdType _cmd>
575  void pushMsg(uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
576  {
577  SEmptyCmd cmd;
578  pushMsg<_cmd>(cmd, _protocolHeaderID, _outputID);
579  }
580 
581  void drainWriteQueue(bool _newVal, uint64_t _outputID)
582  {
583  const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
584  buffer->m_drainWriteQueue = _newVal;
585  }
586 
587  private:
588  const typename SMessageOutputBuffer::Ptr_t& getOutputBuffer(uint64_t _outputID)
589  {
590  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
591  auto it = m_outputBuffers.find(_outputID);
592  if (it != m_outputBuffers.end())
593  return it->second;
594 
595  throw std::runtime_error("Can't find corresponding output buffer: " + std::to_string(_outputID));
596  }
597 
598  uint64_t adjustProtocolHeaderID(uint64_t _protocolHeaderID) const
599  {
600  return (_protocolHeaderID == 0) ? m_protocolHeaderID : _protocolHeaderID;
601  }
602 
603  void readMessage(const SMessageQueueInfo& _info)
604  {
605  try
606  {
607  CProtocolMessage::protocolMessagePtr_t currentMsg = std::make_shared<CProtocolMessage>();
608 
609  unsigned int priority;
610  boost::interprocess::message_queue::size_type receivedSize;
611 
612  // We need to allocate the memory of the size equal to the maximum size of the message
613  currentMsg->resize(_info.m_mq->get_max_msg_size());
614 
615  namespace pt = boost::posix_time;
616  while (!_info.m_mq->timed_receive(
617  currentMsg->data(),
618  _info.m_mq->get_max_msg_size(),
619  receivedSize,
620  priority,
621  pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(500)))
622  {
623  if (m_isShuttingDown)
624  {
625  LOG(dds::misc::info) << _info.m_name << ": stopping read operation due to shutdown";
626  return;
627  }
628  }
629 
630  if (receivedSize < CProtocolMessage::header_length)
631  {
633  << _info.m_name << ": Received message: " << receivedSize << " bytes, expected at least"
634  << CProtocolMessage::header_length << " bytes";
635  }
636  else
637  {
638  // Resize message data to the actually received bytes
639  currentMsg->resize(receivedSize);
640  if (currentMsg->decode_header())
641  {
642  // If the header is ok, process the body of the message
643  processBody(receivedSize - CProtocolMessage::header_length, _info, currentMsg);
644  }
645  else
646  {
647  LOG(dds::misc::error) << "BaseSMChannelImpl: error reading message header";
648  }
649  }
650  }
651  catch (boost::interprocess::interprocess_exception& ex)
652  {
653  LOG(dds::misc::error) << "BaseSMChannelImpl: error receiving message: " << ex.what() << "\n";
654  }
655  }
656 
657  void processBody(boost::interprocess::message_queue::size_type _bodySize,
658  const SMessageQueueInfo& _info,
659  const CProtocolMessage::protocolMessagePtr_t& _currentMsg)
660  {
661  if (_bodySize != _currentMsg->body_length())
662  {
663  LOG(dds::misc::error) << _info.m_name << ": Received message BODY: " << _bodySize
664  << " bytes, expected " << _currentMsg->body_length();
665  }
666  else
667  {
668  if (_currentMsg->body_length() == 0)
669  {
671  << _info.m_name << ": Received message BODY no attachment: " << _currentMsg->toString();
672  }
673  else
674  {
675  LOG(dds::misc::debug) << _info.m_name << ": Received message BODY (" << _bodySize
676  << " bytes): " << _currentMsg->toString();
677  }
678 
679  // process received message
680  T* pThis = static_cast<T*>(this);
681  pThis->processMessage(_currentMsg);
682 
683  auto self(this->shared_from_this());
684  m_ioContext.post(
685  [this, self, &_info]
686  {
687  try
688  {
689  readMessage(_info);
690  }
691  catch (std::exception& ex)
692  {
693  LOG(dds::misc::error) << "BaseSMChannelImpl can't read message: " << ex.what();
694  }
695  });
696  }
697  }
698 
699  void writeMessage(const typename SMessageOutputBuffer::Ptr_t& _buffer)
700  {
701  if (_buffer == nullptr)
702  throw std::runtime_error("Can't find corresponding output buffer");
703 
704  {
705  std::lock_guard<std::mutex> lockWriteBuffer(_buffer->m_mutexWriteBuffer);
706  if (!_buffer->m_writeBufferQueue.empty())
707  return; // A write is in progress, don't start anything
708 
709  if (_buffer->m_writeQueue.empty())
710  return; // There is nothing to send.
711 
712  _buffer->m_writeBufferQueue.assign(_buffer->m_writeQueue.begin(), _buffer->m_writeQueue.end());
713  _buffer->m_writeQueue.clear();
714  }
715 
716  try
717  {
718  for (auto& msg : _buffer->m_writeBufferQueue)
719  {
720  if (_buffer->m_info.m_mq != nullptr)
721  {
722  namespace pt = boost::posix_time;
723  while (!_buffer->m_info.m_mq->timed_send(
724  msg.m_msg->data(),
725  msg.m_msg->length(),
726  0,
727  pt::ptime(pt::microsec_clock::universal_time()) + pt::milliseconds(500)))
728  {
729  if (m_isShuttingDown)
730  {
732  << _buffer->m_info.m_name << ": stopping write operation due to shutdown";
733  return;
734  }
735 
736  // If the other end is disconnected or the queue is full, we
737  // will block the thread by infinitely retrying to send.
738  // For such cases there is a drain command. The connection manager can initiate the
739  // drain, when needed. For example in case when a user task disconnects from the
740  // Intercom channel a drain will be initiated until we receive a new task assignment.
741  if (_buffer->m_drainWriteQueue)
742  {
744  << _buffer->m_info.m_name
745  << ": Draining write queue, while there is a message pending: "
746  << g_cmdToString[msg.m_msg->header().m_cmd];
747  break;
748  }
749  }
750  }
751  else
752  {
754  << _buffer->m_info.m_name << ": Can't find output transport with output ID "
755  << msg.m_outputID
756  << ". Write message failed. Command: " << g_cmdToString[msg.m_msg->header().m_cmd];
757  }
758  }
759  }
760  catch (boost::interprocess::interprocess_exception& ex)
761  {
763  << _buffer->m_info.m_name << ": BaseSMChannelImpl: error sending message: " << ex.what();
764  }
765 
766  // Lock the modification of the container
767  {
768  std::lock_guard<std::mutex> lock(_buffer->m_mutexWriteBuffer);
769  _buffer->m_writeBufferQueue.clear();
770  }
771  // We might need to send more messages
772  writeMessage(_buffer);
773  }
774 
775  const std::string& getName() const
776  {
777  return m_transportIn.front().m_name;
778  }
779 
780  protected:
781  std::atomic<bool> m_isShuttingDown;
782  std::atomic<bool> m_started;
784 
785  private:
786  boost::asio::io_context& m_ioContext;
787 
788  typename SMessageQueueInfo::Container_t
789  m_transportIn;
790  typename SMessageOutputBuffer::Container_t m_outputBuffers;
791  std::mutex m_mutexTransportIn;
792  std::mutex m_mutexTransportOut;
793  };
794  } // namespace protocol_api
795 } // namespace dds
796 
797 #endif /* defined(__DDS__BaseSMChannelImpl__) */
uint64_t m_protocolHeaderID
Definition: BaseSMChannelImpl.h:783
Definition: BaseEventHandlersImpl.h:48
Definition: def.h:148
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseSMChannelImpl.h:189
void pushMsg(const A &_attachment, uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:559
Definition: ChannelEventHandlersImpl.h:26
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::string &_inputName, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:259
std::atomic< bool > m_isShuttingDown
Definition: BaseSMChannelImpl.h:781
Definition: CommandAttachmentImpl.h:54
void defaultInit(const std::vector< std::string > _inputNames, const std::string &_outputName, EMQOpenType _inputOpenType, EMQOpenType _outputOpenType)
Definition: BaseSMChannelImpl.h:227
void addOutput(uint64_t _outputID, const std::string &_name, EMQOpenType _openType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:352
DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelEventHandlersImpl) DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelMessageHandlersImpl) protected
Definition: BaseSMChannelImpl.h:193
uint64_t getProtocolHeaderID() const
Definition: BaseSMChannelImpl.h:347
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &, uint64_t _ID)
Definition: CommandAttachmentImpl.h:71
std::atomic< bool > m_started
True if we were able to start the channel, False otherwise.
Definition: BaseSMChannelImpl.h:782
#define LOG(severity)
Definition: Logger.h:34
Definition: BaseSMChannelImpl.h:144
std::shared_ptr< T > connectionPtr_t
Definition: BaseSMChannelImpl.h:188
void stop()
Definition: BaseSMChannelImpl.h:470
Miscellaneous functions and helpers are located here.
Definition: AgentConnectionManager.h:13
void removeMessageQueue()
Definition: BaseSMChannelImpl.h:480
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::vector< std::string > &_inputNames, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:271
Definition: def.h:147
void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:501
Definition: def.h:146
EMQOpenType
Definition: BaseSMChannelImpl.h:136
void start()
Definition: BaseSMChannelImpl.h:413
bool started() const
Definition: BaseSMChannelImpl.h:408
uint64_t m_ID
Definition: BaseEventHandlersImpl.h:50
Definition: def.h:149
void drainWriteQueue(bool _newVal, uint64_t _outputID)
Definition: BaseSMChannelImpl.h:581
void pushMsg(uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:575
Definition: ChannelMessageHandlersImpl.h:20
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:81