8 #define DRIPLINE_API_EXPORTS 18 #include "map_at_default.hh" 19 #include "param_json.hh" 22 #include "version_wrapper.hh" 27 using std::shared_ptr;
28 using std::make_shared;
32 using scarab::param_node;
33 using scarab::param_value;
34 using scarab::param_input_json;
35 using scarab::param_output_json;
36 using scarab::param_ptr_t;
43 LOGGER(
dlog,
"message" );
83 f_sender_exe(
"N/A" ),
84 f_sender_hostname(
"N/A" ),
85 f_sender_username(
"N/A" ),
91 scarab::version_wrapper* t_version = scarab::version_wrapper::get_instance();
92 f_sender_exe = t_version->exe_name();
93 f_sender_hostname = t_version->hostname();
94 f_sender_username = t_version->username();
95 f_sender_service_name =
"unknown";
97 auto t_versions = version_store::get_instance()->versions();
98 for(
auto& i_version : t_versions )
100 f_sender_versions.emplace( std::make_pair( i_version.first, *i_version.second ) );
121 if( t_first_separator == a_message_id.npos || t_last_separator == a_message_id.npos )
123 throw dripline_error() <<
"Message ID is not formatted correctly\nShould be [UUID]/[chunk]/[total chunks]\nReceived: " << a_message_id;
126 return std::make_tuple( a_message_id.substr(0, t_first_separator),
127 std::stoul(a_message_id.substr(t_first_separator + 1, t_last_separator - t_first_separator - 1)),
128 std::stoul(a_message_id.substr(t_last_separator + 1)) );
140 if( a_message_ptrs.empty() )
142 throw dripline_error() <<
"No messages were provided for processing";
148 for(
unsigned i_message = 0; ! t_first_valid_message && i_message < a_message_ptrs.size(); ++i_message )
150 t_first_valid_message = a_message_ptrs[i_message];
153 if( ! t_first_valid_message )
155 throw dripline_error() <<
"All messages provided for processing were invalid";
158 unsigned t_payload_chunk_length = t_first_valid_message->Body().size();
161 if( t_first_valid_message->ContentEncoding() ==
"application/json" )
167 throw dripline_error() <<
"Unable to parse message with content type <" << t_first_valid_message->ContentEncoding() <<
">";
171 string t_payload_str;
172 bool t_payload_is_complete =
true;
178 t_payload_is_complete =
false;
179 t_payload_str += string( t_payload_chunk_length,
'#' );
183 t_payload_str += t_message->Body();
187 scarab::param_ptr_t t_payload;
188 string t_payload_error_msg;
189 if( t_payload_is_complete )
192 param_input_json t_input;
193 t_payload = t_input.read_string( t_payload_str );
196 t_payload_error_msg =
"Message body could not be parsed; skipping request";
201 t_payload_error_msg =
"Entire message was not available";
206 if( ! t_payload_error_msg.empty() )
209 t_payload = std::unique_ptr< scarab::param_node >(
new param_node() );
210 t_payload->as_node().add(
"invalid", t_payload_str );
211 t_payload->as_node().add(
"error", t_payload_error_msg );
214 LDEBUG(
dlog,
"Processing message:\n" <<
215 "Routing key: " << a_routing_key <<
'\n' <<
216 "Payload: " << t_payload_str );
220 using AmqpClient::Table;
221 using AmqpClient::TableEntry;
222 using AmqpClient::TableValue;
223 Table t_properties = t_first_valid_message->HeaderTable();
233 std::move(t_payload),
236 at( t_properties, std::string(
"specifier"), TableValue(
"") ).GetString(),
237 t_first_valid_message->ReplyTo(),
240 bool t_lockout_key_valid =
true;
241 t_request->lockout_key() =
uuid_from_string( at( t_properties, std::string(
"lockout_key"), TableValue(
"") ).GetString(), t_lockout_key_valid );
242 t_request->set_lockout_key_valid( t_lockout_key_valid );
244 t_message = t_request;
250 at( t_properties, std::string(
"return_code"), TableValue(999U) ).GetUint32(),
251 at( t_properties, std::string(
"return_message"), TableValue(
"") ).GetString(),
252 std::move(t_payload),
254 at( t_properties, std::string(
"specifier"), TableValue(
"") ).GetString(),
263 std::move(t_payload),
265 at( t_properties, std::string(
"specifier"), TableValue(
"") ).GetString(),
273 throw dripline_error() <<
"Message received with unhandled type: " << t_msg_type;
279 if( ! t_payload_error_msg.empty() )
281 t_message->set_is_valid(
false );
284 t_message->correlation_id() = t_first_valid_message->CorrelationId();
285 t_message->message_id() = t_first_valid_message->MessageId();
287 t_message->message_id() = t_message->message_id().substr( 0, t_message->message_id().find_first_of(
s_message_id_separator) );
288 t_message->timestamp() = at( t_properties, std::string(
"timestamp"), TableValue(
"") ).GetString();
290 Table t_sender_info = at( t_properties, std::string(
"sender_info"), TableValue(Table()) ).GetTable();
291 scarab::param_ptr_t t_sender_info_param =
table_to_param( t_sender_info );
292 t_message->set_sender_info( t_sender_info_param->as_node() );
294 t_message->payload() = *t_payload;
301 f_timestamp = scarab::get_formatted_now();
305 std::vector< string > t_body_parts;
308 unsigned t_n_chunks = t_body_parts.size();
309 std::vector< amqp_message_ptr > t_message_parts( t_n_chunks );
311 if( f_message_id.empty() )
316 string t_total_chunks_str = s_message_id_separator +
std::to_string(t_n_chunks);
318 unsigned i_chunk = 0;
319 for(
string& t_body_part : t_body_parts )
321 amqp_message_ptr t_message = AmqpClient::BasicMessage::Create( t_body_part );
324 t_message->CorrelationId( f_correlation_id );
325 t_message->MessageId( t_base_message_id + std::to_string(i_chunk) + t_total_chunks_str );
326 t_message->ReplyTo( f_reply_to );
328 AmqpClient::Table t_properties;
331 t_properties.insert( AmqpClient::TableEntry(
"timestamp", f_timestamp ) );
336 t_message->HeaderTable( t_properties );
338 t_message_parts[i_chunk] = t_message;
343 return t_message_parts;
347 LERROR(
dlog, e.what() );
348 return std::vector< amqp_message_ptr >();
359 param_output_json t_output;
360 if( ! t_output.write_string( *
f_payload, t_body, a_options ) )
362 throw dripline_error() <<
"Could not convert message body to string";
365 unsigned t_chars_per_chunk = a_max_size /
sizeof(string::value_type);
366 unsigned t_n_chunks = std::ceil(
double(t_body.size()) /
double(t_chars_per_chunk) );
367 a_body_vec.resize( t_n_chunks );
368 for(
unsigned i_chunk = 0, pos = 0; pos < t_body.size(); pos += t_chars_per_chunk, ++i_chunk )
370 a_body_vec[i_chunk] = t_body.substr(pos, t_chars_per_chunk );
388 param_output_json t_output;
389 string t_message_string;
390 if( ! t_output.write_string( t_message_node, t_message_string, a_options ) )
394 if( t_message_string.size() > a_max_size ) t_message_string.resize( a_max_size );
395 return t_message_string;
409 return string(
"application/json" );
412 return string(
"Unknown" );
418 param_node t_sender_info;
419 t_sender_info.add(
"exe", f_sender_exe );
420 param_node t_versions;
421 for(
auto& i_version : f_sender_versions )
423 param_node t_version_info;
424 t_version_info.add(
"version", i_version.second.f_version );
425 if( ! i_version.second.f_commit.empty() ) t_version_info.add(
"commit", i_version.second.f_commit );
426 if( ! i_version.second.f_package.empty() ) t_version_info.add(
"package", i_version.second.f_package );
427 t_versions.add( i_version.first, std::move(t_version_info) );
429 t_sender_info.add(
"versions", std::move(t_versions) );
430 t_sender_info.add(
"hostname", f_sender_hostname );
431 t_sender_info.add(
"username", f_sender_username );
432 t_sender_info.add(
"service_name", f_sender_service_name );
433 return t_sender_info;
438 f_sender_exe = a_sender_info[
"exe"]().as_string();
439 const param_node& t_versions = a_sender_info[
"versions"].as_node();
440 f_sender_versions.clear();
441 for(
auto i_version = t_versions.begin(); i_version != t_versions.end(); ++i_version )
444 (*i_version)[
"version"]().as_string(),
445 i_version->get_value(
"commit",
""),
446 i_version->get_value(
"package",
"") ) ) );
448 f_sender_hostname = a_sender_info[
"hostname"]().as_string();
449 f_sender_username = a_sender_info[
"username"]().as_string();
450 f_sender_service_name = a_sender_info[
"service_name"]().as_string();
456 param_node t_message_node;
457 t_message_node.add(
"routing_key", f_routing_key );
458 t_message_node.add(
"specifier",
f_specifier.unparsed() );
459 t_message_node.add(
"correlation_id", f_correlation_id );
460 t_message_node.add(
"message_id", f_message_id );
461 t_message_node.add(
"reply_to", f_reply_to );
464 t_message_node.add(
"timestamp", f_timestamp );
466 if( a_include_payload ) t_message_node.add(
"payload",
payload() );
468 return t_message_node;
479 f_lockout_key_valid( true ),
493 t_request->set_payload( std::move(a_payload) );
494 t_request->set_message_operation( a_msg_op );
495 t_request->routing_key() = a_routing_key;
496 t_request->parsed_specifier() = a_specifier;
497 t_request->reply_to() = a_reply_to;
498 t_request->set_encoding( a_encoding );
506 return msg_request::s_message_type;
529 return msg_reply::create( a_return_code.
rc_value(), a_ret_msg, std::move(a_payload), a_routing_key, a_specifier, a_encoding );
535 t_reply->set_return_code( a_return_code_value );
536 t_reply->return_message() = a_ret_msg;
537 t_reply->set_payload( std::move(a_payload) );
538 t_reply->routing_key() = a_routing_key;
539 t_reply->parsed_specifier() = a_specifier;
540 t_reply->set_encoding( a_encoding );
548 return msg_reply::s_message_type;
559 t_alert->set_payload( std::move(a_payload) );
560 t_alert->routing_key() = a_routing_key;
561 t_alert->parsed_specifier() = a_specifier;
562 t_alert->set_encoding( a_encoding );
581 return msg_alert::s_message_type;
588 if( a_lhs.sender_versions().size() != a_rhs.sender_versions().size() )
return false;
589 bool t_versions_are_equal =
true;
590 for(
auto i_version = std::make_pair(a_lhs.sender_versions().begin(), a_rhs.sender_versions().begin());
591 i_version.first != a_lhs.sender_versions().end();
596 t_versions_are_equal = t_versions_are_equal && i_version.first->second == i_version.second->second;
599 return a_lhs.routing_key() == a_rhs.routing_key() &&
600 a_lhs.correlation_id() == a_rhs.correlation_id() &&
601 a_lhs.reply_to() == a_rhs.reply_to() &&
602 a_lhs.get_encoding() == a_rhs.get_encoding() &&
603 a_lhs.timestamp() == a_rhs.timestamp() &&
604 t_versions_are_equal &&
605 a_lhs.sender_exe() == a_rhs.sender_exe() &&
606 a_lhs.sender_hostname() == a_rhs.sender_hostname() &&
607 a_lhs.sender_username() == a_rhs.sender_username() &&
608 a_lhs.sender_service_name() == a_rhs.sender_service_name() &&
615 return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) ) &&
616 a_lhs.lockout_key() == a_rhs.lockout_key() &&
617 a_lhs.get_lockout_key_valid() == a_rhs.get_lockout_key_valid() &&
618 a_lhs.get_message_operation() == a_rhs.get_message_operation();
623 return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) ) &&
624 a_lhs.get_return_code() == a_rhs.get_return_code() &&
625 a_lhs.return_message() == a_rhs.return_message();
630 return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) );
636 return a_os << s_enc_strings[ a_enc ];
641 a_os <<
"Routing key: " << a_message.routing_key() <<
'\n';
642 a_os <<
"Correlation ID: " << a_message.correlation_id() <<
'\n';
643 a_os <<
"Reply To: " << a_message.reply_to() <<
'\n';
645 a_os <<
"Encoding: " << a_message.get_encoding() <<
'\n';
646 a_os <<
"Timestamp: " << a_message.timestamp() <<
'\n';
647 a_os <<
"Sender Info:\n";
648 a_os <<
"\tExecutable: " << a_message.sender_exe() <<
'\n';
649 a_os <<
"\tHostname: " << a_message.sender_hostname() <<
'\n';
650 a_os <<
"\tUsername: " << a_message.sender_username() <<
'\n';
651 a_os <<
"\tService: " << a_message.sender_service_name() <<
'\n';
652 a_os <<
"\tVersions:\n";
653 for(
const auto& i_version : a_message.sender_versions() )
655 a_os <<
"\t\t" << i_version.first <<
":\n";
656 a_os <<
"\t\t\tVersion: " << i_version.second.f_version <<
'\n';
657 a_os <<
"\t\t\tCommit: " << i_version.second.f_commit <<
'\n';
658 a_os <<
"\t\t\tPackage: " << i_version.second.f_package <<
'\n';
661 if( a_message.
payload().is_node() ) a_os <<
"Payload: " << a_message.
payload().as_node() <<
'\n';
662 else if( a_message.
payload().is_array() ) a_os <<
"Payload: " << a_message.
payload().as_array() <<
'\n';
663 else if( a_message.
payload().is_value() ) a_os <<
"Payload: " << a_message.
payload().as_value() <<
'\n';
664 else a_os <<
"Payload: null\n";
670 a_os << static_cast< const message& >( a_message );
671 a_os <<
"Lockout Key: " << a_message.lockout_key() <<
'\n';
672 a_os <<
"Lockout Key Valid: " << a_message.get_lockout_key_valid() <<
'\n';
673 a_os <<
"Message Operation: " << a_message.get_message_operation() <<
'\n';
679 a_os << static_cast< const message& >( a_message );
680 a_os <<
"Return Code: " << a_message.get_return_code() <<
'\n';
681 a_os <<
"Return Message: " << a_message.return_message() <<
'\n';
687 a_os << static_cast< const message& >( a_message );
void encode_message_body(std::vector< std::string > &a_body_vec, unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the message-body to a strings (default encoding is JSON) for creating AMQP messages...
std::string to_string() const
Converts specifier tokens into a single string.
Base class for return codes.
scarab::param_node get_message_param(bool a_include_payload=true) const
Creates and returns a new param_node object to contain the full message.
virtual msg_t message_type() const
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
op_t to_op_t(uint32_t an_op_uint)
amqp_split_message_ptrs create_amqp_messages(unsigned a_max_size=10000)
Converts a Dripline message object to a set of AMQP messages.
scarab::param_node get_sender_info() const
Creates and returns a new param_node object to contain the sender info.
std::shared_ptr< msg_request > request_ptr_t
Dripline-specific errors.
scarab::param_ptr_t table_to_param(const AmqpClient::Table &a_table)
std::shared_ptr< msg_alert > alert_ptr_t
virtual unsigned rc_value() const =0
std::ostream & operator<<(std::ostream &a_os, op_t an_op)
Pass the integer-equivalent of a message-operation enum to an ostream.
Contains all of the information common to all types of Dripline messages.
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
std::string interpret_encoding() const
virtual void derived_modify_amqp_message(amqp_message_ptr a_amqp_msg, AmqpClient::Table &a_properties) const =0
static const char s_message_id_separator
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
std::vector< amqp_message_ptr > amqp_split_message_ptrs
uuid_t uuid_from_string(const std::string &a_id_str)
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
virtual msg_t message_type() const
scarab::param & payload()
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
msg_t to_msg_t(uint32_t a_msg_uint)
static scarab::logger dlog("agent")
scarab::param_ptr_t f_payload
std::string to_string(op_t an_op)
Gives the human-readable version of a message operation.
specifier & parsed_specifier()
void set_sender_info(const scarab::param_node &a_sender_info)
Copies the sender info out of a param_node.
std::shared_ptr< msg_reply > reply_ptr_t
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
virtual void derived_modify_message_param(scarab::param_node &a_message_node) const =0
uint32_t to_uint(op_t an_op)
Convert a message-operation enum to an integer.
bool operator==(const sender_package_version &a_rhs) const
std::string encode_full_message(unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the entire message into a single string (default encoding is JSON)
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks]. ...
virtual msg_t message_type() const =0
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
AmqpClient::TableValue param_to_table(const scarab::param_node &a_node)
bool operator==(const message &a_lhs, const message &a_rhs)
virtual msg_t message_type() const
std::shared_ptr< message > message_ptr_t