Dripline-Cpp  v2.4.2
Dripline Implementation in C++
endpoint.cc
Go to the documentation of this file.
1 /*
2  * endpoint.cc
3  *
4  * Created on: Aug 14, 2018
5  * Author: N.S. Oblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "endpoint.hh"
11 
12 #include "dripline_exceptions.hh"
13 #include "service.hh"
14 #include "throw_reply.hh"
15 
16 #include "logger.hh"
17 
18 #ifdef DL_PYTHON
19 #include "reply_cache.hh"
20 
21 #include "pybind11/pybind11.h"
22 #include "pybind11/pytypes.h"
23 #endif
24 
25 LOGGER( dlog, "endpoint" );
26 
27 namespace dripline
28 {
29 
30  endpoint::endpoint( const std::string& a_name ) :
31  f_name( a_name ),
32  f_service(),
33  f_lockout_tag(),
34  f_lockout_key( generate_nil_uuid() )
35  {
36  }
37 
38  endpoint::endpoint( const endpoint& a_orig ) :
39  f_name( a_orig.f_name ),
40  f_service( a_orig.f_service ),
41  f_lockout_tag( a_orig.f_lockout_tag ),
42  f_lockout_key( a_orig.f_lockout_key )
43  {}
44 
46  f_name( std::move(a_orig.f_name) ),
47  f_service( std::move(a_orig.f_service) ),
48  f_lockout_tag( std::move(a_orig.f_lockout_tag) ),
49  f_lockout_key( std::move(a_orig.f_lockout_key) )
50  {}
51 
53  {}
54 
56  {
57  f_name = a_orig.f_name;
58  f_service = a_orig.f_service;
59  f_lockout_tag = a_orig.f_lockout_tag;
60  f_lockout_key = a_orig.f_lockout_key;
61  return *this;
62  }
63 
65  {
66  f_name = std::move(a_orig.f_name);
67  f_service = std::move(a_orig.f_service);
68  f_lockout_tag = std::move(a_orig.f_lockout_tag);
69  f_lockout_key = std::move(a_orig.f_lockout_key);
70  return *this;
71  }
72 
74  {
75  return this->on_request_message( a_request_ptr );;
76  }
77 
79  {
80  return this->on_alert_message( a_alert_ptr );
81  }
82 
84  {
85  return this->on_reply_message( a_reply_ptr );
86  }
87 
89  {
90  // reply object to store whatever reply we end up with
91  reply_ptr_t t_reply;
92 
93  // lambda to send the reply. this local function is defined so we can send from within the catch block if needed before rethrowing.
94  auto t_replier = [&t_reply, &a_request, this](){
95  // send the reply if the request had a reply-to
96  if( a_request->reply_to().empty() )
97  {
98  LWARN( dlog, "Not sending reply (reply-to empty)\n" <<
99  " Return code: " << t_reply->get_return_code() << '\n' <<
100  " Return message: " << t_reply->return_message() << '\n' <<
101  " Payload:\n" << t_reply->payload() );
102  }
103  else
104  {
105  send_reply( t_reply );
106  }
107  };
108 
109  try
110  {
111  if( ! a_request->get_is_valid() )
112  {
113  std::string t_message( "Request message was not valid" );
114  // check in the payload for error information
115  if( a_request->payload().is_node() )
116  {
117  const scarab::param_node& t_payload = a_request->payload().as_node();
118  if( t_payload.has("error") ) t_message += "; " + t_payload["error"]().as_string();
119  }
120  throw throw_reply( dl_service_error_decoding_fail{}, a_request->get_payload_ptr()->clone() ) << "Request message was not valid";
121  }
122 
123  // the lockout key must be valid
124  if( ! a_request->get_lockout_key_valid() )
125  {
126  throw throw_reply( dl_service_error_invalid_key{} ) << "Lockout key could not be parsed";
127  }
128 
129  switch( a_request->get_message_operation() )
130  {
131  case op_t::get:
132  {
133  t_reply = __do_get_request( a_request );
134  break;
135  } // end "get" operation
136  case op_t::set:
137  {
138  t_reply = __do_set_request( a_request );
139  break;
140  } // end "set" operation
141  case op_t::cmd:
142  {
143  t_reply = __do_cmd_request( a_request );
144  break;
145  }
146  default:
147  throw throw_reply( dl_service_error_invalid_method() ) << "Unrecognized message operation: <" << a_request->get_message_operation() << ">";
148  break;
149  } // end switch on message type
150  // reply to be sent outside the try block
151  }
152  catch( const throw_reply& e )
153  {
154  if( e.ret_code().rc_value() == dl_success::s_value )
155  {
156  LINFO( dlog, "Replying with: " << e.return_message() );
157  }
158  else
159  {
160  LWARN( dlog, "Replying with: " << e.return_message() );
161  }
162  t_reply = a_request->reply( e.ret_code(), e.return_message() );
163  t_reply->set_payload( e.get_payload_ptr()->clone() );
164  // don't rethrow a throw_reply
165  // reply to be sent outside the catch block
166  }
167 #ifdef DL_PYTHON
168  catch( const pybind11::error_already_set& e )
169  {
170  // check whether the error message from python starts with the keyword
171  // the keyword should be the name of the python class
172  if( std::string(e.what()).substr(0, throw_reply::py_throw_reply_keyword().size()) == throw_reply::py_throw_reply_keyword() )
173  {
174  reply_cache* t_reply_cache = reply_cache::get_instance();
175  if( t_reply_cache->ret_code().rc_value() == dl_success::s_value )
176  {
177  LINFO( dlog, "Replying with: " << t_reply_cache->return_message() );
178  }
179  else
180  {
181  LWARN( dlog, "Replying with: " << t_reply_cache->return_message() );
182  }
183  t_reply = a_request->reply( t_reply_cache->ret_code(),t_reply_cache->return_message() );
184  t_reply->set_payload( t_reply_cache->get_payload_ptr()->clone() );
185  // don't rethrow a throw_reply
186  // reply to be sent outside the catch block
187  }
188  else
189  {
190  // treat the python exception as a standard exception
191  LERROR( dlog, "Caught exception from Python: " << e.what() );
192  t_reply = a_request->reply( dl_unhandled_exception(), e.what() );
193  t_replier(); // send the reply before rethrowing
194  throw; // unhandled exceptions should rethrow because they're by definition unhandled
195  }
196  }
197 #endif
198  catch( const std::exception& e )
199  {
200  LERROR( dlog, "Caught exception: " << e.what() );
201  t_reply = a_request->reply( dl_unhandled_exception(), e.what() );
202  t_replier(); // send the reply before rethrowing
203  throw; // unhandled exceptions should rethrow because they're by definition unhandled
204  }
205 
206  // send the reply
207  t_replier();
208 
209  return t_reply;
210  }
211 
213  {
214  if( a_message->is_request() )
215  {
216  on_request_message( std::static_pointer_cast< msg_request >( a_message ) );
217  }
218  else if( a_message->is_alert() )
219  {
220  on_alert_message( std::static_pointer_cast< msg_alert >( a_message ) );
221  }
222  else if( a_message->is_reply() )
223  {
224  on_reply_message( std::static_pointer_cast< msg_reply >( a_message ) );
225  }
226  else
227  {
228  throw dripline_error() << "Unknown message type";
229  }
230  }
231 
232  void endpoint::send_reply( reply_ptr_t a_reply ) const
233  {
234  if( ! f_service )
235  {
236  LWARN( dlog, "Cannot send reply because the service pointer is not set" );
237  return;
238  }
239 
240  LDEBUG( dlog, "Sending reply message to <" << a_reply->routing_key() << ">:\n" <<
241  " Return code: " << a_reply->get_return_code() << '\n' <<
242  " Return message: " << a_reply->return_message() << '\n' <<
243  " Payload:\n" << a_reply->payload() );
244 
245  sent_msg_pkg_ptr t_receive_reply;
246  try
247  {
248  t_receive_reply = f_service->send( a_reply );
249  }
250  catch( message_ptr_t )
251  {
252  LWARN( dlog, "Operating in offline mode; message not sent" );
253  return;
254  }
255  catch( connection_error& e )
256  {
257  LERROR( dlog, "Unable to connect to the broker:\n" << e.what() );
258  return;
259  }
260  catch( dripline_error& e )
261  {
262  LERROR( dlog, "Dripline error while sending reply:\n" << e.what() );
263  return;
264  }
265 
266  if( ! t_receive_reply->f_successful_send )
267  {
268  LERROR( dlog, "Failed to send reply:\n" + t_receive_reply->f_send_error_message );
269  return;
270  }
271 
272  return;
273  }
274 
276  {
277  throw dripline_error() << "Base endpoint does not handle reply messages";
278  }
279 
281  {
282  throw dripline_error() << "Base endpoint does not handle alert messages";
283  }
284 
286  {
287  LDEBUG( dlog, "Run operation request received" );
288 
289  if( ! authenticate( a_request->lockout_key() ) )
290  {
291  std::stringstream t_conv;
292  t_conv << a_request->lockout_key();
293  std::string t_message( "Request denied due to lockout (key used: " + t_conv.str() + ")" );
294  LINFO( dlog, t_message );
295  return a_request->reply( dl_service_error_access_denied(), t_message );
296  }
297 
298  return do_run_request( a_request );
299  }
300 
302  {
303  LDEBUG( dlog, "Get operation request received" );
304 
305  std::string t_query_type;
306  if( ! a_request->parsed_specifier().empty() )
307  {
308  t_query_type = a_request->parsed_specifier().front();
309  }
310 
311  if( t_query_type == "is-locked" )
312  {
313  a_request->parsed_specifier().pop_front();
314  return handle_is_locked_request( a_request );
315  }
316 
317  return do_get_request( a_request );
318  }
319 
321  {
322  LDEBUG( dlog, "Set request received" );
323 
324  if( ! authenticate( a_request->lockout_key() ) )
325  {
326  std::stringstream t_conv;
327  t_conv << a_request->lockout_key();
328  std::string t_message( "Request denied due to lockout (key used: " + t_conv.str() + ")" );
329  LINFO( dlog, t_message );
330  return a_request->reply( dl_service_error_access_denied(), t_message );
331  }
332 
333  return do_set_request( a_request );
334  }
335 
337  {
338  LDEBUG( dlog, "Cmd request received" );
339 
340  std::string t_instruction;
341  if( ! a_request->parsed_specifier().empty() )
342  {
343  t_instruction = a_request->parsed_specifier().front();
344  }
345 
346  //LWARN( mtlog, "uuid string: " << a_request->get_payload().get_value( "key", "") << ", uuid: " << uuid_from_string( a_request->get_payload().get_value( "key", "") ) );
347  // this condition includes the exception for the unlock instruction that allows us to force the unlock regardless of the key.
348  // disable_key() checks the lockout key if it's not forced, so it's okay that we bypass this call to authenticate() for the unlock instruction.
349  if( ! authenticate( a_request->lockout_key() ) && t_instruction != "unlock" && t_instruction != "ping" && t_instruction != "set_condition" )
350  {
351  std::stringstream t_conv;
352  t_conv << a_request->lockout_key();
353  std::string t_message( "Request denied due to lockout (key used: " + t_conv.str() + ")" );
354  LINFO( dlog, t_message );
355  return a_request->reply( dl_service_error_access_denied(), t_message );
356  }
357 
358  if( t_instruction == "lock" )
359  {
360  a_request->parsed_specifier().pop_front();
361  return handle_lock_request( a_request );
362  }
363  else if( t_instruction == "unlock" )
364  {
365  a_request->parsed_specifier().pop_front();
366  return handle_unlock_request( a_request );
367  }
368  else if( t_instruction == "ping" )
369  {
370  a_request->parsed_specifier().pop_front();
371  return handle_ping_request( a_request );
372  }
373  else if( t_instruction == "set_condition" )
374  {
375  a_request->parsed_specifier().pop_front();
376  return handle_set_condition_request( a_request );
377  }
378 
379  return do_cmd_request( a_request );
380  }
381 
382  uuid_t endpoint::enable_lockout( const scarab::param_node& a_tag, uuid_t a_key )
383  {
384  if( is_locked() ) return generate_nil_uuid();
385  if( a_key.is_nil() ) f_lockout_key = generate_random_uuid();
386  else f_lockout_key = a_key;
387  f_lockout_tag = a_tag;
388  return f_lockout_key;
389  }
390 
391  bool endpoint::disable_lockout( const uuid_t& a_key, bool a_force )
392  {
393  if( ! is_locked() ) return true;
394  if( ! a_force && a_key != f_lockout_key ) return false;
395  f_lockout_key = generate_nil_uuid();
396  f_lockout_tag.clear();
397  return true;
398  }
399 
400  bool endpoint::authenticate( const uuid_t& a_key ) const
401  {
402  LDEBUG( dlog, "Authenticating with key <" << a_key << ">" );
403  if( is_locked() ) return check_key( a_key );
404  return true;
405  }
406 
408  {
409  uuid_t t_new_key = enable_lockout( a_request->get_sender_info(), a_request->lockout_key() );
410  if( t_new_key.is_nil() )
411  {
412  return a_request->reply( dl_resource_error(), "Unable to lock server" );;
413  }
414 
415  scarab::param_ptr_t t_payload_ptr( new scarab::param_node() );
416  scarab::param_node& t_payload_node = t_payload_ptr->as_node();
417  t_payload_node.add( "key", string_from_uuid( t_new_key ) );
418  return a_request->reply( dl_success(), "Server is now locked", std::move(t_payload_ptr) );
419  }
420 
422  {
423  if( ! is_locked() )
424  {
425  return a_request->reply( dl_warning_no_action_taken(), "Already unlocked" );
426  }
427 
428  bool t_force = a_request->payload().get_value( "force", false );
429 
430  if( disable_lockout( a_request->lockout_key(), t_force ) )
431  {
432  return a_request->reply( dl_success(), "Server unlocked" );
433  }
434  return a_request->reply( dl_resource_error(), "Failed to unlock server" );;
435  }
436 
438  {
439  return this->__do_handle_set_condition_request( a_request );
440  }
441 
443  {
444  bool t_is_locked = is_locked();
445  scarab::param_ptr_t t_reply_payload( new scarab::param_node() );
446  scarab::param_node& t_reply_node = t_reply_payload->as_node();
447  t_reply_node.add( "is_locked", t_is_locked );
448  if( t_is_locked ) t_reply_node.add( "tag", f_lockout_tag );
449  return a_request->reply( dl_success(), "Checked lock status", std::move(t_reply_payload) );
450  }
451 
453  {
454  return a_request->reply( dl_success(), "Hello, " + a_request->sender_exe() );
455  }
456 
457 } /* namespace dripline */
bool authenticate(const uuid_t &a_key) const
Returns true if the server is unlocked or if it&#39;s locked and the key matches the lockout key; returns...
Definition: endpoint.cc:400
reply_ptr_t __do_cmd_request(const request_ptr_t a_request)
Definition: endpoint.cc:336
const return_code & ret_code() const noexcept
Definition: throw_reply.hh:109
virtual reply_ptr_t do_run_request(const request_ptr_t a_request)
Definition: endpoint.hh:211
reply_ptr_t submit_request_message(const request_ptr_t a_request)
Directly submit a request message to this endpoint.
Definition: endpoint.cc:73
const scarab::param_ptr_t & get_payload_ptr() const noexcept
Definition: throw_reply.hh:136
void sort_message(const message_ptr_t a_request)
Definition: endpoint.cc:212
reply_ptr_t handle_is_locked_request(const request_ptr_t a_request)
Definition: endpoint.cc:442
std::shared_ptr< sent_msg_pkg > sent_msg_pkg_ptr
Definition: dripline_fwd.hh:27
STL namespace.
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
virtual void on_alert_message(const alert_ptr_t a_alert)
Definition: endpoint.cc:280
Dripline-specific errors.
std::shared_ptr< msg_alert > alert_ptr_t
Definition: dripline_fwd.hh:25
virtual unsigned rc_value() const =0
uuid_t enable_lockout(const scarab::param_node &a_tag)
enable lockout with randomly-generated key
Definition: endpoint.hh:231
A singleton throw_reply object used to transfer throw_reply information to C++ from other implementat...
Definition: reply_cache.hh:37
bool is_locked() const
Definition: endpoint.hh:236
reply_ptr_t __do_get_request(const request_ptr_t a_request)
Definition: endpoint.cc:301
virtual ~endpoint()
Definition: endpoint.cc:52
static scarab::logger dlog("endpoint")
virtual reply_ptr_t do_get_request(const request_ptr_t a_request)
Definition: endpoint.hh:216
static unsigned s_value
bool disable_lockout(const uuid_t &a_key, bool a_force=false)
Definition: endpoint.cc:391
std::string string_from_uuid(const uuid_t &a_id)
Generates a string representation of the provided UUID.
Definition: uuid.cc:92
reply_ptr_t __do_run_request(const request_ptr_t a_request)
Definition: endpoint.cc:285
boost::uuids::uuid uuid_t
Universally-unique-identifier type containing 16 hexadecimal characters.
Definition: uuid.hh:26
reply_ptr_t handle_unlock_request(const request_ptr_t a_request)
Definition: endpoint.cc:421
static scarab::logger dlog("agent")
Error indicating a problem with the connection to the broker.
reply_ptr_t handle_ping_request(const request_ptr_t a_request)
Definition: endpoint.cc:452
bool check_key(const uuid_t &a_key) const
Definition: endpoint.hh:241
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
reply_ptr_t handle_set_condition_request(const request_ptr_t a_request)
Definition: endpoint.cc:437
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
endpoint(const std::string &a_name)
Definition: endpoint.cc:30
Object that can be thrown while processing a request to send a reply.
Definition: throw_reply.hh:41
virtual reply_ptr_t do_set_request(const request_ptr_t a_request)
Definition: endpoint.hh:221
reply_ptr_t __do_set_request(const request_ptr_t a_request)
Definition: endpoint.cc:320
void submit_reply_message(const reply_ptr_t a_reply)
Directly submit a reply message to this endpoint.
Definition: endpoint.cc:83
endpoint & operator=(const endpoint &a_orig)
Definition: endpoint.cc:55
virtual reply_ptr_t do_cmd_request(const request_ptr_t a_request)
Definition: endpoint.hh:226
uuid_t generate_nil_uuid()
Generates a UUID containing all 0s.
Definition: uuid.cc:26
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
Definition: endpoint.cc:88
const std::string & return_message() const noexcept
Definition: throw_reply.hh:99
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
Basic Dripline object capable of receiving and acting on messages.
Definition: endpoint.hh:95
virtual reply_ptr_t __do_handle_set_condition_request(const request_ptr_t a_request)
Default set-condition: no action taken; override for different behavior.
Definition: endpoint.hh:246
void set_payload(scarab::param_ptr_t a_payload)
Definition: throw_reply.hh:130
void submit_alert_message(const alert_ptr_t a_alert)
Directly submit an alert message to this endpoint.
Definition: endpoint.cc:78
virtual void send_reply(reply_ptr_t a_reply) const
Definition: endpoint.cc:232
virtual void on_reply_message(const reply_ptr_t a_reply)
Definition: endpoint.cc:275
reply_ptr_t handle_lock_request(const request_ptr_t a_request)
Definition: endpoint.cc:407