DDS  ver. 1.6
BaseSMChannelImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef __DDS__BaseSMChannelImpl__
6 #define __DDS__BaseSMChannelImpl__
7 // STD
8 #include <chrono>
9 #include <deque>
10 #include <iostream>
11 #include <map>
12 #include <memory>
13 #include <mutex>
14 // BOOST
15 #include <boost/interprocess/ipc/message_queue.hpp>
16 #include <boost/noncopyable.hpp>
17 #include <boost/thread/thread.hpp>
18 #pragma clang diagnostic push
19 #pragma clang diagnostic ignored "-Wunused-local-typedef"
20 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
21 #include <boost/asio.hpp>
22 #pragma clang diagnostic pop
23 // DDS
25 #include "CommandAttachmentImpl.h"
26 #include "Logger.h"
27 
28 #define BEGIN_SM_MSG_MAP(theClass) \
29  public: \
30  friend protocol_api::CBaseSMChannelImpl<theClass>; \
31  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
32  { \
33  if (!m_started) \
34  return; \
35  \
36  using namespace dds; \
37  using namespace dds::protocol_api; \
38  bool processed = true; \
39  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \
40  \
41  try \
42  { \
43  switch (currentCmd) \
44  {
45 
46 #define SM_MESSAGE_HANDLER(msg, func) \
47  case msg: \
48  { \
49  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
50  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
51  processed = func(attachmentPtr); \
52  if (!processed) \
53  { \
54  if (getNofMessageHandlers<msg>() == 0) \
55  { \
56  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
57  << _currentMsg->toString(); \
58  } \
59  else \
60  { \
61  dispatchMessageHandlers(msg, attachmentPtr, this); \
62  } \
63  } \
64  break; \
65  }
66 
67 #define END_SM_MSG_MAP() \
68  default: \
69  LOG(MiscCommon::error) << "The received SM message doesn't have a handler: " << _currentMsg->toString(); \
70  } \
71  } \
72  catch (std::exception & _e) \
73  { \
74  LOG(MiscCommon::error) << "SMChannel processMessage: " << _e.what(); \
75  } \
76  }
77 
78 namespace dds
79 {
80  namespace protocol_api
81  {
82  template <class T>
83  class CBaseSMChannelImpl : public boost::noncopyable,
85  public std::enable_shared_from_this<T>
86  {
87  typedef std::deque<CProtocolMessage::protocolMessagePtr_t> protocolMessagePtrQueue_t;
88  typedef std::shared_ptr<boost::interprocess::message_queue> messageQueuePtr_t;
89 
90  public:
91  typedef std::shared_ptr<T> connectionPtr_t;
92 
93  protected:
94  CBaseSMChannelImpl<T>(const std::string& _inputName, const std::string& _outputName)
96  , m_started(false)
97  , m_workerThreads()
98  , m_currentMsg(std::make_shared<CProtocolMessage>())
99  , m_inputMessageQueueName(_inputName)
100  , m_outputMessageQueueName(_outputName)
101  , m_writeQueue()
102  , m_mutexWriteBuffer()
103  , m_writeBufferQueue()
104  {
105  createMessageQueue();
106  }
107 
108  public:
110  {
111  LOG(MiscCommon::info) << "Shared memory channel destructor is called";
112  stop();
113  }
114 
115  static connectionPtr_t makeNew(const std::string& _inputName, const std::string& _outputName)
116  {
117  connectionPtr_t newObject(new T(_inputName, _outputName));
118  return newObject;
119  }
120 
121  private:
122  void createMessageQueue()
123  {
124  try
125  {
126  LOG(MiscCommon::info) << "Initializing message queue for shared memory channel";
127 
128  static const unsigned int maxNofMessages = 100;
129  // Taking into account that maximum size of the string for the command is 2^16 plus some extra bytes
130  // for key size (128 bytes) and other.
131  static const unsigned int maxMessageSize = 65000;
132 
133  m_transportIn.reset();
134  m_transportOut.reset();
135  m_transportIn =
136  std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_or_create,
137  m_inputMessageQueueName.c_str(),
138  maxNofMessages,
139  maxMessageSize);
140  m_transportOut =
141  std::make_shared<boost::interprocess::message_queue>(boost::interprocess::open_or_create,
142  m_outputMessageQueueName.c_str(),
143  maxNofMessages,
144  maxMessageSize);
145  }
146  catch (boost::interprocess::interprocess_exception& _e)
147  {
149  << "Can't initialize shared memory transport with input name " << m_inputMessageQueueName
150  << " and output name " << m_outputMessageQueueName << ": " << _e.what();
151  m_transportIn.reset();
152  m_transportOut.reset();
153  }
154  }
155 
156  public:
157  void reinit()
158  {
159  if (!m_started)
160  return;
161 
162  LOG(MiscCommon::info) << "Reinitializing shared memory channel...";
163 
164  stop();
166  createMessageQueue();
167  {
168  // Clear message buffers
169  std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
170  m_writeBufferQueue.clear();
171  m_writeQueue.clear();
172  }
173  start();
174  }
175 
176  void start()
177  {
178  if (m_transportIn == nullptr || m_transportOut == nullptr)
179  {
181  << "Can't start shared memory channel because there was a problem creating message queues";
182  m_started = false;
183  return;
184  }
185 
186  m_started = true;
187 
188  m_io_service.reset();
189 
190  auto self(this->shared_from_this());
191  m_io_service.post([this, self] {
192  try
193  {
194  readMessage();
195  }
196  catch (std::exception& ex)
197  {
198  LOG(MiscCommon::error) << "BaseSMChannelImpl can't read message: " << ex.what();
199  }
200  });
201 
202  const int nConcurrentThreads(3);
203  m_workerThreads = std::make_shared<boost::thread_group>();
204  for (int x = 0; x < nConcurrentThreads; ++x)
205  {
206  m_workerThreads->create_thread(boost::bind(&boost::asio::io_service::run, &(m_io_service)));
207  }
208  }
209 
210  void stop()
211  {
212  if (!m_started)
213  return;
214 
215  m_started = false;
216  sendYourselfShutdown();
217 
218  m_io_service.stop();
219  m_workerThreads->join_all();
220  m_workerThreads.reset();
221  }
222 
224  {
225  const bool inputRemoved = boost::interprocess::message_queue::remove(m_inputMessageQueueName.c_str());
226  const bool outputRemoved = boost::interprocess::message_queue::remove(m_outputMessageQueueName.c_str());
227  LOG(MiscCommon::info) << "Message queue " << m_inputMessageQueueName
228  << " remove status: " << inputRemoved;
229  LOG(MiscCommon::info) << "Message queue " << m_outputMessageQueueName
230  << " remove status: " << outputRemoved;
231  }
232 
233  template <ECmdType _cmd, class A>
234  void pushMsg(const A& _attachment)
235  {
236  if (!m_started)
237  {
238  LOG(MiscCommon::error) << "Skip pushing message. The channel was not started.";
239  return;
240  }
241 
242  try
243  {
245 
246  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
247 
248  // add the current message to the queue
249  if (cmdUNKNOWN != _cmd)
250  m_writeQueue.push_back(msg);
251 
252  LOG(MiscCommon::debug) << "BaseSMChannelImpl pushMsg: WriteQueue size = " << m_writeQueue.size();
253  }
254  catch (std::exception& ex)
255  {
256  LOG(MiscCommon::error) << "BaseSMChannelImpl can't push message: " << ex.what();
257  }
258 
259  // process standard async writing
260  auto self(this->shared_from_this());
261  m_io_service.post([this, self] {
262  try
263  {
264  writeMessage();
265  }
266  catch (std::exception& ex)
267  {
268  LOG(MiscCommon::error) << "BaseSMChannelImpl can't write message: " << ex.what();
269  }
270  });
271  }
272 
273  template <ECmdType _cmd>
274  void pushMsg()
275  {
276  SEmptyCmd cmd;
277  pushMsg<_cmd>(cmd);
278  }
279 
280  private:
281  void sendYourselfShutdown()
282  {
283  // Send cmdSHUTDOWN with higher priority in order to stop read operation.
284  SEmptyCmd cmd;
286  m_transportIn->send(msg->data(), msg->length(), 1);
287  }
288 
289  void readMessage()
290  {
291  try
292  {
293  unsigned int priority;
294  boost::interprocess::message_queue::size_type receivedSize;
295 
296  // We need to allocate the memory of the size equal to the maximum size of the message
297  m_currentMsg->resize(m_transportIn->get_max_msg_size());
298  m_transportIn->receive(
299  m_currentMsg->data(), m_transportIn->get_max_msg_size(), receivedSize, priority);
300 
301  if (receivedSize < CProtocolMessage::header_length)
302  {
303  LOG(MiscCommon::debug) << "Received message: " << receivedSize << " bytes, expected at least"
304  << CProtocolMessage::header_length << " bytes";
305  }
306  else
307  {
308  // Resize message data to the actually received bytes
309  m_currentMsg->resize(receivedSize);
310  if (m_currentMsg->decode_header())
311  {
312  ECmdType currentCmd = static_cast<ECmdType>(m_currentMsg->header().m_cmd);
313  if (currentCmd == cmdSHUTDOWN)
314  {
315  // Do not execute processBody which starts readMessage
316  }
317  else
318  {
319  // If the header is ok, process the body of the message
320  processBody(receivedSize - CProtocolMessage::header_length);
321  }
322  }
323  else
324  {
325  LOG(MiscCommon::error) << "BaseSMChannelImpl: error reading message header";
326  }
327  }
328  }
329  catch (boost::interprocess::interprocess_exception& ex)
330  {
331  LOG(MiscCommon::error) << "BaseSMChannelImpl: error receiving message: " << ex.what() << "\n";
332  }
333  }
334 
335  void processBody(boost::interprocess::message_queue::size_type _bodySize)
336  {
337  if (_bodySize != m_currentMsg->body_length())
338  {
340  << "Received message BODY: " << _bodySize << " bytes, expected " << m_currentMsg->body_length();
341  }
342  else
343  {
344  if (m_currentMsg->body_length() == 0)
345  {
346  LOG(MiscCommon::debug) << "Received message BODY no attachment: " << m_currentMsg->toString();
347  }
348  else
349  {
351  << "Received message BODY (" << _bodySize << " bytes): " << m_currentMsg->toString();
352  }
353 
354  // process received message
355  T* pThis = static_cast<T*>(this);
356  pThis->processMessage(m_currentMsg);
357 
358  // Read next message
359  m_currentMsg->clear();
360 
361  auto self(this->shared_from_this());
362  m_io_service.post([this, self] {
363  try
364  {
365  readMessage();
366  }
367  catch (std::exception& ex)
368  {
369  LOG(MiscCommon::error) << "BaseSMChannelImpl can't read message: " << ex.what();
370  }
371  });
372  }
373  }
374 
375  void writeMessage()
376  {
377  {
378  std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
379  if (!m_writeBufferQueue.empty())
380  return; // A write is in progress, don't start anything
381 
382  if (m_writeQueue.empty())
383  return; // There is nothing to send.
384 
385  m_writeBufferQueue.assign(m_writeQueue.begin(), m_writeQueue.end());
386  m_writeQueue.clear();
387  }
388 
389  try
390  {
391  for (auto msg : m_writeBufferQueue)
392  {
393  m_transportOut->send(msg->data(), msg->length(), 0);
394  }
395  }
396  catch (boost::interprocess::interprocess_exception& ex)
397  {
398  LOG(MiscCommon::error) << "BaseSMChannelImpl: error sending message: " << ex.what();
399  }
400 
401  // Lock the modification of the container
402  {
403  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
404  m_writeBufferQueue.clear();
405  }
406  // We might need to send more messages
407  writeMessage();
408  }
409 
410  protected:
411  std::atomic<bool> m_started;
412 
413  private:
414  messageQueuePtr_t m_transportIn;
415  messageQueuePtr_t m_transportOut;
416  boost::asio::io_service m_io_service;
417  std::shared_ptr<boost::thread_group> m_workerThreads;
419 
420  std::string m_inputMessageQueueName;
421  std::string m_outputMessageQueueName;
422 
423  protocolMessagePtrQueue_t m_writeQueue;
424 
425  std::mutex m_mutexWriteBuffer;
426  protocolMessagePtrQueue_t m_writeBufferQueue;
427  };
428  }
429 }
430 
431 #endif /* defined(__DDS__BaseSMChannelImpl__) */
void reinit()
Definition: BaseSMChannelImpl.h:157
static connectionPtr_t makeNew(const std::string &_inputName, const std::string &_outputName)
Definition: BaseSMChannelImpl.h:115
void pushMsg(const A &_attachment)
Definition: BaseSMChannelImpl.h:234
Definition: CommandAttachmentImpl.h:55
Definition: ProtocolCommands.h:29
std::atomic< bool > m_started
True if we were able to start the channel, False otherwise.
Definition: BaseSMChannelImpl.h:411
#define LOG(severity)
Definition: Logger.h:54
Definition: BaseSMChannelImpl.h:83
std::shared_ptr< T > connectionPtr_t
Definition: BaseSMChannelImpl.h:91
void stop()
Definition: BaseSMChannelImpl.h:210
Definition: dds-agent/src/AgentConnectionManager.h:16
void removeMessageQueue()
Definition: BaseSMChannelImpl.h:223
Definition: def.h:153
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &)
Definition: CommandAttachmentImpl.h:72
Definition: def.h:152
Definition: def.h:149
void start()
Definition: BaseSMChannelImpl.h:176
Definition: def.h:150
void pushMsg()
Definition: BaseSMChannelImpl.h:274
Definition: ChannelMessageHandlersImpl.h:29
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:69