8 #define DRIPLINE_API_EXPORTS 21 #include "param_codec.hh" 23 #include "signal_handler.hh" 34 using scarab::param_array;
35 using scarab::param_node;
36 using scarab::param_ptr_t;
37 using scarab::param_value;
41 LOGGER(
dlog,
"agent" );
44 f_is_dry_run( false ),
51 f_suppress_output( false ),
52 f_json_print( false ),
53 f_pretty_print( false ),
66 const scarab::param_array a_ord_args;
67 execute( a_config, a_ord_args );
72 LINFO(
dlog,
"Creating message" );
75 param_node t_config( a_config );
77 param_node t_dripline_node;
78 if( t_config.has(
"dripline" ) )
80 t_dripline_node = std::move(t_config.remove(
"dripline" )->as_node());
83 core t_core( t_dripline_node );
85 f_agent->set_timeout( t_config.get_value(
"timeout", 10U ) * 1000 );
86 t_config.erase(
"timeout" );
87 f_agent->set_json_print( t_config.get_value(
"json-print", f_agent->get_json_print() ) );
88 t_config.erase(
"json-print" );
89 f_agent->set_pretty_print( t_config.get_value(
"pretty-print", f_agent->get_pretty_print() ) );
90 t_config.erase(
"pretty-print" );
91 f_agent->set_suppress_output( t_config.get_value(
"suppress-output", f_agent->get_suppress_output() ) );
92 t_config.erase(
"suppress-output" );
94 f_agent->routing_key() = t_config.get_value(
"rk", f_agent->routing_key() );
95 t_config.erase(
"rk" );
97 f_agent->specifier() = t_config.get_value(
"specifier", f_agent->specifier() );
98 t_config.erase(
"specifier" );
100 if( t_config.has(
"lockout-key" ) )
102 bool t_lk_valid =
true;
104 t_config.erase(
"lockout-key" );
107 LERROR(
dlog,
"Invalid lockout key provided: <" << t_config.get_value(
"lockout-key",
"" ) <<
">" );
113 if( t_config.has(
"return" ) )
115 f_agent->set_return_code( t_config[
"return"].as_node().get_value(
"code",
dl_success().rc_value() ) );
116 f_agent->return_message() = t_config[
"return"].as_node().get_value(
"message",
"" );
117 t_config.erase(
"return" );
120 f_agent->save_filename() = t_config.get_value(
"save",
"" );
121 t_config.erase(
"save" );
124 scarab::param_array t_values;
125 if( t_config.has(
"values" ) )
127 t_values.merge( t_config[
"values"].as_array() );
128 t_config.erase(
"values" );
130 t_values.merge( a_ord_args );
131 if( t_config.has(
"option-values" ) )
133 t_values.merge( t_config[
"option-values"].as_array() );
134 t_config.erase(
"option-values" );
136 if( ! t_values.empty() )
138 t_config.add(
"values", t_values );
142 if( t_config.has(
"dry-run-msg" ) )
144 t_config.erase(
"dry-run-msg" );
145 f_agent->set_is_dry_run(
true );
148 this->create_and_send_message( t_config, t_core );
157 LDEBUG(
dlog,
"message payload to send is: " << t_request->payload() );
161 LERROR(
dlog,
"Unable to create request" );
167 if( f_agent->get_is_dry_run() )
169 LPROG(
dlog,
"Request (routing key = " << f_agent->routing_key() <<
"; specifier = " << f_agent->specifier() <<
"):\n" << *t_request );
176 t_request->lockout_key() = f_agent->lockout_key();
178 LINFO(
dlog,
"Sending message w/ message_operation = " << t_request->get_message_operation() <<
" to " << t_request->routing_key() );
179 LDEBUG(
dlog,
"Message headers:\n" << t_request->get_message_param(
false ) );
184 t_receive_reply = a_core.
send( t_request );
188 LWARN(
dlog,
"Operating in offline mode; message not sent" );
194 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
200 LERROR(
dlog,
"Unable to send request:\n" << e.what() );
205 if( ! t_receive_reply->f_successful_send )
207 LERROR(
dlog,
"Unable to send request:\n" + t_receive_reply->f_send_error_message );
212 if( ! t_receive_reply->f_consumer_tag.empty() )
214 LINFO(
dlog,
"Waiting for a reply from the server; use ctrl-c to cancel" );
218 scarab::signal_handler::add_cancelable_s( &t_msg_receiver );
220 scarab::signal_handler::remove_cancelable_s( &t_msg_receiver );
224 LINFO(
dlog,
"Response received" );
225 f_agent->set_return( t_reply->get_return_code() );
227 const param& t_payload = t_reply->payload();
229 LPROG(
dlog,
"Response:\n" <<
230 "Return code: " << t_reply->get_return_code() <<
'\n' <<
231 "Return message: " << t_reply->return_message() <<
'\n' <<
234 if( ! f_agent->get_suppress_output() )
236 if( ! f_agent->get_json_print() && ! f_agent->get_pretty_print() )
238 std::cout << *t_reply << std::endl;
242 param_node t_encoding_options;
243 if( f_agent->get_pretty_print() )
245 t_encoding_options.add(
"style",
"pretty" );
247 std::string t_encoded_message = t_reply->encode_full_message( 5000, t_encoding_options );
248 std::cout << t_encoded_message << std::endl;
252 if( ! f_agent->save_filename().empty() && ! t_payload.is_null() )
254 scarab::param_translator t_translator;
255 if( ! t_translator.write_file( t_payload, f_agent->save_filename() ) )
257 LERROR(
dlog,
"Unable to write out payload" );
264 LWARN(
dlog,
"Timed out or error while waiting for reply" );
267 f_agent->set_reply( t_reply );
271 f_agent->set_return(
dl_success().rc_value() );
280 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
283 f_agent->return_message(),
284 std::move(t_payload_ptr),
285 f_agent->routing_key(),
286 f_agent->specifier() );
287 LDEBUG(
dlog,
"reply payload to send is: " << t_reply->payload() );
291 LERROR(
dlog,
"Unable to create reply" );
297 if( f_agent->get_is_dry_run() )
299 LPROG(
dlog,
"Reply (routing key = " << f_agent->routing_key() <<
"; specifier = " << f_agent->specifier() <<
"):\n" << *t_reply );
304 LINFO(
dlog,
"Sending reply with return code <" << t_reply->get_return_code() <<
"> and message <" << t_reply->return_message() <<
"> to key " << t_reply->routing_key() );
305 LDEBUG(
dlog,
"Message headers:\n" << t_reply->get_message_param(
false ) );
310 t_msg_sent = a_core.
send( t_reply );
314 LWARN(
dlog,
"Operating in offline mode; message not sent" );
320 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
326 LERROR(
dlog,
"Unable to send reply:\n" << e.what() );
331 if( ! t_msg_sent->f_successful_send )
333 LERROR(
dlog,
"Unable to send reply:\n" + t_msg_sent->f_send_error_message );
338 f_agent->set_return(
dl_success().rc_value() );
347 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
350 f_agent->routing_key(),
351 f_agent->specifier() );
352 LDEBUG(
dlog,
"alert payload to send is: " << t_alert->payload() );
356 LERROR(
dlog,
"Unable to create alert" );
362 if( f_agent->get_is_dry_run() )
364 LPROG(
dlog,
"Alert (routing key = " << f_agent->routing_key() <<
"; specifier = " << f_agent->specifier() <<
"):\n" << *t_alert );
369 LINFO(
dlog,
"Sending alert with key " << t_alert->routing_key() );
370 LDEBUG(
dlog,
"Message headers:\n" << t_alert->get_message_param(
false ) );
375 t_msg_sent = a_core.
send( t_alert );
379 LWARN(
dlog,
"Operating in offline mode; message not sent" );
385 LERROR(
dlog,
"Unable to connect to the broker:\n" << e.what() );
391 LERROR(
dlog,
"Unable to send alert:\n" << e.what() );
396 if( ! t_msg_sent->f_successful_send )
398 LERROR(
dlog,
"Unable to send alert:\n" + t_msg_sent->f_send_error_message );
403 f_agent->set_return(
dl_success().rc_value() );
411 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
415 f_agent->routing_key(),
416 f_agent->specifier() );
422 if( ! a_config.has(
"values" ) )
424 LERROR(
dlog,
"No \"values\" option given" );
428 param_ptr_t t_payload_ptr(
new param_node( a_config ) );
432 f_agent->routing_key(),
433 f_agent->specifier() );
438 param_ptr_t t_payload_ptr(
new param_node() );
439 param_node& t_payload_node = t_payload_ptr->as_node();
442 if( a_config.has(
"load" ) )
444 if( ! a_config[
"load"].as_node().has(
"json" ) )
446 LERROR(
dlog,
"Load instruction did not contain a valid file type");
450 std::string t_load_filename( a_config[
"load"]().as_string() );
451 scarab::param_translator t_translator;
452 scarab::param_ptr_t t_node_from_file = t_translator.read_file( t_load_filename );
453 if( t_node_from_file ==
nullptr || ! t_node_from_file->is_node() )
455 LERROR(
dlog,
"Unable to read JSON file <" << t_load_filename <<
">" );
459 t_payload_node.merge( t_node_from_file->as_node() );
460 a_config.erase(
"load" );
464 t_payload_node.merge( a_config );
468 f_agent->routing_key(),
469 f_agent->specifier() );
virtual sent_msg_pkg_ptr send(request_ptr_t a_request) const
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
virtual request_ptr_t create_request(scarab::param_node &a_config)
virtual request_ptr_t create_request(scarab::param_node &a_config)
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
virtual request_ptr_t create_request(scarab::param_node &a_config)
std::shared_ptr< msg_request > request_ptr_t
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
void execute(const scarab::param_node &a_config)
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
A receiver is able to collect Dripline message chunks and reassemble them into a complete Dripline me...
virtual void create_and_send_message(scarab::param_node &a_config, const core &a_core)
uuid_t uuid_from_string(const std::string &a_id_str)
reply_ptr_t wait_for_reply(const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
static scarab::logger dlog("agent")
Error indicating a problem with the connection to the broker.
void execute(const scarab::param_node &a_config)
std::shared_ptr< msg_reply > reply_ptr_t
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
std::shared_ptr< message > message_ptr_t