DDS  ver. 1.6
BaseChannelImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef __DDS__BaseChannelImpl__
6 #define __DDS__BaseChannelImpl__
7 // STD
8 #include <chrono>
9 #include <deque>
10 #include <iostream>
11 #include <map>
12 #include <memory>
13 // BOOST
14 #include <boost/noncopyable.hpp>
15 #pragma clang diagnostic push
16 #pragma clang diagnostic ignored "-Wunused-local-typedef"
17 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
18 #include <boost/asio.hpp>
19 #pragma clang diagnostic pop
20 
21 #pragma clang diagnostic push
22 #pragma clang diagnostic ignored "-Wdeprecated-register"
23 #include <boost/uuid/uuid.hpp>
24 #include <boost/uuid/uuid_generators.hpp>
25 #include <boost/uuid/uuid_io.hpp>
26 #pragma clang diagnostic pop
27 // DDS
28 #include "ChannelEventsImpl.h"
30 #include "CommandAttachmentImpl.h"
31 #include "Logger.h"
32 #include "MonitoringThread.h"
33 #include "StatImpl.h"
34 
35 namespace dds
36 {
37  namespace protocol_api
38  {
39  template <class T>
40  class CClientChannelImpl; // needed for friend class
41  template <class T>
42  class CServerChannelImpl; // needed for friend class
43  }
44 }
45 
46 #define BEGIN_MSG_MAP(theClass) \
47  public: \
48  friend protocol_api::CBaseChannelImpl<theClass>; \
49  friend protocol_api::CClientChannelImpl<theClass>; \
50  friend protocol_api::CServerChannelImpl<theClass>; \
51  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
52  { \
53  using namespace dds; \
54  using namespace dds::protocol_api; \
55  CMonitoringThread::instance().updateIdle(); \
56  bool processed = true; \
57  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \
58  \
59  try \
60  { \
61  switch (currentCmd) \
62  { \
63  case cmdBINARY_ATTACHMENT: \
64  { \
65  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::ptr_t attahcmentPtr_t; \
66  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::decode(_currentMsg); \
67  processBinaryAttachmentCmd(attachmentPtr); \
68  return; \
69  } \
70  case cmdBINARY_ATTACHMENT_START: \
71  { \
72  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::ptr_t attahcmentPtr_t; \
73  attahcmentPtr_t attachmentPtr = \
74  SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::decode(_currentMsg); \
75  processBinaryAttachmentStartCmd(attachmentPtr); \
76  return; \
77  } \
78  case cmdHANDSHAKE: \
79  { \
80  SCommandAttachmentImpl<cmdHANDSHAKE>::ptr_t attachmentPtr = \
81  SCommandAttachmentImpl<cmdHANDSHAKE>::decode(_currentMsg); \
82  dispatchMessageHandlers(currentCmd, attachmentPtr, this); \
83  return; \
84  } \
85  case cmdREPLY_HANDSHAKE_OK: \
86  { \
87  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::ptr_t attachmentPtr = \
88  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::decode(_currentMsg); \
89  dispatchMessageHandlers(currentCmd, attachmentPtr, this); \
90  return; \
91  } \
92  case cmdREPLY_HANDSHAKE_ERR: \
93  { \
94  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::ptr_t attachmentPtr = \
95  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::decode(_currentMsg); \
96  dispatchMessageHandlers(currentCmd, attachmentPtr, this); \
97  return; \
98  }
99 
100 #define MESSAGE_HANDLER(msg, func) \
101  case msg: \
102  { \
103  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
104  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
105  LOG(MiscCommon::debug) << "Processing " << g_cmdToString[msg] << " received from " << remoteEndIDString(); \
106  processed = func(attachmentPtr); \
107  if (!processed) \
108  { \
109  if (getNofMessageHandlers<msg>() == 0) \
110  { \
111  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
112  << _currentMsg->toString(); \
113  } \
114  else \
115  { \
116  dispatchMessageHandlers(msg, attachmentPtr, this); \
117  } \
118  } \
119  break; \
120  }
121 
122 #define END_MSG_MAP() \
123  default: \
124  LOG(MiscCommon::error) << "The received message doesn't have a handler: " << _currentMsg->toString(); \
125  } \
126  } \
127  catch (std::exception & _e) \
128  { \
129  LOG(MiscCommon::error) << "Channel processMessage: " << _e.what(); \
130  } \
131  }
132 
133 #define REGISTER_DEFAULT_REMOTE_ID_STRING \
134  std::string _remoteEndIDString() \
135  { \
136  return "DDS Server"; \
137  }
138 
139 namespace dds
140 {
141  namespace protocol_api
142  {
143  // Channel types
145  {
146  UNKNOWN = 0,
148  UI,
150  };
151  typedef std::vector<EChannelType> channelTypeVector_t;
152  const std::array<std::string, 5> gChannelTypeName{
153  { "unknown", "agent", "ui", "key_value_guard", "custom_command_guard" }
154  };
155 
157  {
159  : m_bytesReceived(0)
160  , m_fileCrc32(0)
161  , m_srcCommand(0)
162  , m_fileSize(0)
163  , m_startTime()
164  {
165  }
166 
168  uint32_t m_bytesReceived;
169  std::string m_fileName;
170  uint32_t m_fileCrc32;
171  uint16_t m_srcCommand;
172  uint32_t m_fileSize;
173  std::mutex m_mutex;
174  std::chrono::steady_clock::time_point m_startTime;
175  };
176 
177  typedef std::shared_ptr<SBinaryAttachmentInfo> binaryAttachmentInfoPtr_t;
178 
179  template <class T>
180  class CBaseChannelImpl : public boost::noncopyable,
181  public CChannelEventsImpl<T>,
183  public std::enable_shared_from_this<T>,
184  public CStatImpl
185  {
186  typedef std::function<void(T*)> handlerDisconnectEventFunction_t;
187  typedef std::deque<CProtocolMessage::protocolMessagePtr_t> protocolMessagePtrQueue_t;
188  typedef std::vector<boost::asio::mutable_buffer> protocolMessageBuffer_t;
189  typedef std::shared_ptr<boost::asio::deadline_timer> deadlineTimerPtr_t;
190 
191  public:
192  typedef std::shared_ptr<T> connectionPtr_t;
193  typedef std::weak_ptr<T> weakConnectionPtr_t;
194  typedef std::vector<connectionPtr_t> connectionPtrVector_t;
195  typedef std::vector<weakConnectionPtr_t> weakConnectionPtrVector_t;
196 
197  protected:
198  CBaseChannelImpl<T>(boost::asio::io_service& _service)
201  , CStatImpl(_service)
202  , m_isHandshakeOK(false)
203  , m_channelType(EChannelType::UNKNOWN)
204  , m_io_service(_service)
205  , m_socket(_service)
206  , m_started(false)
207  , m_currentMsg(std::make_shared<CProtocolMessage>())
208  , m_binaryAttachmentMap()
209  , m_binaryAttachmentMutex()
210  , m_deadlineTimer(
211  std::make_shared<boost::asio::deadline_timer>(_service, boost::posix_time::milliseconds(1000)))
212  , m_isShuttingDown(false)
213  {
214  }
215 
216  public:
218  {
219  LOG(MiscCommon::info) << "Channel " << gChannelTypeName[m_channelType] << " destructor is called";
220  stop();
221  }
222 
223  static connectionPtr_t makeNew(boost::asio::io_service& _service)
224  {
225  connectionPtr_t newObject(new T(_service));
226  return newObject;
227  }
228 
229  public:
230  bool isHanshakeOK() const
231  {
232  return m_isHandshakeOK;
233  }
234 
236  {
237  return m_channelType;
238  }
239 
240  void setChannelType(EChannelType _channelType)
241  {
242  m_channelType = _channelType;
243  }
244 
245  public:
246  void start()
247  {
248  if (m_started)
249  return;
250 
251  m_started = true;
252  // Prevent Asio TCP socket to be inherited by child when using fork/exec
253  ::fcntl(m_socket.native_handle(), F_SETFD, FD_CLOEXEC);
254  readHeader();
255  }
256 
257  void stop()
258  {
259  if (!m_started)
260  return;
261  m_started = false;
262  close();
263  }
264 
265  boost::asio::ip::tcp::socket& socket()
266  {
267  return m_socket;
268  }
269 
270  template <ECmdType _cmd>
271  void dequeueMsg()
272  {
273  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
274  m_writeQueue.erase(std::remove_if(std::begin(m_writeQueue),
275  std::end(m_writeQueue),
277  return (_msg->header().m_cmd == _cmd);
278  }),
279  std::end(m_writeQueue));
280  }
281 
282  template <ECmdType _cmd, class A>
283  void accumulativePushMsg(const A& _attachment)
284  {
285  static const size_t maxAccumulativeWriteQueueSize = 10000;
286  try
287  {
289 
290  bool copyMessages = false;
291  {
292  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
293 
294  m_deadlineTimer->cancel();
295 
296  if (cmdUNKNOWN != _cmd)
297  m_accumulativeWriteQueue.push_back(msg);
298 
299  copyMessages = m_accumulativeWriteQueue.size() > maxAccumulativeWriteQueueSize;
300  if (copyMessages)
301  {
303  << "copy accumulated queue to write queue "
304  "m_accumulativeWriteQueue.size="
305  << m_accumulativeWriteQueue.size() << " m_writeQueue.size=" << m_writeQueue.size();
306 
307  // copy queue to main queue
308  std::copy(m_accumulativeWriteQueue.begin(),
309  m_accumulativeWriteQueue.end(),
310  back_inserter((m_isHandshakeOK) ? m_writeQueue : m_writeQueueBeforeHandShake));
311  m_accumulativeWriteQueue.clear();
312  }
313 
314  auto self(this->shared_from_this());
315  m_deadlineTimer->async_wait([this, self](const boost::system::error_code& error) {
316  if (!error)
317  {
318  bool copyMessages = false;
319  {
320  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
321  copyMessages = !m_accumulativeWriteQueue.empty();
322  // copy queue to main queue
323  if (copyMessages)
324  {
326  << "deadline_timer called: copy accumulated queue to write queue "
327  "m_accumulativeWriteQueue.size="
328  << m_accumulativeWriteQueue.size()
329  << " m_writeQueue.size=" << m_writeQueue.size();
330  std::copy(m_accumulativeWriteQueue.begin(),
331  m_accumulativeWriteQueue.end(),
332  back_inserter((m_isHandshakeOK) ? m_writeQueue
333  : m_writeQueueBeforeHandShake));
334  m_accumulativeWriteQueue.clear();
335  }
336  }
337  if (copyMessages)
338  pushMsg<cmdUNKNOWN>();
339  }
340  });
341 
342  LOG(MiscCommon::debug) << "accumulativePushMsg: WriteQueue size = " << m_writeQueue.size()
343  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size()
344  << " accumulativeWriteQueue size = " << m_accumulativeWriteQueue.size()
345  << " attachment = " << _attachment;
346  }
347  if (copyMessages)
348  pushMsg<cmdUNKNOWN>();
349  }
350  catch (std::exception& ex)
351  {
352  LOG(MiscCommon::error) << "BaseChannelImpl can't push accumulative message: " << ex.what();
353  }
354  }
355 
356  template <ECmdType _cmd>
358  {
359  SEmptyCmd cmd;
360  accumulativePushMsg<_cmd>(cmd);
361  }
362 
363  template <ECmdType _cmd, class A>
364  void pushMsg(const A& _attachment)
365  {
366  try
367  {
369 
370  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
371  if (!m_isHandshakeOK)
372  {
373  if (isCmdAllowedWithoutHandshake(_cmd))
374  m_writeQueue.push_back(msg);
375  else
376  m_writeQueueBeforeHandShake.push_back(msg);
377  }
378  else
379  {
380  // copy the buffered queue, which has been collected before hand-shake
381  if (!m_writeQueueBeforeHandShake.empty())
382  {
383  std::copy(m_writeQueueBeforeHandShake.begin(),
384  m_writeQueueBeforeHandShake.end(),
385  back_inserter(m_writeQueue));
386  m_writeQueueBeforeHandShake.clear();
387  }
388 
389  // add the current message to the queue
390  if (cmdUNKNOWN != _cmd)
391  m_writeQueue.push_back(msg);
392  }
393 
394  LOG(MiscCommon::debug) << "pushMsg: WriteQueue size = " << m_writeQueue.size()
395  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size();
396  }
397  catch (std::exception& ex)
398  {
399  LOG(MiscCommon::error) << "BaseChannelImpl can't push message: " << ex.what();
400  }
401 
402  // process standard async writing
403  auto self(this->shared_from_this());
404  m_io_service.post([this, self] {
405  try
406  {
407  writeMessage();
408  }
409  catch (std::exception& ex)
410  {
411  LOG(MiscCommon::error) << "BaseChannelImpl can't write message: " << ex.what();
412  }
413  });
414  }
415 
416  template <ECmdType _cmd>
417  void pushMsg()
418  {
419  SEmptyCmd cmd;
420  pushMsg<_cmd>(cmd);
421  }
422 
423  template <ECmdType _cmd, class A>
424  void sendYourself(const A& _attachment)
425  {
427  // process received message
428  T* pThis = static_cast<T*>(this);
429  pThis->processMessage(msg);
430  }
431 
432  template <ECmdType _cmd>
434  {
435  SEmptyCmd cmd;
436  sendYourself<_cmd>(cmd);
437  }
438 
439  void pushBinaryAttachmentCmd(const std::string& _srcFilePath,
440  const std::string& _fileName,
441  uint16_t _cmdSource)
442  {
444 
445  std::string srcFilePath(_srcFilePath);
446  // Resolve environment variables
447  MiscCommon::smart_path(&srcFilePath);
448 
449  std::ifstream f(srcFilePath);
450  if (!f.is_open() || !f.good())
451  {
452  throw std::runtime_error("Could not open the source file: " + srcFilePath);
453  }
454  f.seekg(0, std::ios::end);
455  data.reserve(f.tellg());
456  f.seekg(0, std::ios::beg);
457  data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
458 
459  pushBinaryAttachmentCmd(data, _fileName, _cmdSource);
460  }
461 
463  const std::string& _fileName,
464  uint16_t _cmdSource)
465  {
466  static const int maxCommandSize = 65536;
467  int nofParts = (_data.size() % maxCommandSize == 0) ? (_data.size() / maxCommandSize)
468  : (_data.size() / maxCommandSize + 1);
469  boost::crc_32_type fileCrc32;
470  fileCrc32.process_bytes(&_data[0], _data.size());
471 
472  boost::uuids::uuid fileId = boost::uuids::random_generator()();
473 
474  // Generate start message
475  SBinaryAttachmentStartCmd start_cmd;
476  start_cmd.m_fileId = fileId;
477  start_cmd.m_srcCommand = _cmdSource;
478  start_cmd.m_fileName = _fileName;
479  start_cmd.m_fileSize = _data.size();
480  start_cmd.m_fileCrc32 = fileCrc32.checksum();
481  pushMsg<cmdBINARY_ATTACHMENT_START>(start_cmd);
482 
483  for (size_t i = 0; i < nofParts; ++i)
484  {
486  cmd.m_fileId = fileId;
487 
488  size_t offset = i * maxCommandSize;
489  size_t size = (i != (nofParts - 1)) ? maxCommandSize : (_data.size() - offset);
490 
491  auto iter_begin = _data.begin() + offset;
492  auto iter_end = iter_begin + size;
493  std::copy(iter_begin, iter_end, std::back_inserter(cmd.m_data));
494 
495  cmd.m_size = size;
496  cmd.m_offset = offset;
497 
498  boost::crc_32_type crc32;
499  crc32.process_bytes(&(*iter_begin), size);
500 
501  cmd.m_crc32 = crc32.checksum();
502 
503  pushMsg<cmdBINARY_ATTACHMENT>(cmd);
504  }
505  }
506 
508  {
509  boost::uuids::uuid fileId = _attachment->m_fileId;
510 
511  {
512  // Lock with global map mutex
513  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
514 
515  binaryAttachmentMap_t::iterator iter_info = m_binaryAttachmentMap.find(fileId);
516  bool exists = iter_info != m_binaryAttachmentMap.end();
517 
518  if (!exists)
519  {
520  m_binaryAttachmentMap[fileId] = std::make_shared<SBinaryAttachmentInfo>();
521  iter_info = m_binaryAttachmentMap.find(fileId);
522  iter_info->second->m_startTime = std::chrono::steady_clock::now();
523  iter_info->second->m_fileName = _attachment->m_fileName;
524  iter_info->second->m_fileSize = _attachment->m_fileSize;
525  iter_info->second->m_fileCrc32 = _attachment->m_fileCrc32;
526  iter_info->second->m_srcCommand = _attachment->m_srcCommand;
527  iter_info->second->m_data.resize(_attachment->m_fileSize);
528  }
529  }
530  }
531 
533  {
534  boost::uuids::uuid fileId = _attachment->m_fileId;
535  binaryAttachmentInfoPtr_t info;
536  binaryAttachmentMap_t::iterator iter_info;
537 
538  {
539  // Lock with global map mutex
540  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
541 
542  iter_info = m_binaryAttachmentMap.find(fileId);
543  bool exists = iter_info != m_binaryAttachmentMap.end();
544 
545  if (!exists)
546  {
548  << "Received binary attachment [" << fileId << "] which does not exist. Skip this message.";
549  return;
550  }
551  info = iter_info->second;
552  }
553 
554  boost::crc_32_type crc32;
555  crc32.process_bytes(&_attachment->m_data[0], _attachment->m_data.size());
556 
557  if (crc32.checksum() != _attachment->m_crc32)
558  {
559  {
560  // Lock with global map mutex
561  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
562  // Remove info from map
563  m_binaryAttachmentMap.erase(iter_info);
564  }
565  std::stringstream ss;
566  ss << "Received binary attachment [" << fileId << "] has wrong CRC32 checksum: " << crc32.checksum()
567  << " instead of " << _attachment->m_crc32 << "offset=" << _attachment->m_offset
568  << " size=" << _attachment->m_size;
569  LOG(MiscCommon::error) << ss.str();
570  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand));
571  return;
572  }
573 
574  bool allBytesReceived = false;
575  {
576  // Lock with local mutex for each file
577  std::lock_guard<std::mutex> lock(info->m_mutex);
578 
579  info->m_bytesReceived += _attachment->m_size;
580 
581  std::copy(_attachment->m_data.begin(),
582  _attachment->m_data.end(),
583  info->m_data.begin() + _attachment->m_offset);
584 
585  allBytesReceived = info->m_bytesReceived == info->m_fileSize;
586  if (allBytesReceived)
587  {
588  // Check file CRC32
589  boost::crc_32_type crc32;
590  crc32.process_bytes(&info->m_data[0], info->m_data.size());
591 
592  if (crc32.checksum() != info->m_fileCrc32)
593  {
594  {
595  // Lock with global map mutex
596  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
597  // Remove info from map
598  m_binaryAttachmentMap.erase(iter_info);
599  }
600  std::stringstream ss;
601  ss << "Received binary file [" << fileId
602  << "] has wrong CRC32 checksum: " << crc32.checksum() << " instead of "
603  << _attachment->m_crc32;
604  LOG(MiscCommon::error) << ss.str();
605  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand));
606  return;
607  }
608 
609  const std::string dir(user_defaults_api::CUserDefaults::getDDSPath());
610  const std::string fileName(dir + to_string(fileId));
611  std::ofstream f(fileName.c_str());
612  if (!f.is_open() || !f.good())
613  {
614  {
615  // Lock with global map mutex
616  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
617  // Remove info from map
618  m_binaryAttachmentMap.erase(iter_info);
619  }
620  std::stringstream ss;
621  ss << "Could not open file: " << fileName;
622  LOG(MiscCommon::error) << ss.str();
623  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand));
624  return;
625  }
626 
627  for (const auto& v : info->m_data)
628  {
629  f << v;
630  }
631  f.close();
632 
633  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
634  std::chrono::microseconds downloadTime =
635  std::chrono::duration_cast<std::chrono::microseconds>(now - info->m_startTime);
636 
637  // Send message to yourself
639  reply_cmd.m_receivedFilePath = fileName;
640  reply_cmd.m_requestedFileName = info->m_fileName;
641  reply_cmd.m_srcCommand = info->m_srcCommand;
642  reply_cmd.m_downloadTime = downloadTime.count();
643  reply_cmd.m_receivedFileSize = info->m_fileSize;
644  sendYourself<cmdBINARY_ATTACHMENT_RECEIVED>(reply_cmd);
645  }
646  }
647 
648  if (allBytesReceived)
649  {
650  // Lock with global map mutex
651  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
652  // Remove info from map
653  m_binaryAttachmentMap.erase(iter_info);
654  }
655  }
656 
657  void registerDisconnectEventHandler(handlerDisconnectEventFunction_t _handler)
658  {
659  m_disconnectEventHandler = _handler;
660  }
661 
662  bool started()
663  {
664  return m_started;
665  }
666 
667  std::string remoteEndIDString()
668  {
669  // give a chance child to execute something
670  try
671  {
672  T* pThis = static_cast<T*>(this);
673  std::stringstream ss;
674  ss << pThis->_remoteEndIDString() << " [" << socket().remote_endpoint().address().to_string()
675  << "]";
676  return ss.str();
677  }
678  catch (...)
679  {
680  return std::string();
681  }
682  }
683 
684  private:
685  void readHeader()
686  {
687  auto self(this->shared_from_this());
688  boost::asio::async_read(
689  m_socket,
690  boost::asio::buffer(m_currentMsg->data(), CProtocolMessage::header_length),
691  [this, self](boost::system::error_code ec, std::size_t length) {
692  if (!ec)
693  {
694  LOG(MiscCommon::debug) << "Received message HEADER from " << remoteEndIDString() << ": "
695  << length << " bytes, expected " << CProtocolMessage::header_length;
696  }
697  if (!ec && m_currentMsg->decode_header())
698  {
699  // If the header is ok, receive the body of the message
700  readBody();
701  }
702  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
703  {
705  << "Disconnect is detected while on read msg header: " << ec.message();
706  onDissconnect();
707  }
708  else
709  {
710  if (m_started)
711  LOG(MiscCommon::error) << "Error reading message header: " << ec.message();
712  else
713  LOG(MiscCommon::info) << "The stop signal is received, aborting current operation and "
714  "closing the connection: "
715  << ec.message();
716 
717  stop();
718  }
719  });
720  }
721 
722  void readBody()
723  {
724  if (m_currentMsg->body_length() == 0)
725  {
726  LOG(MiscCommon::debug) << "Received message BODY from " << remoteEndIDString()
727  << ": no attachment: " << m_currentMsg->toString();
728  // process received message
729  T* pThis = static_cast<T*>(this);
730  pThis->processMessage(m_currentMsg);
731 
732  // Log read statistics
733  this->logReadMessage(m_currentMsg);
734 
735  // Read next message
736  m_currentMsg->clear();
737  readHeader();
738  return;
739  }
740 
741  auto self(this->shared_from_this());
742  boost::asio::async_read(
743  m_socket,
744  boost::asio::buffer(m_currentMsg->body(), m_currentMsg->body_length()),
745  [this, self](boost::system::error_code ec, std::size_t length) {
746  if (!ec)
747  {
748  LOG(MiscCommon::debug) << "Received message BODY from " << remoteEndIDString() << " ("
749  << length << " bytes): " << m_currentMsg->toString();
750 
751  // process received message
752  T* pThis = static_cast<T*>(this);
753  pThis->processMessage(m_currentMsg);
754 
755  // Log read statistics
756  this->logReadMessage(m_currentMsg);
757 
758  // Read next message
759  m_currentMsg->clear();
760  readHeader();
761  }
762  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
763  {
764  LOG(MiscCommon::debug) << "Disconnect is detected while on read msg body: " << ec.message();
765  onDissconnect();
766  }
767  else
768  {
769  // don't show error if service is closed
770  if (m_started)
771  LOG(MiscCommon::error) << "Error reading message body: " << ec.message();
772  else
773  LOG(MiscCommon::info) << "The stop signal is received, aborting current operation and "
774  "closing the connection: "
775  << ec.message();
776  stop();
777  }
778  });
779  }
780 
781  private:
782  void writeMessage()
783  {
784  // To avoid sending of a bunch of small messages, we pack as many messages as possible into one write
785  // request (GH-38).
786  // Copy messages from the queue to send buffer (which should remain until the write handler is called)
787  {
788  std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
789  if (!m_writeBuffer.empty())
790  return; // a write is in progress, don't start anything
791 
792  if (m_writeQueue.empty())
793  return; // There is nothing to send.
794 
795  for (auto i : m_writeQueue)
796  {
798  << "Sending to " << remoteEndIDString() << " a message: " << i->toString();
799  if (cmdSHUTDOWN == i->header().m_cmd)
800  m_isShuttingDown = true;
801  m_writeBuffer.push_back(boost::asio::buffer(i->data(), i->length()));
802  m_writeBufferQueue.push_back(i);
803  }
804  m_writeQueue.clear();
805  }
806 
807  auto self(this->shared_from_this());
808  boost::asio::async_write(
809  m_socket,
810  m_writeBuffer,
811  [this, self](boost::system::error_code _ec, std::size_t _bytesTransferred) {
812  try
813  {
814  if (!_ec)
815  {
816  LOG(MiscCommon::debug) << "Message successfully sent to " << remoteEndIDString() << " ("
817  << _bytesTransferred << " bytes)";
818 
819  if (m_isShuttingDown)
820  {
822  << "Shutdown signal has been successfully sent to " << remoteEndIDString();
823  stop();
824  }
825 
826  // lock the modification of the container
827  {
828  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
829 
830  // Log write statistics
831  this->logWriteMessages(m_writeBufferQueue);
832 
833  m_writeBuffer.clear();
834  m_writeBufferQueue.clear();
835  }
836  // we might need to send more messages
837  writeMessage();
838  }
839  else if ((boost::asio::error::eof == _ec) || (boost::asio::error::connection_reset == _ec))
840  {
842  << "Disconnect is detected while on write message: " << _ec.message();
843  onDissconnect();
844  }
845  else
846  {
847  // don't show error if service is closed
848  if (m_started)
850  << "Error sending to " << remoteEndIDString() << ": " << _ec.message();
851  else
853  << "The stop signal is received, aborting current operation and "
854  "closing the connection: "
855  << _ec.message();
856  stop();
857  }
858  }
859  catch (std::exception& ex)
860  {
861  LOG(MiscCommon::error) << "BaseChannelImpl can't write message (callback): " << ex.what();
862  }
863  });
864  }
865 
866  void onDissconnect()
867  {
868  LOG(MiscCommon::debug) << "The session was disconnected by the remote end: " << remoteEndIDString();
869  // stopping the channel
870  stop();
871 
872  // give a chance to children to execute something
874 
875  // Call external event handler
876  T* pThis = static_cast<T*>(this);
877  if (m_disconnectEventHandler)
878  m_disconnectEventHandler(pThis);
879  }
880 
881  bool isCmdAllowedWithoutHandshake(ECmdType _cmd)
882  {
883  if (m_isHandshakeOK)
884  return true;
885  return (_cmd == cmdHANDSHAKE || _cmd == cmdREPLY_HANDSHAKE_OK || _cmd == cmdREPLY_HANDSHAKE_ERR);
886  }
887 
888  private:
889  void close()
890  {
891  m_socket.close();
892  }
893 
894  private:
895  handlerDisconnectEventFunction_t m_disconnectEventHandler;
896 
897  protected:
900 
901  private:
902  boost::asio::io_service& m_io_service;
903  boost::asio::ip::tcp::socket m_socket;
904  bool m_started;
906 
907  protocolMessagePtrQueue_t m_writeQueue;
908  protocolMessagePtrQueue_t m_writeQueueBeforeHandShake;
909 
910  std::mutex m_mutexWriteBuffer;
911  protocolMessageBuffer_t m_writeBuffer;
912  protocolMessagePtrQueue_t m_writeBufferQueue;
913 
914  // BinaryAttachment
915  typedef std::map<boost::uuids::uuid, binaryAttachmentInfoPtr_t> binaryAttachmentMap_t;
916  binaryAttachmentMap_t m_binaryAttachmentMap;
917  std::mutex m_binaryAttachmentMutex;
918 
919  protocolMessagePtrQueue_t m_accumulativeWriteQueue;
920  deadlineTimerPtr_t m_deadlineTimer;
921 
922  bool m_isShuttingDown;
923  };
924  }
925 }
926 
927 #endif /* defined(__DDS__BaseChannelImpl__) */
bool m_isHandshakeOK
Definition: BaseChannelImpl.h:898
std::shared_ptr< T > connectionPtr_t
Definition: BaseChannelImpl.h:192
std::string m_receivedFilePath
Path to the received file.
Definition: BinaryAttachmentReceivedCmd.h:23
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentCmd.h:29
void accumulativePushMsg(const A &_attachment)
Definition: BaseChannelImpl.h:283
EChannelType
Definition: BaseChannelImpl.h:144
Definition: BaseChannelImpl.h:156
Definition: ProtocolCommands.h:33
EChannelType m_channelType
Definition: BaseChannelImpl.h:899
uint32_t m_fileSize
File size in bytes.
Definition: BinaryAttachmentStartCmd.h:31
Definition: BinaryAttachmentReceivedCmd.h:15
static std::string getDDSPath()
Definition: UserDefaults.cpp:207
Definition: CommandAttachmentImpl.h:55
bool started()
Definition: BaseChannelImpl.h:662
bool isHanshakeOK() const
Definition: BaseChannelImpl.h:230
Definition: BaseChannelImpl.h:148
uint32_t m_fileCrc32
File checksum.
Definition: BinaryAttachmentStartCmd.h:32
void pushBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource)
Definition: BaseChannelImpl.h:439
std::shared_ptr< SEmptyCmd > ptr_t
Definition: CommandAttachmentImpl.h:65
Definition: BinaryAttachmentStartCmd.h:21
std::chrono::steady_clock::time_point m_startTime
Definition: BaseChannelImpl.h:174
Definition: ProtocolCommands.h:29
void processBinaryAttachmentStartCmd(SCommandAttachmentImpl< cmdBINARY_ATTACHMENT_START >::ptr_t _attachment)
Definition: BaseChannelImpl.h:507
void registerDisconnectEventHandler(handlerDisconnectEventFunction_t _handler)
Definition: BaseChannelImpl.h:657
MiscCommon::BYTEVector_t m_data
Definition: BaseChannelImpl.h:167
const std::array< std::string, 5 > gChannelTypeName
Definition: BaseChannelImpl.h:152
void accumulativePushMsg()
Definition: BaseChannelImpl.h:357
#define LOG(severity)
Definition: Logger.h:54
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseChannelImpl.h:193
Definition: SimpleMsgCmd.h:16
Definition: ProtocolCommands.h:34
uint32_t m_size
Size of this piece of binary data.
Definition: BinaryAttachmentCmd.h:31
Definition: BinaryAttachmentCmd.h:21
EChannelType getChannelType() const
Definition: BaseChannelImpl.h:235
void start()
Definition: BaseChannelImpl.h:246
uint32_t m_fileSize
Definition: BaseChannelImpl.h:172
Definition: BaseChannelImpl.h:146
static connectionPtr_t makeNew(boost::asio::io_service &_service)
Definition: BaseChannelImpl.h:223
Definition: dds-agent/src/AgentConnectionManager.h:16
std::string remoteEndIDString()
Definition: BaseChannelImpl.h:667
void pushMsg(const A &_attachment)
Definition: BaseChannelImpl.h:364
uint32_t m_crc32
CRC checksum of this piece of binary data.
Definition: BinaryAttachmentCmd.h:32
uint32_t m_offset
Offset for this piece of binary data.
Definition: BinaryAttachmentCmd.h:30
Definition: BaseChannelImpl.h:180
Definition: BaseChannelImpl.h:147
This class implements slots subscription and slots calls associated with certain channel events...
Definition: ChannelEventsImpl.h:27
std::string m_fileName
Name of the file.
Definition: BinaryAttachmentStartCmd.h:30
Definition: BaseChannelImpl.h:42
void dequeueMsg()
Definition: BaseChannelImpl.h:271
SBinaryAttachmentInfo()
Definition: BaseChannelImpl.h:158
void sendYourself()
Definition: BaseChannelImpl.h:433
Definition: BaseChannelImpl.h:40
void pushMsg()
Definition: BaseChannelImpl.h:417
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &)
Definition: CommandAttachmentImpl.h:72
Definition: def.h:152
uint32_t m_fileCrc32
Definition: BaseChannelImpl.h:170
std::vector< EChannelType > channelTypeVector_t
Definition: BaseChannelImpl.h:151
Definition: def.h:149
std::vector< weakConnectionPtr_t > weakConnectionPtrVector_t
Definition: BaseChannelImpl.h:195
void sendYourself(const A &_attachment)
Definition: BaseChannelImpl.h:424
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:93
Definition: StatImpl.h:112
std::shared_ptr< SBinaryAttachmentInfo > binaryAttachmentInfoPtr_t
Definition: BaseChannelImpl.h:177
MiscCommon::BYTEVector_t m_data
Piece of binary data.
Definition: BinaryAttachmentCmd.h:33
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
std::mutex m_mutex
Definition: BaseChannelImpl.h:173
boost::asio::ip::tcp::socket & socket()
Definition: BaseChannelImpl.h:265
Definition: ChannelEventsImpl.h:20
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentStartCmd.h:29
uint32_t m_bytesReceived
Definition: BaseChannelImpl.h:168
uint16_t m_srcCommand
Source command which initiated file transport.
Definition: BinaryAttachmentStartCmd.h:33
void pushBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource)
Definition: BaseChannelImpl.h:462
std::string m_fileName
Definition: BaseChannelImpl.h:169
Definition: def.h:150
Definition: BaseChannelImpl.h:149
Definition: ProtocolCommands.h:30
std::vector< connectionPtr_t > connectionPtrVector_t
Definition: BaseChannelImpl.h:194
void setChannelType(EChannelType _channelType)
Definition: BaseChannelImpl.h:240
void processBinaryAttachmentCmd(SCommandAttachmentImpl< cmdBINARY_ATTACHMENT >::ptr_t _attachment)
Definition: BaseChannelImpl.h:532
Definition: ChannelMessageHandlersImpl.h:29
uint16_t m_srcCommand
Definition: BaseChannelImpl.h:171
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
void stop()
Definition: BaseChannelImpl.h:257
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:69