Dripline-Cpp  v2.4.2
Dripline Implementation in C++
monitor.cc
Go to the documentation of this file.
1 /*
2  * monitor.cc
3  *
4  * Created on: Jul 1, 2019
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "monitor.hh"
11 
12 #include "dripline_exceptions.hh"
13 #include "uuid.hh"
14 
15 #include "logger.hh"
16 #include "signal_handler.hh"
17 
18 LOGGER( dlog, "monitor" );
19 
20 namespace dripline
21 {
22 
23  monitor::monitor( const scarab::param_node& a_config ) :
24  scarab::cancelable(),
25  core( a_config.has( "dripline" ) ? a_config["dripline"].as_node() : scarab::param_node() ),
27  f_status( status::nothing ),
28  f_name( std::string("monitor_") + string_from_uuid(generate_random_uuid()) ),
29  f_json_print( false ),
30  f_pretty_print( false ),
31  f_requests_keys(),
32  f_alerts_keys()
33  {
34  // get requests keys
35  if( a_config.has( "request-keys" ) && a_config["request-keys"].is_array() )
36  {
37  const scarab::param_array& t_req_keys = a_config["request-keys"].as_array();
38  f_requests_keys.reserve( t_req_keys.size() );
39  for( auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
40  {
41  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << (*t_it)().as_string() << "> on the requests exchange" );
42  f_requests_keys.push_back( (*t_it)().as_string() );
43  }
44  }
45 
46  if( a_config.has( "request-key" ) && a_config["request-key"].is_value() )
47  {
48  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << a_config["request-key"]().as_string() << "> on the requests exchange" );
49  f_requests_keys.push_back( a_config["request-key"]().as_string() );
50  }
51 
52  // get alerts keys
53  if( a_config.has( "alert-keys" ) && a_config["alert-keys"].is_array() )
54  {
55  const scarab::param_array& t_req_keys = a_config["alert-keys"].as_array();
56  f_requests_keys.reserve( t_req_keys.size() );
57  for( auto t_it = t_req_keys.begin(); t_it != t_req_keys.end(); ++t_it )
58  {
59  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << (*t_it)().as_string() << "> on the alerts exchange" );
60  f_alerts_keys.push_back( (*t_it)().as_string() );
61  }
62  }
63 
64  if( a_config.has( "alert-key" ) && a_config["alert-key"].is_value() )
65  {
66  LPROG( dlog, "Monitor <" << f_name << "> will monitor key <" << a_config["alert-key"]().as_string() << "> on the alerts exchange" );
67  f_alerts_keys.push_back( a_config["alert-key"]().as_string() );
68  }
69  }
70 
71  monitor::monitor( monitor&& a_orig ) :
72  scarab::cancelable( std::move(a_orig) ),
73  core( std::move(a_orig) ),
74  listener_receiver( std::move(a_orig) ),
75  f_status( a_orig.f_status ),
76  f_name( std::move(a_orig.f_name) ),
77  f_json_print( a_orig.f_json_print ),
78  f_pretty_print( a_orig.f_pretty_print ),
79  f_requests_keys( std::move(a_orig.f_requests_keys) ),
80  f_alerts_keys( std::move(a_orig.f_alerts_keys) )
81  {
82  a_orig.f_status = status::nothing;
83  a_orig.f_json_print = false;
84  a_orig.f_pretty_print = false;
85  }
86 
88  {
89  if( f_status >= status::listening )
90  {
91  this->cancel( dl_success().rc_value() );
92  std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
93  }
94  if( f_status > status::exchange_declared ) stop();
95  }
96 
98  {
99  core::operator=( std::move(a_orig) );
100  listener::operator=( std::move(a_orig) );
101  concurrent_receiver::operator=( std::move(a_orig) );
102  f_status = a_orig.f_status;
103  a_orig.f_status = status::nothing;
104  f_name = std::move(a_orig.f_name);
105  f_requests_keys = std::move(a_orig.f_requests_keys);
106  f_alerts_keys = std::move(a_orig.f_alerts_keys);
107  return *this;
108  }
109 
111  {
112  if( f_status != status::nothing )
113  {
114  LERROR( dlog, "Monitor is not in the right status to start" );
115  return false;
116  }
117 
118  if( f_requests_keys.empty() && f_alerts_keys.empty() )
119  {
120  LERROR( dlog, "No keys provided to monitor" );
121  return false;
122  }
123 
124  LINFO( dlog, "Connecting to <" << f_address << ":" << f_port << ">" );
125 
126  LDEBUG( dlog, "Opening channel for message monitor <" << f_name << ">" );
127  f_channel = open_channel();
128  if( ! f_channel ) return false;
129  f_status = status::channel_created;
130 
131  if( ! setup_exchange( f_channel, f_requests_exchange ) ) return false;
132  if( ! setup_exchange( f_channel, f_alerts_exchange ) ) return false;
133  f_status = status::exchange_declared;
134 
135  LDEBUG( dlog, "Setting up queue for message monitor <" << f_name << ">" );
136  if( ! setup_queue( f_channel, f_name ) ) return false;
137  f_status = status::queue_declared;
138 
139  if( ! bind_keys() ) return false;
140  f_status = status::queue_bound;
141 
142  f_consumer_tag = start_consuming( f_channel, f_name );
143  if( f_consumer_tag.empty() ) return false;
144  f_status = status::consuming;
145 
146  return true;
147  }
148 
150  {
151  scarab::signal_handler t_sig_hand;
152  t_sig_hand.add_cancelable( this );
153 
154  if( f_status != status::consuming )
155  {
156  LERROR( dlog, "Monitor is not in the right status to listen" );
157  return false;
158  }
159 
160  f_status = status::listening;
161 
162  try
163  {
164  f_receiver_thread = std::thread( &concurrent_receiver::execute, this );
165 
166  listen_on_queue();
167 
168  f_receiver_thread.join();
169  }
170  catch( std::system_error& e )
171  {
172  LERROR( dlog, "Could not start the a thread due to a system error: " << e.what() );
173  return false;
174  }
175  catch( dripline_error& e )
176  {
177  LERROR( dlog, "Dripline error while running monitor: " << e.what() );
178  return false;
179  }
180  catch( std::exception& e )
181  {
182  LERROR( dlog, "Error while running monitor: " << e.what() );
183  return false;
184  }
185 
186  return true;
187 
188  }
189 
191  {
192  LINFO( dlog, "Stopping message monitor <" << f_name << ">" );
193 
194  if( f_status >= status::listening ) // listening
195  {
196  this->cancel( dl_success().rc_value() );
197  f_status = status::consuming;
198  }
199 
200  if( f_status >= status::queue_bound ) // queue_bound or consuming
201  {
202  if( ! stop_consuming( f_channel, f_consumer_tag ) ) return false;
203  f_status = status::queue_bound;
204  }
205 
206  if( f_status >= status::queue_declared ) // queue_declared or queue_bound
207  {
208  if( ! remove_queue( f_channel, f_name ) ) return false;
209  f_status = status::exchange_declared;
210  }
211 
212  return true;
213  }
214 
216  {
217  LDEBUG( dlog, "Binding request keys for message monitor <" << f_name << ">" );
218  for( auto t_req_key_it = f_requests_keys.begin(); t_req_key_it != f_requests_keys.end(); ++t_req_key_it )
219  {
220  if( ! bind_key( f_channel, f_requests_exchange, f_name, *t_req_key_it ) ) return false;
221  }
222 
223  LDEBUG( dlog, "Binding alerts keys for message monitor <" << f_name << ">" );
224  for( auto t_al_key_it = f_alerts_keys.begin(); t_al_key_it != f_alerts_keys.end(); ++t_al_key_it )
225  {
226  if( ! bind_key( f_channel, f_alerts_exchange, f_name, *t_al_key_it ) ) return false;
227  }
228 
229  return true;
230  }
231 
233  {
234  LINFO( dlog, "Listening for incoming messages on <" << f_name << ">" );
235 
236  while( ! is_canceled() )
237  {
238  amqp_envelope_ptr t_envelope;
239  bool t_channel_valid = core::listen_for_message( t_envelope, f_channel, f_consumer_tag, f_listen_timeout_ms );
240 
241  if( f_canceled.load() )
242  {
243  LDEBUG( dlog, "Monitor <" << f_name << "> canceled" );
244  return true;
245  }
246 
247  if( ! t_envelope && t_channel_valid )
248  {
249  // we end up here every time the listen times out with no message received
250  continue;
251  }
252 
253  handle_message_chunk( t_envelope );
254 
255  if( ! t_channel_valid )
256  {
257  LERROR( dlog, "Channel is no longer valid for monitor <" << f_name << ">" );
258  return false;
259  }
260 
261  if( f_canceled.load() )
262  {
263  LDEBUG( dlog, "Monitor <" << f_name << "> canceled" );
264  return true;
265  }
266  }
267  return true;
268  }
269 
271  {
272  try
273  {
274  if( ! f_json_print && ! f_pretty_print )
275  {
276  if( a_message->is_request() )
277  {
278  LPROG( dlog, *std::static_pointer_cast< msg_request >( a_message ) );
279  return;
280  }
281  if( a_message->is_reply() )
282  {
283  LPROG( dlog, *std::static_pointer_cast< msg_reply >( a_message ) );
284  return;
285  }
286  if( a_message->is_alert() )
287  {
288  LPROG( dlog, *std::static_pointer_cast< msg_alert >( a_message ) );
289  return;
290  }
291  LPROG( dlog, *a_message );
292  return;
293  }
294  else
295  {
296  scarab::param_node t_encoding_options;
297  if( f_pretty_print )
298  {
299  t_encoding_options.add( "style", "pretty" );
300  }
301  std::string t_encoded_message = a_message->encode_full_message( 5000, t_encoding_options );
302  LPROG( dlog, t_encoded_message );
303  return;
304  }
305  }
306  catch( dripline_error& e )
307  {
308  LERROR( dlog, "<" << f_name << "> Dripline exception caught while handling message: " << e.what() );
309  }
310  catch( std::exception& e )
311  {
312  LERROR( dlog, "<" << f_name << "> Standard exception caught while sending reply: " << e.what() );
313  }
314 
315  return;
316  }
317 
318 } /* namespace dripline */
virtual ~monitor()
Definition: monitor.cc:87
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
Definition: core.cc:432
Listens for messages sent to a particular set of keys and prints them.
Definition: monitor.hh:39
bool start()
Opens the AMQP connection, binds keys, and starts consuming.
Definition: monitor.cc:110
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
Definition: core.cc:330
STL namespace.
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
Definition: core.cc:498
monitor & operator=(const monitor &)=delete
concurrent_receiver & operator=(const concurrent_receiver &)=delete
Dripline-specific errors.
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Definition: receiver.cc:73
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
Definition: core.cc:381
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:407
Definition: core.hh:17
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:468
bool listen()
Starts actively listening for and handling messages (blocking).
Definition: monitor.cc:149
static scarab::logger dlog("monitor")
virtual bool listen_on_queue()
Waits for a single AMQP message and processes it.
Definition: monitor.cc:232
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:92
Convenience class to bring together listener and concurrent_receiver.
Definition: listener.hh:75
virtual void submit_message(message_ptr_t a_message)
Definition: monitor.cc:270
amqp_channel_ptr open_channel() const
Definition: core.cc:296
static scarab::logger dlog("agent")
bool bind_keys()
Definition: monitor.cc:215
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
monitor(const scarab::param_node &a_config=scarab::param_node())
Definition: monitor.cc:23
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:355
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
bool stop()
Stops listening for messages and closes the AMQP connection.
Definition: monitor.cc:190
void execute()
Handles messages that appear in the concurrent queue by calling submit_message(). ...
Definition: receiver.cc:393
listener & operator=(const listener &)=delete
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:72
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
core & operator=(const core &a_orig)
Definition: core.cc:150