Dripline-Cpp  v2.4.2
Dripline Implementation in C++
Public Member Functions | Protected Member Functions | List of all members
concurrent_receiver Class Referenceabstract

Receives and processes messages concurrently. More...

#include <receiver.hh>

Inheritance diagram for concurrent_receiver:
Inheritance graph

Public Member Functions

 concurrent_receiver ()
 
 concurrent_receiver (const concurrent_receiver &)=delete
 
 concurrent_receiver (concurrent_receiver &&a_orig)
 
virtual ~concurrent_receiver ()
 
concurrent_receiveroperator= (const concurrent_receiver &)=delete
 
concurrent_receiveroperator= (concurrent_receiver &&a_orig)
 
virtual void process_message (message_ptr_t a_message)
 Deposits the message in the concurrent queue (called by the listener) More...
 
void execute ()
 Handles messages that appear in the concurrent queue by calling submit_message(). More...
 
- Public Member Functions inherited from receiver
 receiver ()
 
 receiver (const receiver &a_orig)=delete
 
 receiver (receiver &&a_orig)
 
virtual ~receiver ()
 
receiveroperator= (const receiver &a_orig)=delete
 
receiveroperator= (receiver &&a_orig)
 
void handle_message_chunk (amqp_envelope_ptr a_envelope)
 
void wait_for_message (incoming_message_pack &a_pack, const std::string &a_message_id)
 
void process_message_pack (incoming_message_pack &a_pack, const std::string &a_message_id)
 Converts a message pack into a Dripline message, and then submits the message for processing. More...
 
 snake_case_mv_referrable (incoming_message_map, incoming_messages)
 Stores the incomplete messages. More...
 
 snake_case_mv_accessible (unsigned, single_message_wait_ms)
 Wait time for message chunks from a single message. More...
 
reply_ptr_t wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, int a_timeout_ms=0)
 
reply_ptr_t wait_for_reply (const sent_msg_pkg_ptr a_receive_reply, bool &a_chan_valid, int a_timeout_ms=0)
 

Protected Member Functions

virtual void submit_message (message_ptr_t a_message)=0
 
 snake_case_mv_referrable (scarab::concurrent_queue< message_ptr_t >, message_queue)
 
 snake_case_mv_referrable (std::thread, receiver_thread)
 
- Protected Member Functions inherited from receiver
reply_ptr_t process_received_reply (incoming_message_pack &a_pack, const std::string &a_message_id)
 

Detailed Description

Receives and processes messages concurrently.

Author
N.S. Oblath

This class enables Dripline messages to be received and processed concurrently. It is intended to be used in conjuction with a listener, as in listener_receiver.

The typical use case involves three threads:

  1. A listener gets messages from the AMQP channel (using listen_on_queue(), e.g. @ref service or @ref endpoint_listener_receiver) and callsreceiver::handle_message_chunk()`
  2. A receiver has a timing thread waiting for multiple message chunks (if relevant); when the message is complete, concurrent_receiver::process_message() is called, which deposits the message in a concurrent queue.
  3. A concurrent_receiver picks up the complete message from the concurrent queue, and processes the message using submit_message().

The execute() function implements thread 3.

A class deriving from concurrent_receiver must implement submit_message().

Definition at line 153 of file receiver.hh.

Constructor & Destructor Documentation

◆ concurrent_receiver() [1/3]

Definition at line 367 of file receiver.cc.

◆ concurrent_receiver() [2/3]

◆ concurrent_receiver() [3/3]

Definition at line 372 of file receiver.cc.

◆ ~concurrent_receiver()

~concurrent_receiver ( )
virtual

Definition at line 377 of file receiver.cc.

Member Function Documentation

◆ execute()

void execute ( )

Handles messages that appear in the concurrent queue by calling submit_message().

Definition at line 393 of file receiver.cc.

◆ operator=() [1/2]

concurrent_receiver& operator= ( const concurrent_receiver )
delete

◆ operator=() [2/2]

concurrent_receiver & operator= ( concurrent_receiver &&  a_orig)

Definition at line 380 of file receiver.cc.

◆ process_message()

void process_message ( message_ptr_t  a_message)
virtual

Deposits the message in the concurrent queue (called by the listener)

Reimplemented from receiver.

Definition at line 387 of file receiver.cc.

◆ snake_case_mv_referrable() [1/2]

snake_case_mv_referrable ( scarab::concurrent_queue< message_ptr_t ,
message_queue   
)
protected

◆ snake_case_mv_referrable() [2/2]

snake_case_mv_referrable ( std::thread  ,
receiver_thread   
)
protected

◆ submit_message()

virtual void submit_message ( message_ptr_t  a_message)
protectedpure virtual

Handles messages according to the use case. It's to be implemented by the class inheriting from concurrent_receiver For a concrete example, see service or endpoint_listener_receiver.

Implemented in service, endpoint_listener_receiver, and monitor.


The documentation for this class was generated from the following files: