Dripline-Cpp  v2.4.2
Dripline Implementation in C++
message.cc
Go to the documentation of this file.
1 /*
2  * message.cc
3  *
4  * Created on: Jul 9, 2015
5  * Author: nsoblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "message.hh"
11 
12 #include "dripline_constants.hh"
13 #include "dripline_exceptions.hh"
14 #include "dripline_version.hh"
15 #include "version_store.hh"
16 
17 #include "logger.hh"
18 #include "map_at_default.hh"
19 #include "param_json.hh"
20 #include "return_codes.hh"
21 #include "time.hh"
22 #include "version_wrapper.hh"
23 
24 #include <cmath>
25 #include <map>
26 
27 using std::shared_ptr;
28 using std::make_shared;
29 using std::string;
30 
31 using scarab::param;
32 using scarab::param_node;
33 using scarab::param_value;
34 using scarab::param_input_json;
35 using scarab::param_output_json;
36 using scarab::param_ptr_t;
37 
38 using std::string;
39 
40 namespace dripline
41 {
42 
43  LOGGER( dlog, "message" );
44 
45 
46  //***********
47  // Message
48  //***********
49 
51  f_version(),
52  f_commit(),
53  f_package()
54  {}
55 
56  message::sender_package_version::sender_package_version( const scarab::version_semantic& a_version ) :
57  f_version( a_version.version_str() ),
58  f_commit( a_version.commit() ),
59  f_package( a_version.package() )
60  {}
61 
62  message::sender_package_version::sender_package_version( const std::string& a_version, const std::string& a_commit, const std::string& a_package ) :
63  f_version( a_version ),
64  f_commit( a_commit ),
65  f_package( a_package )
66  {}
67 
69  {
70  return f_version == a_rhs.f_version &&
71  f_commit == a_rhs.f_commit &&
72  f_package == a_rhs.f_package;
73  }
74 
76  f_is_valid( true ),
77  f_routing_key(),
78  f_correlation_id(),
79  f_message_id(),
80  f_reply_to(),
81  f_encoding( encoding::json ),
82  f_timestamp(),
83  f_sender_exe( "N/A" ),
84  f_sender_hostname( "N/A" ),
85  f_sender_username( "N/A" ),
86  f_sender_versions(),
87  f_specifier(),
88  f_payload( new param() )
89  {
90  // set the sender_info correctly for the server software
91  scarab::version_wrapper* t_version = scarab::version_wrapper::get_instance();
92  f_sender_exe = t_version->exe_name();
93  f_sender_hostname = t_version->hostname();
94  f_sender_username = t_version->username();
95  f_sender_service_name = "unknown";
96 
97  auto t_versions = version_store::get_instance()->versions();
98  for( auto& i_version : t_versions )
99  {
100  f_sender_versions.emplace( std::make_pair( i_version.first, *i_version.second ) );
101  }
102  }
103 
105  {
106  }
107 /*
108  message_ptr_t message::process_envelope( amqp_envelope_ptr a_envelope )
109  {
110  if( ! a_envelope )
111  {
112  throw dripline_error() << "Empty envelope received";
113  }
114  return message::process_message( a_envelope->Message(), a_envelope->RoutingKey() );
115  }
116 */
117  std::tuple< std::string, unsigned, unsigned > message::parse_message_id( const string& a_message_id )
118  {
119  std::string::size_type t_first_separator = a_message_id.find_first_of( s_message_id_separator );
120  std::string::size_type t_last_separator = a_message_id.find_last_of( s_message_id_separator );
121  if( t_first_separator == a_message_id.npos || t_last_separator == a_message_id.npos )
122  {
123  throw dripline_error() << "Message ID is not formatted correctly\nShould be [UUID]/[chunk]/[total chunks]\nReceived: " << a_message_id;
124  }
125 
126  return std::make_tuple( a_message_id.substr(0, t_first_separator),
127  std::stoul(a_message_id.substr(t_first_separator + 1, t_last_separator - t_first_separator - 1)),
128  std::stoul(a_message_id.substr(t_last_separator + 1)) );
129  }
130 
131  message_ptr_t message::process_message( amqp_split_message_ptrs a_message_ptrs, const std::string& a_routing_key )
132  {
133  // find first non-empty message pointer
134  // get length of payload (to use for empty ones)
135  // get header content
136  // set bool for complete to true
137  // loop through messages to build full payload string, if one is incomplete, fill with hashes and set bool for complete to false
138  // if payload complete, parse payload
139 
140  if( a_message_ptrs.empty() )
141  {
142  throw dripline_error() << "No messages were provided for processing";
143  }
144 
145  // Find the first valid message, from which we'll get the payload chunk length
146  // Below we'll also use this for the header content
147  amqp_message_ptr t_first_valid_message;
148  for( unsigned i_message = 0; ! t_first_valid_message && i_message < a_message_ptrs.size(); ++i_message )
149  {
150  t_first_valid_message = a_message_ptrs[i_message];
151  }
152 
153  if( ! t_first_valid_message )
154  {
155  throw dripline_error() << "All messages provided for processing were invalid";
156  }
157 
158  unsigned t_payload_chunk_length = t_first_valid_message->Body().size();
159 
160  encoding t_encoding;
161  if( t_first_valid_message->ContentEncoding() == "application/json" )
162  {
163  t_encoding = encoding::json;
164  }
165  else
166  {
167  throw dripline_error() << "Unable to parse message with content type <" << t_first_valid_message->ContentEncoding() << ">";
168  }
169 
170  // Build up the body
171  string t_payload_str;
172  bool t_payload_is_complete = true;
173  for( amqp_message_ptr& t_message : a_message_ptrs )
174  {
175  if( ! t_message )
176  {
177  // If a chunk of the message is missing, it's filled with hashes
178  t_payload_is_complete = false;
179  t_payload_str += string( t_payload_chunk_length, '#' );
180  continue;
181  }
182 
183  t_payload_str += t_message->Body();
184  }
185 
186  // Attempt to parse
187  scarab::param_ptr_t t_payload;
188  string t_payload_error_msg;
189  if( t_payload_is_complete )
190  {
191  // Parse the body
192  param_input_json t_input;
193  t_payload = t_input.read_string( t_payload_str );
194  if( ! t_payload )
195  {
196  t_payload_error_msg = "Message body could not be parsed; skipping request";
197  }
198  }
199  else
200  {
201  t_payload_error_msg = "Entire message was not available";
202  }
203 
204  // Payload is unavailable if the error message is non-empty
205  // In that case, store whatever we have for the payload string in the payload, plus the error message
206  if( ! t_payload_error_msg.empty() )
207  {
208  // Store the invalid payload string in the payload
209  t_payload = std::unique_ptr< scarab::param_node >( new param_node() );
210  t_payload->as_node().add( "invalid", t_payload_str );
211  t_payload->as_node().add( "error", t_payload_error_msg );
212  }
213 
214  LDEBUG( dlog, "Processing message:\n" <<
215  "Routing key: " << a_routing_key << '\n' <<
216  "Payload: " << t_payload_str );
217 
218  using scarab::at;
219 
220  using AmqpClient::Table;
221  using AmqpClient::TableEntry;
222  using AmqpClient::TableValue;
223  Table t_properties = t_first_valid_message->HeaderTable();
224 
225  // Create the message, of whichever type
226  message_ptr_t t_message;
227  msg_t t_msg_type = to_msg_t( at( t_properties, std::string("message_type"), TableValue(to_uint(msg_t::unknown)) ).GetUint32() );
228  switch( t_msg_type )
229  {
230  case msg_t::request:
231  {
233  std::move(t_payload),
234  to_op_t( at( t_properties, std::string("message_operation"), TableValue(to_uint(op_t::unknown)) ).GetUint32() ),
235  a_routing_key,
236  at( t_properties, std::string("specifier"), TableValue("") ).GetString(),
237  t_first_valid_message->ReplyTo(),
238  t_encoding);
239 
240  bool t_lockout_key_valid = true;
241  t_request->lockout_key() = uuid_from_string( at( t_properties, std::string("lockout_key"), TableValue("") ).GetString(), t_lockout_key_valid );
242  t_request->set_lockout_key_valid( t_lockout_key_valid );
243 
244  t_message = t_request;
245  break;
246  }
247  case msg_t::reply:
248  {
249  reply_ptr_t t_reply = msg_reply::create(
250  at( t_properties, std::string("return_code"), TableValue(999U) ).GetUint32(),
251  at( t_properties, std::string("return_message"), TableValue("") ).GetString(),
252  std::move(t_payload),
253  a_routing_key,
254  at( t_properties, std::string("specifier"), TableValue("") ).GetString(),
255  t_encoding);
256 
257  t_message = t_reply;
258  break;
259  }
260  case msg_t::alert:
261  {
262  alert_ptr_t t_alert = msg_alert::create(
263  std::move(t_payload),
264  a_routing_key,
265  at( t_properties, std::string("specifier"), TableValue("") ).GetString(),
266  t_encoding);
267 
268  t_message = t_alert;
269  break;
270  }
271  default:
272  {
273  throw dripline_error() << "Message received with unhandled type: " << t_msg_type;
274  break;
275  }
276  }
277 
278  // Set message header fields
279  if( ! t_payload_error_msg.empty() )
280  {
281  t_message->set_is_valid( false );
282  }
283 
284  t_message->correlation_id() = t_first_valid_message->CorrelationId();
285  t_message->message_id() = t_first_valid_message->MessageId();
286  // remove the message chunk information from the message id
287  t_message->message_id() = t_message->message_id().substr( 0, t_message->message_id().find_first_of(s_message_id_separator) );
288  t_message->timestamp() = at( t_properties, std::string("timestamp"), TableValue("") ).GetString();
289 
290  Table t_sender_info = at( t_properties, std::string("sender_info"), TableValue(Table()) ).GetTable();
291  scarab::param_ptr_t t_sender_info_param = table_to_param( t_sender_info );
292  t_message->set_sender_info( t_sender_info_param->as_node() );
293 
294  t_message->payload() = *t_payload;
295 
296  return t_message;
297  }
298 
300  {
301  f_timestamp = scarab::get_formatted_now();
302 
303  try
304  {
305  std::vector< string > t_body_parts;
306  encode_message_body( t_body_parts, a_max_size );
307 
308  unsigned t_n_chunks = t_body_parts.size();
309  std::vector< amqp_message_ptr > t_message_parts( t_n_chunks );
310 
311  if( f_message_id.empty() )
312  {
313  f_message_id = string_from_uuid( generate_random_uuid() );
314  }
315  string t_base_message_id = f_message_id + s_message_id_separator;
316  string t_total_chunks_str = s_message_id_separator + std::to_string(t_n_chunks);
317 
318  unsigned i_chunk = 0;
319  for( string& t_body_part : t_body_parts )
320  {
321  amqp_message_ptr t_message = AmqpClient::BasicMessage::Create( t_body_part );
322 
323  t_message->ContentEncoding( interpret_encoding() );
324  t_message->CorrelationId( f_correlation_id );
325  t_message->MessageId( t_base_message_id + std::to_string(i_chunk) + t_total_chunks_str );
326  t_message->ReplyTo( f_reply_to );
327 
328  AmqpClient::Table t_properties;
329  t_properties.insert( AmqpClient::TableEntry( "message_type", to_uint(message_type()) ) );
330  t_properties.insert( AmqpClient::TableEntry( "specifier", f_specifier.to_string() ) );
331  t_properties.insert( AmqpClient::TableEntry( "timestamp", f_timestamp ) );
332  t_properties.insert( AmqpClient::TableEntry( "sender_info", param_to_table( get_sender_info() ) ) );
333 
334  this->derived_modify_amqp_message( t_message, t_properties );
335 
336  t_message->HeaderTable( t_properties );
337 
338  t_message_parts[i_chunk] = t_message;
339 
340  ++i_chunk;
341  }
342 
343  return t_message_parts;
344  }
345  catch( dripline_error& e )
346  {
347  LERROR( dlog, e.what() );
348  return std::vector< amqp_message_ptr >();
349  }
350  }
351 
352  void message::encode_message_body( std::vector< string >& a_body_vec, unsigned a_max_size, const scarab::param_node& a_options ) const
353  {
354  switch( f_encoding )
355  {
356  case encoding::json:
357  {
358  string t_body;
359  param_output_json t_output;
360  if( ! t_output.write_string( *f_payload, t_body, a_options ) )
361  {
362  throw dripline_error() << "Could not convert message body to string";
363  }
364 
365  unsigned t_chars_per_chunk = a_max_size / sizeof(string::value_type);
366  unsigned t_n_chunks = std::ceil( double(t_body.size()) / double(t_chars_per_chunk) );
367  a_body_vec.resize( t_n_chunks );
368  for( unsigned i_chunk = 0, pos = 0; pos < t_body.size(); pos += t_chars_per_chunk, ++i_chunk )
369  {
370  a_body_vec[i_chunk] = t_body.substr(pos, t_chars_per_chunk );
371  }
372  break;
373  }
374  default:
375  throw dripline_error() << "Cannot encode using <" << interpret_encoding() << "> (" << f_encoding << ")";
376  break;
377  }
378  return;
379  }
380 
381  std::string message::encode_full_message( unsigned a_max_size, const scarab::param_node& a_options ) const
382  {
383  switch( f_encoding )
384  {
385  case encoding::json:
386  {
387  param_node t_message_node = get_message_param();
388  param_output_json t_output;
389  string t_message_string;
390  if( ! t_output.write_string( t_message_node, t_message_string, a_options ) )
391  {
392  throw dripline_error() << "Could not convert message to string";
393  }
394  if( t_message_string.size() > a_max_size ) t_message_string.resize( a_max_size );
395  return t_message_string;
396  break;
397  }
398  default:
399  throw dripline_error() << "Cannot encode using <" << interpret_encoding() << ">(" << f_encoding << ")";
400  break;
401  }
402  }
403 
405  {
406  switch( f_encoding )
407  {
408  case encoding::json:
409  return string( "application/json" );
410  break;
411  default:
412  return string( "Unknown" );
413  }
414  }
415 
416  param_node message::get_sender_info() const
417  {
418  param_node t_sender_info;
419  t_sender_info.add( "exe", f_sender_exe );
420  param_node t_versions;
421  for( auto& i_version : f_sender_versions )
422  {
423  param_node t_version_info;
424  t_version_info.add( "version", i_version.second.f_version );
425  if( ! i_version.second.f_commit.empty() ) t_version_info.add( "commit", i_version.second.f_commit );
426  if( ! i_version.second.f_package.empty() ) t_version_info.add( "package", i_version.second.f_package );
427  t_versions.add( i_version.first, std::move(t_version_info) );
428  }
429  t_sender_info.add( "versions", std::move(t_versions) );
430  t_sender_info.add( "hostname", f_sender_hostname );
431  t_sender_info.add( "username", f_sender_username );
432  t_sender_info.add( "service_name", f_sender_service_name );
433  return t_sender_info;
434  }
435 
436  void message::set_sender_info( const param_node& a_sender_info )
437  {
438  f_sender_exe = a_sender_info["exe"]().as_string();
439  const param_node& t_versions = a_sender_info["versions"].as_node();
440  f_sender_versions.clear();
441  for( auto i_version = t_versions.begin(); i_version != t_versions.end(); ++i_version )
442  {
443  f_sender_versions.insert( std::make_pair(i_version.name(), sender_package_version(
444  (*i_version)["version"]().as_string(),
445  i_version->get_value("commit", ""),
446  i_version->get_value("package", "") ) ) );
447  }
448  f_sender_hostname = a_sender_info["hostname"]().as_string();
449  f_sender_username = a_sender_info["username"]().as_string();
450  f_sender_service_name = a_sender_info["service_name"]().as_string();
451  return;
452  }
453 
454  param_node message::get_message_param( bool a_include_payload ) const
455  {
456  param_node t_message_node;
457  t_message_node.add( "routing_key", f_routing_key );
458  t_message_node.add( "specifier", f_specifier.unparsed() );
459  t_message_node.add( "correlation_id", f_correlation_id );
460  t_message_node.add( "message_id", f_message_id );
461  t_message_node.add( "reply_to", f_reply_to );
462  t_message_node.add( "message_type", to_uint(message_type()) );
463  t_message_node.add( "encoding", interpret_encoding() );
464  t_message_node.add( "timestamp", f_timestamp );
465  t_message_node.add( "sender_info", get_sender_info() );
466  if( a_include_payload ) t_message_node.add( "payload", payload() );
467  this->derived_modify_message_param( t_message_node );
468  return t_message_node;
469  }
470 
471 
472  //***********
473  // Request
474  //***********
475 
477  message(),
478  f_lockout_key( generate_nil_uuid() ),
479  f_lockout_key_valid( true ),
480  f_message_operation( op_t::unknown )
481  {
482  f_correlation_id = string_from_uuid( generate_random_uuid() );
483  }
484 
486  {
487 
488  }
489 
490  request_ptr_t msg_request::create( param_ptr_t a_payload, op_t a_msg_op, const std::string& a_routing_key, const std::string& a_specifier, const std::string& a_reply_to, message::encoding a_encoding )
491  {
492  request_ptr_t t_request = make_shared< msg_request >();
493  t_request->set_payload( std::move(a_payload) );
494  t_request->set_message_operation( a_msg_op );
495  t_request->routing_key() = a_routing_key;
496  t_request->parsed_specifier() = a_specifier;
497  t_request->reply_to() = a_reply_to;
498  t_request->set_encoding( a_encoding );
499  return t_request;
500  }
501 
502  msg_t msg_request::s_message_type = msg_t::request;
503 
505  {
506  return msg_request::s_message_type;
507  }
508 
509 
510  //*********
511  // Reply
512  //*********
513 
515  message(),
516  f_return_code( dl_success::s_value ),
517  f_return_message(),
518  f_return_buffer()
519  {
520  }
521 
523  {
524 
525  }
526 
527  reply_ptr_t msg_reply::create( const return_code& a_return_code, const std::string& a_ret_msg, param_ptr_t a_payload, const std::string& a_routing_key, const std::string& a_specifier, message::encoding a_encoding )
528  {
529  return msg_reply::create( a_return_code.rc_value(), a_ret_msg, std::move(a_payload), a_routing_key, a_specifier, a_encoding );
530  }
531 
532  reply_ptr_t msg_reply::create( unsigned a_return_code_value, const std::string& a_ret_msg, param_ptr_t a_payload, const std::string& a_routing_key, const std::string& a_specifier, message::encoding a_encoding )
533  {
534  reply_ptr_t t_reply = make_shared< msg_reply >();
535  t_reply->set_return_code( a_return_code_value );
536  t_reply->return_message() = a_ret_msg;
537  t_reply->set_payload( std::move(a_payload) );
538  t_reply->routing_key() = a_routing_key;
539  t_reply->parsed_specifier() = a_specifier;
540  t_reply->set_encoding( a_encoding );
541  return t_reply;
542  }
543 
544  msg_t msg_reply::s_message_type = msg_t::reply;
545 
547  {
548  return msg_reply::s_message_type;
549  }
550 
551 
552  //*********
553  // Alert
554  //*********
555 
556  alert_ptr_t msg_alert::create( param_ptr_t a_payload, const std::string& a_routing_key, const std::string& a_specifier, message::encoding a_encoding )
557  {
558  alert_ptr_t t_alert = make_shared< msg_alert >();
559  t_alert->set_payload( std::move(a_payload) );
560  t_alert->routing_key() = a_routing_key;
561  t_alert->parsed_specifier() = a_specifier;
562  t_alert->set_encoding( a_encoding );
563  return t_alert;
564  }
565 
567  message()
568  {
569  f_correlation_id = string_from_uuid( generate_random_uuid() );
570  }
571 
573  {
574 
575  }
576 
577  msg_t msg_alert::s_message_type = msg_t::alert;
578 
580  {
581  return msg_alert::s_message_type;
582  }
583 
584 
585 
586  DRIPLINE_API bool operator==( const message& a_lhs, const message& a_rhs )
587  {
588  if( a_lhs.sender_versions().size() != a_rhs.sender_versions().size() ) return false;
589  bool t_versions_are_equal = true;
590  for( auto i_version = std::make_pair(a_lhs.sender_versions().begin(), a_rhs.sender_versions().begin());
591  i_version.first != a_lhs.sender_versions().end();
592  ++i_version.first,
593  ++i_version.second
594  )
595  {
596  t_versions_are_equal = t_versions_are_equal && i_version.first->second == i_version.second->second;
597  }
598 
599  return a_lhs.routing_key() == a_rhs.routing_key() &&
600  a_lhs.correlation_id() == a_rhs.correlation_id() &&
601  a_lhs.reply_to() == a_rhs.reply_to() &&
602  a_lhs.get_encoding() == a_rhs.get_encoding() &&
603  a_lhs.timestamp() == a_rhs.timestamp() &&
604  t_versions_are_equal &&
605  a_lhs.sender_exe() == a_rhs.sender_exe() &&
606  a_lhs.sender_hostname() == a_rhs.sender_hostname() &&
607  a_lhs.sender_username() == a_rhs.sender_username() &&
608  a_lhs.sender_service_name() == a_rhs.sender_service_name() &&
609  a_lhs.parsed_specifier() == a_rhs.parsed_specifier() &&
610  a_lhs.payload().to_string() == a_rhs.payload().to_string();
611  }
612 
613  DRIPLINE_API bool operator==( const msg_request& a_lhs, const msg_request& a_rhs )
614  {
615  return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) ) &&
616  a_lhs.lockout_key() == a_rhs.lockout_key() &&
617  a_lhs.get_lockout_key_valid() == a_rhs.get_lockout_key_valid() &&
618  a_lhs.get_message_operation() == a_rhs.get_message_operation();
619  }
620 
621  DRIPLINE_API bool operator==( const msg_reply& a_lhs, const msg_reply& a_rhs )
622  {
623  return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) ) &&
624  a_lhs.get_return_code() == a_rhs.get_return_code() &&
625  a_lhs.return_message() == a_rhs.return_message();
626  }
627 
628  DRIPLINE_API bool operator==( const msg_alert& a_lhs, const msg_alert& a_rhs )
629  {
630  return operator==( static_cast< const message& >(a_lhs), static_cast< const message& >(a_rhs) );
631  }
632 
633  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, message::encoding a_enc )
634  {
635  static std::map< message::encoding, string > s_enc_strings = { { message::encoding::json, "json" } };
636  return a_os << s_enc_strings[ a_enc ];
637  }
638 
639  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const message& a_message )
640  {
641  a_os << "Routing key: " << a_message.routing_key() << '\n';
642  a_os << "Correlation ID: " << a_message.correlation_id() << '\n';
643  a_os << "Reply To: " << a_message.reply_to() << '\n';
644  a_os << "Message Type: " << to_uint(a_message.message_type()) << '\n';
645  a_os << "Encoding: " << a_message.get_encoding() << '\n';
646  a_os << "Timestamp: " << a_message.timestamp() << '\n';
647  a_os << "Sender Info:\n";
648  a_os << "\tExecutable: " << a_message.sender_exe() << '\n';
649  a_os << "\tHostname: " << a_message.sender_hostname() << '\n';
650  a_os << "\tUsername: " << a_message.sender_username() << '\n';
651  a_os << "\tService: " << a_message.sender_service_name() << '\n';
652  a_os << "\tVersions:\n";
653  for( const auto& i_version : a_message.sender_versions() )
654  {
655  a_os << "\t\t" << i_version.first << ":\n";
656  a_os << "\t\t\tVersion: " << i_version.second.f_version << '\n';
657  a_os << "\t\t\tCommit: " << i_version.second.f_commit << '\n';
658  a_os << "\t\t\tPackage: " << i_version.second.f_package << '\n';
659  }
660  a_os << "Specifier: " << a_message.parsed_specifier().unparsed() << '\n';
661  if( a_message.payload().is_node() ) a_os << "Payload: " << a_message.payload().as_node() << '\n';
662  else if( a_message.payload().is_array() ) a_os << "Payload: " << a_message.payload().as_array() << '\n';
663  else if( a_message.payload().is_value() ) a_os << "Payload: " << a_message.payload().as_value() << '\n';
664  else a_os << "Payload: null\n";
665  return a_os;
666  }
667 
668  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const msg_request& a_message )
669  {
670  a_os << static_cast< const message& >( a_message );
671  a_os << "Lockout Key: " << a_message.lockout_key() << '\n';
672  a_os << "Lockout Key Valid: " << a_message.get_lockout_key_valid() << '\n';
673  a_os << "Message Operation: " << a_message.get_message_operation() << '\n';
674  return a_os;
675  }
676 
677  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const msg_reply& a_message )
678  {
679  a_os << static_cast< const message& >( a_message );
680  a_os << "Return Code: " << a_message.get_return_code() << '\n';
681  a_os << "Return Message: " << a_message.return_message() << '\n';
682  return a_os;
683  }
684 
685  DRIPLINE_API std::ostream& operator<<( std::ostream& a_os, const msg_alert& a_message )
686  {
687  a_os << static_cast< const message& >( a_message );
688  return a_os;
689  }
690 
691 } /* namespace dripline */
void encode_message_body(std::vector< std::string > &a_body_vec, unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the message-body to a strings (default encoding is JSON) for creating AMQP messages...
Definition: message.cc:352
std::string to_string() const
Converts specifier tokens into a single string.
Definition: specifier.cc:112
Base class for return codes.
Definition: return_codes.hh:39
scarab::param_node get_message_param(bool a_include_payload=true) const
Creates and returns a new param_node object to contain the full message.
Definition: message.cc:454
virtual msg_t message_type() const
Definition: message.cc:579
AmqpClient::BasicMessage::ptr_t amqp_message_ptr
Definition: amqp.hh:26
op_t to_op_t(uint32_t an_op_uint)
amqp_split_message_ptrs create_amqp_messages(unsigned a_max_size=10000)
Converts a Dripline message object to a set of AMQP messages.
Definition: message.cc:299
virtual ~msg_request()
Definition: message.cc:485
Request message class.
Definition: message.hh:174
scarab::param_node get_sender_info() const
Creates and returns a new param_node object to contain the sender info.
Definition: message.cc:416
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
Dripline-specific errors.
scarab::param_ptr_t table_to_param(const AmqpClient::Table &a_table)
Definition: amqp.cc:14
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
virtual unsigned rc_value() const =0
std::ostream & operator<<(std::ostream &a_os, op_t an_op)
Pass the integer-equivalent of a message-operation enum to an ostream.
Contains all of the information common to all types of Dripline messages.
Definition: message.hh:53
static request_ptr_t create(scarab::param_ptr_t a_payload, op_t a_msg_op, const std::string &a_routing_key, const std::string &a_specifier="", const std::string &a_reply_to="", message::encoding a_encoding=encoding::json)
Create a request message.
Definition: message.cc:490
std::string interpret_encoding() const
Definition: message.cc:404
virtual void derived_modify_amqp_message(amqp_message_ptr a_amqp_msg, AmqpClient::Table &a_properties) const =0
static const char s_message_id_separator
Definition: message.hh:149
virtual ~msg_alert()
Definition: message.cc:572
static reply_ptr_t create(const return_code &a_return_code, const std::string &a_ret_msg, scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Create a reply message using a return_code object and manually specifying the destination.
#define DRIPLINE_API
Definition: dripline_api.hh:34
std::vector< amqp_message_ptr > amqp_split_message_ptrs
Definition: amqp.hh:31
uuid_t uuid_from_string(const std::string &a_id_str)
Definition: uuid.cc:31
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:92
virtual msg_t message_type() const
Definition: message.cc:504
scarab::param & payload()
Definition: message.hh:326
Alert message class.
Definition: message.hh:284
static message_ptr_t process_message(amqp_split_message_ptrs a_message_ptrs, const std::string &a_routing_key)
Converts a set of AMQP messages to a Dripline message object.
Definition: message.cc:131
static alert_ptr_t create(scarab::param_ptr_t a_payload, const std::string &a_routing_key, const std::string &a_specifier="", message::encoding a_encoding=encoding::json)
Creates an alert message.
Definition: message.cc:556
msg_t to_msg_t(uint32_t a_msg_uint)
static scarab::logger dlog("agent")
virtual ~msg_reply()
Definition: message.cc:522
scarab::param_ptr_t f_payload
Definition: message.hh:146
std::string to_string(op_t an_op)
Gives the human-readable version of a message operation.
specifier & parsed_specifier()
Definition: message.hh:316
void set_sender_info(const scarab::param_node &a_sender_info)
Copies the sender info out of a param_node.
Definition: message.cc:436
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
virtual void derived_modify_message_param(scarab::param_node &a_message_node) const =0
uint32_t to_uint(op_t an_op)
Convert a message-operation enum to an integer.
bool operator==(const sender_package_version &a_rhs) const
Definition: message.cc:68
std::string encode_full_message(unsigned a_max_size, const scarab::param_node &a_options=scarab::param_node()) const
Converts the entire message into a single string (default encoding is JSON)
Definition: message.cc:381
static std::tuple< std::string, unsigned, unsigned > parse_message_id(const std::string &a_message_id)
Parses the message ID, which should be of the form [UUID]/[chunk]/[total chunks]. ...
Definition: message.cc:117
virtual ~message()
Definition: message.cc:104
virtual msg_t message_type() const =0
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
Definition: uuid.cc:26
AmqpClient::TableValue param_to_table(const scarab::param_node &a_node)
Definition: amqp.cc:94
bool operator==(const message &a_lhs, const message &a_rhs)
Definition: message.cc:586
virtual msg_t message_type() const
Definition: message.cc:546
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
Reply message class.
Definition: message.hh:229
specifier f_specifier
Definition: message.hh:122