Dripline-Cpp  v2.4.2
Dripline Implementation in C++
heartbeater.cc
Go to the documentation of this file.
1 /*
2  * heartbeat.cc
3  *
4  * Created on: Apr 26, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "heartbeater.hh"
11 
12 #include "message.hh"
13 #include "service.hh"
14 
15 #include "logger.hh"
16 #include "param_node.hh"
17 
18 #include <iomanip>
19 
20 LOGGER( dlog, "heartbeater" );
21 
22 namespace dripline
23 {
24 
26  cancelable(),
27  f_heartbeat_interval_s( 60 ),
28  f_check_timeout_ms( 1000 ),
29  f_service( a_service ),
30  f_heartbeat_thread()
31  {}
32 
34  cancelable( std::move(a_orig) ),
35  f_heartbeat_interval_s( a_orig.f_heartbeat_interval_s ),
36  f_check_timeout_ms( a_orig.f_check_timeout_ms ),
37  f_service( std::move(a_orig.f_service) ),
39  {}
40 
42  {}
43 
45  {
46  f_heartbeat_interval_s = std::move(a_orig.f_heartbeat_interval_s);
47  f_check_timeout_ms = std::move(a_orig.f_check_timeout_ms);
48  f_service = std::move(a_orig.f_service);
49  f_heartbeat_thread = std::move(a_orig.f_heartbeat_thread);
50  return *this;
51  }
52 
53  void heartbeater::execute( const std::string& a_name, uuid_t a_id, const std::string& a_routing_key )
54  {
55  if( ! f_service )
56  {
57  throw dripline_error() << "Unable to start heartbeater because service pointer is not set";
58  }
59 
60  if( f_heartbeat_interval_s == 0 )
61  {
62  LINFO( dlog, "Heartbeat disabled" );
63  return;
64  }
65 
66  scarab::param_ptr_t t_payload_ptr( new scarab::param_node() );
67  scarab::param_node& t_payload = t_payload_ptr->as_node();
68  t_payload.add( "name", a_name );
69  t_payload.add( "id", string_from_uuid(a_id) );
70 
71  routing_key t_key;
72  t_key.push_back( a_routing_key );
73  t_key.push_back( a_name );
74 
75  alert_ptr_t t_alert_ptr = msg_alert::create( std::move(t_payload_ptr), t_key.to_string() );
76 
77  LINFO( dlog, "Starting heartbeat loop" );
78 
79  auto t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
80  while( ! f_canceled.load() )
81  {
82  // wait the interval
83  std::this_thread::sleep_for( std::chrono::milliseconds( f_check_timeout_ms ) );
84 
85  // send the message
86  if( std::chrono::steady_clock().now() >= t_next_heartbeat_at && ! f_canceled.load() )
87  {
88  LDEBUG( dlog, "Sending heartbeat" );
89  // reset message ID so it's different for each heartbeat
90  t_alert_ptr->message_id() = string_from_uuid( generate_random_uuid() );
91 
92  sent_msg_pkg_ptr t_receive_reply;
93  try
94  {
95  t_receive_reply = f_service->send( t_alert_ptr );
96 
97  if( ! t_receive_reply->f_successful_send )
98  {
99  LERROR( dlog, "Failed to send reply:\n" + t_receive_reply->f_send_error_message );
100  }
101  }
102  catch( message_ptr_t )
103  {
104  LWARN( dlog, "Operating in offline mode; message not sent" );
105  }
106  catch( connection_error& e )
107  {
108  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
109  }
110  catch( dripline_error& e )
111  {
112  LERROR( dlog, "Dripline error while sending reply:\n" << e.what() );
113  }
114 
115  t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
116  }
117  }
118 
119  return;
120  }
121 
122 } /* namespace dripline */
123 
124 
Parses routing keys and stores the tokenized information.
Definition: specifier.hh:27
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
Definition: heartbeater.cc:53
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
STL namespace.
std::thread f_heartbeat_thread
Definition: heartbeater.hh:83
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:92
static scarab::logger dlog("heartbeater")
boost::uuids::uuid uuid_t
Universally-unique-identifier type containing 16 hexadecimal characters.
Definition: uuid.hh:26
heartbeater & operator=(const heartbeater &)=delete
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
Definition: message.cc:556
static scarab::logger dlog("agent")
std::string to_string() const
Converts the routing-key tokens into a single string.
Definition: specifier.cc:37
Error indicating a problem with the connection to the broker.
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
A heartbeater repeatedly sends an alert on a particular time interval.
Definition: heartbeater.hh:52
std::shared_ptr< service > service_ptr_t
Definition: dripline_fwd.hh:42
heartbeater(service_ptr_t a_service=service_ptr_t())
Primary constructor. A service pointer is required to be able to send messages.
Definition: heartbeater.cc:25