DDS  ver. 2.0
BaseChannelImpl.h
Go to the documentation of this file.
1 // Copyright 2014 GSI, Inc. All rights reserved.
2 //
3 //
4 //
5 #ifndef __DDS__BaseChannelImpl__
6 #define __DDS__BaseChannelImpl__
7 // STD
8 #include <chrono>
9 #include <deque>
10 #include <iostream>
11 #include <map>
12 #include <memory>
13 // BOOST
14 #include <boost/noncopyable.hpp>
15 #pragma clang diagnostic push
16 #pragma clang diagnostic ignored "-Wunused-local-typedef"
17 #pragma clang diagnostic ignored "-Wdeprecated-declarations"
18 #include <boost/asio.hpp>
19 #pragma clang diagnostic pop
20 
21 #pragma clang diagnostic push
22 #pragma clang diagnostic ignored "-Wdeprecated-register"
23 #include <boost/uuid/uuid.hpp>
24 #include <boost/uuid/uuid_generators.hpp>
25 #include <boost/uuid/uuid_io.hpp>
26 #pragma clang diagnostic pop
27 // DDS
30 #include "CommandAttachmentImpl.h"
31 #include "Logger.h"
32 #include "MonitoringThread.h"
33 #include "ProtocolDef.h"
34 #include "StatImpl.h"
35 
36 namespace dds
37 {
38  namespace protocol_api
39  {
40  template <class T>
41  class CClientChannelImpl; // needed for friend class
42  template <class T>
43  class CServerChannelImpl; // needed for friend class
44  }
45 }
46 
47 // Either raw message or command based processing can be used at a time
48 // Command based message processing
49 #define BEGIN_MSG_MAP(theClass) \
50  public: \
51  friend protocol_api::CBaseChannelImpl<theClass>; \
52  friend protocol_api::CClientChannelImpl<theClass>; \
53  friend protocol_api::CServerChannelImpl<theClass>; \
54  void processMessage(protocol_api::CProtocolMessage::protocolMessagePtr_t _currentMsg) \
55  { \
56  using namespace dds; \
57  using namespace dds::protocol_api; \
58  CMonitoringThread::instance().updateIdle(); \
59  bool processed = true; \
60  ECmdType currentCmd = static_cast<ECmdType>(_currentMsg->header().m_cmd); \
61  SSenderInfo sender; \
62  sender.m_ID = _currentMsg->header().m_ID; \
63  \
64  try \
65  { \
66  switch (currentCmd) \
67  { \
68  case cmdBINARY_ATTACHMENT: \
69  { \
70  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::ptr_t attahcmentPtr_t; \
71  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<cmdBINARY_ATTACHMENT>::decode(_currentMsg); \
72  processBinaryAttachmentCmd(sender, attachmentPtr); \
73  return; \
74  } \
75  case cmdBINARY_ATTACHMENT_START: \
76  { \
77  typedef typename SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::ptr_t attahcmentPtr_t; \
78  attahcmentPtr_t attachmentPtr = \
79  SCommandAttachmentImpl<cmdBINARY_ATTACHMENT_START>::decode(_currentMsg); \
80  processBinaryAttachmentStartCmd(sender, attachmentPtr); \
81  return; \
82  } \
83  case cmdHANDSHAKE: \
84  { \
85  SCommandAttachmentImpl<cmdHANDSHAKE>::ptr_t attachmentPtr = \
86  SCommandAttachmentImpl<cmdHANDSHAKE>::decode(_currentMsg); \
87  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
88  return; \
89  } \
90  case cmdLOBBY_MEMBER_HANDSHAKE: \
91  { \
92  SCommandAttachmentImpl<cmdLOBBY_MEMBER_HANDSHAKE>::ptr_t attachmentPtr = \
93  SCommandAttachmentImpl<cmdLOBBY_MEMBER_HANDSHAKE>::decode(_currentMsg); \
94  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
95  return; \
96  } \
97  case cmdREPLY_HANDSHAKE_OK: \
98  { \
99  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::ptr_t attachmentPtr = \
100  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_OK>::decode(_currentMsg); \
101  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
102  return; \
103  } \
104  case cmdREPLY_HANDSHAKE_ERR: \
105  { \
106  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::ptr_t attachmentPtr = \
107  SCommandAttachmentImpl<cmdREPLY_HANDSHAKE_ERR>::decode(_currentMsg); \
108  dispatchHandlers<>(currentCmd, sender, attachmentPtr); \
109  return; \
110  }
111 
112 #define MESSAGE_HANDLER(msg, func) \
113  case msg: \
114  { \
115  typedef typename SCommandAttachmentImpl<msg>::ptr_t attahcmentPtr_t; \
116  attahcmentPtr_t attachmentPtr = SCommandAttachmentImpl<msg>::decode(_currentMsg); \
117  LOG(MiscCommon::debug) << "Processing " << g_cmdToString[msg] << " received from " << remoteEndIDString(); \
118  processed = func(attachmentPtr, sender); \
119  if (!processed) \
120  { \
121  if (!handlerExists(msg)) \
122  { \
123  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
124  << _currentMsg->toString(); \
125  } \
126  else \
127  { \
128  dispatchHandlers<>(msg, sender, attachmentPtr); \
129  } \
130  } \
131  break; \
132  }
133 
134 #define END_MSG_MAP() \
135  default: \
136  LOG(MiscCommon::error) << "The received message doesn't have a handler: " << _currentMsg->toString(); \
137  } \
138  } \
139  catch (std::exception & _e) \
140  { \
141  LOG(MiscCommon::error) << "Channel processMessage: " << _e.what(); \
142  } \
143  }
144 
145 // Raw message processing
146 #define RAW_MESSAGE_HANDLER(theClass, func) \
147  BEGIN_MSG_MAP(theClass) \
148  default: \
149  break; \
150  } \
151  processed = func(_currentMsg); \
152  if (!processed) \
153  { \
154  if (!handlerExists(ECmdType::cmdRAW_MSG)) \
155  { \
156  LOG(MiscCommon::error) << "The received message was not processed and has no registered handler: " \
157  << _currentMsg->toString(); \
158  } \
159  else \
160  { \
161  dispatchHandlers(ECmdType::cmdRAW_MSG, sender, _currentMsg); \
162  } \
163  } \
164  } \
165  catch (std::exception & _e) \
166  { \
167  LOG(MiscCommon::error) << "Channel processMessage: " << _e.what(); \
168  } \
169  }
170 
171 #define REGISTER_DEFAULT_REMOTE_ID_STRING \
172  std::string _remoteEndIDString() \
173  { \
174  return "DDS Server"; \
175  }
176 
177 namespace dds
178 {
179  namespace protocol_api
180  {
182  {
184  : m_bytesReceived(0)
185  , m_fileCrc32(0)
186  , m_srcCommand(0)
187  , m_fileSize(0)
188  , m_startTime()
189  {
190  }
191 
193  uint32_t m_bytesReceived;
194  std::string m_fileName;
195  uint32_t m_fileCrc32;
196  uint16_t m_srcCommand;
197  uint32_t m_fileSize;
198  std::mutex m_mutex;
199  std::chrono::steady_clock::time_point m_startTime;
200  };
201 
202  typedef std::shared_ptr<SBinaryAttachmentInfo> binaryAttachmentInfoPtr_t;
203 
204  template <class T>
205  class CBaseChannelImpl : public boost::noncopyable,
208  public std::enable_shared_from_this<T>,
209  public CStatImpl
210  {
211  typedef std::deque<CProtocolMessage::protocolMessagePtr_t> protocolMessagePtrQueue_t;
212  typedef std::vector<boost::asio::mutable_buffer> protocolMessageBuffer_t;
213  typedef std::shared_ptr<boost::asio::deadline_timer> deadlineTimerPtr_t;
214 
215  public:
216  typedef std::shared_ptr<T> connectionPtr_t;
217  typedef std::weak_ptr<T> weakConnectionPtr_t;
218  typedef std::vector<connectionPtr_t> connectionPtrVector_t;
219  typedef std::vector<weakConnectionPtr_t> weakConnectionPtrVector_t;
220 
221  // Both are needed because unqualified name lookup terminates at the first scope that has anything with the
222  // right name
225 
226  protected:
227  CBaseChannelImpl<T>(boost::asio::io_service& _service, uint64_t _protocolHeaderID)
230  , CStatImpl(_service)
231  , m_isHandshakeOK(false)
232  , m_channelType(EChannelType::UNKNOWN)
233  , m_protocolHeaderID(_protocolHeaderID)
234  , m_io_service(_service)
235  , m_socket(_service)
236  , m_started(false)
237  , m_currentMsg(std::make_shared<CProtocolMessage>())
238  , m_binaryAttachmentMap()
239  , m_binaryAttachmentMutex()
240  , m_deadlineTimer(
241  std::make_shared<boost::asio::deadline_timer>(_service, boost::posix_time::milliseconds(1000)))
242  , m_isShuttingDown(false)
243  {
244  try
245  {
247  LOG(MiscCommon::debug) << "SID: " << m_sessionID;
248  }
249  catch (std::runtime_error& _error)
250  {
251  LOG(MiscCommon::fatal) << _error.what();
252  }
253  }
254 
255  public:
257  {
258  LOG(MiscCommon::info) << "Channel " << gChannelTypeName[m_channelType] << " destructor is called";
259  stop();
260  }
261 
262  static connectionPtr_t makeNew(boost::asio::io_service& _service, uint64_t _protocolHeaderID)
263  {
264  connectionPtr_t newObject(new T(_service, _protocolHeaderID));
265  return newObject;
266  }
267 
268  public:
269  bool isHanshakeOK() const
270  {
271  return m_isHandshakeOK;
272  }
273 
275  {
276  return m_channelType;
277  }
278 
279  void setChannelType(EChannelType _channelType)
280  {
281  m_channelType = _channelType;
282  }
283 
284  public:
285  void start()
286  {
287  if (m_started)
288  return;
289 
290  m_started = true;
291  // Prevent Asio TCP socket to be inherited by child when using fork/exec
292  ::fcntl(m_socket.native_handle(), F_SETFD, FD_CLOEXEC);
293  readHeader();
294  }
295 
296  void stop()
297  {
298  if (!m_started)
299  return;
300  m_started = false;
301  close();
302  }
303 
304  boost::asio::ip::tcp::socket& socket()
305  {
306  return m_socket;
307  }
308 
309  template <ECmdType _cmd>
310  void dequeueMsg()
311  {
312  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
313  m_writeQueue.erase(std::remove_if(std::begin(m_writeQueue),
314  std::end(m_writeQueue),
316  return (_msg->header().m_cmd == _cmd);
317  }),
318  std::end(m_writeQueue));
319  }
320 
322  {
323  static const size_t maxAccumulativeWriteQueueSize = 10000;
324  try
325  {
326  bool copyMessages = false;
327  {
328  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
329 
330  m_deadlineTimer->cancel();
331 
332  if (cmdUNKNOWN != _cmd)
333  m_accumulativeWriteQueue.push_back(_msg);
334 
335  copyMessages = m_accumulativeWriteQueue.size() > maxAccumulativeWriteQueueSize;
336  if (copyMessages)
337  {
339  << "copy accumulated queue to write queue "
340  "m_accumulativeWriteQueue.size="
341  << m_accumulativeWriteQueue.size() << " m_writeQueue.size=" << m_writeQueue.size();
342 
343  // copy queue to main queue
344  std::copy(m_accumulativeWriteQueue.begin(),
345  m_accumulativeWriteQueue.end(),
346  back_inserter((m_isHandshakeOK) ? m_writeQueue : m_writeQueueBeforeHandShake));
347  m_accumulativeWriteQueue.clear();
348  }
349 
350  auto self(this->shared_from_this());
351  m_deadlineTimer->async_wait([this, self](const boost::system::error_code& error) {
352  if (!error)
353  {
354  bool copyMessages = false;
355  {
356  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
357  copyMessages = !m_accumulativeWriteQueue.empty();
358  // copy queue to main queue
359  if (copyMessages)
360  {
362  << "deadline_timer called: copy accumulated queue to write queue "
363  "m_accumulativeWriteQueue.size="
364  << m_accumulativeWriteQueue.size()
365  << " m_writeQueue.size=" << m_writeQueue.size();
366  std::copy(m_accumulativeWriteQueue.begin(),
367  m_accumulativeWriteQueue.end(),
368  back_inserter((m_isHandshakeOK) ? m_writeQueue
369  : m_writeQueueBeforeHandShake));
370  m_accumulativeWriteQueue.clear();
371  }
372  }
373  if (copyMessages)
374  pushMsg<cmdUNKNOWN>();
375  }
376  });
377 
378  LOG(MiscCommon::debug) << "accumulativePushMsg: WriteQueue size = " << m_writeQueue.size()
379  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size()
380  << " accumulativeWriteQueue size = " << m_accumulativeWriteQueue.size()
381  << " msg = " << _msg->toString();
382  }
383  if (copyMessages)
384  pushMsg<cmdUNKNOWN>();
385  }
386  catch (std::exception& ex)
387  {
388  LOG(MiscCommon::error) << "BaseChannelImpl can't push accumulative message: " << ex.what();
389  }
390  }
391 
392  template <ECmdType _cmd, class A>
393  void accumulativePushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0)
394  {
395  try
396  {
398  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
399  accumulativePushMsg(msg, _cmd);
400  }
401  catch (std::exception& ex)
402  {
403  LOG(MiscCommon::error) << "BaseChannelImpl can't push accumulative message: " << ex.what();
404  }
405  }
406 
407  template <ECmdType _cmd>
408  void accumulativePushMsg(uint64_t _protocolHeaderID = 0)
409  {
410  SEmptyCmd cmd;
411  accumulativePushMsg<_cmd>(cmd, _protocolHeaderID);
412  }
413 
414  void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _protocolHeaderID = 0)
415  {
416  try
417  {
418  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
419  if (!m_isHandshakeOK)
420  {
421  if (isCmdAllowedWithoutHandshake(_cmd))
422  m_writeQueue.push_back(_msg);
423  else
424  m_writeQueueBeforeHandShake.push_back(_msg);
425  }
426  else
427  {
428  // copy the buffered queue, which has been collected before hand-shake
429  if (!m_writeQueueBeforeHandShake.empty())
430  {
431  std::copy(m_writeQueueBeforeHandShake.begin(),
432  m_writeQueueBeforeHandShake.end(),
433  back_inserter(m_writeQueue));
434  m_writeQueueBeforeHandShake.clear();
435  }
436 
437  // add the current message to the queue
438  if (cmdUNKNOWN != _cmd)
439  m_writeQueue.push_back(_msg);
440  }
441 
442  LOG(MiscCommon::debug) << "pushMsg: WriteQueue size = " << m_writeQueue.size()
443  << " WriteQueueBeforeHandShake = " << m_writeQueueBeforeHandShake.size();
444  }
445  catch (std::exception& ex)
446  {
447  LOG(MiscCommon::error) << "BaseChannelImpl can't push message: " << ex.what();
448  }
449 
450  // process standard async writing
451  auto self(this->shared_from_this());
452  m_io_service.post([this, self] {
453  try
454  {
455  writeMessage();
456  }
457  catch (std::exception& ex)
458  {
459  LOG(MiscCommon::error) << "BaseChannelImpl can't write message: " << ex.what();
460  }
461  });
462  }
463 
464  template <ECmdType _cmd, class A>
465  void pushMsg(const A& _attachment, uint64_t _protocolHeaderID = 0)
466  {
467  try
468  {
470  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
471  pushMsg(msg, _cmd, _protocolHeaderID);
472  }
473  catch (std::exception& ex)
474  {
475  LOG(MiscCommon::error) << "BaseChannelImpl can't push message: " << ex.what();
476  }
477  }
478 
479  template <ECmdType _cmd>
480  void pushMsg(uint64_t _protocolHeaderID = 0)
481  {
482  SEmptyCmd cmd;
483  pushMsg<_cmd>(cmd, adjustProtocolHeaderID(_protocolHeaderID));
484  }
485 
486  template <ECmdType _cmd, class A>
487  void sendYourself(const A& _attachment, uint64_t _protocolHeaderID = 0)
488  {
490  SCommandAttachmentImpl<_cmd>::encode(_attachment, adjustProtocolHeaderID(_protocolHeaderID));
491  // process received message
492  T* pThis = static_cast<T*>(this);
493  pThis->processMessage(msg);
494  }
495 
496  template <ECmdType _cmd>
497  void sendYourself(uint64_t _protocolHeaderID = 0)
498  {
499  SEmptyCmd cmd;
500  sendYourself<_cmd>(cmd, adjustProtocolHeaderID(_protocolHeaderID));
501  }
502 
503  void pushBinaryAttachmentCmd(const std::string& _srcFilePath,
504  const std::string& _fileName,
505  uint16_t _cmdSource,
506  uint64_t _protocolHeaderID)
507  {
509 
510  std::string srcFilePath(_srcFilePath);
511  // Resolve environment variables
512  MiscCommon::smart_path(&srcFilePath);
513 
514  std::ifstream f(srcFilePath);
515  if (!f.is_open() || !f.good())
516  {
517  throw std::runtime_error("Could not open the source file: " + srcFilePath);
518  }
519  f.seekg(0, std::ios::end);
520  data.reserve(f.tellg());
521  f.seekg(0, std::ios::beg);
522  data.assign((std::istreambuf_iterator<char>(f)), std::istreambuf_iterator<char>());
523 
524  pushBinaryAttachmentCmd(data, _fileName, _cmdSource, _protocolHeaderID);
525  }
526 
528  const std::string& _fileName,
529  uint16_t _cmdSource,
530  uint64_t _protocolHeaderID)
531  {
532  static const int maxCommandSize = 65536;
533  int nofParts = (_data.size() % maxCommandSize == 0) ? (_data.size() / maxCommandSize)
534  : (_data.size() / maxCommandSize + 1);
535  boost::crc_32_type fileCrc32;
536  fileCrc32.process_bytes(&_data[0], _data.size());
537 
538  boost::uuids::uuid fileId = boost::uuids::random_generator()();
539 
540  // Generate start message
541  SBinaryAttachmentStartCmd start_cmd;
542  start_cmd.m_fileId = fileId;
543  start_cmd.m_srcCommand = _cmdSource;
544  start_cmd.m_fileName = _fileName;
545  start_cmd.m_fileSize = _data.size();
546  start_cmd.m_fileCrc32 = fileCrc32.checksum();
547  pushMsg<cmdBINARY_ATTACHMENT_START>(start_cmd, _protocolHeaderID);
548 
549  for (size_t i = 0; i < nofParts; ++i)
550  {
552  cmd.m_fileId = fileId;
553 
554  size_t offset = i * maxCommandSize;
555  size_t size = (i != (nofParts - 1)) ? maxCommandSize : (_data.size() - offset);
556 
557  auto iter_begin = _data.begin() + offset;
558  auto iter_end = iter_begin + size;
559  std::copy(iter_begin, iter_end, std::back_inserter(cmd.m_data));
560 
561  cmd.m_size = size;
562  cmd.m_offset = offset;
563 
564  boost::crc_32_type crc32;
565  crc32.process_bytes(&(*iter_begin), size);
566 
567  cmd.m_crc32 = crc32.checksum();
568 
569  pushMsg<cmdBINARY_ATTACHMENT>(cmd, _protocolHeaderID);
570  }
571  }
572 
575  {
576  boost::uuids::uuid fileId = _attachment->m_fileId;
577 
578  {
579  // Lock with global map mutex
580  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
581 
582  binaryAttachmentMap_t::iterator iter_info = m_binaryAttachmentMap.find(fileId);
583  bool exists = iter_info != m_binaryAttachmentMap.end();
584 
585  if (!exists)
586  {
587  m_binaryAttachmentMap[fileId] = std::make_shared<SBinaryAttachmentInfo>();
588  iter_info = m_binaryAttachmentMap.find(fileId);
589  iter_info->second->m_startTime = std::chrono::steady_clock::now();
590  iter_info->second->m_fileName = _attachment->m_fileName;
591  iter_info->second->m_fileSize = _attachment->m_fileSize;
592  iter_info->second->m_fileCrc32 = _attachment->m_fileCrc32;
593  iter_info->second->m_srcCommand = _attachment->m_srcCommand;
594  iter_info->second->m_data.resize(_attachment->m_fileSize);
595  }
596  }
597  }
598 
601  {
602  boost::uuids::uuid fileId = _attachment->m_fileId;
603  binaryAttachmentInfoPtr_t info;
604  binaryAttachmentMap_t::iterator iter_info;
605 
606  {
607  // Lock with global map mutex
608  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
609 
610  iter_info = m_binaryAttachmentMap.find(fileId);
611  bool exists = iter_info != m_binaryAttachmentMap.end();
612 
613  if (!exists)
614  {
616  << "Received binary attachment [" << fileId << "] which does not exist. Skip this message.";
617  return;
618  }
619  info = iter_info->second;
620  }
621 
622  boost::crc_32_type crc32;
623  crc32.process_bytes(&_attachment->m_data[0], _attachment->m_data.size());
624 
625  if (crc32.checksum() != _attachment->m_crc32)
626  {
627  {
628  // Lock with global map mutex
629  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
630  // Remove info from map
631  m_binaryAttachmentMap.erase(iter_info);
632  }
633  std::stringstream ss;
634  ss << "Received binary attachment [" << fileId << "] has wrong CRC32 checksum: " << crc32.checksum()
635  << " instead of " << _attachment->m_crc32 << "offset=" << _attachment->m_offset
636  << " size=" << _attachment->m_size;
637  LOG(MiscCommon::error) << ss.str();
638  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand),
639  _sender.m_ID);
640  return;
641  }
642 
643  bool allBytesReceived = false;
644  {
645  // Lock with local mutex for each file
646  std::lock_guard<std::mutex> lock(info->m_mutex);
647 
648  info->m_bytesReceived += _attachment->m_size;
649 
650  std::copy(_attachment->m_data.begin(),
651  _attachment->m_data.end(),
652  info->m_data.begin() + _attachment->m_offset);
653 
654  allBytesReceived = info->m_bytesReceived == info->m_fileSize;
655  if (allBytesReceived)
656  {
657  // Check file CRC32
658  boost::crc_32_type crc32;
659  crc32.process_bytes(&info->m_data[0], info->m_data.size());
660 
661  if (crc32.checksum() != info->m_fileCrc32)
662  {
663  {
664  // Lock with global map mutex
665  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
666  // Remove info from map
667  m_binaryAttachmentMap.erase(iter_info);
668  }
669  std::stringstream ss;
670  ss << "Received binary file [" << fileId
671  << "] has wrong CRC32 checksum: " << crc32.checksum() << " instead of "
672  << _attachment->m_crc32;
673  LOG(MiscCommon::error) << ss.str();
674  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand),
675  _sender.m_ID);
676  return;
677  }
678 
679  const std::string dir(user_defaults_api::CUserDefaults::getDDSPath());
680  const std::string fileName(dir + to_string(fileId));
681  std::ofstream f(fileName.c_str());
682  if (!f.is_open() || !f.good())
683  {
684  {
685  // Lock with global map mutex
686  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
687  // Remove info from map
688  m_binaryAttachmentMap.erase(iter_info);
689  }
690  std::stringstream ss;
691  ss << "Could not open file: " << fileName;
692  LOG(MiscCommon::error) << ss.str();
693  sendYourself<cmdSIMPLE_MSG>(SSimpleMsgCmd(ss.str(), MiscCommon::error, info->m_srcCommand),
694  _sender.m_ID);
695  return;
696  }
697 
698  for (const auto& v : info->m_data)
699  {
700  f << v;
701  }
702  f.close();
703 
704  std::chrono::steady_clock::time_point now = std::chrono::steady_clock::now();
705  std::chrono::microseconds downloadTime =
706  std::chrono::duration_cast<std::chrono::microseconds>(now - info->m_startTime);
707 
708  // Send message to yourself
710  reply_cmd.m_receivedFilePath = fileName;
711  reply_cmd.m_requestedFileName = info->m_fileName;
712  reply_cmd.m_srcCommand = info->m_srcCommand;
713  reply_cmd.m_downloadTime = downloadTime.count();
714  reply_cmd.m_receivedFileSize = info->m_fileSize;
715  sendYourself<cmdBINARY_ATTACHMENT_RECEIVED>(reply_cmd, _sender.m_ID);
716  }
717  }
718 
719  if (allBytesReceived)
720  {
721  // Lock with global map mutex
722  std::lock_guard<std::mutex> lock(m_binaryAttachmentMutex);
723  // Remove info from map
724  m_binaryAttachmentMap.erase(iter_info);
725  }
726  }
727 
728  bool started()
729  {
730  return m_started;
731  }
732 
733  std::string remoteEndIDString()
734  {
735  // give a chance child to execute something
736  try
737  {
738  T* pThis = static_cast<T*>(this);
739  std::stringstream ss;
740  ss << pThis->_remoteEndIDString() << " [" << socket().remote_endpoint().address().to_string()
741  << "]";
742  return ss.str();
743  }
744  catch (...)
745  {
746  return std::string();
747  }
748  }
749 
750  uint64_t getProtocolHeaderID() const
751  {
752  return m_protocolHeaderID;
753  }
754 
755  private:
756  uint64_t adjustProtocolHeaderID(uint64_t _protocolHeaderID) const
757  {
758  return _protocolHeaderID != 0 ? _protocolHeaderID : m_protocolHeaderID;
759  }
760 
761  void readHeader()
762  {
763  auto self(this->shared_from_this());
764  boost::asio::async_read(
765  m_socket,
766  boost::asio::buffer(m_currentMsg->data(), CProtocolMessage::header_length),
767  [this, self](boost::system::error_code ec, std::size_t length) {
768  if (!ec)
769  {
770  LOG(MiscCommon::debug) << "Received message HEADER from " << remoteEndIDString() << ": "
771  << length << " bytes, expected " << CProtocolMessage::header_length;
772  }
773  if (!ec && m_currentMsg->decode_header())
774  {
775  // If the header is ok, receive the body of the message
776  readBody();
777  }
778  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
779  {
781  << "Disconnect is detected while on read msg header: " << ec.message();
782  onDissconnect();
783  }
784  else
785  {
786  if (m_started)
787  LOG(MiscCommon::error) << "Error reading message header: " << ec.message();
788  else
789  LOG(MiscCommon::info) << "The stop signal is received, aborting current operation and "
790  "closing the connection: "
791  << ec.message();
792 
793  stop();
794  }
795  });
796  }
797 
798  void readBody()
799  {
800  if (m_currentMsg->body_length() == 0)
801  {
802  LOG(MiscCommon::debug) << "Received message BODY from " << remoteEndIDString()
803  << ": no attachment: " << m_currentMsg->toString();
804  // process received message
805  T* pThis = static_cast<T*>(this);
806  pThis->processMessage(m_currentMsg);
807 
808  // Log read statistics
809  this->logReadMessage(m_currentMsg);
810 
811  // Read next message
812  m_currentMsg = std::make_shared<CProtocolMessage>();
813  readHeader();
814  return;
815  }
816 
817  auto self(this->shared_from_this());
818  boost::asio::async_read(
819  m_socket,
820  boost::asio::buffer(m_currentMsg->body(), m_currentMsg->body_length()),
821  [this, self](boost::system::error_code ec, std::size_t length) {
822  if (!ec)
823  {
824  LOG(MiscCommon::debug) << "Received message BODY from " << remoteEndIDString() << " ("
825  << length << " bytes): " << m_currentMsg->toString();
826 
827  // process received message
828  T* pThis = static_cast<T*>(this);
829  pThis->processMessage(m_currentMsg);
830 
831  // Log read statistics
832  this->logReadMessage(m_currentMsg);
833 
834  // Read next message
835  m_currentMsg = std::make_shared<CProtocolMessage>();
836  readHeader();
837  }
838  else if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec))
839  {
840  LOG(MiscCommon::debug) << "Disconnect is detected while on read msg body: " << ec.message();
841  onDissconnect();
842  }
843  else
844  {
845  // don't show error if service is closed
846  if (m_started)
847  LOG(MiscCommon::error) << "Error reading message body: " << ec.message();
848  else
849  LOG(MiscCommon::info) << "The stop signal is received, aborting current operation and "
850  "closing the connection: "
851  << ec.message();
852  stop();
853  }
854  });
855  }
856 
857  private:
858  void writeMessage()
859  {
860  // To avoid sending of a bunch of small messages, we pack as many messages as possible into one write
861  // request (GH-38).
862  // Copy messages from the queue to send buffer (which should remain until the write handler is called)
863  {
864  std::lock_guard<std::mutex> lockWriteBuffer(m_mutexWriteBuffer);
865  if (!m_writeBuffer.empty())
866  return; // a write is in progress, don't start anything
867 
868  if (m_writeQueue.empty())
869  return; // There is nothing to send.
870 
871  for (auto i : m_writeQueue)
872  {
874  << "Sending to " << remoteEndIDString() << " a message: " << i->toString();
875  if (cmdSHUTDOWN == i->header().m_cmd)
876  m_isShuttingDown = true;
877  m_writeBuffer.push_back(boost::asio::buffer(i->data(), i->length()));
878  m_writeBufferQueue.push_back(i);
879  }
880  m_writeQueue.clear();
881  }
882 
883  auto self(this->shared_from_this());
884  boost::asio::async_write(
885  m_socket,
886  m_writeBuffer,
887  [this, self](boost::system::error_code _ec, std::size_t _bytesTransferred) {
888  try
889  {
890  if (!_ec)
891  {
892  LOG(MiscCommon::debug) << "Message successfully sent to " << remoteEndIDString() << " ("
893  << _bytesTransferred << " bytes)";
894 
895  if (m_isShuttingDown)
896  {
898  << "Shutdown signal has been successfully sent to " << remoteEndIDString();
899  stop();
900  }
901 
902  // lock the modification of the container
903  {
904  std::lock_guard<std::mutex> lock(m_mutexWriteBuffer);
905 
906  // Log write statistics
907  this->logWriteMessages(m_writeBufferQueue);
908 
909  m_writeBuffer.clear();
910  m_writeBufferQueue.clear();
911  }
912  // we might need to send more messages
913  writeMessage();
914  }
915  else if ((boost::asio::error::eof == _ec) || (boost::asio::error::connection_reset == _ec))
916  {
918  << "Disconnect is detected while on write message: " << _ec.message();
919  onDissconnect();
920  }
921  else
922  {
923  // don't show error if service is closed
924  if (m_started)
926  << "Error sending to " << remoteEndIDString() << ": " << _ec.message();
927  else
929  << "The stop signal is received, aborting current operation and "
930  "closing the connection: "
931  << _ec.message();
932  stop();
933  }
934  }
935  catch (std::exception& ex)
936  {
937  LOG(MiscCommon::error) << "BaseChannelImpl can't write message (callback): " << ex.what();
938  }
939  });
940  }
941 
942  void onDissconnect()
943  {
944  LOG(MiscCommon::debug) << "The session was disconnected by the remote end: " << remoteEndIDString();
945  // stopping the channel
946  stop();
947 
948  // give a chance to children to execute something
949  this->dispatchHandlers(EChannelEvents::OnRemoteEndDissconnected, SSenderInfo());
950  }
951 
952  bool isCmdAllowedWithoutHandshake(ECmdType _cmd)
953  {
954  if (m_isHandshakeOK)
955  return true;
956  return (_cmd == cmdHANDSHAKE || _cmd == cmdREPLY_HANDSHAKE_OK || _cmd == cmdREPLY_HANDSHAKE_ERR);
957  }
958 
959  private:
960  void close()
961  {
962  m_socket.close();
963  }
964 
965  protected:
968  std::string m_sessionID;
970 
971  private:
972  boost::asio::io_service& m_io_service;
973  boost::asio::ip::tcp::socket m_socket;
974  bool m_started;
976 
977  protocolMessagePtrQueue_t m_writeQueue;
978  protocolMessagePtrQueue_t m_writeQueueBeforeHandShake;
979 
980  std::mutex m_mutexWriteBuffer;
981  protocolMessageBuffer_t m_writeBuffer;
982  protocolMessagePtrQueue_t m_writeBufferQueue;
983 
984  // BinaryAttachment
985  typedef std::map<boost::uuids::uuid, binaryAttachmentInfoPtr_t> binaryAttachmentMap_t;
986  binaryAttachmentMap_t m_binaryAttachmentMap;
987  std::mutex m_binaryAttachmentMutex;
988 
989  protocolMessagePtrQueue_t m_accumulativeWriteQueue;
990  deadlineTimerPtr_t m_deadlineTimer;
991 
992  bool m_isShuttingDown;
993  };
994  }
995 }
996 
997 #endif /* defined(__DDS__BaseChannelImpl__) */
bool m_isHandshakeOK
Definition: BaseChannelImpl.h:966
std::shared_ptr< T > connectionPtr_t
Definition: BaseChannelImpl.h:216
std::string m_receivedFilePath
Path to the received file.
Definition: BinaryAttachmentReceivedCmd.h:23
void processBinaryAttachmentCmd(const SSenderInfo &_sender, SCommandAttachmentImpl< cmdBINARY_ATTACHMENT >::ptr_t _attachment)
Definition: BaseChannelImpl.h:599
Definition: BaseEventHandlersImpl.h:48
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentCmd.h:29
EChannelType
Definition: ProtocolDef.h:15
Definition: BaseChannelImpl.h:181
Definition: ProtocolCommands.h:34
void accumulativePushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd)
Definition: BaseChannelImpl.h:321
EChannelType m_channelType
Definition: BaseChannelImpl.h:967
DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelEventHandlersImpl) DDS_DECLARE_EVENT_HANDLER_CLASS(CChannelMessageHandlersImpl) protected
Definition: BaseChannelImpl.h:223
void pushBinaryAttachmentCmd(const std::string &_srcFilePath, const std::string &_fileName, uint16_t _cmdSource, uint64_t _protocolHeaderID)
Definition: BaseChannelImpl.h:503
Definition: ChannelEventHandlersImpl.h:29
uint32_t m_fileSize
File size in bytes.
Definition: BinaryAttachmentStartCmd.h:31
Definition: BinaryAttachmentReceivedCmd.h:15
static connectionPtr_t makeNew(boost::asio::io_service &_service, uint64_t _protocolHeaderID)
Definition: BaseChannelImpl.h:262
static std::string getDDSPath()
Definition: UserDefaults.cpp:251
Definition: CommandAttachmentImpl.h:56
bool started()
Definition: BaseChannelImpl.h:728
bool isHanshakeOK() const
Definition: BaseChannelImpl.h:269
uint32_t m_fileCrc32
File checksum.
Definition: BinaryAttachmentStartCmd.h:32
std::string getLockedSID() const
Definition: UserDefaults.cpp:476
std::shared_ptr< SEmptyCmd > ptr_t
Definition: CommandAttachmentImpl.h:66
void pushMsg(CProtocolMessage::protocolMessagePtr_t _msg, ECmdType _cmd, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:414
Definition: BinaryAttachmentStartCmd.h:21
void accumulativePushMsg(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:393
std::chrono::steady_clock::time_point m_startTime
Definition: BaseChannelImpl.h:199
Definition: ProtocolCommands.h:30
std::string m_sessionID
Definition: BaseChannelImpl.h:968
static CProtocolMessage::protocolMessagePtr_t encode(const SEmptyCmd &, uint64_t _ID)
Definition: CommandAttachmentImpl.h:73
#define DDS_DECLARE_EVENT_HANDLER_CLASS(theClass)
Definition: BaseEventHandlersImpl.h:39
MiscCommon::BYTEVector_t m_data
Definition: BaseChannelImpl.h:192
const std::array< std::string, 5 > gChannelTypeName
Definition: ProtocolDef.h:23
void pushMsg(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:480
#define LOG(severity)
Definition: Logger.h:54
std::weak_ptr< T > weakConnectionPtr_t
Definition: BaseChannelImpl.h:217
Definition: SimpleMsgCmd.h:16
Definition: ProtocolCommands.h:35
uint32_t m_size
Size of this piece of binary data.
Definition: BinaryAttachmentCmd.h:31
Definition: BinaryAttachmentCmd.h:21
EChannelType getChannelType() const
Definition: BaseChannelImpl.h:274
void start()
Definition: BaseChannelImpl.h:285
uint32_t m_fileSize
Definition: BaseChannelImpl.h:197
Definition: dds-agent/src/AgentConnectionManager.h:18
std::string remoteEndIDString()
Definition: BaseChannelImpl.h:733
Definition: def.h:153
uint32_t m_crc32
CRC checksum of this piece of binary data.
Definition: BinaryAttachmentCmd.h:32
uint32_t m_offset
Offset for this piece of binary data.
Definition: BinaryAttachmentCmd.h:30
Definition: BaseChannelImpl.h:205
void pushMsg(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:465
std::string m_fileName
Name of the file.
Definition: BinaryAttachmentStartCmd.h:30
void processBinaryAttachmentStartCmd(const SSenderInfo &_sender, SCommandAttachmentImpl< cmdBINARY_ATTACHMENT_START >::ptr_t _attachment)
Definition: BaseChannelImpl.h:573
Definition: BaseChannelImpl.h:43
void dequeueMsg()
Definition: BaseChannelImpl.h:310
void accumulativePushMsg(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:408
SBinaryAttachmentInfo()
Definition: BaseChannelImpl.h:183
Definition: BaseChannelImpl.h:41
static CUserDefaults & instance(const boost::uuids::uuid &_sid=CUserDefaults::getInitialSID())
Return singleton instance.
Definition: UserDefaults.cpp:45
Definition: def.h:152
uint32_t m_fileCrc32
Definition: BaseChannelImpl.h:195
void pushBinaryAttachmentCmd(const MiscCommon::BYTEVector_t &_data, const std::string &_fileName, uint16_t _cmdSource, uint64_t _protocolHeaderID)
Definition: BaseChannelImpl.h:527
Definition: def.h:149
std::vector< weakConnectionPtr_t > weakConnectionPtrVector_t
Definition: BaseChannelImpl.h:219
void smart_path(_T *_Path)
The function extends any environment variable found in the give path to its value.
Definition: SysHelper.h:93
Definition: StatImpl.h:112
std::shared_ptr< SBinaryAttachmentInfo > binaryAttachmentInfoPtr_t
Definition: BaseChannelImpl.h:202
void sendYourself(const A &_attachment, uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:487
void sendYourself(uint64_t _protocolHeaderID=0)
Definition: BaseChannelImpl.h:497
MiscCommon::BYTEVector_t m_data
Piece of binary data.
Definition: BinaryAttachmentCmd.h:33
std::vector< unsigned char > BYTEVector_t
An STL vector of bytes.
Definition: def.h:127
std::mutex m_mutex
Definition: BaseChannelImpl.h:198
boost::asio::ip::tcp::socket & socket()
Definition: BaseChannelImpl.h:304
boost::uuids::uuid m_fileId
Unique ID of the file.
Definition: BinaryAttachmentStartCmd.h:29
uint32_t m_bytesReceived
Definition: BaseChannelImpl.h:193
uint64_t m_ID
Definition: BaseEventHandlersImpl.h:55
uint16_t m_srcCommand
Source command which initiated file transport.
Definition: BinaryAttachmentStartCmd.h:33
std::string m_fileName
Definition: BaseChannelImpl.h:194
uint64_t getProtocolHeaderID() const
Definition: BaseChannelImpl.h:750
Definition: def.h:150
Definition: ProtocolCommands.h:31
std::vector< connectionPtr_t > connectionPtrVector_t
Definition: BaseChannelImpl.h:218
void setChannelType(EChannelType _channelType)
Definition: BaseChannelImpl.h:279
Definition: ChannelMessageHandlersImpl.h:20
uint16_t m_srcCommand
Definition: BaseChannelImpl.h:196
ECmdType
Definition: ProtocolCommands.h:25
Definition: ProtocolCommands.h:28
void stop()
Definition: BaseChannelImpl.h:296
uint64_t m_protocolHeaderID
Definition: BaseChannelImpl.h:969
std::shared_ptr< CProtocolMessage > protocolMessagePtr_t
Definition: ProtocolMessage.h:80