DDS  ver. 3.4
BaseChannelImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef __DDS__BaseChannelImpl__
6 #define __DDS__BaseChannelImpl__
7 // STD
8 #include <chrono>
9 #include <deque>
10 #include <iostream>
11 #include <map>
12 #include <memory>
13 // BOOST
14 #include <boost/asio.hpp>
15 #include <boost/filesystem.hpp>
16 #include <boost/noncopyable.hpp>
17 
18 #pragma clang diagnostic push
19 #pragma clang diagnostic ignored "-Wdeprecated-register"
20 #include <boost/uuid/uuid.hpp>
21 #include <boost/uuid/uuid_generators.hpp>
22 #include <boost/uuid/uuid_io.hpp>
23 #pragma clang diagnostic pop
24 // DDS
27 #include "CommandAttachmentImpl.h"
28 #include "Logger.h"
29 #include "MonitoringThread.h"
30 #include "ProtocolDef.h"
31 #include "StatImpl.h"
32 
33 namespace fs = boost::filesystem;
34 
35 namespace dds
36 {
37  namespace protocol_api
38  {
39  template <class T>
40  class CClientChannelImpl; // needed for friend class
41  template <class T>
42  class CServerChannelImpl; // needed for friend class
43  } // namespace protocol_api
44 } // namespace dds
45 
46 // Either raw message or command based processing can be used at a time
47 // Command based message processing
48 #define BEGIN_MSG_MAP(theClass) \
49  public: \
50  friend protocol_api::CBaseChannelImpl<theClass>; \
51  friend protocol_api::CClientChannelImpl<theClass>; \
52  friend protocol_api::CServerChannelImpl<theClass>; \
53  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
54  { \
55  using namespace dds; \
56  using namespace dds::protocol_api; \
57  CMonitoringThread::instance().updateIdle(); \
58  bool processed = true; \
59  (void)processed; \
60  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \
61  SSenderInfo sender; \
62  sender.m_ID = _currentMsg->header().m_ID; \
63  \
64  try \
65  { \
66  switch (currentCmd) \
67  { \
68  case cmdBINARY_ATTACHMENT: \
69  { \
70  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::ptr_t attahcmentPtr_t; \
71  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::decode(_currentMsg); \
72  processBinaryAttachmentCmd(sender, attachmentPtr); \
73  return; \
74  } \
75  case cmdBINARY_ATTACHMENT_START: \
76  { \
77  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::ptr_t attahcmentPtr_t; \
78  attahcmentPtr_t attachmentPtr = \
79  SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::decode(_currentMsg); \
80  processBinaryAttachmentStartCmd(sender, attachmentPtr); \
81  return; \
82  } \
83  case cmdHANDSHAKE: \
84  { \
85  SCommandAttachmentImpl<cmdHANDSHAKE>::ptr_t attachmentPtr = \
86  SCommandAttachmentImpl<cmdHANDSHAKE>::decode(_currentMsg); \
87  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
88  return; \
89  } \
90  case cmdLOBBY_MEMBER_HANDSHAKE: \
91  { \
92  SCommandAttachmentImpl<cmdLOBBY_MEMBER_HANDSHAKE>::ptr_t attachmentPtr = \
93  SCommandAttachmentImpl<cmdLOBBY_MEMBER_HANDSHAKE>::decode(_currentMsg); \
94  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
95  return; \
96  } \
97  case cmdREPLY_HANDSHAKE_OK: \
98  { \
99  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::ptr_t attachmentPtr = \
100  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::decode(_currentMsg); \
101  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
102  return; \
103  } \
104  case cmdREPLY_HANDSHAKE_ERR: \
105  { \
106  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::ptr_t attachmentPtr = \
107  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::decode(_currentMsg); \
108  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
109  return; \
110  }
111 
112 #define MESSAGE_HANDLER(msg, func) \
113  case msg: \
114  { \
115  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
116  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
117  LOG(MiscCommon::debug) << "Processing " << g_cmdToString[msg] << " received from " << remoteEndIDString(); \
118  processed = func(attachmentPtr, sender); \
119  if (!processed) \
120  { \
121  if (!handlerExists(msg)) \
122  { \
123  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
124  << _currentMsg->toString(); \
125  } \
126  else \
127  { \
128  dispatchHandlers<>(msg, sender, attachmentPtr); \
129  } \
130  } \
131  break; \
132  }
133 
134 #define MESSAGE_HANDLER_DISPATCH(msg) \
135  case msg: \
136  { \
137  (void)processed; \
138  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
139  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
140  LOG(MiscCommon::debug) << "Dispatching " << g_cmdToString[msg] << " received from " << remoteEndIDString(); \
141  if (!handlerExists(msg)) \
142  { \
143  LOG(MiscCommon::error) << "The received message can't be dispatched, it has no registered handler: " \
144  << _currentMsg->toString(); \
145  } \
146  else \
147  { \
148  dispatchHandlers<>(msg, sender, attachmentPtr); \
149  } \
150  break; \
151  }
152 
153 #define END_MSG_MAP() \
154  default: \
155  LOG(MiscCommon::error) << "The received message doesn't have a handler: " << _currentMsg->toString(); \
156  } \
157  } \
158  catch (std::exception & _e) \
159  { \
160  LOG(MiscCommon::error) << "Channel processMessage (" << _currentMsg->toString() << "): " << _e.what(); \
161  } \
162  }
163 
164 // Raw message processing
165 #define RAW_MESSAGE_HANDLER(theClass, func) \
166  BEGIN_MSG_MAP(theClass) \
167  default: \
168  break; \
169  } \
170  processed = func(_currentMsg); \
171  if (!processed) \
172  { \
173  if (!handlerExists(ECmdType::cmdRAW_MSG)) \
174  { \
175  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
176  << _currentMsg->toString(); \
177  } \
178  else \
179  { \
180  dispatchHandlers(ECmdType::cmdRAW_MSG, sender, _currentMsg); \
181  } \
182  } \
183  } \
184  catch (std::exception & _e) \
185  { \
186  LOG(MiscCommon::error) << "Channel processMessage: " << _e.what(); \
187  } \
188  }
189 
190 #define REGISTER_DEFAULT_REMOTE_ID_STRING \
191  std::string _remoteEndIDString() \
192  { \
193  return "DDS Server"; \
194  }
195 
196 namespace dds
197 {
198  namespace protocol_api
199  {
201  {
203  : m_bytesReceived(0)
204  , m_fileCrc32(0)
205  , m_srcCommand(0)
206  , m_fileSize(0)
207  , m_startTime()
208  {
209  }
210 
212  uint32_t m_bytesReceived;
213  std::string m_fileName;
214  uint32_t m_fileCrc32;
215  uint16_t m_srcCommand;
216  uint32_t m_fileSize;
217  std::mutex m_mutex;
218  std::chrono::steady_clock::time_point m_startTime;
219  };
220 
221  typedef std::shared_ptr<SBinaryAttachmentInfo> binaryAttachmentInfoPtr_t;
222 
223  template <class T>
224  class CBaseChannelImpl : public boost::noncopyable,
227  public std::enable_shared_from_this<T>,
228  public CStatImpl
229  {
230  typedef std::deque<CProtocolMessage::protocolMessagePtr_t> protocolMessagePtrQueue_t;
231  typedef std::vector<boost::asio::mutable_buffer> protocolMessageBuffer_t;
232  typedef std::shared_ptr<boost::asio::deadline_timer> deadlineTimerPtr_t;
233 
234  public:
235  typedef std::shared_ptr<T> connectionPtr_t;
236  typedef std::weak_ptr<T> weakConnectionPtr_t;
237  typedef std::vector<connectionPtr_t> connectionPtrVector_t;
238  typedef std::vector<weakConnectionPtr_t> weakConnectionPtrVector_t;
239 
240  // Both are needed because unqualified name lookup terminates at the first scope that has anything with the
241  // right name
244 
245  protected:
246  CBaseChannelImpl<T>(boost::asio::io_context& _service, uint64_t _protocolHeaderID)
249  , CStatImpl(_service)
250  , m_isHandshakeOK(false)
252  , m_protocolHeaderID(_protocolHeaderID)
253  , m_ioContext(_service)
254  , m_socket(_service)
255  , m_started(false)
256  , m_currentMsg(std::make_shared<CProtocolMessage>())
257  , m_binaryAttachmentMap()
258  , m_binaryAttachmentMutex()
259  , m_deadlineTimer(
260  std::make_shared<boost::asio::deadline_timer>(_service, boost::posix_time::milliseconds(1000)))
261  , m_isShuttingDown(false)
262  {
263  try
264  {
266  LOG(MiscCommon::debug) << "SID: " << m_sessionID;
267  }
268  catch (std::runtime_error& _error)
269  {
270  LOG(MiscCommon::fatal) << _error.what();
271  }
272  }
273 
274  public:
276  {
277  LOG(MiscCommon::info) << "Channel " << gChannelTypeName[m_channelType] << " destructor is called";
278  stop();
279  }
280 
281  template <class... Args>
282  static connectionPtr_t makeNew(boost::asio::io_context& _service,
283  uint64_t _protocolHeaderID,
284  Args&(... args))
285  {
286  connectionPtr_t newObject(new T(_service, _protocolHeaderID, args...));
287  return newObject;
288  }
289 
290  public:
291  bool isHanshakeOK() const
292  {
293  return m_isHandshakeOK;
294  }
295 
297  {
298  return m_channelType;
299  }
300 
301  void setChannelType(EChannelType _channelType)
302  {
303  m_channelType = _channelType;
304  }
305 
306  public:
307  void start()
308  {
309  if (m_started)
310  return;
311 
312  m_started = true;
313  // Prevent Asio TCP socket to be inherited by child when using fork/exec
314  ::fcntl(m_socket.native_handle(), F_SETFD, FD_CLOEXEC);
315  readHeader();
316  }
317 
318  void stop()
319  {
320  if (!m_started)
321  return;
322  m_started = false;
323  close();
324  }
325 
326  boost::asio::ip::tcp::socket& socket()
327  {
328  return m_socket;
329  }
330 
331  template <ECmdType _cmd>
332  void dequeueMsg()
333  {
334  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
335  m_writeQueue.erase(std::remove_if(std::begin(m_writeQueue),
336  std::end(m_writeQueue),
338  return (_msg->header().m_cmd == _cmd);
339  }),
340  std::end(m_writeQueue));
341  }
342 
344  {
345  static const size_t maxAccumulativeWriteQueueSize = 10000;
346  try
347  {
348  bool copyMessages = false;
349  {
350  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
351 
352  m_deadlineTimer->cancel();
353 
354  if (cmdUNKNOWN != _cmd)
355  m_accumulativeWriteQueue.push_back(_msg);
356 
357  copyMessages = m_accumulativeWriteQueue.size() > maxAccumulativeWriteQueueSize;
358  if (copyMessages)
359  {
361  << "copy accumulated queue to write queue "
362  "m_accumulativeWriteQueue.size="
363  << m_accumulativeWriteQueue.size() << " m_writeQueue.size=" << m_writeQueue.size();
364 
365  // copy queue to main queue
366  std::copy(m_accumulativeWriteQueue.begin(),
367  m_accumulativeWriteQueue.end(),
368  back_inserter((m_isHandshakeOK) ? m_writeQueue : m_writeQueueBeforeHandShake));
369  m_accumulativeWriteQueue.clear();
370  }
371 
372  auto self(this->shared_from_this());
373  m_deadlineTimer->async_wait([this, self](const boost::system::error_code& error) {
374  if (!error)
375  {
376  bool copyMessages = false;
377  {
378  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
379  copyMessages = !m_accumulativeWriteQueue.empty();
380  // copy queue to main queue
381  if (copyMessages)
382  {
384  << "deadline_timer called: copy accumulated queue to write queue "
385  "m_accumulativeWriteQueue.size="
386  << m_accumulativeWriteQueue.size()
387  << " m_writeQueue.size=" << m_writeQueue.size();
388  std::copy(m_accumulativeWriteQueue.begin(),
389  m_accumulativeWriteQueue.end(),
390  back_inserter((m_isHandshakeOK) ? m_writeQueue
391  : m_writeQueueBeforeHandShake));
392  m_accumulativeWriteQueue.clear();
393  }
394  }
395  if (copyMessages)
396  pushMsg<cmdUNKNOWN>();
397  }
398  });
399 
400  LOG(MiscCommon::debug) << "accumulativePushMsg: WriteQueue size = " << m_writeQueue.size()
401  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size()
402  << " accumulativeWriteQueue size = " << m_accumulativeWriteQueue.size()
403  << " msg = " << _msg->toString();
404  }
405  if (copyMessages)
406  pushMsg<cmdUNKNOWN>();
407  }
408  catch (std::exception& ex)
409  {
410  LOG(MiscCommon::error) << "BaseChannelImpl can't push accumulative message: " << ex.what();
411  }
412  }
413 
414  template <ECmdType _cmd, class A>
415  void accumulativePushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0)
416  {
417  try
418  {
420  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
421  accumulativePushMsg(msg, _cmd);
422  }
423  catch (std::exception& ex)
424  {
425  LOG(MiscCommon::error) << "BaseChannelImpl can't push accumulative message: " << ex.what();
426  }
427  }
428 
429  template <ECmdType _cmd>
430  void accumulativePushMsg(uint64_t _protocolHeaderID = 0)
431  {
432  SEmptyCmd cmd;
433  accumulativePushMsg<_cmd>(cmd, _protocolHeaderID);
434  }
435 
436  void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _protocolHeaderID = 0)
437  {
438  try
439  {
440  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
441  if (!m_isHandshakeOK)
442  {
443  if (isCmdAllowedWithoutHandshake(_cmd))
444  m_writeQueue.push_back(_msg);
445  else
446  m_writeQueueBeforeHandShake.push_back(_msg);
447  }
448  else
449  {
450  // copy the buffered queue, which has been collected before hand-shake
451  if (!m_writeQueueBeforeHandShake.empty())
452  {
453  std::copy(m_writeQueueBeforeHandShake.begin(),
454  m_writeQueueBeforeHandShake.end(),
455  back_inserter(m_writeQueue));
456  m_writeQueueBeforeHandShake.clear();
457  }
458 
459  // add the current message to the queue
460  if (cmdUNKNOWN != _cmd)
461  m_writeQueue.push_back(_msg);
462  }
463 
464  LOG(MiscCommon::debug) << "pushMsg: WriteQueue size = " << m_writeQueue.size()
465  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size();
466  }
467  catch (std::exception& ex)
468  {
469  LOG(MiscCommon::error) << "BaseChannelImpl can't push message: " << ex.what();
470  }
471 
472  // process standard async writing
473  auto self(this->shared_from_this());
474  m_ioContext.post([this, self] {
475  try
476  {
477  writeMessage();
478  }
479  catch (std::exception& ex)
480  {
481  LOG(MiscCommon::error) << "BaseChannelImpl can't write message: " << ex.what();
482  }
483  });
484  }
485 
486  template <ECmdType _cmd, class A>
487  void pushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0)
488  {
489  try
490  {
492  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
493  pushMsg(msg, _cmd, _protocolHeaderID);
494  }
495  catch (std::exception& ex)
496  {
497  LOG(MiscCommon::error) << "BaseChannelImpl can't push message: " << ex.what();
498  }
499  }
500 
501  template <ECmdType _cmd>
502  void pushMsg(uint64_t _protocolHeaderID = 0)
503  {
504  SEmptyCmd cmd;
505  pushMsg<_cmd>(cmd, adjustProtocolHeaderID(_protocolHeaderID));
506  }
507 
508  template <ECmdType _cmd, class A>
509  void sendYourself(const A& _attachment, uint64_t _protocolHeaderID = 0)
510  {
512  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
513  // process received message
514  T* pThis = static_cast<T*>(this);
515  pThis->processMessage(msg);
516  }
517 
518  template <ECmdType _cmd>
519  void sendYourself(uint64_t _protocolHeaderID = 0)
520  {
521  SEmptyCmd cmd;
522  sendYourself<_cmd>(cmd, adjustProtocolHeaderID(_protocolHeaderID));
523  }
524 
525  void pushBinaryAttachmentCmd(const std::string& _srcFilePath,
526  const std::string& _fileName,
527  uint16_t _cmdSource,
528  uint64_t _protocolHeaderID)
529  {
531 
532  std::string srcFilePath(_srcFilePath);
533  // Resolve environment variables
534  MiscCommon::smart_path(&srcFilePath);
535 
536  std::ifstream f(srcFilePath);
537  if (!f.is_open() || !f.good())
538  {
539  throw std::runtime_error("Could not open the source file: " + srcFilePath);
540  }
541  f.seekg(0, std::ios::end);
542  data.reserve(f.tellg());
543  f.seekg(0, std::ios::beg);
544  data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
545 
546  pushBinaryAttachmentCmd(data, _fileName, _cmdSource, _protocolHeaderID);
547  }
548 
550  const std::string& _fileName,
551  uint16_t _cmdSource,
552  uint64_t _protocolHeaderID)
553  {
554  static const int maxCommandSize = 65536;
555  int nofParts = (_data.size() % maxCommandSize == 0) ? (_data.size() / maxCommandSize)
556  : (_data.size() / maxCommandSize + 1);
557  boost::crc_32_type fileCrc32;
558  fileCrc32.process_bytes(&_data[0], _data.size());
559 
560  boost::uuids::uuid fileId = boost::uuids::random_generator()();
561 
562  // Generate start message
563  SBinaryAttachmentStartCmd start_cmd;
564  start_cmd.m_fileId = fileId;
565  start_cmd.m_srcCommand = _cmdSource;
566  start_cmd.m_fileName = _fileName;
567  start_cmd.m_fileSize = _data.size();
568  start_cmd.m_fileCrc32 = fileCrc32.checksum();
569  pushMsg<cmdBINARY_ATTACHMENT_START>(start_cmd, _protocolHeaderID);
570 
571  for (size_t i = 0; i < nofParts; ++i)
572  {
574  cmd.m_fileId = fileId;
575 
576  size_t offset = i * maxCommandSize;
577  size_t size = (i != (nofParts - 1)) ? maxCommandSize : (_data.size() - offset);
578 
579  auto iter_begin = _data.begin() + offset;
580  auto iter_end = iter_begin + size;
581  std::copy(iter_begin, iter_end, std::back_inserter(cmd.m_data));
582 
583  cmd.m_size = size;
584  cmd.m_offset = offset;
585 
586  boost::crc_32_type crc32;
587  crc32.process_bytes(&(*iter_begin), size);
588 
589  cmd.m_crc32 = crc32.checksum();
590 
591  pushMsg<cmdBINARY_ATTACHMENT>(cmd, _protocolHeaderID);
592  }
593  }
594 
597  {
598  boost::uuids::uuid fileId = _attachment->m_fileId;
599 
600  {
601  // Lock with global map mutex
602  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
603 
604  binaryAttachmentMap_t::iterator iter_info = m_binaryAttachmentMap.find(fileId);
605  bool exists = iter_info != m_binaryAttachmentMap.end();
606 
607  if (!exists)
608  {
609  m_binaryAttachmentMap[fileId] = std::make_shared<SBinaryAttachmentInfo>();
610  iter_info = m_binaryAttachmentMap.find(fileId);
611  iter_info->second->m_startTime = std::chrono::steady_clock::now();
612  iter_info->second->m_fileName = _attachment->m_fileName;
613  iter_info->second->m_fileSize = _attachment->m_fileSize;
614  iter_info->second->m_fileCrc32 = _attachment->m_fileCrc32;
615  iter_info->second->m_srcCommand = _attachment->m_srcCommand;
616  iter_info->second->m_data.resize(_attachment->m_fileSize);
617  }
618  }
619  }
620 
623  {
624  boost::uuids::uuid fileId = _attachment->m_fileId;
626  binaryAttachmentMap_t::iterator iter_info;
627 
628  {
629  // Lock with global map mutex
630  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
631 
632  iter_info = m_binaryAttachmentMap.find(fileId);
633  bool exists = iter_info != m_binaryAttachmentMap.end();
634 
635  if (!exists)
636  {
638  << "Received binary attachment [" << fileId << "] which does not exist. Skip this message.";
639  return;
640  }
641  info = iter_info->second;
642  }
643 
644  boost::crc_32_type crc32;
645  crc32.process_bytes(&_attachment->m_data[0], _attachment->m_data.size());
646 
647  if (crc32.checksum() != _attachment->m_crc32)
648  {
649  {
650  // Lock with global map mutex
651  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
652  // Remove info from map
653  m_binaryAttachmentMap.erase(iter_info);
654  }
655  std::stringstream ss;
656  ss << "Received binary attachment [" << fileId << "] has wrong CRC32 checksum: " << crc32.checksum()
657  << " instead of " << _attachment->m_crc32 << "offset=" << _attachment->m_offset
658  << " size=" << _attachment->m_size;
659  LOG(MiscCommon::error) << ss.str();
660  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand),
661  _sender.m_ID);
662  return;
663  }
664 
665  bool allBytesReceived = false;
666  {
667  // Lock with local mutex for each file
668  std::lock_guard<std::mutex> lock(info->m_mutex);
669 
670  info->m_bytesReceived += _attachment->m_size;
671 
672  std::copy(_attachment->m_data.begin(),
673  _attachment->m_data.end(),
674  info->m_data.begin() + _attachment->m_offset);
675 
676  allBytesReceived = info->m_bytesReceived == info->m_fileSize;
677  if (allBytesReceived)
678  {
679  // Check file CRC32
680  boost::crc_32_type crc32;
681  crc32.process_bytes(&info->m_data[0], info->m_data.size());
682 
683  if (crc32.checksum() != info->m_fileCrc32)
684  {
685  {
686  // Lock with global map mutex
687  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
688  // Remove info from map
689  m_binaryAttachmentMap.erase(iter_info);
690  }
691  std::stringstream ss;
692  ss << "Received binary file [" << fileId
693  << "] has wrong CRC32 checksum: " << crc32.checksum() << " instead of "
694  << _attachment->m_crc32;
695  LOG(MiscCommon::error) << ss.str();
696  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand),
697  _sender.m_ID);
698  return;
699  }
700 
701  fs::path dir(user_defaults_api::CUserDefaults::instance().getWrkDir());
702  const std::string filePath(dir.append(to_string(fileId)).string());
703  std::ofstream f(filePath.c_str());
704  if (!f.is_open() || !f.good())
705  {
706  {
707  // Lock with global map mutex
708  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
709  // Remove info from map
710  m_binaryAttachmentMap.erase(iter_info);
711  }
712  std::stringstream ss;
713  ss << "Could not open file: " << filePath;
714  LOG(MiscCommon::error) << ss.str();
715  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand),
716  _sender.m_ID);
717  return;
718  }
719 
720  for (const auto& v : info->m_data)
721  {
722  f << v;
723  }
724  f.close();
725 
726  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
727  std::chrono::microseconds downloadTime =
728  std::chrono::duration_cast<std::chrono::microseconds>(now - info->m_startTime);
729 
730  // Send message to yourself
732  reply_cmd.m_receivedFilePath = filePath;
733  reply_cmd.m_requestedFileName = info->m_fileName;
734  reply_cmd.m_srcCommand = info->m_srcCommand;
735  reply_cmd.m_downloadTime = downloadTime.count();
736  reply_cmd.m_receivedFileSize = info->m_fileSize;
737  sendYourself<cmdBINARY_ATTACHMENT_RECEIVED>(reply_cmd, _sender.m_ID);
738  }
739  }
740 
741  if (allBytesReceived)
742  {
743  // Lock with global map mutex
744  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
745  // Remove info from map
746  m_binaryAttachmentMap.erase(iter_info);
747  }
748  }
749 
750  bool started()
751  {
752  return m_started;
753  }
754 
755  std::string remoteEndIDString()
756  {
757  // give a chance child to execute something
758  try
759  {
760  T* pThis = static_cast<T*>(this);
761  std::stringstream ss;
762  ss << pThis->_remoteEndIDString() << " [" << socket().remote_endpoint().address().to_string()
763  << "]";
764  return ss.str();
765  }
766  catch (...)
767  {
768  return std::string();
769  }
770  }
771 
772  uint64_t getProtocolHeaderID() const
773  {
774  return m_protocolHeaderID;
775  }
776 
777  private:
778  uint64_t adjustProtocolHeaderID(uint64_t _protocolHeaderID) const
779  {
780  return _protocolHeaderID != 0 ? _protocolHeaderID : m_protocolHeaderID;
781  }
782 
783  void readHeader()
784  {
785  auto self(this->shared_from_this());
786  boost::asio::async_read(
787  m_socket,
788  boost::asio::buffer(m_currentMsg->data(), CProtocolMessage::header_length),
789  [this, self](boost::system::error_code ec, std::size_t length) {
790  if (!ec)
791  {
792  LOG(MiscCommon::debug) << "Received message HEADER from " << remoteEndIDString() << ": "
793  << length << " bytes, expected " << CProtocolMessage::header_length;
794  }
795  if (!ec && m_currentMsg->decode_header())
796  {
797  // If the header is ok, receive the body of the message
798  readBody();
799  }
800  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
801  {
803  << "Disconnect is detected while on read msg header: " << ec.message();
804  onDissconnect();
805  }
806  else
807  {
808  if (m_started)
809  LOG(MiscCommon::error) << "Error reading message header: " << ec.message();
810  else
811  LOG(MiscCommon::info) << "The stop signal is received, aborting current operation and "
812  "closing the connection: "
813  << ec.message();
814 
815  stop();
816  }
817  });
818  }
819 
820  void readBody()
821  {
822  if (m_currentMsg->body_length() == 0)
823  {
824  LOG(MiscCommon::debug) << "Received message BODY from " << remoteEndIDString()
825  << ": no attachment: " << m_currentMsg->toString();
826  // process received message
827  T* pThis = static_cast<T*>(this);
828  pThis->processMessage(m_currentMsg);
829 
830  // Log read statistics
831  this->logReadMessage(m_currentMsg);
832 
833  // Read next message
834  m_currentMsg = std::make_shared<CProtocolMessage>();
835  readHeader();
836  return;
837  }
838 
839  auto self(this->shared_from_this());
840  boost::asio::async_read(
841  m_socket,
842  boost::asio::buffer(m_currentMsg->body(), m_currentMsg->body_length()),
843  [this, self](boost::system::error_code ec, std::size_t length) {
844  if (!ec)
845  {
846  LOG(MiscCommon::debug) << "Received message BODY from " << remoteEndIDString() << " ("
847  << length << " bytes): " << m_currentMsg->toString();
848 
849  // process received message
850  T* pThis = static_cast<T*>(this);
851  pThis->processMessage(m_currentMsg);
852 
853  // Log read statistics
854  this->logReadMessage(m_currentMsg);
855 
856  // Read next message
857  m_currentMsg = std::make_shared<CProtocolMessage>();
858  readHeader();
859  }
860  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
861  {
862  LOG(MiscCommon::debug) << "Disconnect is detected while on read msg body: " << ec.message();
863  onDissconnect();
864  }
865  else
866  {
867  // don't show error if service is closed
868  if (m_started)
869  LOG(MiscCommon::error) << "Error reading message body: " << ec.message();
870  else
871  LOG(MiscCommon::info) << "The stop signal is received, aborting current operation and "
872  "closing the connection: "
873  << ec.message();
874  stop();
875  }
876  });
877  }
878 
879  private:
880  void writeMessage()
881  {
882  // To avoid sending of a bunch of small messages, we pack as many messages as possible into one write
883  // request (GH-38).
884  // Copy messages from the queue to send buffer (which should remain until the write handler is called)
885  {
886  std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
887  if (!m_writeBuffer.empty())
888  return; // a write is in progress, don't start anything
889 
890  if (m_writeQueue.empty())
891  return; // There is nothing to send.
892 
893  for (auto i : m_writeQueue)
894  {
896  << "Sending to " << remoteEndIDString() << " a message: " << i->toString();
897  if (cmdSHUTDOWN == i->header().m_cmd)
898  m_isShuttingDown = true;
899  m_writeBuffer.push_back(boost::asio::buffer(i->data(), i->length()));
900  m_writeBufferQueue.push_back(i);
901  }
902  m_writeQueue.clear();
903  }
904 
905  auto self(this->shared_from_this());
906  boost::asio::async_write(
907  m_socket,
908  m_writeBuffer,
909  [this, self](boost::system::error_code _ec, std::size_t _bytesTransferred) {
910  try
911  {
912  if (!_ec)
913  {
914  LOG(MiscCommon::debug) << "Message successfully sent to " << remoteEndIDString() << " ("
915  << _bytesTransferred << " bytes)";
916 
917  if (m_isShuttingDown)
918  {
920  << "Shutdown signal has been successfully sent to " << remoteEndIDString();
921  stop();
922  }
923 
924  // lock the modification of the container
925  {
926  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
927 
928  // Log write statistics
929  this->logWriteMessages(m_writeBufferQueue);
930 
931  m_writeBuffer.clear();
932  m_writeBufferQueue.clear();
933  }
934  // we might need to send more messages
935  writeMessage();
936  }
937  else if ((boost::asio::error::eof == _ec) || (boost::asio::error::connection_reset == _ec))
938  {
940  << "Disconnect is detected while on write message: " << _ec.message();
941  onDissconnect();
942  }
943  else
944  {
945  // don't show error if service is closed
946  if (m_started)
948  << "Error sending to " << remoteEndIDString() << ": " << _ec.message();
949  else
951  << "The stop signal is received, aborting current operation and "
952  "closing the connection: "
953  << _ec.message();
954  stop();
955  }
956  }
957  catch (std::exception& ex)
958  {
959  LOG(MiscCommon::error) << "BaseChannelImpl can't write message (callback): " << ex.what();
960  }
961  });
962  }
963 
964  void onDissconnect()
965  {
966  LOG(MiscCommon::debug) << "The session was disconnected by the remote end: " << remoteEndIDString();
967  // stopping the channel
968  stop();
969 
970  // give a chance to children to execute something
971  this->dispatchHandlers(EChannelEvents::OnRemoteEndDissconnected, SSenderInfo());
972  }
973 
974  bool isCmdAllowedWithoutHandshake(ECmdType _cmd)
975  {
976  if (m_isHandshakeOK)
977  return true;
978  return (_cmd == cmdHANDSHAKE || _cmd == cmdREPLY_HANDSHAKE_OK || _cmd == cmdREPLY_HANDSHAKE_ERR);
979  }
980 
981  private:
982  void close()
983  {
984  m_socket.close();
985  }
986 
987  protected:
990  std::string m_sessionID;
992  boost::asio::io_context& m_ioContext;
993 
994  private:
995  boost::asio::ip::tcp::socket m_socket;
996  bool m_started;
998 
999  protocolMessagePtrQueue_t m_writeQueue;
1000  protocolMessagePtrQueue_t m_writeQueueBeforeHandShake;
1001 
1002  std::mutex m_mutexWriteBuffer;
1003  protocolMessageBuffer_t m_writeBuffer;
1004  protocolMessagePtrQueue_t m_writeBufferQueue;
1005 
1006  // BinaryAttachment
1007  typedef std::map<boost::uuids::uuid, binaryAttachmentInfoPtr_t> binaryAttachmentMap_t;
1008  binaryAttachmentMap_t m_binaryAttachmentMap;
1009  std::mutex m_binaryAttachmentMutex;
1010 
1011  protocolMessagePtrQueue_t m_accumulativeWriteQueue;
1012  deadlineTimerPtr_t m_deadlineTimer;
1013 
1014  bool m_isShuttingDown;
1015  };
1016  } // namespace protocol_api
1017 } // namespace dds
1018 
1019 #endif /* defined(__DDS__BaseChannelImpl__) */
bool m_isHandshakeOK
Definition: BaseChannelImpl.h:988
std::shared_ptr< T > connectionPtr_t
Definition: BaseChannelImpl.h:235
std::string m_receivedFilePath
Path to the received file.
Definition: BinaryAttachmentReceivedCmd.h:23
void processBinaryAttachmentCmd(const SSenderInfo &_sender, SCommandAttachmentImpl< cmdBINARY_ATTACHMENT >::ptr_t _attachment)
Definition: BaseChannelImpl.h:621
Definition: BaseEventHandlersImpl.h:48
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentCmd.h:29
EChannelType
Definition: ProtocolDef.h:15
Definition: BaseChannelImpl.h:200
Definition: ProtocolCommands.h:34
void accumulativePushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd)
Definition: BaseChannelImpl.h:343
void logReadMessage(CProtocolMessage::protocolMessagePtr_t _message)
Definition: StatImpl.cpp:189
EChannelType m_channelType
Definition: BaseChannelImpl.h:989
DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelEventHandlersImpl) DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelMessageHandlersImpl) protected
Definition: BaseChannelImpl.h:242
void pushBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource, uint64_t _protocolHeaderID)
Definition: BaseChannelImpl.h:525
Definition: ChannelEventHandlersImpl.h:29
uint16_t m_srcCommand
Source command which initiated file transport.
Definition: BinaryAttachmentReceivedCmd.h:25
const std::array< std::string, 3 > gChannelTypeName
Definition: ProtocolDef.h:22
uint32_t m_fileSize
File size in bytes.
Definition: BinaryAttachmentStartCmd.h:31
Definition: BinaryAttachmentReceivedCmd.h:15
Definition: CommandAttachmentImpl.h:54
bool started()
Definition: BaseChannelImpl.h:750
bool isHanshakeOK() const
Definition: BaseChannelImpl.h:291
uint32_t m_receivedFileSize
Number of recieved bytes.
Definition: BinaryAttachmentReceivedCmd.h:26
uint32_t m_fileCrc32
File checksum.
Definition: BinaryAttachmentStartCmd.h:32
std::string getLockedSID() const
Definition: UserDefaults.cpp:491
std::shared_ptr< SEmptyCmd > ptr_t
Definition: CommandAttachmentImpl.h:64
void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:436
Definition: BinaryAttachmentStartCmd.h:21
void accumulativePushMsg(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:415
std::chrono::steady_clock::time_point m_startTime
Definition: BaseChannelImpl.h:218
Definition: ProtocolCommands.h:30
std::string m_sessionID
Definition: BaseChannelImpl.h:990
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &, uint64_t _ID)
Definition: CommandAttachmentImpl.h:71
MiscCommon::BYTEVector_t m_data
Definition: BaseChannelImpl.h:211
void pushMsg(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:502
#define LOG(severity)
Definition: Logger.h:56
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseChannelImpl.h:236
boost::asio::io_context & m_ioContext
Definition: BaseChannelImpl.h:992
Definition: ProtocolMessage.h:76
Definition: SimpleMsgCmd.h:16
Definition: ProtocolCommands.h:35
uint32_t m_size
Size of this piece of binary data.
Definition: BinaryAttachmentCmd.h:31
Definition: BinaryAttachmentCmd.h:21
EChannelType getChannelType() const
Definition: BaseChannelImpl.h:296
void start()
Definition: BaseChannelImpl.h:307
uint32_t m_fileSize
Definition: BaseChannelImpl.h:216
Definition: ProtocolDef.h:17
std::string m_requestedFileName
Requested name of the file.
Definition: BinaryAttachmentReceivedCmd.h:24
Definition: AgentConnectionManager.h:13
std::string remoteEndIDString()
Definition: BaseChannelImpl.h:755
Definition: def.h:153
uint32_t m_crc32
CRC checksum of this piece of binary data.
Definition: BinaryAttachmentCmd.h:32
uint32_t m_offset
Offset for this piece of binary data.
Definition: BinaryAttachmentCmd.h:30
static connectionPtr_t makeNew(boost::asio::io_context &_service, uint64_t _protocolHeaderID, Args &(... args))
Definition: BaseChannelImpl.h:282
Definition: BaseChannelImpl.h:224
void pushMsg(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:487
std::string m_fileName
Name of the file.
Definition: BinaryAttachmentStartCmd.h:30
void processBinaryAttachmentStartCmd(const SSenderInfo &_sender, SCommandAttachmentImpl< cmdBINARY_ATTACHMENT_START >::ptr_t _attachment)
Definition: BaseChannelImpl.h:595
Definition: BaseChannelImpl.h:42
void dequeueMsg()
Definition: BaseChannelImpl.h:332
void accumulativePushMsg(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:430
SBinaryAttachmentInfo()
Definition: BaseChannelImpl.h:202
Definition: BaseChannelImpl.h:40
uint32_t m_downloadTime
Time spent to download file [microseconds].
Definition: BinaryAttachmentReceivedCmd.h:27
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:36
Definition: def.h:152
uint32_t m_fileCrc32
Definition: BaseChannelImpl.h:214
void pushBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, uint64_t _protocolHeaderID)
Definition: BaseChannelImpl.h:549
Definition: def.h:149
std::vector< weakConnectionPtr_t > weakConnectionPtrVector_t
Definition: BaseChannelImpl.h:238
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:95
Definition: StatImpl.h:110
std::shared_ptr< SBinaryAttachmentInfo > binaryAttachmentInfoPtr_t
Definition: BaseChannelImpl.h:221
void sendYourself(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:509
void sendYourself(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:519
MiscCommon::BYTEVector_t m_data
Piece of binary data.
Definition: BinaryAttachmentCmd.h:33
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
std::mutex m_mutex
Definition: BaseChannelImpl.h:217
uint32_t crc32(const std::string &_str)
Definition: CRC.h:26
boost::asio::ip::tcp::socket & socket()
Definition: BaseChannelImpl.h:326
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentStartCmd.h:29
uint32_t m_bytesReceived
Definition: BaseChannelImpl.h:212
uint64_t m_ID
Definition: BaseEventHandlersImpl.h:50
uint16_t m_srcCommand
Source command which initiated file transport.
Definition: BinaryAttachmentStartCmd.h:33
std::string m_fileName
Definition: BaseChannelImpl.h:213
uint64_t getProtocolHeaderID() const
Definition: BaseChannelImpl.h:772
Definition: def.h:150
void logWriteMessages(const protocolMessagePtrQueue_t &_messageQueue)
Definition: StatImpl.cpp:205
Definition: ProtocolCommands.h:31
std::vector< connectionPtr_t > connectionPtrVector_t
Definition: BaseChannelImpl.h:237
void setChannelType(EChannelType _channelType)
Definition: BaseChannelImpl.h:301
Definition: ChannelMessageHandlersImpl.h:20
uint16_t m_srcCommand
Definition: BaseChannelImpl.h:215
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
void stop()
Definition: BaseChannelImpl.h:318
uint64_t m_protocolHeaderID
Definition: BaseChannelImpl.h:991
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:81