DDS  ver. 3.6
BaseChannelImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef __DDS__BaseChannelImpl__
6 #define __DDS__BaseChannelImpl__
7 // STD
8 #include <chrono>
9 #include <deque>
10 #include <iostream>
11 #include <map>
12 #include <memory>
13 // BOOST
14 #include <boost/asio.hpp>
15 #include <boost/filesystem.hpp>
16 #include <boost/noncopyable.hpp>
17 #include <boost/uuid/uuid.hpp>
18 #include <boost/uuid/uuid_generators.hpp>
19 #include <boost/uuid/uuid_io.hpp>
20 // DDS
23 #include "CommandAttachmentImpl.h"
24 #include "Logger.h"
25 #include "MonitoringThread.h"
26 #include "ProtocolDef.h"
27 
28 namespace fs = boost::filesystem;
29 
30 namespace dds
31 {
32  namespace protocol_api
33  {
34  template <class T>
35  class CClientChannelImpl; // needed for friend class
36  template <class T>
37  class CServerChannelImpl; // needed for friend class
38  } // namespace protocol_api
39 } // namespace dds
40 
41 // Either raw message or command based processing can be used at a time
42 // Command based message processing
43 #define BEGIN_MSG_MAP(theClass) \
44  public: \
45  friend protocol_api::CBaseChannelImpl<theClass>; \
46  friend protocol_api::CClientChannelImpl<theClass>; \
47  friend protocol_api::CServerChannelImpl<theClass>; \
48  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
49  { \
50  using namespace dds; \
51  using namespace dds::protocol_api; \
52  CMonitoringThread::instance().updateIdle(); \
53  bool processed = true; \
54  (void)processed; \
55  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \
56  SSenderInfo sender; \
57  sender.m_ID = _currentMsg->header().m_ID; \
58  \
59  try \
60  { \
61  switch (currentCmd) \
62  { \
63  case cmdBINARY_ATTACHMENT: \
64  { \
65  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::ptr_t attahcmentPtr_t; \
66  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::decode(_currentMsg); \
67  processBinaryAttachmentCmd(sender, attachmentPtr); \
68  return; \
69  } \
70  case cmdBINARY_ATTACHMENT_START: \
71  { \
72  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::ptr_t attahcmentPtr_t; \
73  attahcmentPtr_t attachmentPtr = \
74  SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::decode(_currentMsg); \
75  processBinaryAttachmentStartCmd(sender, attachmentPtr); \
76  return; \
77  } \
78  case cmdHANDSHAKE: \
79  { \
80  SCommandAttachmentImpl<cmdHANDSHAKE>::ptr_t attachmentPtr = \
81  SCommandAttachmentImpl<cmdHANDSHAKE>::decode(_currentMsg); \
82  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
83  return; \
84  } \
85  case cmdLOBBY_MEMBER_HANDSHAKE: \
86  { \
87  SCommandAttachmentImpl<cmdLOBBY_MEMBER_HANDSHAKE>::ptr_t attachmentPtr = \
88  SCommandAttachmentImpl<cmdLOBBY_MEMBER_HANDSHAKE>::decode(_currentMsg); \
89  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
90  return; \
91  } \
92  case cmdREPLY_HANDSHAKE_OK: \
93  { \
94  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::ptr_t attachmentPtr = \
95  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::decode(_currentMsg); \
96  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
97  return; \
98  } \
99  case cmdREPLY_HANDSHAKE_ERR: \
100  { \
101  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::ptr_t attachmentPtr = \
102  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::decode(_currentMsg); \
103  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
104  return; \
105  }
106 
107 #define MESSAGE_HANDLER(msg, func) \
108  case msg: \
109  { \
110  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
111  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
112  LOG(dds::misc::debug) << "Processing " << g_cmdToString[msg] << " received from " << remoteEndIDString(); \
113  processed = func(attachmentPtr, sender); \
114  if (!processed) \
115  { \
116  if (!handlerExists(msg)) \
117  { \
118  LOG(dds::misc::error) << "The received message was not processed and has no registered handler: " \
119  << _currentMsg->toString(); \
120  } \
121  else \
122  { \
123  dispatchHandlers<>(msg, sender, attachmentPtr); \
124  } \
125  } \
126  break; \
127  }
128 
129 #define MESSAGE_HANDLER_DISPATCH(msg) \
130  case msg: \
131  { \
132  (void)processed; \
133  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
134  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
135  LOG(dds::misc::debug) << "Dispatching " << g_cmdToString[msg] << " received from " << remoteEndIDString(); \
136  if (!handlerExists(msg)) \
137  { \
138  LOG(dds::misc::error) << "The received message can't be dispatched, it has no registered handler: " \
139  << _currentMsg->toString(); \
140  } \
141  else \
142  { \
143  dispatchHandlers<>(msg, sender, attachmentPtr); \
144  } \
145  break; \
146  }
147 
148 #define END_MSG_MAP() \
149  default: \
150  LOG(dds::misc::error) << "The received message doesn't have a handler: " << _currentMsg->toString(); \
151  } \
152  } \
153  catch (std::exception & _e) \
154  { \
155  LOG(dds::misc::error) << "Channel processMessage (" << _currentMsg->toString() << "): " << _e.what(); \
156  } \
157  }
158 
159 // Raw message processing
160 #define RAW_MESSAGE_HANDLER(theClass, func) \
161  BEGIN_MSG_MAP(theClass) \
162  default: \
163  break; \
164  } \
165  processed = func(_currentMsg); \
166  if (!processed) \
167  { \
168  if (!handlerExists(ECmdType::cmdRAW_MSG)) \
169  { \
170  LOG(dds::misc::error) << "The received message was not processed and has no registered handler: " \
171  << _currentMsg->toString(); \
172  } \
173  else \
174  { \
175  dispatchHandlers(ECmdType::cmdRAW_MSG, sender, _currentMsg); \
176  } \
177  } \
178  } \
179  catch (std::exception & _e) \
180  { \
181  LOG(dds::misc::error) << "Channel processMessage: " << _e.what(); \
182  } \
183  }
184 
185 #define REGISTER_DEFAULT_REMOTE_ID_STRING \
186  std::string _remoteEndIDString() \
187  { \
188  return "DDS Server"; \
189  }
190 
191 namespace dds
192 {
193  namespace protocol_api
194  {
196  {
198  : m_bytesReceived(0)
199  , m_fileCrc32(0)
200  , m_srcCommand(0)
201  , m_fileSize(0)
202  , m_startTime()
203  {
204  }
205 
207  uint32_t m_bytesReceived;
208  std::string m_fileName;
209  uint32_t m_fileCrc32;
210  uint16_t m_srcCommand;
211  uint32_t m_fileSize;
212  std::mutex m_mutex;
213  std::chrono::steady_clock::time_point m_startTime;
214  };
215 
216  typedef std::shared_ptr<SBinaryAttachmentInfo> binaryAttachmentInfoPtr_t;
217 
218  template <class T>
219  class CBaseChannelImpl : public boost::noncopyable,
222  public std::enable_shared_from_this<T>
223  {
224  typedef std::deque<CProtocolMessage::protocolMessagePtr_t> protocolMessagePtrQueue_t;
225  typedef std::vector<boost::asio::mutable_buffer> protocolMessageBuffer_t;
226  typedef std::shared_ptr<boost::asio::deadline_timer> deadlineTimerPtr_t;
227 
228  public:
229  typedef std::shared_ptr<T> connectionPtr_t;
230  typedef std::weak_ptr<T> weakConnectionPtr_t;
231  typedef std::vector<connectionPtr_t> connectionPtrVector_t;
232  typedef std::vector<weakConnectionPtr_t> weakConnectionPtrVector_t;
233 
234  // Both are needed because unqualified name lookup terminates at the first scope that has anything with the
235  // right name
238 
239  protected:
240  CBaseChannelImpl<T>(boost::asio::io_context& _service, uint64_t _protocolHeaderID)
243  , m_isHandshakeOK(false)
245  , m_protocolHeaderID(_protocolHeaderID)
246  , m_ioContext(_service)
247  , m_socket(_service)
248  , m_started(false)
249  , m_currentMsg(std::make_shared<CProtocolMessage>())
250  , m_binaryAttachmentMap()
251  , m_binaryAttachmentMutex()
252  , m_deadlineTimer(
253  std::make_shared<boost::asio::deadline_timer>(_service, boost::posix_time::milliseconds(1000)))
254  , m_isShuttingDown(false)
255  {
256  try
257  {
259  LOG(dds::misc::debug) << "SID: " << m_sessionID;
260  }
261  catch (std::runtime_error& _error)
262  {
263  LOG(dds::misc::fatal) << _error.what();
264  }
265  }
266 
267  public:
269  {
270  LOG(dds::misc::info) << "Channel " << gChannelTypeName[m_channelType] << " destructor is called";
271  stop();
272  }
273 
274  template <class... Args>
275  static connectionPtr_t makeNew(boost::asio::io_context& _service,
276  uint64_t _protocolHeaderID,
277  Args&(... args))
278  {
279  connectionPtr_t newObject(new T(_service, _protocolHeaderID, args...));
280  return newObject;
281  }
282 
283  public:
284  bool isHanshakeOK() const
285  {
286  return m_isHandshakeOK;
287  }
288 
290  {
291  return m_channelType;
292  }
293 
294  void setChannelType(EChannelType _channelType)
295  {
296  m_channelType = _channelType;
297  }
298 
299  public:
300  void start()
301  {
302  if (m_started)
303  return;
304 
305  m_started = true;
306  // Prevent Asio TCP socket to be inherited by child when using fork/exec
307  ::fcntl(m_socket.native_handle(), F_SETFD, FD_CLOEXEC);
308  readHeader();
309  }
310 
311  void stop()
312  {
313  if (!m_started)
314  return;
315  m_started = false;
316  close();
317  }
318 
319  boost::asio::ip::tcp::socket& socket()
320  {
321  return m_socket;
322  }
323 
324  template <ECmdType _cmd>
325  void dequeueMsg()
326  {
327  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
328  m_writeQueue.erase(std::remove_if(std::begin(m_writeQueue),
329  std::end(m_writeQueue),
331  { return (_msg->header().m_cmd == _cmd); }),
332  std::end(m_writeQueue));
333  }
334 
336  {
337  static const size_t maxAccumulativeWriteQueueSize = 10000;
338  try
339  {
340  bool copyMessages = false;
341  {
342  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
343 
344  m_deadlineTimer->cancel();
345 
346  if (cmdUNKNOWN != _cmd)
347  m_accumulativeWriteQueue.push_back(_msg);
348 
349  copyMessages = m_accumulativeWriteQueue.size() > maxAccumulativeWriteQueueSize;
350  if (copyMessages)
351  {
353  << "copy accumulated queue to write queue "
354  "m_accumulativeWriteQueue.size="
355  << m_accumulativeWriteQueue.size() << " m_writeQueue.size=" << m_writeQueue.size();
356 
357  // copy queue to main queue
358  std::copy(m_accumulativeWriteQueue.begin(),
359  m_accumulativeWriteQueue.end(),
360  back_inserter((m_isHandshakeOK) ? m_writeQueue : m_writeQueueBeforeHandShake));
361  m_accumulativeWriteQueue.clear();
362  }
363 
364  auto self(this->shared_from_this());
365  m_deadlineTimer->async_wait(
366  [this, self](const boost::system::error_code& error)
367  {
368  if (!error)
369  {
370  bool copyMessages = false;
371  {
372  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
373  copyMessages = !m_accumulativeWriteQueue.empty();
374  // copy queue to main queue
375  if (copyMessages)
376  {
378  << "deadline_timer called: copy accumulated queue to write queue "
379  "m_accumulativeWriteQueue.size="
380  << m_accumulativeWriteQueue.size()
381  << " m_writeQueue.size=" << m_writeQueue.size();
382  std::copy(m_accumulativeWriteQueue.begin(),
383  m_accumulativeWriteQueue.end(),
384  back_inserter((m_isHandshakeOK) ? m_writeQueue
385  : m_writeQueueBeforeHandShake));
386  m_accumulativeWriteQueue.clear();
387  }
388  }
389  if (copyMessages)
390  pushMsg<cmdUNKNOWN>();
391  }
392  });
393 
394  LOG(dds::misc::debug) << "accumulativePushMsg: WriteQueue size = " << m_writeQueue.size()
395  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size()
396  << " accumulativeWriteQueue size = " << m_accumulativeWriteQueue.size()
397  << " msg = " << _msg->toString();
398  }
399  if (copyMessages)
400  pushMsg<cmdUNKNOWN>();
401  }
402  catch (std::exception& ex)
403  {
404  LOG(dds::misc::error) << "BaseChannelImpl can't push accumulative message: " << ex.what();
405  }
406  }
407 
408  template <ECmdType _cmd, class A>
409  void accumulativePushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0)
410  {
411  try
412  {
414  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
415  accumulativePushMsg(msg, _cmd);
416  }
417  catch (std::exception& ex)
418  {
419  LOG(dds::misc::error) << "BaseChannelImpl can't push accumulative message: " << ex.what();
420  }
421  }
422 
423  template <ECmdType _cmd>
424  void accumulativePushMsg(uint64_t _protocolHeaderID = 0)
425  {
426  SEmptyCmd cmd;
427  accumulativePushMsg<_cmd>(cmd, _protocolHeaderID);
428  }
429 
430  void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t /*_protocolHeaderID*/ = 0)
431  {
432  try
433  {
434  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
435  if (!m_isHandshakeOK)
436  {
437  if (isCmdAllowedWithoutHandshake(_cmd))
438  m_writeQueue.push_back(_msg);
439  else
440  m_writeQueueBeforeHandShake.push_back(_msg);
441  }
442  else
443  {
444  // copy the buffered queue, which has been collected before hand-shake
445  if (!m_writeQueueBeforeHandShake.empty())
446  {
447  std::copy(m_writeQueueBeforeHandShake.begin(),
448  m_writeQueueBeforeHandShake.end(),
449  back_inserter(m_writeQueue));
450  m_writeQueueBeforeHandShake.clear();
451  }
452 
453  // add the current message to the queue
454  if (cmdUNKNOWN != _cmd)
455  m_writeQueue.push_back(_msg);
456  }
457 
458  LOG(dds::misc::debug) << "pushMsg: WriteQueue size = " << m_writeQueue.size()
459  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size();
460  }
461  catch (std::exception& ex)
462  {
463  LOG(dds::misc::error) << "BaseChannelImpl can't push message: " << ex.what();
464  }
465 
466  // process standard async writing
467  auto self(this->shared_from_this());
468  m_ioContext.post(
469  [this, self]
470  {
471  try
472  {
473  writeMessage();
474  }
475  catch (std::exception& ex)
476  {
477  LOG(dds::misc::error) << "BaseChannelImpl can't write message: " << ex.what();
478  }
479  });
480  }
481 
482  template <ECmdType _cmd, class A>
483  void pushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0)
484  {
485  try
486  {
488  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
489  pushMsg(msg, _cmd, _protocolHeaderID);
490  }
491  catch (std::exception& ex)
492  {
493  LOG(dds::misc::error) << "BaseChannelImpl can't push message: " << ex.what();
494  }
495  }
496 
497  template <ECmdType _cmd>
498  void pushMsg(uint64_t _protocolHeaderID = 0)
499  {
500  SEmptyCmd cmd;
501  pushMsg<_cmd>(cmd, adjustProtocolHeaderID(_protocolHeaderID));
502  }
503 
504  template <ECmdType _cmd, class A>
505  void sendYourself(const A& _attachment, uint64_t _protocolHeaderID = 0)
506  {
508  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
509  // process received message
510  T* pThis = static_cast<T*>(this);
511  pThis->processMessage(msg);
512  }
513 
514  template <ECmdType _cmd>
515  void sendYourself(uint64_t _protocolHeaderID = 0)
516  {
517  SEmptyCmd cmd;
518  sendYourself<_cmd>(cmd, adjustProtocolHeaderID(_protocolHeaderID));
519  }
520 
521  void pushBinaryAttachmentCmd(const std::string& _srcFilePath,
522  const std::string& _fileName,
523  uint16_t _cmdSource,
524  uint64_t _protocolHeaderID)
525  {
527 
528  std::string srcFilePath(_srcFilePath);
529  // Resolve environment variables
530  dds::misc::smart_path(&srcFilePath);
531 
532  std::ifstream f(srcFilePath);
533  if (!f.is_open() || !f.good())
534  {
535  throw std::runtime_error("Could not open the source file: " + srcFilePath);
536  }
537  f.seekg(0, std::ios::end);
538  data.reserve(f.tellg());
539  f.seekg(0, std::ios::beg);
540  data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
541 
542  pushBinaryAttachmentCmd(data, _fileName, _cmdSource, _protocolHeaderID);
543  }
544 
546  const std::string& _fileName,
547  uint16_t _cmdSource,
548  uint64_t _protocolHeaderID)
549  {
550  static const int maxCommandSize = 65536;
551  int nofParts = (_data.size() % maxCommandSize == 0) ? (_data.size() / maxCommandSize)
552  : (_data.size() / maxCommandSize + 1);
553  boost::crc_32_type fileCrc32;
554  fileCrc32.process_bytes(&_data[0], _data.size());
555 
556  boost::uuids::uuid fileId = boost::uuids::random_generator()();
557 
558  // Generate start message
559  SBinaryAttachmentStartCmd start_cmd;
560  start_cmd.m_fileId = fileId;
561  start_cmd.m_srcCommand = _cmdSource;
562  start_cmd.m_fileName = _fileName;
563  start_cmd.m_fileSize = _data.size();
564  start_cmd.m_fileCrc32 = fileCrc32.checksum();
565  pushMsg<cmdBINARY_ATTACHMENT_START>(start_cmd, _protocolHeaderID);
566 
567  for (int i = 0; i < nofParts; ++i)
568  {
570  cmd.m_fileId = fileId;
571 
572  size_t offset = i * maxCommandSize;
573  size_t size = (i != (nofParts - 1)) ? maxCommandSize : (_data.size() - offset);
574 
575  auto iter_begin = _data.begin() + offset;
576  auto iter_end = iter_begin + size;
577  std::copy(iter_begin, iter_end, std::back_inserter(cmd.m_data));
578 
579  cmd.m_size = size;
580  cmd.m_offset = offset;
581 
582  boost::crc_32_type crc32;
583  crc32.process_bytes(&(*iter_begin), size);
584 
585  cmd.m_crc32 = crc32.checksum();
586 
587  pushMsg<cmdBINARY_ATTACHMENT>(cmd, _protocolHeaderID);
588  }
589  }
590 
593  {
594  boost::uuids::uuid fileId = _attachment->m_fileId;
595 
596  {
597  // Lock with global map mutex
598  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
599 
600  binaryAttachmentMap_t::iterator iter_info = m_binaryAttachmentMap.find(fileId);
601  bool exists = iter_info != m_binaryAttachmentMap.end();
602 
603  if (!exists)
604  {
605  m_binaryAttachmentMap[fileId] = std::make_shared<SBinaryAttachmentInfo>();
606  iter_info = m_binaryAttachmentMap.find(fileId);
607  iter_info->second->m_startTime = std::chrono::steady_clock::now();
608  iter_info->second->m_fileName = _attachment->m_fileName;
609  iter_info->second->m_fileSize = _attachment->m_fileSize;
610  iter_info->second->m_fileCrc32 = _attachment->m_fileCrc32;
611  iter_info->second->m_srcCommand = _attachment->m_srcCommand;
612  iter_info->second->m_data.resize(_attachment->m_fileSize);
613  }
614  }
615  }
616 
619  {
620  boost::uuids::uuid fileId = _attachment->m_fileId;
622  binaryAttachmentMap_t::iterator iter_info;
623 
624  {
625  // Lock with global map mutex
626  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
627 
628  iter_info = m_binaryAttachmentMap.find(fileId);
629  bool exists = iter_info != m_binaryAttachmentMap.end();
630 
631  if (!exists)
632  {
634  << "Received binary attachment [" << fileId << "] which does not exist. Skip this message.";
635  return;
636  }
637  info = iter_info->second;
638  }
639 
640  boost::crc_32_type crc32;
641  crc32.process_bytes(&_attachment->m_data[0], _attachment->m_data.size());
642 
643  if (crc32.checksum() != _attachment->m_crc32)
644  {
645  {
646  // Lock with global map mutex
647  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
648  // Remove info from map
649  m_binaryAttachmentMap.erase(iter_info);
650  }
651  std::stringstream ss;
652  ss << "Received binary attachment [" << fileId << "] has wrong CRC32 checksum: " << crc32.checksum()
653  << " instead of " << _attachment->m_crc32 << "offset=" << _attachment->m_offset
654  << " size=" << _attachment->m_size;
655  LOG(dds::misc::error) << ss.str();
656  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), dds::misc::error, info->m_srcCommand),
657  _sender.m_ID);
658  return;
659  }
660 
661  bool allBytesReceived = false;
662  {
663  // Lock with local mutex for each file
664  std::lock_guard<std::mutex> lock(info->m_mutex);
665 
666  info->m_bytesReceived += _attachment->m_size;
667 
668  std::copy(_attachment->m_data.begin(),
669  _attachment->m_data.end(),
670  info->m_data.begin() + _attachment->m_offset);
671 
672  allBytesReceived = info->m_bytesReceived == info->m_fileSize;
673  if (allBytesReceived)
674  {
675  // Check file CRC32
676  boost::crc_32_type crc32;
677  crc32.process_bytes(&info->m_data[0], info->m_data.size());
678 
679  if (crc32.checksum() != info->m_fileCrc32)
680  {
681  {
682  // Lock with global map mutex
683  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
684  // Remove info from map
685  m_binaryAttachmentMap.erase(iter_info);
686  }
687  std::stringstream ss;
688  ss << "Received binary file [" << fileId
689  << "] has wrong CRC32 checksum: " << crc32.checksum() << " instead of "
690  << _attachment->m_crc32;
691  LOG(dds::misc::error) << ss.str();
692  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), dds::misc::error, info->m_srcCommand),
693  _sender.m_ID);
694  return;
695  }
696 
697  fs::path dir(user_defaults_api::CUserDefaults::instance().getWrkDir());
698  const std::string filePath(dir.append(to_string(fileId)).string());
699  std::ofstream f(filePath.c_str());
700  if (!f.is_open() || !f.good())
701  {
702  {
703  // Lock with global map mutex
704  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
705  // Remove info from map
706  m_binaryAttachmentMap.erase(iter_info);
707  }
708  std::stringstream ss;
709  ss << "Could not open file: " << filePath;
710  LOG(dds::misc::error) << ss.str();
711  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), dds::misc::error, info->m_srcCommand),
712  _sender.m_ID);
713  return;
714  }
715 
716  for (const auto& v : info->m_data)
717  {
718  f << v;
719  }
720  f.close();
721 
722  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
723  std::chrono::microseconds downloadTime =
724  std::chrono::duration_cast<std::chrono::microseconds>(now - info->m_startTime);
725 
726  // Send message to yourself
728  reply_cmd.m_receivedFilePath = filePath;
729  reply_cmd.m_requestedFileName = info->m_fileName;
730  reply_cmd.m_srcCommand = info->m_srcCommand;
731  reply_cmd.m_downloadTime = downloadTime.count();
732  reply_cmd.m_receivedFileSize = info->m_fileSize;
733  sendYourself<cmdBINARY_ATTACHMENT_RECEIVED>(reply_cmd, _sender.m_ID);
734  }
735  }
736 
737  if (allBytesReceived)
738  {
739  // Lock with global map mutex
740  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
741  // Remove info from map
742  m_binaryAttachmentMap.erase(iter_info);
743  }
744  }
745 
746  bool started()
747  {
748  return m_started;
749  }
750 
751  std::string remoteEndIDString()
752  {
753  // give a chance child to execute something
754  try
755  {
756  T* pThis = static_cast<T*>(this);
757  std::stringstream ss;
758  ss << pThis->_remoteEndIDString() << " [" << socket().remote_endpoint().address().to_string()
759  << "]";
760  return ss.str();
761  }
762  catch (...)
763  {
764  return std::string();
765  }
766  }
767 
768  uint64_t getProtocolHeaderID() const
769  {
770  return m_protocolHeaderID;
771  }
772 
773  private:
774  uint64_t adjustProtocolHeaderID(uint64_t _protocolHeaderID) const
775  {
776  return _protocolHeaderID != 0 ? _protocolHeaderID : m_protocolHeaderID;
777  }
778 
779  void readHeader()
780  {
781  auto self(this->shared_from_this());
782  boost::asio::async_read(
783  m_socket,
784  boost::asio::buffer(m_currentMsg->data(), CProtocolMessage::header_length),
785  [this, self](boost::system::error_code ec, std::size_t length)
786  {
787  if (!ec)
788  {
789  LOG(dds::misc::debug) << "Received message HEADER from " << remoteEndIDString() << ": "
790  << length << " bytes, expected " << CProtocolMessage::header_length;
791  }
792  if (!ec && m_currentMsg->decode_header())
793  {
794  // If the header is ok, receive the body of the message
795  readBody();
796  }
797  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
798  {
800  << "Disconnect is detected while on read msg header: " << ec.message();
801  onDissconnect();
802  }
803  else
804  {
805  if (m_started)
806  LOG(dds::misc::error) << "Error reading message header: " << ec.message();
807  else
808  LOG(dds::misc::info) << "The stop signal is received, aborting current operation and "
809  "closing the connection: "
810  << ec.message();
811 
812  stop();
813  }
814  });
815  }
816 
817  void readBody()
818  {
819  if (m_currentMsg->body_length() == 0)
820  {
821  LOG(dds::misc::debug) << "Received message BODY from " << remoteEndIDString()
822  << ": no attachment: " << m_currentMsg->toString();
823  // process received message
824  T* pThis = static_cast<T*>(this);
825  pThis->processMessage(m_currentMsg);
826 
827  // Read next message
828  m_currentMsg = std::make_shared<CProtocolMessage>();
829  readHeader();
830  return;
831  }
832 
833  auto self(this->shared_from_this());
834  boost::asio::async_read(
835  m_socket,
836  boost::asio::buffer(m_currentMsg->body(), m_currentMsg->body_length()),
837  [this, self](boost::system::error_code ec, std::size_t length)
838  {
839  if (!ec)
840  {
841  LOG(dds::misc::debug) << "Received message BODY from " << remoteEndIDString() << " ("
842  << length << " bytes): " << m_currentMsg->toString();
843 
844  // process received message
845  T* pThis = static_cast<T*>(this);
846  pThis->processMessage(m_currentMsg);
847 
848  // Read next message
849  m_currentMsg = std::make_shared<CProtocolMessage>();
850  readHeader();
851  }
852  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
853  {
854  LOG(dds::misc::debug) << "Disconnect is detected while on read msg body: " << ec.message();
855  onDissconnect();
856  }
857  else
858  {
859  // don't show error if service is closed
860  if (m_started)
861  LOG(dds::misc::error) << "Error reading message body: " << ec.message();
862  else
863  LOG(dds::misc::info) << "The stop signal is received, aborting current operation and "
864  "closing the connection: "
865  << ec.message();
866  stop();
867  }
868  });
869  }
870 
871  private:
872  void writeMessage()
873  {
874  // To avoid sending of a bunch of small messages, we pack as many messages as possible into one write
875  // request (GH-38).
876  // Copy messages from the queue to send buffer (which should remain until the write handler is called)
877  {
878  std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
879  if (!m_writeBuffer.empty())
880  return; // a write is in progress, don't start anything
881 
882  if (m_writeQueue.empty())
883  return; // There is nothing to send.
884 
885  for (auto i : m_writeQueue)
886  {
888  << "Sending to " << remoteEndIDString() << " a message: " << i->toString();
889  if (cmdSHUTDOWN == i->header().m_cmd)
890  m_isShuttingDown = true;
891  m_writeBuffer.push_back(boost::asio::buffer(i->data(), i->length()));
892  m_writeBufferQueue.push_back(i);
893  }
894  m_writeQueue.clear();
895  }
896 
897  auto self(this->shared_from_this());
898  boost::asio::async_write(
899  m_socket,
900  m_writeBuffer,
901  [this, self](boost::system::error_code _ec, std::size_t _bytesTransferred)
902  {
903  try
904  {
905  if (!_ec)
906  {
907  LOG(dds::misc::debug) << "Message successfully sent to " << remoteEndIDString() << " ("
908  << _bytesTransferred << " bytes)";
909 
910  if (m_isShuttingDown)
911  {
913  << "Shutdown signal has been successfully sent to " << remoteEndIDString();
914  stop();
915  }
916 
917  // lock the modification of the container
918  {
919  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
920 
921  m_writeBuffer.clear();
922  m_writeBufferQueue.clear();
923  }
924  // we might need to send more messages
925  writeMessage();
926  }
927  else if ((boost::asio::error::eof == _ec) || (boost::asio::error::connection_reset == _ec))
928  {
930  << "Disconnect is detected while on write message: " << _ec.message();
931  onDissconnect();
932  }
933  else
934  {
935  // don't show error if service is closed
936  if (m_started)
938  << "Error sending to " << remoteEndIDString() << ": " << _ec.message();
939  else
941  << "The stop signal is received, aborting current operation and "
942  "closing the connection: "
943  << _ec.message();
944  stop();
945  }
946  }
947  catch (std::exception& ex)
948  {
949  LOG(dds::misc::error) << "BaseChannelImpl can't write message (callback): " << ex.what();
950  }
951  });
952  }
953 
954  void onDissconnect()
955  {
956  LOG(dds::misc::debug) << "The session was disconnected by the remote end: " << remoteEndIDString();
957  // stopping the channel
958  stop();
959 
960  // give a chance to children to execute something
961  this->dispatchHandlers(EChannelEvents::OnRemoteEndDissconnected, SSenderInfo());
962  }
963 
964  bool isCmdAllowedWithoutHandshake(ECmdType _cmd)
965  {
966  if (m_isHandshakeOK)
967  return true;
968  return (_cmd == cmdHANDSHAKE || _cmd == cmdREPLY_HANDSHAKE_OK || _cmd == cmdREPLY_HANDSHAKE_ERR);
969  }
970 
971  private:
972  void close()
973  {
974  m_socket.close();
975  }
976 
977  protected:
980  std::string m_sessionID;
982  boost::asio::io_context& m_ioContext;
983 
984  private:
985  boost::asio::ip::tcp::socket m_socket;
986  bool m_started;
988 
989  protocolMessagePtrQueue_t m_writeQueue;
990  protocolMessagePtrQueue_t m_writeQueueBeforeHandShake;
991 
992  std::mutex m_mutexWriteBuffer;
993  protocolMessageBuffer_t m_writeBuffer;
994  protocolMessagePtrQueue_t m_writeBufferQueue;
995 
996  // BinaryAttachment
997  typedef std::map<boost::uuids::uuid, binaryAttachmentInfoPtr_t> binaryAttachmentMap_t;
998  binaryAttachmentMap_t m_binaryAttachmentMap;
999  std::mutex m_binaryAttachmentMutex;
1000 
1001  protocolMessagePtrQueue_t m_accumulativeWriteQueue;
1002  deadlineTimerPtr_t m_deadlineTimer;
1003 
1004  bool m_isShuttingDown;
1005  };
1006  } // namespace protocol_api
1007 } // namespace dds
1008 
1009 #endif /* defined(__DDS__BaseChannelImpl__) */
bool m_isHandshakeOK
Definition: BaseChannelImpl.h:978
std::shared_ptr< T > connectionPtr_t
Definition: BaseChannelImpl.h:229
std::string m_receivedFilePath
Path to the received file.
Definition: BinaryAttachmentReceivedCmd.h:23
void processBinaryAttachmentCmd(const SSenderInfo &_sender, SCommandAttachmentImpl< cmdBINARY_ATTACHMENT >::ptr_t _attachment)
Definition: BaseChannelImpl.h:617
Definition: BaseEventHandlersImpl.h:48
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentCmd.h:26
EChannelType
Definition: ProtocolDef.h:15
Definition: BaseChannelImpl.h:195
Definition: ProtocolCommands.h:34
void accumulativePushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd)
Definition: BaseChannelImpl.h:335
uint32_t crc32(const std::string &_str)
Definition: CRC.h:26
EChannelType m_channelType
Definition: BaseChannelImpl.h:979
DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelEventHandlersImpl) DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelMessageHandlersImpl) protected
Definition: BaseChannelImpl.h:236
void pushBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource, uint64_t _protocolHeaderID)
Definition: BaseChannelImpl.h:521
Definition: ChannelEventHandlersImpl.h:26
uint16_t m_srcCommand
Source command which initiated file transport.
Definition: BinaryAttachmentReceivedCmd.h:25
const std::array< std::string, 3 > gChannelTypeName
Definition: ProtocolDef.h:22
uint32_t m_fileSize
File size in bytes.
Definition: BinaryAttachmentStartCmd.h:28
Definition: BinaryAttachmentReceivedCmd.h:15
Definition: CommandAttachmentImpl.h:54
bool started()
Definition: BaseChannelImpl.h:746
bool isHanshakeOK() const
Definition: BaseChannelImpl.h:284
uint32_t m_receivedFileSize
Number of recieved bytes.
Definition: BinaryAttachmentReceivedCmd.h:26
uint32_t m_fileCrc32
File checksum.
Definition: BinaryAttachmentStartCmd.h:29
std::string getLockedSID() const
Definition: UserDefaults.cpp:559
std::shared_ptr< SEmptyCmd > ptr_t
Definition: CommandAttachmentImpl.h:64
Definition: BinaryAttachmentStartCmd.h:18
void accumulativePushMsg(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:409
std::chrono::steady_clock::time_point m_startTime
Definition: BaseChannelImpl.h:213
Definition: ProtocolCommands.h:30
std::string m_sessionID
Definition: BaseChannelImpl.h:980
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &, uint64_t _ID)
Definition: CommandAttachmentImpl.h:71
void pushMsg(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:498
#define LOG(severity)
Definition: Logger.h:34
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseChannelImpl.h:230
boost::asio::io_context & m_ioContext
Definition: BaseChannelImpl.h:982
Definition: ProtocolMessage.h:76
Definition: SimpleMsgCmd.h:16
Definition: ProtocolCommands.h:35
uint32_t m_size
Size of this piece of binary data.
Definition: BinaryAttachmentCmd.h:28
Definition: BinaryAttachmentCmd.h:18
EChannelType getChannelType() const
Definition: BaseChannelImpl.h:289
void start()
Definition: BaseChannelImpl.h:300
uint32_t m_fileSize
Definition: BaseChannelImpl.h:211
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:124
Definition: ProtocolDef.h:17
std::string m_requestedFileName
Requested name of the file.
Definition: BinaryAttachmentReceivedCmd.h:24
Miscellaneous functions and helpers are located here.
Definition: AgentConnectionManager.h:13
void processBinaryAttachmentStartCmd(const SSenderInfo &, SCommandAttachmentImpl< cmdBINARY_ATTACHMENT_START >::ptr_t _attachment)
Definition: BaseChannelImpl.h:591
std::string remoteEndIDString()
Definition: BaseChannelImpl.h:751
uint32_t m_crc32
CRC checksum of this piece of binary data.
Definition: BinaryAttachmentCmd.h:29
uint32_t m_offset
Offset for this piece of binary data.
Definition: BinaryAttachmentCmd.h:27
static connectionPtr_t makeNew(boost::asio::io_context &_service, uint64_t _protocolHeaderID, Args &(... args))
Definition: BaseChannelImpl.h:275
Definition: BaseChannelImpl.h:219
void pushMsg(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:483
std::string m_fileName
Name of the file.
Definition: BinaryAttachmentStartCmd.h:27
Definition: BaseChannelImpl.h:37
void dequeueMsg()
Definition: BaseChannelImpl.h:325
void accumulativePushMsg(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:424
SBinaryAttachmentInfo()
Definition: BaseChannelImpl.h:197
Definition: def.h:147
Definition: BaseChannelImpl.h:35
uint32_t m_downloadTime
Time spent to download file [microseconds].
Definition: BinaryAttachmentReceivedCmd.h:27
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:37
uint32_t m_fileCrc32
Definition: BaseChannelImpl.h:209
std::vector< weakConnectionPtr_t > weakConnectionPtrVector_t
Definition: BaseChannelImpl.h:232
void pushBinaryAttachmentCmd(const dds::misc::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, uint64_t _protocolHeaderID)
Definition: BaseChannelImpl.h:545
Definition: def.h:146
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:95
std::shared_ptr< SBinaryAttachmentInfo > binaryAttachmentInfoPtr_t
Definition: BaseChannelImpl.h:216
void sendYourself(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:505
void sendYourself(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:515
Definition: def.h:150
std::mutex m_mutex
Definition: BaseChannelImpl.h:212
dds::misc::BYTEVector_t m_data
Piece of binary data.
Definition: BinaryAttachmentCmd.h:30
boost::asio::ip::tcp::socket & socket()
Definition: BaseChannelImpl.h:319
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentStartCmd.h:26
uint32_t m_bytesReceived
Definition: BaseChannelImpl.h:207
uint64_t m_ID
Definition: BaseEventHandlersImpl.h:50
uint16_t m_srcCommand
Source command which initiated file transport.
Definition: BinaryAttachmentStartCmd.h:30
Definition: def.h:149
std::string m_fileName
Definition: BaseChannelImpl.h:208
uint64_t getProtocolHeaderID() const
Definition: BaseChannelImpl.h:768
Definition: ProtocolCommands.h:31
std::vector< connectionPtr_t > connectionPtrVector_t
Definition: BaseChannelImpl.h:231
void setChannelType(EChannelType _channelType)
Definition: BaseChannelImpl.h:294
Definition: ChannelMessageHandlersImpl.h:20
dds::misc::BYTEVector_t m_data
Definition: BaseChannelImpl.h:206
uint16_t m_srcCommand
Definition: BaseChannelImpl.h:210
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
void stop()
Definition: BaseChannelImpl.h:311
void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t=0)
Definition: BaseChannelImpl.h:430
uint64_t m_protocolHeaderID
Definition: BaseChannelImpl.h:981
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:81