DDS  ver. 3.4
BaseSMChannelImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef __DDS__BaseSMChannelImpl__
6 #define __DDS__BaseSMChannelImpl__
7 // STD
8 #include <chrono>
9 #include <deque>
10 #include <iostream>
11 #include <map>
12 #include <memory>
13 #include <mutex>
14 // BOOST
15 #include <boost/asio.hpp>
16 #include <boost/date_time.hpp>
17 #include <boost/interprocess/ipc/message_queue.hpp>
18 #include <boost/noncopyable.hpp>
19 #include <boost/thread/thread.hpp>
20 // DDS
23 #include "CommandAttachmentImpl.h"
24 #include "Logger.h"
25 
26 // Either raw message or command based processing can be used at a time
27 // Command based message processing
28 #define BEGIN_SM_MSG_MAP(theClass) \
29  public: \
30  friend protocol_api::CBaseSMChannelImpl<theClass>; \
31  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
32  { \
33  if (!m_started) \
34  return; \
35  \
36  using namespace dds; \
37  using namespace dds::protocol_api; \
38  bool processed = true; \
39  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \
40  SSenderInfo sender; \
41  sender.m_ID = _currentMsg->header().m_ID; \
42  \
43  try \
44  { \
45  switch (currentCmd) \
46  {
47 
48 #define SM_MESSAGE_HANDLER(msg, func) \
49  case msg: \
50  { \
51  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
52  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
53  processed = func(attachmentPtr, sender); \
54  if (!processed) \
55  { \
56  if (!handlerExists(msg)) \
57  { \
58  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
59  << _currentMsg->toString(); \
60  } \
61  else \
62  { \
63  dispatchHandlers(msg, sender, attachmentPtr); \
64  } \
65  } \
66  break; \
67  }
68 
69 #define SM_MESSAGE_HANDLER_DISPATCH(msg) \
70  case msg: \
71  { \
72  processed = false; \
73  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
74  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
75  LOG(MiscCommon::debug) << "Dispatching " << g_cmdToString[msg]; \
76  if (!handlerExists(msg)) \
77  { \
78  LOG(MiscCommon::error) << "The received message can't be dispatched, it has no registered handler: " \
79  << _currentMsg->toString(); \
80  } \
81  else \
82  { \
83  dispatchHandlers<>(msg, sender, attachmentPtr); \
84  } \
85  break; \
86  }
87 
88 #define END_SM_MSG_MAP() \
89  default: \
90  LOG(MiscCommon::error) << "The received SM message doesn't have a handler: " << _currentMsg->toString(); \
91  } \
92  } \
93  catch (std::exception & _e) \
94  { \
95  LOG(MiscCommon::error) << "SMChannel processMessage: " << _e.what(); \
96  } \
97  }
98 
99 // Raw message processing
100 #define SM_RAW_MESSAGE_HANDLER(theClass, func) \
101  public: \
102  friend protocol_api::CBaseSMChannelImpl<theClass>; \
103  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
104  { \
105  if (!m_started) \
106  return; \
107  \
108  using namespace dds; \
109  using namespace dds::protocol_api; \
110  bool processed = true; \
111  SSenderInfo sender; \
112  sender.m_ID = _currentMsg->header().m_ID; \
113  try \
114  { \
115  processed = func(_currentMsg, sender); \
116  if (!processed) \
117  { \
118  if (!handlerExists(ECmdType::cmdRAW_MSG)) \
119  { \
120  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
121  << _currentMsg->toString(); \
122  } \
123  else \
124  { \
125  dispatchHandlers(ECmdType::cmdRAW_MSG, sender, _currentMsg); \
126  } \
127  } \
128  } \
129  catch (std::exception & _e) \
130  { \
131  LOG(MiscCommon::error) << "SMChannel processMessage: " << _e.what(); \
132  } \
133  }
134 
135 namespace dds
136 {
137  namespace protocol_api
138  {
139  enum class EMQOpenType
140  {
141  CreateOnly,
142  OpenOrCreate,
143  OpenOnly
144  };
145 
146  template <class T>
147  class CBaseSMChannelImpl : public boost::noncopyable,
150  public std::enable_shared_from_this<T>
151  {
152  struct SProtocolMessageInfo
153  {
154  SProtocolMessageInfo(uint64_t _outputID, CProtocolMessage::protocolMessagePtr_t _msg)
155  : m_outputID(_outputID)
156  , m_msg(_msg)
157  {
158  }
159 
160  uint64_t m_outputID;
162  };
163 
164  typedef std::deque<SProtocolMessageInfo> protocolMessagePtrQueue_t;
165 
166  typedef std::shared_ptr<boost::interprocess::message_queue> messageQueuePtr_t;
167 
168  struct SMessageQueueInfo
169  {
170  using Container_t = std::vector<SMessageQueueInfo>;
171 
172  std::string m_name;
173  EMQOpenType m_openType;
174  messageQueuePtr_t m_mq;
175  };
176 
177  struct SMessageOutputBuffer
178  {
179  using Ptr_t = std::shared_ptr<SMessageOutputBuffer>;
180  using Container_t = std::map<uint64_t, Ptr_t>;
181 
182  SMessageQueueInfo m_info;
183 
184  protocolMessagePtrQueue_t m_writeQueue;
185  std::mutex m_mutexWriteBuffer;
186  protocolMessagePtrQueue_t m_writeBufferQueue;
187  std::atomic<bool> m_drainWriteQueue{ false };
188  };
189 
190  public:
191  typedef std::shared_ptr<T> connectionPtr_t;
192  typedef std::weak_ptr<T> weakConnectionPtr_t;
193 
194  // Both are needed because unqualified name lookup terminates at the first scope that has anything with the
195  // right name
198 
199  protected:
200  CBaseSMChannelImpl<T>(boost::asio::io_context& _service,
201  const std::string& _inputName,
202  const std::string& _outputName,
203  uint64_t _protocolHeaderID,
204  EMQOpenType _inputOpenType,
205  EMQOpenType _outputOpenType)
207  , m_isShuttingDown(false)
208  , m_started(false)
209  , m_protocolHeaderID(_protocolHeaderID)
210  , m_ioContext(_service)
211  {
212  defaultInit({ _inputName }, _outputName, _inputOpenType, _outputOpenType);
213  }
214 
215  CBaseSMChannelImpl<T>(boost::asio::io_context& _service,
216  const std::vector<std::string>& _inputNames,
217  const std::string& _outputName,
218  uint64_t _protocolHeaderID,
219  EMQOpenType _inputOpenType,
220  EMQOpenType _outputOpenType)
222  , m_isShuttingDown(false)
223  , m_started(false)
224  , m_protocolHeaderID(_protocolHeaderID)
225  , m_ioContext(_service)
226  {
227  defaultInit(_inputNames, _outputName, _inputOpenType, _outputOpenType);
228  }
229 
230  void defaultInit(const std::vector<std::string> _inputNames,
231  const std::string& _outputName,
232  EMQOpenType _inputOpenType,
233  EMQOpenType _outputOpenType)
234  {
235  for (const auto& v : _inputNames)
236  {
237  SMessageQueueInfo inInfo;
238  inInfo.m_name = v;
239  inInfo.m_openType = _inputOpenType;
240  m_transportIn.push_back(inInfo);
241  }
242 
243  // Output transport - default output transport initialized with protocol header ID
244  auto buffer = std::make_shared<SMessageOutputBuffer>();
245  buffer->m_info.m_name = _outputName;
246  buffer->m_info.m_openType = _outputOpenType;
247  m_outputBuffers.emplace(0, buffer);
248 
249  createMessageQueue();
250 
251  LOG(MiscCommon::info) << "SM: New channel: inputName=" << m_transportIn.front().m_name
252  << " outputName=" << _outputName << " protocolHeaderID=" << m_protocolHeaderID;
253  }
254 
255  public:
257  {
258  LOG(MiscCommon::info) << "SM: channel destructor is called. MQ: " << getName();
259  stop();
260  }
261 
262  static connectionPtr_t makeNew(boost::asio::io_context& _service,
263  const std::string& _inputName,
264  const std::string& _outputName,
265  uint64_t _ProtocolHeaderID,
266  EMQOpenType _inputOpenType = EMQOpenType::OpenOrCreate,
267  EMQOpenType _outputOpenType = EMQOpenType::OpenOrCreate)
268  {
269  connectionPtr_t newObject(
270  new T(_service, _inputName, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
271  return newObject;
272  }
273 
274  static connectionPtr_t makeNew(boost::asio::io_context& _service,
275  const std::vector<std::string>& _inputNames,
276  const std::string& _outputName,
277  uint64_t _ProtocolHeaderID,
278  EMQOpenType _inputOpenType = EMQOpenType::OpenOrCreate,
279  EMQOpenType _outputOpenType = EMQOpenType::OpenOrCreate)
280  {
281  connectionPtr_t newObject(
282  new T(_service, _inputNames, _outputName, _ProtocolHeaderID, _inputOpenType, _outputOpenType));
283  return newObject;
284  }
285 
286  private:
287  void createMessageQueue()
288  {
289  {
290  std::lock_guard<std::mutex> lock(m_mutexTransportIn);
291  for (auto& info : m_transportIn)
292  {
293  LOG(MiscCommon::info) << "SM: Initializing input message queue: " << info.m_name;
294  info.m_mq.reset();
295  info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
296  }
297  }
298 
299  {
300  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
301  for (auto& v : m_outputBuffers)
302  {
303  SMessageQueueInfo& info = v.second->m_info;
304  LOG(MiscCommon::info) << "SM: Initializing output message queue: " << info.m_name;
305  info.m_mq.reset();
306  info.m_mq = createMessageQueue(info.m_name.c_str(), info.m_openType);
307  }
308  }
309  }
310 
311  messageQueuePtr_t createMessageQueue(const std::string& _name, EMQOpenType _openType)
312  {
313  static const unsigned int maxNofMessages = 100;
314  // Taking into account that maximum size of the string for the command is 2^16 plus some extra bytes
315  // for key size (128 bytes) and other.
316  // TODO: Because of performance problems we had to reduce the size of the message from 65K to 1K.
317  // TODO: Need to implement an algorithm to break protocol messages on smaller chunks if they are bigger
318  // than nmaxMessageSize
319  static const unsigned int maxMessageSize = 1024;
320 
321  try
322  {
323  switch (_openType)
324  {
326  return std::make_shared<boost::interprocess::message_queue>(
327  boost::interprocess::open_or_create, _name.c_str(), maxNofMessages, maxMessageSize);
329  return std::make_shared<boost::interprocess::message_queue>(
330  boost::interprocess::create_only, _name.c_str(), maxNofMessages, maxMessageSize);
332  return std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_only,
333  _name.c_str());
334  default:
336  << "Can't initialize shared memory transport with name " << _name << ": "
337  << "Unknown EMQOpenType given: " << static_cast<int>(_openType);
338  return nullptr;
339  }
340  }
341  catch (boost::interprocess::interprocess_exception& _e)
342  {
344  << "Can't initialize shared memory transport with name " << _name << ": " << _e.what();
345  return nullptr;
346  }
347  }
348 
349  public:
350  uint64_t getProtocolHeaderID() const
351  {
352  return m_protocolHeaderID;
353  }
354 
355  void addOutput(uint64_t _outputID,
356  const std::string& _name,
358  {
359  // We use EMQOpenType::OpenOrCreate in order to prevent race condition when the leader is already
360  // selected, but the shared memory has not been created yet. If the shared memory was not created by the
361  // lobby leader it will be automatically created by the lobby member.
362 
363  if (_outputID < 1)
364  {
365  std::stringstream ss;
366  ss << "Can't add output " << _name
367  << ". Output ID must be greater than 0. Current value: " << _outputID;
368  throw std::runtime_error(ss.str());
369  }
370 
371  {
372  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
373  auto it = m_outputBuffers.find(_outputID);
374  if (it != m_outputBuffers.end())
375  {
376  std::stringstream ss;
377  ss << "Can't add output " << _name << ". Output with ID " << _outputID << " already exists.";
378  throw std::runtime_error(ss.str());
379  }
380  }
381 
382  auto buffer = std::make_shared<SMessageOutputBuffer>();
383  buffer->m_info.m_name = _name;
384  buffer->m_info.m_openType = _openType;
385  buffer->m_info.m_mq = createMessageQueue(_name, _openType);
386 
387  if (buffer->m_info.m_mq != nullptr)
388  {
389  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
390  auto result = m_outputBuffers.emplace(_outputID, buffer);
391  if (!result.second)
392  {
393  std::stringstream ss;
394  ss << "Failed to add shared memory channel output with ID: " << _outputID << " name: " << _name;
395  throw std::runtime_error(ss.str());
396  }
397  else
398  {
400  << "Added shared memory channel output with ID: " << _outputID << " name: " << _name;
401  }
402  }
403  else
404  {
405  std::stringstream ss;
406  ss << "Can't add shared memory channel output with ID: " << _outputID << " name: " << _name;
407  throw std::runtime_error(ss.str());
408  }
409  }
410 
411  bool started() const
412  {
413  return m_started;
414  }
415 
416  void start()
417  {
418  // Check that all message queues were succesfully created
419  bool queuesCreated(true);
420  for (const auto& v : m_transportIn)
421  {
422  if (v.m_mq == nullptr)
423  {
424  queuesCreated = false;
425  break;
426  }
427  }
428 
429  if (queuesCreated)
430  {
431  for (const auto& v : m_outputBuffers)
432  {
433  if (v.second->m_info.m_mq == nullptr)
434  {
435  queuesCreated = false;
436  break;
437  }
438  }
439  }
440  if (!queuesCreated)
441  {
443  << "Can't start shared memory channel because there was a problem creating message queues";
444  m_started = false;
445  return;
446  }
447  //
448 
449  m_started = true;
450  m_isShuttingDown = false;
451 
452  auto self(this->shared_from_this());
453  for (const auto& v : m_transportIn)
454  {
455  m_ioContext.post([this, self, &v] {
456  try
457  {
458  readMessage(v);
459  }
460  catch (std::exception& ex)
461  {
462  LOG(MiscCommon::error) << "BaseSMChannelImpl can't read message: " << ex.what();
463  }
464  });
465  }
466 
467  SSenderInfo sender;
468  sender.m_ID = m_protocolHeaderID;
469  dispatchHandlers(EChannelEvents::OnSMStart, sender);
470  }
471 
472  void stop()
473  {
474  LOG(MiscCommon::info) << "SM: channel STOP is called. MQ: " << getName();
475  if (!m_started)
476  return;
477 
478  m_started = false;
479  sendYourselfShutdown();
480  }
481 
483  {
484  {
485  std::lock_guard<std::mutex> lock(m_mutexTransportIn);
486  for (const auto& v : m_transportIn)
487  {
488  const bool status = boost::interprocess::message_queue::remove(v.m_name.c_str());
489  LOG(MiscCommon::info) << "Message queue " << v.m_name << " remove status: " << status;
490  }
491  }
492  {
493  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
494  for (const auto& v : m_outputBuffers)
495  {
496  const SMessageQueueInfo& info = v.second->m_info;
497  const bool status = boost::interprocess::message_queue::remove(info.m_name.c_str());
498  LOG(MiscCommon::info) << "Message queue " << info.m_name << " remove status: " << status;
499  }
500  }
501  }
502 
503  void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _outputID = 0)
504  {
505  if (!m_started)
506  {
507  LOG(MiscCommon::error) << "Skip pushing message. The channel was not started.";
508  return;
509  }
510 
511  try
512  {
513  // Get corresponding buffer
514  const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
515 
516  std::lock_guard<std::mutex> lock(buffer->m_mutexWriteBuffer);
517 
518  // Need to drain the queue, i.e. skip messages
519  if (buffer->m_drainWriteQueue)
520  {
521  buffer->m_writeQueue.clear();
522  return;
523  }
524 
525  // add the current message to the queue
526  if (cmdUNKNOWN != _cmd)
527  buffer->m_writeQueue.push_back(SProtocolMessageInfo(_outputID, _msg));
528 
530  << getName()
531  << ": BaseSMChannelImpl pushMsg: WriteQueue size = " << buffer->m_writeQueue.size();
532 
533  // process standard async writing
534  auto self(this->shared_from_this());
535  m_ioContext.post([this, self, &buffer] {
536  try
537  {
538  writeMessage(buffer);
539  }
540  catch (std::exception& ex)
541  {
542  LOG(MiscCommon::error) << "BaseSMChannelImpl can't write message: " << ex.what();
543  }
544  });
545  }
546  catch (std::exception& ex)
547  {
548  LOG(MiscCommon::error) << getName() << ": BaseSMChannelImpl can't push message: " << ex.what();
549  }
550  }
551 
552  template <ECmdType _cmd, class A>
553  void pushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
554  {
555  try
556  {
557  uint64_t headerID = adjustProtocolHeaderID(_protocolHeaderID);
559  SCommandAttachmentImpl<_cmd>::encode(_attachment, headerID);
560  pushMsg(msg, _cmd, _outputID);
561  }
562  catch (std::exception& ex)
563  {
564  LOG(MiscCommon::error) << "BaseSMChannelImpl can't push message: " << ex.what();
565  }
566  }
567 
568  template <ECmdType _cmd>
569  void pushMsg(uint64_t _protocolHeaderID = 0, uint64_t _outputID = 0)
570  {
571  SEmptyCmd cmd;
572  pushMsg<_cmd>(cmd, _protocolHeaderID, _outputID);
573  }
574 
576  {
577  // Send cmdSHUTDOWN to all connected outputs except yourself
578  {
579  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
580  for (auto it : m_outputBuffers)
581  {
582  if (it.first == 0 || it.first == m_protocolHeaderID)
583  continue;
584  SEmptyCmd cmd;
587  messageQueuePtr_t mq = it.second->m_info.m_mq;
588  // TODO: FIXME: Revise the code, use timed_send
589  mq->send(msg->data(), msg->length(), 1);
590  }
591  }
592  }
593 
594  void drainWriteQueue(bool _newVal, uint64_t _outputID)
595  {
596  const typename SMessageOutputBuffer::Ptr_t& buffer = getOutputBuffer(_outputID);
597  buffer->m_drainWriteQueue = _newVal;
598  }
599 
600  private:
601  const typename SMessageOutputBuffer::Ptr_t& getOutputBuffer(uint64_t _outputID)
602  {
603  std::lock_guard<std::mutex> lock(m_mutexTransportOut);
604  auto it = m_outputBuffers.find(_outputID);
605  if (it != m_outputBuffers.end())
606  return it->second;
607 
608  throw std::runtime_error("Can't find corresponding output buffer: " + std::to_string(_outputID));
609  }
610 
611  uint64_t adjustProtocolHeaderID(uint64_t _protocolHeaderID) const
612  {
613  return (_protocolHeaderID == 0) ? m_protocolHeaderID : _protocolHeaderID;
614  }
615 
616  void sendYourselfShutdown()
617  {
618  std::lock_guard<std::mutex> lock(m_mutexTransportIn);
619  // Send cmdSHUTDOWN with higher priority in order to stop read operation.
620  for (auto& info : m_transportIn)
621  {
622  LOG(MiscCommon::debug) << info.m_name << ": Sends itself a SHUTDOWN msg";
623  SEmptyCmd cmd;
626  // m_transportIn.m_mq->send(msg->data(), msg->length(), 1);
627  while (!info.m_mq->timed_send(
628  msg->data(),
629  msg->length(),
630  0,
631  boost::posix_time::ptime(boost::posix_time::microsec_clock::universal_time()) +
632  boost::posix_time::milliseconds(10)))
633  {
634  if (m_isShuttingDown)
635  {
637  << info.m_name << ": stopping send yourself shutdown while already shutting down";
638  break;
639  }
640  }
641  }
642  }
643 
644  void readMessage(const SMessageQueueInfo& _info)
645  {
646  try
647  {
648  CProtocolMessage::protocolMessagePtr_t currentMsg = std::make_shared<CProtocolMessage>();
649 
650  unsigned int priority;
651  boost::interprocess::message_queue::size_type receivedSize;
652 
653  // We need to allocate the memory of the size equal to the maximum size of the message
654  currentMsg->resize(_info.m_mq->get_max_msg_size());
655  _info.m_mq->receive(currentMsg->data(), _info.m_mq->get_max_msg_size(), receivedSize, priority);
656 
657  if (receivedSize < CProtocolMessage::header_length)
658  {
660  << _info.m_name << ": Received message: " << receivedSize << " bytes, expected at least"
661  << CProtocolMessage::header_length << " bytes";
662  }
663  else
664  {
665  // Resize message data to the actually received bytes
666  currentMsg->resize(receivedSize);
667  if (currentMsg->decode_header())
668  {
669  // If the header is ok, process the body of the message
670  processBody(receivedSize - CProtocolMessage::header_length, _info, currentMsg);
671  }
672  else
673  {
674  LOG(MiscCommon::error) << "BaseSMChannelImpl: error reading message header";
675  }
676  }
677  }
678  catch (boost::interprocess::interprocess_exception& ex)
679  {
680  LOG(MiscCommon::error) << "BaseSMChannelImpl: error receiving message: " << ex.what() << "\n";
681  }
682  }
683 
684  void processBody(boost::interprocess::message_queue::size_type _bodySize,
685  const SMessageQueueInfo& _info,
686  const CProtocolMessage::protocolMessagePtr_t& _currentMsg)
687  {
688  if (_bodySize != _currentMsg->body_length())
689  {
690  LOG(MiscCommon::error) << _info.m_name << ": Received message BODY: " << _bodySize
691  << " bytes, expected " << _currentMsg->body_length();
692  }
693  else
694  {
695  if (_currentMsg->body_length() == 0)
696  {
698  << _info.m_name << ": Received message BODY no attachment: " << _currentMsg->toString();
699  }
700  else
701  {
702  LOG(MiscCommon::debug) << _info.m_name << ": Received message BODY (" << _bodySize
703  << " bytes): " << _currentMsg->toString();
704  }
705 
706  // process received message
707  T* pThis = static_cast<T*>(this);
708  pThis->processMessage(_currentMsg);
709 
710  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd);
711 
712  // Do not start readMessage if cmdSHUTDOWN was sent
713  if (currentCmd != cmdSHUTDOWN)
714  {
715  auto self(this->shared_from_this());
716  m_ioContext.post([this, self, &_info] {
717  try
718  {
719  readMessage(_info);
720  }
721  catch (std::exception& ex)
722  {
723  LOG(MiscCommon::error) << "BaseSMChannelImpl can't read message: " << ex.what();
724  }
725  });
726  }
727  else
728  {
729  m_isShuttingDown = true;
730  LOG(MiscCommon::debug) << _info.m_name << ": Stopping readMessage thread...";
731  }
732  }
733  }
734 
735  void writeMessage(const typename SMessageOutputBuffer::Ptr_t& _buffer)
736  {
737  if (_buffer == nullptr)
738  throw std::runtime_error("Can't find corresponding output buffer");
739 
740  {
741  std::lock_guard<std::mutex> lockWriteBuffer(_buffer->m_mutexWriteBuffer);
742  if (!_buffer->m_writeBufferQueue.empty())
743  return; // A write is in progress, don't start anything
744 
745  if (_buffer->m_writeQueue.empty())
746  return; // There is nothing to send.
747 
748  _buffer->m_writeBufferQueue.assign(_buffer->m_writeQueue.begin(), _buffer->m_writeQueue.end());
749  _buffer->m_writeQueue.clear();
750  }
751 
752  try
753  {
754  for (auto& msg : _buffer->m_writeBufferQueue)
755  {
756  if (_buffer->m_info.m_mq != nullptr)
757  {
758  while (!_buffer->m_info.m_mq->timed_send(
759  msg.m_msg->data(),
760  msg.m_msg->length(),
761  0,
762  boost::posix_time::ptime(boost::posix_time::microsec_clock::universal_time()) +
763  boost::posix_time::milliseconds(10)))
764  {
765  if (m_isShuttingDown)
766  {
768  << _buffer->m_info.m_name << ": stopping write operation due to shutdown";
769  return;
770  }
771 
772  // If the other end is disconnected or the queue is full, we
773  // will block the thread by infinitely retrying to send.
774  // For such cases there is a drain command. The connection manager can initiate the
775  // drain, when needed. For example in case when a user task disconnects from the
776  // Intercom channel a drain will be initiated util we receive a new task assignment
777  if (_buffer->m_drainWriteQueue)
778  {
780  << _buffer->m_info.m_name
781  << ": Draining write queue, while there is a message pending: "
782  << g_cmdToString[msg.m_msg->header().m_cmd];
783  break;
784  }
785  }
786  }
787  else
788  {
790  << _buffer->m_info.m_name << ": Can't find output transport with output ID "
791  << msg.m_outputID
792  << ". Write message failed. Command: " << g_cmdToString[msg.m_msg->header().m_cmd];
793  }
794  }
795  }
796  catch (boost::interprocess::interprocess_exception& ex)
797  {
799  << _buffer->m_info.m_name << ": BaseSMChannelImpl: error sending message: " << ex.what();
800  }
801 
802  // Lock the modification of the container
803  {
804  std::lock_guard<std::mutex> lock(_buffer->m_mutexWriteBuffer);
805  _buffer->m_writeBufferQueue.clear();
806  }
807  // We might need to send more messages
808  writeMessage(_buffer);
809  }
810 
811  const std::string& getName() const
812  {
813  return m_transportIn.front().m_name;
814  }
815 
816  protected:
817  std::atomic<bool> m_isShuttingDown;
818  std::atomic<bool> m_started;
820 
821  private:
822  boost::asio::io_context& m_ioContext;
823 
824  typename SMessageQueueInfo::Container_t
825  m_transportIn;
826  typename SMessageOutputBuffer::Container_t m_outputBuffers;
827  std::mutex m_mutexTransportIn;
828  std::mutex m_mutexTransportOut;
829  };
830  } // namespace protocol_api
831 } // namespace dds
832 
833 #endif /* defined(__DDS__BaseSMChannelImpl__) */
uint64_t m_protocolHeaderID
Definition: BaseSMChannelImpl.h:819
Definition: def.h:151
Definition: BaseEventHandlersImpl.h:48
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseSMChannelImpl.h:192
void pushMsg(const A &_attachment, uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:553
Definition: ChannelEventHandlersImpl.h:29
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::string &_inputName, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:262
void syncSendShutdownAll()
Definition: BaseSMChannelImpl.h:575
std::atomic< bool > m_isShuttingDown
Definition: BaseSMChannelImpl.h:817
Definition: CommandAttachmentImpl.h:54
void defaultInit(const std::vector< std::string > _inputNames, const std::string &_outputName, EMQOpenType _inputOpenType, EMQOpenType _outputOpenType)
Definition: BaseSMChannelImpl.h:230
void addOutput(uint64_t _outputID, const std::string &_name, EMQOpenType _openType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:355
DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelEventHandlersImpl) DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelMessageHandlersImpl) protected
Definition: BaseSMChannelImpl.h:196
Definition: ProtocolCommands.h:30
uint64_t getProtocolHeaderID() const
Definition: BaseSMChannelImpl.h:350
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &, uint64_t _ID)
Definition: CommandAttachmentImpl.h:71
std::atomic< bool > m_started
True if we were able to start the channel, False otherwise.
Definition: BaseSMChannelImpl.h:818
#define LOG(severity)
Definition: Logger.h:56
Definition: BaseSMChannelImpl.h:147
std::shared_ptr< T > connectionPtr_t
Definition: BaseSMChannelImpl.h:191
void stop()
Definition: BaseSMChannelImpl.h:472
Definition: AgentConnectionManager.h:13
void removeMessageQueue()
Definition: BaseSMChannelImpl.h:482
static connectionPtr_t makeNew(boost::asio::io_context &_service, const std::vector< std::string > &_inputNames, const std::string &_outputName, uint64_t _ProtocolHeaderID, EMQOpenType _inputOpenType=EMQOpenType::OpenOrCreate, EMQOpenType _outputOpenType=EMQOpenType::OpenOrCreate)
Definition: BaseSMChannelImpl.h:274
Definition: def.h:152
Definition: def.h:149
void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:503
EMQOpenType
Definition: BaseSMChannelImpl.h:139
void start()
Definition: BaseSMChannelImpl.h:416
bool started() const
Definition: BaseSMChannelImpl.h:411
uint64_t m_ID
Definition: BaseEventHandlersImpl.h:50
Definition: def.h:150
void drainWriteQueue(bool _newVal, uint64_t _outputID)
Definition: BaseSMChannelImpl.h:594
void pushMsg(uint64_t _protocolHeaderID=0, uint64_t _outputID=0)
Definition: BaseSMChannelImpl.h:569
Definition: ChannelMessageHandlersImpl.h:20
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:81