Dripline-Cpp  v2.4.2
Dripline Implementation in C++
service.cc
Go to the documentation of this file.
1 /*
2  * service.cc
3  *
4  * Created on: Jan 5, 2016
5  * Author: nsoblath
6  */
7 
8 #define DRIPLINE_API_EXPORTS
9 
10 #include "service.hh"
11 
12 #include "dripline_exceptions.hh"
13 
14 #include "authentication.hh"
15 #include "logger.hh"
16 
17 using scarab::authentication;
18 using scarab::param_node;
19 using scarab::param_value;
20 using scarab::param_ptr_t;
21 
22 using std::string;
23 using std::set;
24 
25 namespace dripline
26 {
27  LOGGER( dlog, "service" );
28 
29  service::service( const scarab::param_node& a_config, const string& a_queue_name, const std::string& a_broker_address, unsigned a_port, const std::string& a_auth_file, const bool a_make_connection ) :
30  scarab::cancelable(),
31  core( a_config, a_broker_address, a_port, a_auth_file, a_make_connection ),
32  // logic for setting the name:
33  // a_queue_name if provided
34  // otherwise a_config["queue"] if it exists
35  // otherwise "dlcpp_service"
36  endpoint( a_queue_name.empty() ? a_config.get_value( "queue", "dlcpp_service" ) : a_queue_name ),
38  heartbeater(),
39  scheduler<>(),
40  std::enable_shared_from_this< service >(),
41  f_status( status::nothing ),
42  f_enable_scheduling( a_config.get_value("enable-scheduling", false ) ),
43  f_id( generate_random_uuid() ),
44  f_sync_children(),
45  f_async_children(),
46  f_broadcast_key( "broadcast" )
47  {
48  // get values from the config
49  f_listen_timeout_ms = a_config.get_value( "loop-timeout-ms", f_listen_timeout_ms );
50  heartbeater::f_check_timeout_ms = f_listen_timeout_ms;
51  f_single_message_wait_ms = a_config.get_value( "message-wait-ms", f_single_message_wait_ms );
52  f_heartbeat_interval_s = a_config.get_value( "heartbeat-interval-s", f_heartbeat_interval_s );
53 
54  // override if specified as a separate argument
55  if( ! a_queue_name.empty() ) f_name = a_queue_name;
56  }
57 
58  service::service( const bool a_make_connection, const scarab::param_node& a_config ) :
59  scarab::cancelable(),
60  core( a_make_connection, a_config ),
61  endpoint( "" ),
63  heartbeater(),
64  scheduler<>(),
65  std::enable_shared_from_this< service >(),
66  f_status( status::nothing ),
67  f_enable_scheduling( a_config.get_value("enable-scheduling", false ) ),
68  f_id( generate_random_uuid() ),
69  f_sync_children(),
70  f_async_children(),
71  f_broadcast_key()
72  {
73  }
74 
75  service::service( service&& a_orig ) :
76  scarab::cancelable(),
77  core( std::move(a_orig) ),
78  endpoint( std::move(a_orig) ),
79  listener_receiver( std::move(a_orig) ),
80  heartbeater( std::move(a_orig) ),
81  scheduler<>( std::move(a_orig) ),
82  std::enable_shared_from_this< service >(),
83  f_status( std::move(a_orig.f_status) ),
84  f_enable_scheduling( std::move(a_orig.f_enable_scheduling) ),
85  f_id( std::move(a_orig.f_id) ),
86  f_sync_children( std::move(a_orig.f_sync_children) ),
87  f_async_children( std::move(a_orig.f_async_children) ),
88  f_broadcast_key( std::move(a_orig.f_broadcast_key) )
89  {
90  }
91 
93  {
94  if( f_status >= status::listening )
95  {
96  this->cancel( dl_success().rc_value() );
97  std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
98  }
99  if( f_status > status::exchange_declared ) stop();
100  }
101 
103  {
104  core::operator=( std::move(a_orig) );
105  endpoint::operator=( std::move(a_orig) );
106  listener_receiver::operator=( std::move(a_orig) );
107  heartbeater::operator=( std::move(a_orig) );
108  scheduler<>::operator=( std::move(a_orig) );
109  f_status = std::move(a_orig.f_status) ;
110  f_enable_scheduling = std::move(a_orig.f_enable_scheduling);
111  f_id = std::move(a_orig.f_id);
112  f_sync_children = std::move(a_orig.f_sync_children);
113  f_async_children = std::move(a_orig.f_async_children);
114  f_broadcast_key = std::move(a_orig.f_broadcast_key);
115  return *this;
116  }
117 
118  bool service::add_child( endpoint_ptr_t a_endpoint_ptr )
119  {
120  auto t_inserted = f_sync_children.insert( std::make_pair( a_endpoint_ptr->name(), a_endpoint_ptr ) );
121  if( t_inserted.second )
122  {
123  try
124  {
125  a_endpoint_ptr->service() = shared_from_this();
126  }
127  catch( std::bad_weak_ptr& e )
128  {
129  LWARN( dlog, "add_child called from service constructor (or for some other reason the shared-pointer is bad); Service pointer not set.");
130  }
131  }
132  return t_inserted.second;
133  }
134 
136  {
137  lr_ptr_t t_listener_receiver_ptr = std::dynamic_pointer_cast< listener_receiver >( a_endpoint_ptr );
138  if( ! t_listener_receiver_ptr )
139  {
140  t_listener_receiver_ptr.reset( new endpoint_listener_receiver( a_endpoint_ptr ) );
141  }
142  auto t_inserted = f_async_children.insert( std::make_pair( a_endpoint_ptr->name(), t_listener_receiver_ptr ) );
143  if( t_inserted.second )
144  {
145  try
146  {
147  a_endpoint_ptr->service() = shared_from_this();
148  }
149  catch( std::bad_weak_ptr& e )
150  {
151  LWARN( dlog, "add_async_child called from service constructor (or for some other reason the shared-pointer is bad); Service pointer not set.");
152  }
153  }
154  return t_inserted.second;
155  }
156 
158  {
159  if( ! f_make_connection )
160  {
161  LWARN( dlog, "Should not start service when make_connection is disabled" );
162  return true;
163  }
164  if( f_name.empty() )
165  {
166  LERROR( dlog, "Service requires a queue name to be started" );
167  return false;
168  }
169 
170  // fill in the link to this in endpoint because we couldn't do it in the constructor
171  endpoint::f_service = this->shared_from_this();
172  heartbeater::f_service = this->shared_from_this();
173 
174  LINFO( dlog, "Connecting to <" << f_address << ":" << f_port << ">" );
175 
176  if( ! open_channels() ) return false;
177  f_status = status::channel_created;
178 
179  if( ! setup_exchange( f_channel, f_requests_exchange ) ) return false;
180  if( ! setup_exchange( f_channel, f_alerts_exchange ) ) return false;
181  f_status = status::exchange_declared;
182 
183  if( ! setup_queues() ) return false;
184  f_status = status::queue_declared;
185 
186  if( ! bind_keys() ) return false;
187  f_status = status::queue_bound;
188 
189  if( ! start_consuming() ) return false;
190  f_status = status::consuming;
191 
192  return true;
193  }
194 
196  {
197  if ( ! f_make_connection )
198  {
199  LWARN( dlog, "Should not listen for messages when make_connection is disabled" );
200  return true;
201  }
202 
203  f_status = status::listening;
204 
205  try
206  {
207  if( f_heartbeat_interval_s != 0 )
208  {
209  f_heartbeat_thread = std::thread( &heartbeater::execute, this, f_name, f_id, f_heartbeat_routing_key );
210  }
211  else
212  {
213  LINFO( dlog, "Heartbeat disabled" );
214  }
215 
216  if( f_enable_scheduling )
217  {
218  f_scheduler_thread = std::thread( &scheduler::execute, this );
219  }
220  else
221  {
222  LINFO( dlog, "scheduler disabled" );
223  }
224 
225  f_receiver_thread = std::thread( &concurrent_receiver::execute, this );
226 
227  for( async_map_t::iterator t_child_it = f_async_children.begin();
228  t_child_it != f_async_children.end();
229  ++t_child_it )
230  {
231  t_child_it->second->receiver_thread() = std::thread( &concurrent_receiver::execute, static_cast< listener_receiver* >(t_child_it->second.get()) );
232  t_child_it->second->listener_thread() = std::thread( &listener::listen_on_queue, t_child_it->second.get() );
233  }
234 
235  listen_on_queue();
236 
237  for( async_map_t::iterator t_child_it = f_async_children.begin();
238  t_child_it != f_async_children.end();
239  ++t_child_it )
240  {
241  t_child_it->second->listener_thread().join();
242  t_child_it->second->receiver_thread().join();
243  }
244 
245  f_receiver_thread.join();
246 
247  if( f_heartbeat_thread.joinable() )
248  {
249  f_heartbeat_thread.join();
250  }
251  if( f_scheduler_thread.joinable() )
252  {
253  f_scheduler_thread.join();
254  }
255  }
256  catch( std::system_error& e )
257  {
258  LERROR( dlog, "Could not start the a thread due to a system error: " << e.what() );
259  return false;
260  }
261  catch( dripline_error& e )
262  {
263  LERROR( dlog, "Dripline error while running service: " << e.what() );
264  return false;
265  }
266  catch( std::exception& e )
267  {
268  LERROR( dlog, "Error while running service: " << e.what() );
269  return false;
270  }
271 
272  return true;
273  }
274 
276  {
277  LINFO( dlog, "Stopping service on <" << f_name << ">" );
278 
279  if( f_status >= status::listening ) // listening or processing
280  {
281  this->cancel( dl_success().rc_value() );
282  f_status = status::consuming;
283  }
284  if( f_status >= status::queue_bound ) // queue_bound or consuming
285  {
286  if( ! stop_consuming() ) return false;
287  f_status = status::queue_bound;
288  }
289 
290 
291  if( f_status >= status::queue_declared ) // queue_declared or queue_bound
292  {
293  if( ! remove_queue() ) return false;
294  f_status = status::exchange_declared;
295  }
296 
297  return true;
298  }
299 
301  {
302  LDEBUG( dlog, "Opening channel for service <" << f_name << ">" );
303  f_channel = open_channel();
304  if( ! f_channel ) return false;
305 
306  for( async_map_t::iterator t_child_it = f_async_children.begin();
307  t_child_it != f_async_children.end();
308  ++t_child_it )
309  {
310  LDEBUG( dlog, "Opening channel for child <" << t_child_it->first << ">" );
311  t_child_it->second->channel() = open_channel();
312  t_child_it->second->set_listen_timeout_ms( f_listen_timeout_ms );
313  }
314  return true;
315  }
316 
318  {
319  LDEBUG( dlog, "Setting up queue for service <" << f_name << ">" );
320  if( ! setup_queue( f_channel, f_name ) ) return false;
321 
322  for( async_map_t::iterator t_child_it = f_async_children.begin();
323  t_child_it != f_async_children.end();
324  ++t_child_it )
325  {
326  LDEBUG( dlog, "Setting up queue for async child <" << t_child_it->first << ">" );
327  if( ! setup_queue( t_child_it->second->channel(), t_child_it->first ) ) return false;
328  }
329 
330  return true;
331  }
332 
334  {
335  LDEBUG( dlog, "Binding primary service keys" );
336  if( ! bind_key( f_channel, f_requests_exchange, f_name, f_name + ".#" ) ) return false;
337  if( ! bind_key( f_channel, f_requests_exchange, f_name, f_broadcast_key + ".#" ) ) return false;
338 
339  LDEBUG( dlog, "Binding keys for synchronous children" );
340  for( sync_map_t::const_iterator t_child_it = f_sync_children.begin();
341  t_child_it != f_sync_children.end();
342  ++t_child_it )
343  {
344  if( ! bind_key( f_channel, f_requests_exchange, f_name, t_child_it->first + ".#" ) ) return false;
345  }
346 
347  LDEBUG( dlog, "Binding keys for asynchronous children" );
348  for( async_map_t::iterator t_child_it = f_async_children.begin();
349  t_child_it != f_async_children.end();
350  ++t_child_it )
351  {
352  if( ! bind_key( t_child_it->second->channel(), f_requests_exchange, t_child_it->first, t_child_it->first + ".#" ) ) return false;
353  }
354 
355  return true;
356  }
357 
359  {
360  f_consumer_tag = core::start_consuming( f_channel, f_name );
361  if( f_consumer_tag.empty() ) return false;
362 
363  for( async_map_t::iterator t_child_it = f_async_children.begin();
364  t_child_it != f_async_children.end();
365  ++t_child_it )
366  {
367  t_child_it->second->consumer_tag() = core::start_consuming( t_child_it->second->channel(), t_child_it->first );
368  if( t_child_it->second->consumer_tag().empty() ) return false;
369  }
370  return true;
371  }
372 
374  {
375  // doesn't stop on failure; continues trying to stop consuming
376  bool t_success = true;
377  t_success = core::stop_consuming( f_channel, f_consumer_tag );
378  for( async_map_t::iterator t_child_it = f_async_children.begin();
379  t_child_it != f_async_children.end();
380  ++t_child_it )
381  {
382  t_success = core::stop_consuming( t_child_it->second->channel(), t_child_it->second->consumer_tag() );
383  }
384  return t_success;
385  }
386 
388  {
389  // doesn't stop on failure; continues trying to remove queues
390  bool t_success = true;
391  t_success = core::remove_queue( f_channel, f_name );
392  for( async_map_t::iterator t_child_it = f_async_children.begin();
393  t_child_it != f_async_children.end();
394  ++t_child_it )
395  {
396  t_success = core::remove_queue( t_child_it->second->channel(), t_child_it->first );
397  }
398  return t_success;
399  }
400 
402  {
403  LINFO( dlog, "Listening for incoming messages on <" << f_name << ">" );
404 
405  while( ! is_canceled() )
406  {
407  amqp_envelope_ptr t_envelope;
408  bool t_channel_valid = core::listen_for_message( t_envelope, f_channel, f_consumer_tag, f_listen_timeout_ms );
409 
410  if( f_canceled.load() )
411  {
412  LDEBUG( dlog, "Service canceled" );
413  return true;
414  }
415 
416  if( ! t_envelope && t_channel_valid )
417  {
418  // we end up here every time the listen times out with no message received
419  continue;
420  }
421 
422  f_status = status::processing;
423 
424  handle_message_chunk( t_envelope );
425 
426  if( ! t_channel_valid )
427  {
428  LERROR( dlog, "Channel is no longer valid for endpoint <" << f_name << ">" );
429  return false;
430  }
431 
432  if( f_canceled.load() )
433  {
434  LDEBUG( dlog, "Service <" << f_name << "> canceled" );
435  return true;
436  }
437 
438  f_status = status::listening;
439  }
440  return true;
441  }
442 
444  {
445  try
446  {
447  sort_message( a_message );
448  return;
449  }
450  catch( dripline_error& e )
451  {
452  LERROR( dlog, "<" << f_name << "> Dripline exception caught while handling message: " << e.what() );
453  throw;
454  }
455  catch( amqp_exception& e )
456  {
457  LERROR( dlog, "<" << f_name << "> AMQP exception caught while handling message: (" << e.reply_code() << ") " << e.reply_text() );
458  throw;
459  }
460  catch( amqp_lib_exception& e )
461  {
462  LERROR( dlog, "<" << f_name << "> AMQP Library Exception caught while handling message: (" << e.ErrorCode() << ") " << e.what() );
463  throw;
464  }
465  catch( std::exception& e )
466  {
467  LERROR( dlog, "<" << f_name << "> Standard exception caught while handling message: " << e.what() );
468  throw;
469  }
470 
471  return;
472  }
473 
474  void service::send_reply( reply_ptr_t a_reply ) const
475  {
476  LDEBUG( dlog, "Sending reply message to <" << a_reply->routing_key() << ">:\n" <<
477  " Return code: " << a_reply->get_return_code() << '\n' <<
478  " Return message: " << a_reply->return_message() << '\n' <<
479  " Payload:\n" << a_reply->payload() );
480 
481  if( ! send( a_reply ) )
482  {
483  LWARN( dlog, "Something went wrong while sending the reply" );
484  }
485  return;
486  }
487 
489  {
490  std::string t_first_token( a_request->routing_key() );
491  t_first_token = t_first_token.substr( 0, t_first_token.find_first_of('.') );
492  LDEBUG( dlog, "First token in routing key: <" << t_first_token << ">" );
493 
494  if( t_first_token == f_name || t_first_token == f_broadcast_key )
495  {
496  // reply will be sent by endpoint::on_request_message
497  return this->endpoint::on_request_message( a_request );
498  }
499  else
500  {
501  auto t_endpoint_itr = f_sync_children.find( t_first_token );
502  if( t_endpoint_itr == f_sync_children.end() )
503  {
504  LERROR( dlog, "Did not find child endpoint called <" << t_first_token << ">" );
505  throw dripline_error() << "Did not find child endpoint <" << t_first_token << ">";
506  }
507 
508  // reply will be sent by endpoint::on_request_message or derived
509  return t_endpoint_itr->second->on_request_message( a_request );
510  }
511  }
512 
513  void service::do_cancellation( int a_code )
514  {
515  LDEBUG( dlog, "Canceling service <" << f_name << ">" );
516  for( async_map_t::iterator t_child_it = f_async_children.begin();
517  t_child_it != f_async_children.end();
518  ++t_child_it )
519  {
520  LDEBUG( dlog, "Canceling child endpoint <" << t_child_it->first << ">" );
521  t_child_it->second->cancel( a_code );
522  }
523  return;
524  }
525 
526 } /* namespace dripline */
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
Definition: core.cc:432
service & operator=(const service &)=delete
void sort_message(const message_ptr_t a_request)
Definition: endpoint.cc:212
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
Definition: heartbeater.cc:53
listener_receiver & operator=(const listener_receiver &)=delete
virtual bool listen_on_queue()=0
void execute()
Main execution loop for the scheduler.
Definition: scheduler.hh:321
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
Definition: core.cc:330
STL namespace.
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
Definition: core.cc:498
std::shared_ptr< msg_request > request_ptr_t
Definition: dripline_fwd.hh:23
std::thread f_heartbeat_thread
Definition: heartbeater.hh:83
Dripline-specific errors.
virtual void submit_message(message_ptr_t a_message)
Submit a message for direct processing.
Definition: service.cc:443
void handle_message_chunk(amqp_envelope_ptr a_envelope)
Definition: receiver.cc:73
bool add_child(endpoint_ptr_t a_endpoint_ptr)
Add a synchronous child endpoint.
Definition: service.cc:118
virtual bool setup_queues()
Definition: service.cc:317
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
Definition: core.cc:381
virtual sent_msg_pkg_ptr send(request_ptr_t a_request) const
Sends a request message and returns a channel on which to listen for a reply.
Definition: service.hh:177
Consumer of Dripline messages on a particular queue.
Definition: service.hh:72
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:407
Definition: core.hh:17
virtual void do_cancellation(int a_code)
Definition: service.cc:513
virtual bool remove_queue()
Definition: service.cc:387
virtual bool bind_keys()
Definition: service.cc:333
Executes scheduled events.
Definition: scheduler.hh:93
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:468
std::thread f_scheduler_thread
Definition: scheduler.hh:166
std::shared_ptr< listener_receiver > lr_ptr_t
Definition: dripline_fwd.hh:33
service(const scarab::param_node &a_config=scarab::param_node(), const std::string &a_queue_name="", const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="", const bool a_make_connection=true)
Definition: service.cc:29
Convenience class to bring together listener and concurrent_receiver.
Definition: listener.hh:75
Decorator class for a plain endpoint: adds listener_receiver capabilities.
Definition: listener.hh:104
heartbeater & operator=(const heartbeater &)=delete
virtual void send_reply(reply_ptr_t a_reply) const
Sends a reply message.
Definition: service.cc:474
std::shared_ptr< endpoint > endpoint_ptr_t
Definition: dripline_fwd.hh:39
amqp_channel_ptr open_channel() const
Definition: core.cc:296
static scarab::logger dlog("agent")
virtual bool open_channels()
Definition: service.cc:300
scheduler & operator=(const scheduler &)=delete
AmqpClient::AmqpException amqp_exception
Definition: amqp.hh:28
virtual bool start_consuming()
Definition: service.cc:358
std::shared_ptr< msg_reply > reply_ptr_t
Definition: dripline_fwd.hh:24
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
Definition: uuid.cc:19
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
Definition: core.cc:355
virtual ~service()
Definition: service.cc:92
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
Definition: amqp.hh:25
void execute()
Handles messages that appear in the concurrent queue by calling submit_message(). ...
Definition: receiver.cc:393
endpoint & operator=(const endpoint &a_orig)
Definition: endpoint.cc:55
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
Definition: core.hh:72
virtual bool stop_consuming()
Definition: service.cc:373
AmqpClient::AmqpLibraryException amqp_lib_exception
Definition: amqp.hh:29
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
Definition: endpoint.cc:88
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
Definition: service.cc:488
std::shared_ptr< message > message_ptr_t
Definition: dripline_fwd.hh:20
bool add_async_child(endpoint_ptr_t a_endpoint_ptr)
Add an asynchronous child endpoint.
Definition: service.cc:135
Basic Dripline object capable of receiving and acting on messages.
Definition: endpoint.hh:95
A heartbeater repeatedly sends an alert on a particular time interval.
Definition: heartbeater.hh:52
core & operator=(const core &a_orig)
Definition: core.cc:150
virtual bool listen_on_queue()
Waits for AMQP messages arriving on the channel.
Definition: service.cc:401