Dripline-Cpp  v2.4.2
Dripline Implementation in C++
Public Types | Public Member Functions | Protected Types | Protected Member Functions | Private Member Functions | List of all members
service Class Reference

Consumer of Dripline messages on a particular queue. More...

#include <service.hh>

Inheritance diagram for service:
Inheritance graph

Public Types

typedef std::map< std::string, endpoint_ptr_tsync_map_t
 
typedef std::map< std::string, lr_ptr_tasync_map_t
 
- Public Types inherited from scheduler<>
using clock_t = std::chrono::system_clock
 
using time_point_t = typename std::chrono::system_clock ::time_point
 
using duration_t = typename std::chrono::system_clock ::duration
 
using executable_t = std::function< void() >
 
typedef std::multimap< time_point_t, event > events_map_t
 

Public Member Functions

 service (const scarab::param_node &a_config=scarab::param_node(), const std::string &a_queue_name="", const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="", const bool a_make_connection=true)
 
 service (const bool a_make_connection, const scarab::param_node &a_config=scarab::param_node())
 
 service (const service &)=delete
 
 service (service &&a_orig)
 
virtual ~service ()
 
serviceoperator= (const service &)=delete
 
serviceoperator= (service &&a_orig)
 
 snake_case_mv_accessible (status, status)
 
 snake_case_mv_accessible (bool, enable_scheduling)
 
bool add_child (endpoint_ptr_t a_endpoint_ptr)
 Add a synchronous child endpoint. More...
 
bool add_async_child (endpoint_ptr_t a_endpoint_ptr)
 Add an asynchronous child endpoint. More...
 
virtual sent_msg_pkg_ptr send (request_ptr_t a_request) const
 Sends a request message and returns a channel on which to listen for a reply. More...
 
virtual sent_msg_pkg_ptr send (reply_ptr_t a_reply) const
 Sends a reply message. More...
 
virtual sent_msg_pkg_ptr send (alert_ptr_t a_alert) const
 Sends an alert message. More...
 
bool start ()
 
bool listen ()
 
bool stop ()
 
virtual bool listen_on_queue ()
 Waits for AMQP messages arriving on the channel. More...
 
virtual void submit_message (message_ptr_t a_message)
 Submit a message for direct processing. More...
 
virtual void send_reply (reply_ptr_t a_reply) const
 Sends a reply message. More...
 
 snake_case_mv_accessible (uuid_t, id)
 
 snake_case_mv_referrable (sync_map_t, sync_children)
 
 snake_case_mv_referrable (async_map_t, async_children)
 
 snake_case_mv_referrable (std::string, broadcast_key)
 
- Public Member Functions inherited from core
 core (const scarab::param_node &a_config=scarab::param_node(), const std::string &a_broker_address="", unsigned a_port=0, const std::string &a_auth_file="", const bool a_make_connection=true)
 
 core (const bool a_make_connection, const scarab::param_node &a_config=scarab::param_node())
 
 core (const core &a_orig)
 
 core (core &&a_orig)
 
virtual ~core ()
 
coreoperator= (const core &a_orig)
 
coreoperator= (core &&a_orig)
 
 snake_case_mv_referrable (std::string, address)
 
 snake_case_mv_accessible (unsigned, port)
 
 snake_case_mv_referrable (std::string, username)
 
 snake_case_mv_referrable (std::string, password)
 
 snake_case_mv_referrable (std::string, requests_exchange)
 
 snake_case_mv_referrable (std::string, alerts_exchange)
 
 snake_case_mv_referrable (std::string, heartbeat_routing_key)
 
 snake_case_mv_accessible (unsigned, max_payload_size)
 
 snake_case_mv_accessible (bool, make_connection)
 
- Public Member Functions inherited from endpoint
 endpoint (const std::string &a_name)
 
 endpoint (const endpoint &a_orig)
 
 endpoint (endpoint &&a_orig)
 
virtual ~endpoint ()
 
endpointoperator= (const endpoint &a_orig)
 
endpointoperator= (endpoint &&a_orig)
 
 snake_case_mv_referrable (std::string, name)
 
 snake_case_mv_referrable (service_ptr_t, service)
 
reply_ptr_t submit_request_message (const request_ptr_t a_request)
 Directly submit a request message to this endpoint. More...
 
void submit_reply_message (const reply_ptr_t a_reply)
 Directly submit a reply message to this endpoint. More...
 
void submit_alert_message (const alert_ptr_t a_alert)
 Directly submit an alert message to this endpoint. More...
 
virtual void on_reply_message (const reply_ptr_t a_reply)
 
virtual void on_alert_message (const alert_ptr_t a_alert)
 
virtual reply_ptr_t do_run_request (const request_ptr_t a_request)
 
virtual reply_ptr_t do_get_request (const request_ptr_t a_request)
 
virtual reply_ptr_t do_set_request (const request_ptr_t a_request)
 
virtual reply_ptr_t do_cmd_request (const request_ptr_t a_request)
 
void sort_message (const message_ptr_t a_request)
 
uuid_t enable_lockout (const scarab::param_node &a_tag)
 enable lockout with randomly-generated key More...
 
uuid_t enable_lockout (const scarab::param_node &a_tag, uuid_t a_key)
 enable lockout with user-supplied key More...
 
bool disable_lockout (const uuid_t &a_key, bool a_force=false)
 
bool is_locked () const
 
bool check_key (const uuid_t &a_key) const
 
- Public Member Functions inherited from listener_receiver
 listener_receiver ()
 
 listener_receiver (const listener_receiver &)=delete
 
 listener_receiver (listener_receiver &&a_orig)
 
listener_receiveroperator= (const listener_receiver &)=delete
 
listener_receiveroperator= (listener_receiver &&a_orig)
 
- Public Member Functions inherited from listener
 listener ()
 
 listener (const listener &)=delete
 
 listener (listener &&a_orig)
 
virtual ~listener ()
 
listeneroperator= (const listener &)=delete
 
listeneroperator= (listener &&a_orig)
 
 snake_case_mv_referrable (amqp_channel_ptr, channel)
 
 snake_case_mv_referrable (std::string, consumer_tag)
 
 snake_case_mv_accessible (unsigned, listen_timeout_ms)
 
 snake_case_mv_referrable (std::thread, listener_thread)
 
- Public Member Functions inherited from concurrent_receiver
 concurrent_receiver ()
 
 concurrent_receiver (const concurrent_receiver &)=delete
 
 concurrent_receiver (concurrent_receiver &&a_orig)
 
virtual ~concurrent_receiver ()
 
concurrent_receiveroperator= (const concurrent_receiver &)=delete
 
concurrent_receiveroperator= (concurrent_receiver &&a_orig)
 
virtual void process_message (message_ptr_t a_message)
 Deposits the message in the concurrent queue (called by the listener) More...
 
void execute ()
 Handles messages that appear in the concurrent queue by calling submit_message(). More...
 
- Public Member Functions inherited from receiver
 receiver ()
 
 receiver (const receiver &a_orig)=delete
 
 receiver (receiver &&a_orig)
 
virtual ~receiver ()
 
receiveroperator= (const receiver &a_orig)=delete
 
receiveroperator= (receiver &&a_orig)
 
void handle_message_chunk (amqp_envelope_ptr a_envelope)
 
void wait_for_message (incoming_message_pack &a_pack, const std::string &a_message_id)
 
void process_message_pack (incoming_message_pack &a_pack, const std::string &a_message_id)
 Converts a message pack into a Dripline message, and then submits the message for processing. More...
 
 snake_case_mv_referrable (incoming_message_map, incoming_messages)
 Stores the incomplete messages. More...
 
 snake_case_mv_accessible (unsigned, single_message_wait_ms)
 Wait time for message chunks from a single message. More...
 
reply_ptr_t wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
 
reply_ptr_t wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, bool &a_chan_valid, int a_timeout_ms=0)
 
- Public Member Functions inherited from heartbeater
 heartbeater (service_ptr_t a_service=service_ptr_t())
 Primary constructor. A service pointer is required to be able to send messages. More...
 
 heartbeater (const heartbeater &)=delete
 
 heartbeater (heartbeater &&a_orig)
 
virtual ~heartbeater ()
 
heartbeateroperator= (const heartbeater &)=delete
 
heartbeateroperator= (heartbeater &&a_orig)
 
void execute (const std::string &a_name, uuid_t a_id, const std::string &a_routing_key)
 
 snake_case_mv_accessible (unsigned, heartbeat_interval_s)
 Interval between heartbeat alerts (default: 60 s) More...
 
 snake_case_mv_accessible (unsigned, check_timeout_ms)
 Timing interval for the internal loop (default: 1000 ms) More...
 
 snake_case_mv_referrable (service_ptr_t, service)
 
 snake_case_mv_atomic (bool, stop)
 
- Public Member Functions inherited from scheduler<>
 scheduler ()
 
 scheduler (const scheduler &)=delete
 
 scheduler (scheduler &&)
 
virtual ~scheduler ()
 
scheduleroperator= (const scheduler &)=delete
 
scheduleroperator= (scheduler &&)
 
int schedule (executable_t an_executable, time_point_t an_exe_time)
 
int schedule (executable_t an_executable, duration_t an_interval, time_point_t an_exe_time=std::chrono::system_clock ::now())
 
void unschedule (int an_id)
 Unschedule an event using the event's ID. More...
 
void execute ()
 Main execution loop for the scheduler. More...
 
 snake_case_mv_accessible (duration_t, exe_buffer)
 The time difference from "now" that determines whether an event is executed. More...
 
 snake_case_mv_accessible (duration_t, cycle_time)
 Main thread cycle time. More...
 
 snake_case_mv_referrable_const (simple_executor, the_executor)
 The executor used to execute events. More...
 
 snake_case_mv_referrable_const (events_map_t, events)
 The scheduled events, stored in a map sorted by execution time. More...
 

Protected Types

enum  status {
  nothing = 0, channel_created = 10, exchange_declared = 20, queue_declared = 30,
  queue_bound = 40, consuming = 50, listening = 60, processing = 70
}
 

Protected Member Functions

virtual bool open_channels ()
 
virtual bool setup_queues ()
 
virtual bool bind_keys ()
 
virtual bool start_consuming ()
 
virtual bool stop_consuming ()
 
virtual bool remove_queue ()
 
virtual reply_ptr_t on_request_message (const request_ptr_t a_request)
 Default request handler; passes request to initial request functions. More...
 
- Protected Member Functions inherited from core
sent_msg_pkg_ptr do_send (message_ptr_t a_message, const std::string &a_exchange, bool a_expect_reply) const
 
amqp_channel_ptr send_withreply (message_ptr_t a_message, std::string &a_reply_consumer_tag, const std::string &a_exchange) const
 
bool send_noreply (message_ptr_t a_message, const std::string &a_exchange) const
 
amqp_channel_ptr open_channel () const
 
- Protected Member Functions inherited from endpoint
bool authenticate (const uuid_t &a_key) const
 Returns true if the server is unlocked or if it's locked and the key matches the lockout key; returns false otherwise. More...
 
 snake_case_mv_referrable (scarab::param_node, lockout_tag)
 
 snake_case_mv_accessible (uuid_t, lockout_key)
 
- Protected Member Functions inherited from concurrent_receiver
 snake_case_mv_referrable (scarab::concurrent_queue< message_ptr_t >, message_queue)
 
 snake_case_mv_referrable (std::thread, receiver_thread)
 
- Protected Member Functions inherited from receiver
reply_ptr_t process_received_reply (incoming_message_pack &a_pack, const std::string &a_message_id)
 

Private Member Functions

virtual void do_cancellation (int a_code)
 

Additional Inherited Members

- Static Public Member Functions inherited from core
static bool listen_for_message (amqp_envelope_ptr &a_envelope, amqp_channel_ptr a_channel, const std::string &a_consumer_tag, int a_timeout_ms=0, bool a_do_ack=true)
 return: if false, channel is no longer useable; if true, may be reused More...
 
- Public Attributes inherited from scheduler<>
snake_case_mv_accessible_static(int, curr_id) protected std::recursive_mutex f_scheduler_mutex
 The ID to be used for the next scheduled event. More...
 
std::mutex f_executor_mutex
 
std::condition_variable_any f_cv
 
std::thread f_scheduler_thread
 
- Static Public Attributes inherited from core
static bool s_offline = false
 
- Static Protected Member Functions inherited from core
static bool setup_exchange (amqp_channel_ptr a_channel, const std::string &a_exchange)
 
static bool setup_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name)
 
static bool bind_key (amqp_channel_ptr a_channel, const std::string &a_exchange, const std::string &a_queue_name, const std::string &a_routing_key)
 
static std::string start_consuming (amqp_channel_ptr a_channel, const std::string &a_queue_name)
 
static bool stop_consuming (amqp_channel_ptr a_channel, std::string &a_consumer_tag)
 
static bool remove_queue (amqp_channel_ptr a_channel, const std::string &a_queue_name)
 
- Protected Attributes inherited from heartbeater
std::thread f_heartbeat_thread
 

Detailed Description

Consumer of Dripline messages on a particular queue.

Author
N.S. Oblath

The service class is the implementation of the "service" concept in Dripline. It's the primary component that makes up a Dripline mesh.

The lifetime of a service is defined by the three main functions:

  1. start() – create the AMQP channel, create the AMQP queue, bind the routing keys, and start consuming AMQP messages
  2. listen() – starts the heartbeat and scheduler threads (optional), starts the receiver thread, and waits for and handles messages on the queue
  3. stop() – (called asynchronously) cancels the listening service

The ability to handle and respond to Dripline messages is embodied in the endpoint class. Service uses endoint in three ways:

  1. Service is an endpoint. A service can be setup to handle messages directed to it.
  2. Service has basic child endpoints. These are also called "synchronous" endpoints. These endpoints use the same AMQP queue as the service itself. Messages send to the service and to the synchronous endpoints are all handled serially.
  3. Service has asynchronous child endpoints. These endpoints each have their own AMQP queue and thread responsible for receiving and handling their messages.

A service has a number of key characteristics (most of which come from its parent classes): core – Has all of the basic AMQP capabilities, sending messages, and making and manipulating connections endpoint – Handles Dripline messages listener_receiver – Asynchronously recieves AMQP messages and turns them into Dripline messages heartbeater – Sends periodic heartbeat messages scheduler – Can schedule events

As is apparent from the above descriptions, a service is responsible for a number of threads when it executes: Listening – grabs AMQP messages off the channel when they arrive Message-wait – any incomplete multi-part Dripline message will setup a thread to wait until the message is complete, and then submits it for handling Receiver – grabs completed Dripline messages and handles it Async endpoint listening – same as abovefor each asynchronous endpoint Async endpoint message-wait – same as above for each asynchronous endpoint Async endpoint receiver – same as above for each asynchronous endpoint Heatbeater – sends regular heartbeat messages Scheduler – executes scheduled events

Definition at line 72 of file service.hh.

Member Typedef Documentation

◆ async_map_t

typedef std::map< std::string, lr_ptr_t > async_map_t

Definition at line 165 of file service.hh.

◆ sync_map_t

typedef std::map< std::string, endpoint_ptr_t > sync_map_t

Definition at line 162 of file service.hh.

Member Enumeration Documentation

◆ status

enum status
strongprotected
Enumerator
nothing 
channel_created 
exchange_declared 
queue_declared 
queue_bound 
consuming 
listening 
processing 

Definition at line 81 of file service.hh.

Constructor & Destructor Documentation

◆ service() [1/4]

service ( const scarab::param_node &  a_config = scarab::param_node(),
const std::string &  a_queue_name = "",
const std::string &  a_broker_address = "",
unsigned  a_port = 0,
const std::string &  a_auth_file = "",
const bool  a_make_connection = true 
)

Definition at line 29 of file service.cc.

◆ service() [2/4]

service ( const bool  a_make_connection,
const scarab::param_node &  a_config = scarab::param_node() 
)

Definition at line 58 of file service.cc.

◆ service() [3/4]

service ( const service )
delete

◆ service() [4/4]

service ( service &&  a_orig)

Definition at line 75 of file service.cc.

◆ ~service()

~service ( )
virtual

Definition at line 92 of file service.cc.

Member Function Documentation

◆ add_async_child()

bool add_async_child ( endpoint_ptr_t  a_endpoint_ptr)

Add an asynchronous child endpoint.

Definition at line 135 of file service.cc.

◆ add_child()

bool add_child ( endpoint_ptr_t  a_endpoint_ptr)

Add a synchronous child endpoint.

Definition at line 118 of file service.cc.

◆ bind_keys()

bool bind_keys ( )
protectedvirtual

Definition at line 333 of file service.cc.

◆ do_cancellation()

void do_cancellation ( int  a_code)
privatevirtual

Definition at line 513 of file service.cc.

◆ listen()

bool listen ( )

Starts listening on the queue for receiving messages. If no queue was created, this does nothing.

Definition at line 195 of file service.cc.

◆ listen_on_queue()

bool listen_on_queue ( )
virtual

Waits for AMQP messages arriving on the channel.

Implements listener.

Definition at line 401 of file service.cc.

◆ on_request_message()

reply_ptr_t on_request_message ( const request_ptr_t  a_request)
protectedvirtual

Default request handler; passes request to initial request functions.

Reimplemented from endpoint.

Definition at line 488 of file service.cc.

◆ open_channels()

bool open_channels ( )
protectedvirtual

Definition at line 300 of file service.cc.

◆ operator=() [1/2]

service& operator= ( const service )
delete

◆ operator=() [2/2]

service & operator= ( service &&  a_orig)

Definition at line 102 of file service.cc.

◆ remove_queue()

bool remove_queue ( )
protectedvirtual

Definition at line 387 of file service.cc.

◆ send() [1/3]

sent_msg_pkg_ptr send ( request_ptr_t  a_request) const
inlinevirtual

Sends a request message and returns a channel on which to listen for a reply.

Reimplemented from core.

Definition at line 177 of file service.hh.

◆ send() [2/3]

sent_msg_pkg_ptr send ( reply_ptr_t  a_reply) const
inlinevirtual

Sends a reply message.

Reimplemented from core.

Definition at line 183 of file service.hh.

◆ send() [3/3]

sent_msg_pkg_ptr send ( alert_ptr_t  a_alert) const
inlinevirtual

Sends an alert message.

Reimplemented from core.

Definition at line 189 of file service.hh.

◆ send_reply()

void send_reply ( reply_ptr_t  a_reply) const
virtual

Sends a reply message.

Reimplemented from endpoint.

Definition at line 474 of file service.cc.

◆ setup_queues()

bool setup_queues ( )
protectedvirtual

Definition at line 317 of file service.cc.

◆ snake_case_mv_accessible() [1/3]

snake_case_mv_accessible ( status  ,
status   
)

◆ snake_case_mv_accessible() [2/3]

snake_case_mv_accessible ( bool  ,
enable_scheduling   
)

◆ snake_case_mv_accessible() [3/3]

snake_case_mv_accessible ( uuid_t  ,
id   
)

◆ snake_case_mv_referrable() [1/3]

snake_case_mv_referrable ( sync_map_t  ,
sync_children   
)

◆ snake_case_mv_referrable() [2/3]

snake_case_mv_referrable ( async_map_t  ,
async_children   
)

◆ snake_case_mv_referrable() [3/3]

snake_case_mv_referrable ( std::string  ,
broadcast_key   
)

◆ start()

bool start ( )

Creates a channel to the broker and establishes the queue for receiving messages. If no queue name was given, this does nothing.

Definition at line 157 of file service.cc.

◆ start_consuming()

bool start_consuming ( )
protectedvirtual

Definition at line 358 of file service.cc.

◆ stop()

bool stop ( )

Stops receiving messages and closes the connection to the broker. If no queue was created, this does nothing.

Definition at line 275 of file service.cc.

◆ stop_consuming()

bool stop_consuming ( )
protectedvirtual

Definition at line 373 of file service.cc.

◆ submit_message()

void submit_message ( message_ptr_t  a_message)
virtual

Submit a message for direct processing.

Implements concurrent_receiver.

Definition at line 443 of file service.cc.


The documentation for this class was generated from the following files: