8 #define DRIPLINE_API_EXPORTS 16 #include "signal_handler.hh" 18 LOGGER(
dlog,
"monitor" );
25 core( a_config.has(
"dripline" ) ? a_config[
"dripline"].as_node() :
scarab::param_node() ),
27 f_status(
status::nothing ),
29 f_json_print( false ),
30 f_pretty_print( false ),
35 if( a_config.has(
"request-keys" ) && a_config[
"request-keys"].is_array() )
37 const scarab::param_array& t_req_keys = a_config[
"request-keys"].as_array();
38 f_requests_keys.reserve( t_req_keys.size() );
39 for(
auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
41 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << (*t_it)().as_string() <<
"> on the requests exchange" );
42 f_requests_keys.push_back( (*t_it)().as_string() );
46 if( a_config.has(
"request-key" ) && a_config[
"request-key"].is_value() )
48 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << a_config[
"request-key"]().as_string() <<
"> on the requests exchange" );
49 f_requests_keys.push_back( a_config[
"request-key"]().as_string() );
53 if( a_config.has(
"alert-keys" ) && a_config[
"alert-keys"].is_array() )
55 const scarab::param_array& t_req_keys = a_config[
"alert-keys"].as_array();
56 f_requests_keys.reserve( t_req_keys.size() );
57 for(
auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
59 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << (*t_it)().as_string() <<
"> on the alerts exchange" );
60 f_alerts_keys.push_back( (*t_it)().as_string() );
64 if( a_config.has(
"alert-key" ) && a_config[
"alert-key"].is_value() )
66 LPROG(
dlog,
"Monitor <" << f_name <<
"> will monitor key <" << a_config[
"alert-key"]().as_string() <<
"> on the alerts exchange" );
67 f_alerts_keys.push_back( a_config[
"alert-key"]().as_string() );
75 f_status( a_orig.f_status ),
76 f_name(
std::move(a_orig.f_name) ),
77 f_json_print( a_orig.f_json_print ),
78 f_pretty_print( a_orig.f_pretty_print ),
79 f_requests_keys(
std::move(a_orig.f_requests_keys) ),
80 f_alerts_keys(
std::move(a_orig.f_alerts_keys) )
83 a_orig.f_json_print =
false;
84 a_orig.f_pretty_print =
false;
92 std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
102 f_status = a_orig.f_status;
104 f_name = std::move(a_orig.f_name);
105 f_requests_keys = std::move(a_orig.f_requests_keys);
106 f_alerts_keys = std::move(a_orig.f_alerts_keys);
114 LERROR(
dlog,
"Monitor is not in the right status to start" );
118 if( f_requests_keys.empty() && f_alerts_keys.empty() )
120 LERROR(
dlog,
"No keys provided to monitor" );
124 LINFO(
dlog,
"Connecting to <" << f_address <<
":" << f_port <<
">" );
126 LDEBUG(
dlog,
"Opening channel for message monitor <" << f_name <<
">" );
128 if( ! f_channel )
return false;
131 if( !
setup_exchange( f_channel, f_requests_exchange ) )
return false;
132 if( !
setup_exchange( f_channel, f_alerts_exchange ) )
return false;
135 LDEBUG(
dlog,
"Setting up queue for message monitor <" << f_name <<
">" );
136 if( !
setup_queue( f_channel, f_name ) )
return false;
143 if( f_consumer_tag.empty() )
return false;
151 scarab::signal_handler t_sig_hand;
152 t_sig_hand.add_cancelable(
this );
156 LERROR(
dlog,
"Monitor is not in the right status to listen" );
168 f_receiver_thread.join();
170 catch( std::system_error& e )
172 LERROR(
dlog,
"Could not start the a thread due to a system error: " << e.what() );
177 LERROR(
dlog,
"Dripline error while running monitor: " << e.what() );
180 catch( std::exception& e )
182 LERROR(
dlog,
"Error while running monitor: " << e.what() );
192 LINFO(
dlog,
"Stopping message monitor <" << f_name <<
">" );
217 LDEBUG(
dlog,
"Binding request keys for message monitor <" << f_name <<
">" );
218 for(
auto t_req_key_it = f_requests_keys.begin(); t_req_key_it != f_requests_keys.end(); ++t_req_key_it )
220 if( !
bind_key( f_channel, f_requests_exchange, f_name, *t_req_key_it ) )
return false;
223 LDEBUG(
dlog,
"Binding alerts keys for message monitor <" << f_name <<
">" );
224 for(
auto t_al_key_it = f_alerts_keys.begin(); t_al_key_it != f_alerts_keys.end(); ++t_al_key_it )
226 if( !
bind_key( f_channel, f_alerts_exchange, f_name, *t_al_key_it ) )
return false;
234 LINFO(
dlog,
"Listening for incoming messages on <" << f_name <<
">" );
236 while( ! is_canceled() )
241 if( f_canceled.load() )
243 LDEBUG(
dlog,
"Monitor <" << f_name <<
"> canceled" );
247 if( ! t_envelope && t_channel_valid )
255 if( ! t_channel_valid )
257 LERROR(
dlog,
"Channel is no longer valid for monitor <" << f_name <<
">" );
261 if( f_canceled.load() )
263 LDEBUG(
dlog,
"Monitor <" << f_name <<
"> canceled" );
274 if( ! f_json_print && ! f_pretty_print )
276 if( a_message->is_request() )
278 LPROG(
dlog, *std::static_pointer_cast< msg_request >( a_message ) );
281 if( a_message->is_reply() )
283 LPROG(
dlog, *std::static_pointer_cast< msg_reply >( a_message ) );
286 if( a_message->is_alert() )
288 LPROG(
dlog, *std::static_pointer_cast< msg_alert >( a_message ) );
291 LPROG(
dlog, *a_message );
296 scarab::param_node t_encoding_options;
299 t_encoding_options.add(
"style",
"pretty" );
301 std::string t_encoded_message = a_message->encode_full_message( 5000, t_encoding_options );
302 LPROG(
dlog, t_encoded_message );
308 LERROR(
dlog,
"<" << f_name <<
"> Dripline exception caught while handling message: " << e.what() );
310 catch( std::exception& e )
312 LERROR(
dlog,
"<" << f_name <<
"> Standard exception caught while sending reply: " << e.what() );
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
Listens for messages sent to a particular set of keys and prints them.
bool start()
Opens the AMQP connection, binds keys, and starts consuming.
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
monitor & operator=(const monitor &)=delete
concurrent_receiver & operator=(const concurrent_receiver &)=delete
Dripline-specific errors.
void handle_message_chunk(amqp_envelope_ptr a_envelope)
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
bool listen()
Starts actively listening for and handling messages (blocking).
static scarab::logger dlog("monitor")
virtual bool listen_on_queue()
Waits for a single AMQP message and processes it.
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Convenience class to bring together listener and concurrent_receiver.
virtual void submit_message(message_ptr_t a_message)
amqp_channel_ptr open_channel() const
static scarab::logger dlog("agent")
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
monitor(const scarab::param_node &a_config=scarab::param_node())
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
bool stop()
Stops listening for messages and closes the AMQP connection.
void execute()
Handles messages that appear in the concurrent queue by calling submit_message(). ...
listener & operator=(const listener &)=delete
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
std::shared_ptr< message > message_ptr_t
core & operator=(const core &a_orig)