8 #define DRIPLINE_API_EXPORTS 16 #include "param_node.hh" 20 LOGGER(
dlog,
"heartbeater" );
27 f_heartbeat_interval_s( 60 ),
28 f_check_timeout_ms( 1000 ),
29 f_service( a_service ),
34 cancelable(
std::move(a_orig) ),
35 f_heartbeat_interval_s( a_orig.f_heartbeat_interval_s ),
36 f_check_timeout_ms( a_orig.f_check_timeout_ms ),
37 f_service(
std::move(a_orig.f_service) ),
46 f_heartbeat_interval_s = std::move(a_orig.f_heartbeat_interval_s);
47 f_check_timeout_ms = std::move(a_orig.f_check_timeout_ms);
48 f_service = std::move(a_orig.f_service);
57 throw dripline_error() <<
"Unable to start heartbeater because service pointer is not set";
60 if( f_heartbeat_interval_s == 0 )
62 LINFO(
dlog,
"Heartbeat disabled" );
66 scarab::param_ptr_t t_payload_ptr(
new scarab::param_node() );
67 scarab::param_node& t_payload = t_payload_ptr->as_node();
68 t_payload.add(
"name", a_name );
72 t_key.push_back( a_routing_key );
73 t_key.push_back( a_name );
77 LINFO(
dlog,
"Starting heartbeat loop" );
79 auto t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
80 while( ! f_canceled.load() )
83 std::this_thread::sleep_for( std::chrono::milliseconds( f_check_timeout_ms ) );
86 if( std::chrono::steady_clock().now() >= t_next_heartbeat_at && ! f_canceled.load() )
88 LDEBUG(
dlog,
"Sending heartbeat" );
95 t_receive_reply = f_service->send( t_alert_ptr );
97 if( ! t_receive_reply->f_successful_send )
99 LERROR(
dlog,
"Failed to send reply:\n" + t_receive_reply->f_send_error_message );
104 LWARN(
dlog,
"Operating in offline mode; message not sent" );
108 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
112 LERROR(
dlog,
"Dripline error while sending reply:\n" << e.what() );
115 t_next_heartbeat_at = std::chrono::steady_clock().now() + std::chrono::seconds( f_heartbeat_interval_s );
Parses routing keys and stores the tokenized information.
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
std::thread f_heartbeat_thread
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
static scarab::logger dlog("heartbeater")
boost::uuids::uuid uuid_t
Universally-unique-identifier type containing 16 hexadecimal characters.
heartbeater & operator=(const heartbeater &)=delete
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
static scarab::logger dlog("agent")
std::string to_string() const
Converts the routing-key tokens into a single string.
Error indicating a problem with the connection to the broker.
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
std::shared_ptr< message > message_ptr_t
A heartbeater repeatedly sends an alert on a particular time interval.
std::shared_ptr< service > service_ptr_t
heartbeater(service_ptr_t a_service=service_ptr_t())
Primary constructor. A service pointer is required to be able to send messages.