8 #define DRIPLINE_API_EXPORTS 14 #include "authentication.hh" 17 using scarab::authentication;
18 using scarab::param_node;
19 using scarab::param_value;
20 using scarab::param_ptr_t;
27 LOGGER(
dlog,
"service" );
29 service::service(
const scarab::param_node& a_config,
const string& a_queue_name,
const std::string& a_broker_address,
unsigned a_port,
const std::string& a_auth_file,
const bool a_make_connection ) :
31 core( a_config, a_broker_address, a_port, a_auth_file, a_make_connection ),
36 endpoint( a_queue_name.empty() ? a_config.get_value(
"queue",
"dlcpp_service" ) : a_queue_name ),
41 f_status(
status::nothing ),
42 f_enable_scheduling( a_config.get_value(
"enable-scheduling", false ) ),
46 f_broadcast_key(
"broadcast" )
49 f_listen_timeout_ms = a_config.get_value(
"loop-timeout-ms", f_listen_timeout_ms );
50 heartbeater::f_check_timeout_ms = f_listen_timeout_ms;
51 f_single_message_wait_ms = a_config.get_value(
"message-wait-ms", f_single_message_wait_ms );
52 f_heartbeat_interval_s = a_config.get_value(
"heartbeat-interval-s", f_heartbeat_interval_s );
55 if( ! a_queue_name.empty() ) f_name = a_queue_name;
60 core( a_make_connection, a_config ),
66 f_status(
status::nothing ),
67 f_enable_scheduling( a_config.get_value(
"enable-scheduling", false ) ),
83 f_status(
std::move(a_orig.f_status) ),
84 f_enable_scheduling(
std::move(a_orig.f_enable_scheduling) ),
85 f_id(
std::move(a_orig.f_id) ),
86 f_sync_children(
std::move(a_orig.f_sync_children) ),
87 f_async_children(
std::move(a_orig.f_async_children) ),
88 f_broadcast_key(
std::move(a_orig.f_broadcast_key) )
97 std::this_thread::sleep_for( std::chrono::milliseconds(1100) );
109 f_status = std::move(a_orig.f_status) ;
110 f_enable_scheduling = std::move(a_orig.f_enable_scheduling);
111 f_id = std::move(a_orig.f_id);
112 f_sync_children = std::move(a_orig.f_sync_children);
113 f_async_children = std::move(a_orig.f_async_children);
114 f_broadcast_key = std::move(a_orig.f_broadcast_key);
120 auto t_inserted = f_sync_children.insert( std::make_pair( a_endpoint_ptr->name(), a_endpoint_ptr ) );
121 if( t_inserted.second )
125 a_endpoint_ptr->service() = shared_from_this();
127 catch( std::bad_weak_ptr& e )
129 LWARN(
dlog,
"add_child called from service constructor (or for some other reason the shared-pointer is bad); Service pointer not set.");
132 return t_inserted.second;
138 if( ! t_listener_receiver_ptr )
142 auto t_inserted = f_async_children.insert( std::make_pair( a_endpoint_ptr->name(), t_listener_receiver_ptr ) );
143 if( t_inserted.second )
147 a_endpoint_ptr->service() = shared_from_this();
149 catch( std::bad_weak_ptr& e )
151 LWARN(
dlog,
"add_async_child called from service constructor (or for some other reason the shared-pointer is bad); Service pointer not set.");
154 return t_inserted.second;
159 if( ! f_make_connection )
161 LWARN(
dlog,
"Should not start service when make_connection is disabled" );
166 LERROR(
dlog,
"Service requires a queue name to be started" );
171 endpoint::f_service = this->shared_from_this();
172 heartbeater::f_service = this->shared_from_this();
174 LINFO(
dlog,
"Connecting to <" << f_address <<
":" << f_port <<
">" );
179 if( !
setup_exchange( f_channel, f_requests_exchange ) )
return false;
180 if( !
setup_exchange( f_channel, f_alerts_exchange ) )
return false;
197 if ( ! f_make_connection )
199 LWARN(
dlog,
"Should not listen for messages when make_connection is disabled" );
207 if( f_heartbeat_interval_s != 0 )
213 LINFO(
dlog,
"Heartbeat disabled" );
216 if( f_enable_scheduling )
222 LINFO(
dlog,
"scheduler disabled" );
227 for( async_map_t::iterator t_child_it = f_async_children.begin();
228 t_child_it != f_async_children.end();
231 t_child_it->second->receiver_thread() = std::thread( &
concurrent_receiver::execute, static_cast< listener_receiver* >(t_child_it->second.get()) );
237 for( async_map_t::iterator t_child_it = f_async_children.begin();
238 t_child_it != f_async_children.end();
241 t_child_it->second->listener_thread().join();
242 t_child_it->second->receiver_thread().join();
245 f_receiver_thread.join();
256 catch( std::system_error& e )
258 LERROR(
dlog,
"Could not start the a thread due to a system error: " << e.what() );
263 LERROR(
dlog,
"Dripline error while running service: " << e.what() );
266 catch( std::exception& e )
268 LERROR(
dlog,
"Error while running service: " << e.what() );
277 LINFO(
dlog,
"Stopping service on <" << f_name <<
">" );
302 LDEBUG(
dlog,
"Opening channel for service <" << f_name <<
">" );
304 if( ! f_channel )
return false;
306 for( async_map_t::iterator t_child_it = f_async_children.begin();
307 t_child_it != f_async_children.end();
310 LDEBUG(
dlog,
"Opening channel for child <" << t_child_it->first <<
">" );
312 t_child_it->second->set_listen_timeout_ms( f_listen_timeout_ms );
319 LDEBUG(
dlog,
"Setting up queue for service <" << f_name <<
">" );
320 if( !
setup_queue( f_channel, f_name ) )
return false;
322 for( async_map_t::iterator t_child_it = f_async_children.begin();
323 t_child_it != f_async_children.end();
326 LDEBUG(
dlog,
"Setting up queue for async child <" << t_child_it->first <<
">" );
327 if( !
setup_queue( t_child_it->second->channel(), t_child_it->first ) )
return false;
335 LDEBUG(
dlog,
"Binding primary service keys" );
336 if( !
bind_key( f_channel, f_requests_exchange, f_name, f_name +
".#" ) )
return false;
337 if( !
bind_key( f_channel, f_requests_exchange, f_name, f_broadcast_key +
".#" ) )
return false;
339 LDEBUG(
dlog,
"Binding keys for synchronous children" );
340 for( sync_map_t::const_iterator t_child_it = f_sync_children.begin();
341 t_child_it != f_sync_children.end();
344 if( !
bind_key( f_channel, f_requests_exchange, f_name, t_child_it->first +
".#" ) )
return false;
347 LDEBUG(
dlog,
"Binding keys for asynchronous children" );
348 for( async_map_t::iterator t_child_it = f_async_children.begin();
349 t_child_it != f_async_children.end();
352 if( !
bind_key( t_child_it->second->channel(), f_requests_exchange, t_child_it->first, t_child_it->first +
".#" ) )
return false;
361 if( f_consumer_tag.empty() )
return false;
363 for( async_map_t::iterator t_child_it = f_async_children.begin();
364 t_child_it != f_async_children.end();
367 t_child_it->second->consumer_tag() =
core::start_consuming( t_child_it->second->channel(), t_child_it->first );
368 if( t_child_it->second->consumer_tag().empty() )
return false;
376 bool t_success =
true;
378 for( async_map_t::iterator t_child_it = f_async_children.begin();
379 t_child_it != f_async_children.end();
382 t_success =
core::stop_consuming( t_child_it->second->channel(), t_child_it->second->consumer_tag() );
390 bool t_success =
true;
392 for( async_map_t::iterator t_child_it = f_async_children.begin();
393 t_child_it != f_async_children.end();
403 LINFO(
dlog,
"Listening for incoming messages on <" << f_name <<
">" );
405 while( ! is_canceled() )
410 if( f_canceled.load() )
412 LDEBUG(
dlog,
"Service canceled" );
416 if( ! t_envelope && t_channel_valid )
426 if( ! t_channel_valid )
428 LERROR(
dlog,
"Channel is no longer valid for endpoint <" << f_name <<
">" );
432 if( f_canceled.load() )
434 LDEBUG(
dlog,
"Service <" << f_name <<
"> canceled" );
452 LERROR(
dlog,
"<" << f_name <<
"> Dripline exception caught while handling message: " << e.what() );
457 LERROR(
dlog,
"<" << f_name <<
"> AMQP exception caught while handling message: (" << e.reply_code() <<
") " << e.reply_text() );
462 LERROR(
dlog,
"<" << f_name <<
"> AMQP Library Exception caught while handling message: (" << e.ErrorCode() <<
") " << e.what() );
465 catch( std::exception& e )
467 LERROR(
dlog,
"<" << f_name <<
"> Standard exception caught while handling message: " << e.what() );
476 LDEBUG(
dlog,
"Sending reply message to <" << a_reply->routing_key() <<
">:\n" <<
477 " Return code: " << a_reply->get_return_code() <<
'\n' <<
478 " Return message: " << a_reply->return_message() <<
'\n' <<
479 " Payload:\n" << a_reply->payload() );
481 if( !
send( a_reply ) )
483 LWARN(
dlog,
"Something went wrong while sending the reply" );
490 std::string t_first_token( a_request->routing_key() );
491 t_first_token = t_first_token.substr( 0, t_first_token.find_first_of(
'.') );
492 LDEBUG(
dlog,
"First token in routing key: <" << t_first_token <<
">" );
494 if( t_first_token == f_name || t_first_token == f_broadcast_key )
501 auto t_endpoint_itr = f_sync_children.find( t_first_token );
502 if( t_endpoint_itr == f_sync_children.end() )
504 LERROR(
dlog,
"Did not find child endpoint called <" << t_first_token <<
">" );
505 throw dripline_error() <<
"Did not find child endpoint <" << t_first_token <<
">";
509 return t_endpoint_itr->second->on_request_message( a_request );
515 LDEBUG(
dlog,
"Canceling service <" << f_name <<
">" );
516 for( async_map_t::iterator t_child_it = f_async_children.begin();
517 t_child_it != f_async_children.end();
520 LDEBUG(
dlog,
"Canceling child endpoint <" << t_child_it->first <<
">" );
521 t_child_it->second->cancel( a_code );
static bool stop_consuming(amqp_channel_ptr a_channel, std::string &a_consumer_tag)
service & operator=(const service &)=delete
void sort_message(const message_ptr_t a_request)
void execute(const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
listener_receiver & operator=(const listener_receiver &)=delete
virtual bool listen_on_queue()=0
void execute()
Main execution loop for the scheduler.
static bool setup_exchange(amqp_channel_ptr a_channel, const std::string &a_exchange)
static bool listen_for_message(amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
return: if false, channel is no longer useable; if true, may be reused
std::shared_ptr< msg_request > request_ptr_t
std::thread f_heartbeat_thread
Dripline-specific errors.
virtual void submit_message(message_ptr_t a_message)
Submit a message for direct processing.
void handle_message_chunk(amqp_envelope_ptr a_envelope)
bool add_child(endpoint_ptr_t a_endpoint_ptr)
Add a synchronous child endpoint.
virtual bool setup_queues()
static bool bind_key(amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
virtual sent_msg_pkg_ptr send(request_ptr_t a_request) const
Sends a request message and returns a channel on which to listen for a reply.
Consumer of Dripline messages on a particular queue.
static std::string start_consuming(amqp_channel_ptr a_channel, const std::string &a_queue_name)
virtual void do_cancellation(int a_code)
virtual bool remove_queue()
Executes scheduled events.
static bool remove_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
std::thread f_scheduler_thread
std::shared_ptr< listener_receiver > lr_ptr_t
service(const scarab::param_node &a_config=scarab::param_node(), const std::string &a_queue_name="", const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="", const bool a_make_connection=true)
Convenience class to bring together listener and concurrent_receiver.
Decorator class for a plain endpoint: adds listener_receiver capabilities.
heartbeater & operator=(const heartbeater &)=delete
virtual void send_reply(reply_ptr_t a_reply) const
Sends a reply message.
std::shared_ptr< endpoint > endpoint_ptr_t
amqp_channel_ptr open_channel() const
static scarab::logger dlog("agent")
virtual bool open_channels()
scheduler & operator=(const scheduler &)=delete
AmqpClient::AmqpException amqp_exception
virtual bool start_consuming()
std::shared_ptr< msg_reply > reply_ptr_t
uuid_t generate_random_uuid()
Generates a UUID containing random numbers (RNG is a Mersenne Twister)
static bool setup_queue(amqp_channel_ptr a_channel, const std::string &a_queue_name)
AmqpClient::Envelope::ptr_t amqp_envelope_ptr
void execute()
Handles messages that appear in the concurrent queue by calling submit_message(). ...
endpoint & operator=(const endpoint &a_orig)
Basic AMQP interactions, including sending messages and interacting with AMQP channels.
virtual bool stop_consuming()
AmqpClient::AmqpLibraryException amqp_lib_exception
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
virtual reply_ptr_t on_request_message(const request_ptr_t a_request)
Default request handler; passes request to initial request functions.
std::shared_ptr< message > message_ptr_t
bool add_async_child(endpoint_ptr_t a_endpoint_ptr)
Add an asynchronous child endpoint.
Basic Dripline object capable of receiving and acting on messages.
A heartbeater repeatedly sends an alert on a particular time interval.
core & operator=(const core &a_orig)
virtual bool listen_on_queue()
Waits for AMQP messages arriving on the channel.