Dripline-Cpp  v2.4.2
Dripline Implementation in C++
agent.cc
Go to the documentation of this file.
1 /*
2  * agent.cc
3  *
4  * Created on: Jun 2, 2016
5  * Author: nsoblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "agent.hh"
11 
12 #include "agent_config.hh"
13 #include "core.hh"
14 #include "dripline_constants.hh"
15 #include "dripline_exceptions.hh"
16 #include "dripline_version.hh"
17 #include "receiver.hh"
18 #include "uuid.hh"
19 
20 #include "logger.hh"
21 #include "param_codec.hh"
22 #include "path.hh"
23 #include "signal_handler.hh"
24 
25 #include <algorithm> // for min
26 #include <string>
27 
28 // In Windows there's a preprocessor macro called uuid_t that conflicts with this typdef
29 #ifdef uuid_t
30 #undef uuid_t
31 #endif
32 
33 using scarab::param;
34 using scarab::param_array;
35 using scarab::param_node;
36 using scarab::param_ptr_t;
37 using scarab::param_value;
38 
39 namespace dripline
40 {
41  LOGGER( dlog, "agent" );
42 
44  f_is_dry_run( false ),
45  f_routing_key(),
46  f_specifier(),
47  f_lockout_key( generate_nil_uuid() ),
48  f_return_code( dl_success().rc_value() ),
49  f_return_message(),
50  f_timeout( 0 ),
51  f_suppress_output( false ),
52  f_json_print( false ),
53  f_pretty_print( false ),
54  f_save_filename(),
55  f_reply(),
56  f_return( dl_client_error().rc_value() )
57  {
58  }
59 
61  {
62  }
63 
64  void agent::sub_agent::execute( const scarab::param_node& a_config )
65  {
66  const scarab::param_array a_ord_args;
67  execute( a_config, a_ord_args );
68  }
69 
70  void agent::sub_agent::execute( const scarab::param_node& a_config, const scarab::param_array& a_ord_args )
71  {
72  LINFO( dlog, "Creating message" );
73 
74  // create a copy of the config that will be pared down by removing expected elements
75  param_node t_config( a_config );
76 
77  param_node t_dripline_node;
78  if( t_config.has( "dripline" ) )
79  {
80  t_dripline_node = std::move(t_config.remove( "dripline" )->as_node());
81  }
82 
83  core t_core( t_dripline_node );
84 
85  f_agent->set_timeout( t_config.get_value( "timeout", 10U ) * 1000 ); // convert seconds (dripline agent user interface) to milliseconds (expected by SimpleAmqpClient)
86  t_config.erase( "timeout" );
87  f_agent->set_json_print( t_config.get_value( "json-print", f_agent->get_json_print() ) );
88  t_config.erase( "json-print" );
89  f_agent->set_pretty_print( t_config.get_value( "pretty-print", f_agent->get_pretty_print() ) );
90  t_config.erase( "pretty-print" );
91  f_agent->set_suppress_output( t_config.get_value( "suppress-output", f_agent->get_suppress_output() ) );
92  t_config.erase( "suppress-output" );
93 
94  f_agent->routing_key() = t_config.get_value( "rk", f_agent->routing_key() );
95  t_config.erase( "rk" );
96 
97  f_agent->specifier() = t_config.get_value( "specifier", f_agent->specifier() );
98  t_config.erase( "specifier" );
99 
100  if( t_config.has( "lockout-key" ) )
101  {
102  bool t_lk_valid = true;
103  f_agent->lockout_key() = dripline::uuid_from_string( t_config["lockout-key"]().as_string(), t_lk_valid );
104  t_config.erase( "lockout-key" );
105  if( ! t_lk_valid )
106  {
107  LERROR( dlog, "Invalid lockout key provided: <" << t_config.get_value( "lockout-key", "" ) << ">" );
108  f_agent->set_return( dl_client_error().rc_value() );
109  return;
110  }
111  }
112 
113  if( t_config.has( "return" ) )
114  {
115  f_agent->set_return_code( t_config["return"].as_node().get_value( "code", dl_success().rc_value() ) );
116  f_agent->return_message() = t_config["return"].as_node().get_value( "message", "" );
117  t_config.erase( "return" );
118  }
119 
120  f_agent->save_filename() = t_config.get_value( "save", "" );
121  t_config.erase( "save" );
122 
123  // load the values array, merged in the proper order
124  scarab::param_array t_values;
125  if( t_config.has( "values" ) )
126  {
127  t_values.merge( t_config["values"].as_array() );
128  t_config.erase( "values" );
129  }
130  t_values.merge( a_ord_args );
131  if( t_config.has( "option-values" ) )
132  {
133  t_values.merge( t_config["option-values"].as_array() );
134  t_config.erase( "option-values" );
135  }
136  if( ! t_values.empty() )
137  {
138  t_config.add( "values", t_values );
139  }
140 
141  // check if this is meant to be a dry run message
142  if( t_config.has( "dry-run-msg" ) )
143  {
144  t_config.erase( "dry-run-msg" );
145  f_agent->set_is_dry_run( true );
146  }
147 
148  this->create_and_send_message( t_config, t_core );
149 
150  return;
151  }
152 
153  void agent::sub_agent_request::create_and_send_message( scarab::param_node& a_config, const core& a_core )
154  {
155  // create the request
156  request_ptr_t t_request = this->create_request( a_config );
157  LDEBUG( dlog, "message payload to send is: " << t_request->payload() );
158 
159  if( ! t_request )
160  {
161  LERROR( dlog, "Unable to create request" );
162  f_agent->set_return( dl_client_error_invalid_request().rc_value() );
163  return;
164  }
165 
166  // if this is a dry run, we print the message and stop here
167  if( f_agent->get_is_dry_run() )
168  {
169  LPROG( dlog, "Request (routing key = " << f_agent->routing_key() << "; specifier = " << f_agent->specifier() << "):\n" << *t_request );
170  f_agent->set_return( dl_warning_dry_run().rc_value() );
171  return;
172  }
173 
174  // now all that remains in f_config should be values to pass to the server as arguments to the request
175 
176  t_request->lockout_key() = f_agent->lockout_key();
177 
178  LINFO( dlog, "Sending message w/ message_operation = " << t_request->get_message_operation() << " to " << t_request->routing_key() );
179  LDEBUG( dlog, "Message headers:\n" << t_request->get_message_param( false ) );
180 
181  sent_msg_pkg_ptr t_receive_reply;
182  try
183  {
184  t_receive_reply = a_core.send( t_request );
185  }
186  catch( message_ptr_t )
187  {
188  LWARN( dlog, "Operating in offline mode; message not sent" );
189  f_agent->set_return( dl_warning_offline().rc_value() );
190  return;
191  }
192  catch( connection_error& e )
193  {
194  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
195  f_agent->set_return( dl_amqp_error_broker_connection().rc_value() );
196  return;
197  }
198  catch( dripline_error& e )
199  {
200  LERROR( dlog, "Unable to send request:\n" << e.what() );
201  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
202  return;
203  }
204 
205  if( ! t_receive_reply->f_successful_send )
206  {
207  LERROR( dlog, "Unable to send request:\n" + t_receive_reply->f_send_error_message );
208  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
209  return;
210  }
211 
212  if( ! t_receive_reply->f_consumer_tag.empty() ) // this indicates that the reply queue was created, and we've started consuming on it; we should wait for a reply
213  {
214  LINFO( dlog, "Waiting for a reply from the server; use ctrl-c to cancel" );
215 
216  // timed blocking call to wait for incoming message
217  receiver t_msg_receiver;
218  scarab::signal_handler::add_cancelable_s( &t_msg_receiver );
219  dripline::reply_ptr_t t_reply = t_msg_receiver.wait_for_reply( t_receive_reply, f_agent->get_timeout() );
220  scarab::signal_handler::remove_cancelable_s( &t_msg_receiver );
221 
222  if( t_reply )
223  {
224  LINFO( dlog, "Response received" );
225  f_agent->set_return( t_reply->get_return_code() );
226 
227  const param& t_payload = t_reply->payload();
228 
229  LPROG( dlog, "Response:\n" <<
230  "Return code: " << t_reply->get_return_code() << '\n' <<
231  "Return message: " << t_reply->return_message() << '\n' <<
232  t_payload );
233 
234  if( ! f_agent->get_suppress_output() )
235  {
236  if( ! f_agent->get_json_print() && ! f_agent->get_pretty_print() )
237  {
238  std::cout << *t_reply << std::endl;
239  }
240  else
241  {
242  param_node t_encoding_options;
243  if( f_agent->get_pretty_print() )
244  {
245  t_encoding_options.add( "style", "pretty" );
246  }
247  std::string t_encoded_message = t_reply->encode_full_message( 5000, t_encoding_options );
248  std::cout << t_encoded_message << std::endl;
249  }
250  }
251 
252  if( ! f_agent->save_filename().empty() && ! t_payload.is_null() )
253  {
254  scarab::param_translator t_translator;
255  if( ! t_translator.write_file( t_payload, f_agent->save_filename() ) )
256  {
257  LERROR( dlog, "Unable to write out payload" );
258  f_agent->set_return( dl_client_error_handling_reply().rc_value() );
259  }
260  }
261  }
262  else
263  {
264  LWARN( dlog, "Timed out or error while waiting for reply" );
265  f_agent->set_return( dl_client_error_timeout().rc_value() );
266  }
267  f_agent->set_reply( t_reply );
268  }
269  else
270  {
271  f_agent->set_return( dl_success().rc_value() );
272  }
273 
274  return;
275  }
276 
277  void agent::sub_agent_reply::create_and_send_message( scarab::param_node& a_config, const core& a_core )
278  {
279  // create the alert
280  param_ptr_t t_payload_ptr( new param_node( a_config ) );
281 
282  reply_ptr_t t_reply = msg_reply::create( f_agent->get_return_code(),
283  f_agent->return_message(),
284  std::move(t_payload_ptr),
285  f_agent->routing_key(),
286  f_agent->specifier() );
287  LDEBUG( dlog, "reply payload to send is: " << t_reply->payload() );
288 
289  if( ! t_reply )
290  {
291  LERROR( dlog, "Unable to create reply" );
292  f_agent->set_return( dl_client_error_invalid_request().rc_value() );
293  return;
294  }
295 
296  // if this is a dry run, we print the message and stop here
297  if( f_agent->get_is_dry_run() )
298  {
299  LPROG( dlog, "Reply (routing key = " << f_agent->routing_key() << "; specifier = " << f_agent->specifier() << "):\n" << *t_reply );
300  f_agent->set_return( dl_warning_dry_run().rc_value() );
301  return;
302  }
303 
304  LINFO( dlog, "Sending reply with return code <" << t_reply->get_return_code() << "> and message <" << t_reply->return_message() << "> to key " << t_reply->routing_key() );
305  LDEBUG( dlog, "Message headers:\n" << t_reply->get_message_param( false ) );
306 
307  sent_msg_pkg_ptr t_msg_sent;
308  try
309  {
310  t_msg_sent = a_core.send( t_reply );
311  }
312  catch( message_ptr_t )
313  {
314  LWARN( dlog, "Operating in offline mode; message not sent" );
315  f_agent->set_return( dl_warning_offline().rc_value() );
316  return;
317  }
318  catch( connection_error& e )
319  {
320  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
321  f_agent->set_return( dl_amqp_error_broker_connection().rc_value() );
322  return;
323  }
324  catch( dripline_error& e )
325  {
326  LERROR( dlog, "Unable to send reply:\n" << e.what() );
327  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
328  return;
329  }
330 
331  if( ! t_msg_sent->f_successful_send )
332  {
333  LERROR( dlog, "Unable to send reply:\n" + t_msg_sent->f_send_error_message );
334  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
335  }
336  else
337  {
338  f_agent->set_return( dl_success().rc_value() );
339  }
340 
341  return;
342  }
343 
344  void agent::sub_agent_alert::create_and_send_message( scarab::param_node& a_config, const core& a_core )
345  {
346  // create the alert
347  param_ptr_t t_payload_ptr( new param_node( a_config ) );
348 
349  alert_ptr_t t_alert = msg_alert::create( std::move(t_payload_ptr),
350  f_agent->routing_key(),
351  f_agent->specifier() );
352  LDEBUG( dlog, "alert payload to send is: " << t_alert->payload() );
353 
354  if( ! t_alert )
355  {
356  LERROR( dlog, "Unable to create alert" );
357  f_agent->set_return( dl_client_error_invalid_request().rc_value() );
358  return;
359  }
360 
361  // if this is a dry run, we print the message and stop here
362  if( f_agent->get_is_dry_run() )
363  {
364  LPROG( dlog, "Alert (routing key = " << f_agent->routing_key() << "; specifier = " << f_agent->specifier() << "):\n" << *t_alert );
365  f_agent->set_return( dl_warning_dry_run().rc_value() );
366  return;
367  }
368 
369  LINFO( dlog, "Sending alert with key " << t_alert->routing_key() );
370  LDEBUG( dlog, "Message headers:\n" << t_alert->get_message_param( false ) );
371 
372  sent_msg_pkg_ptr t_msg_sent;
373  try
374  {
375  t_msg_sent = a_core.send( t_alert );
376  }
377  catch( message_ptr_t )
378  {
379  LWARN( dlog, "Operating in offline mode; message not sent" );
380  f_agent->set_return( dl_warning_offline().rc_value() );
381  return;
382  }
383  catch( connection_error& e )
384  {
385  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
386  f_agent->set_return( dl_amqp_error_broker_connection().rc_value() );
387  return;
388  }
389  catch( dripline_error& e )
390  {
391  LERROR( dlog, "Unable to send alert:\n" << e.what() );
392  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
393  return;
394  }
395 
396  if( ! t_msg_sent->f_successful_send )
397  {
398  LERROR( dlog, "Unable to send alert:\n" + t_msg_sent->f_send_error_message );
399  f_agent->set_return( dl_client_error_unable_to_send().rc_value() );
400  }
401  else
402  {
403  f_agent->set_return( dl_success().rc_value() );
404  }
405 
406  return;
407  }
408 
409  request_ptr_t agent::sub_agent_get::create_request( scarab::param_node& a_config )
410  {
411  param_ptr_t t_payload_ptr( new param_node( a_config ) );
412 
413  return msg_request::create( std::move(t_payload_ptr),
414  op_t::get,
415  f_agent->routing_key(),
416  f_agent->specifier() );
417  }
418 
419  request_ptr_t agent::sub_agent_set::create_request( scarab::param_node& a_config )
420  {
421  // require the values array
422  if( ! a_config.has( "values" ) )
423  {
424  LERROR( dlog, "No \"values\" option given" );
425  return nullptr;
426  }
427 
428  param_ptr_t t_payload_ptr( new param_node( a_config ) );
429 
430  return msg_request::create( std::move(t_payload_ptr),
431  op_t::set,
432  f_agent->routing_key(),
433  f_agent->specifier() );
434  }
435 
437  {
438  param_ptr_t t_payload_ptr( new param_node() );
439  param_node& t_payload_node = t_payload_ptr->as_node();
440 
441  // for the load instruction, the instruction node should be replaced by the contents of the file specified
442  if( a_config.has( "load" ) )
443  {
444  if( ! a_config["load"].as_node().has( "json" ) )
445  {
446  LERROR( dlog, "Load instruction did not contain a valid file type");
447  return nullptr;
448  }
449 
450  std::string t_load_filename( a_config["load"]().as_string() );
451  scarab::param_translator t_translator;
452  scarab::param_ptr_t t_node_from_file = t_translator.read_file( t_load_filename );
453  if( t_node_from_file == nullptr || ! t_node_from_file->is_node() )
454  {
455  LERROR( dlog, "Unable to read JSON file <" << t_load_filename << ">" );
456  return nullptr;
457  }
458 
459  t_payload_node.merge( t_node_from_file->as_node() );
460  a_config.erase( "load" );
461  }
462 
463  // at this point, all that remains in a_config should be other options that we want to add to the payload node
464  t_payload_node.merge( a_config );
465 
466  return msg_request::create( std::move(t_payload_ptr),
467  op_t::cmd,
468  f_agent->routing_key(),
469  f_agent->specifier() );
470  }
471 
472 } /* namespace dripline */
virtual sent_msg_pkg_ptr send(request_ptr_t a_request) const
Definition: core.cc:180
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
Definition: agent.cc:344
virtual request_ptr_t create_request(scarab::param_node &a_config)
Definition: agent.cc:409
virtual request_ptr_t create_request(scarab::param_node &a_config)
Definition: agent.cc:436
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
virtual request_ptr_t create_request(scarab::param_node &a_config)
Definition: agent.cc:419
virtual ~agent()
Definition: agent.cc:60
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
void execute(const scarab::param_node &a_config)
Definition: agent.hh:183
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
Definition: agent.cc:153
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
Definition: message.cc:490
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
Definition: receiver.hh:76
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
Definition: agent.cc:277
uuid_t uuid_from_string(const std::string &a_id_str)
Definition: uuid.cc:31
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
Definition: receiver.cc:217
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
Definition: message.cc:556
static scarab::logger dlog("agent")
Error indicating a problem with the connection to the broker.
void execute(const scarab::param_node &a_config)
Definition: agent.cc:64
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:72
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
Definition: uuid.cc:26
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20