8 #ifndef DRIPLINE_SCHEDULER_HH_ 9 #define DRIPLINE_SCHEDULER_HH_ 11 #include "cancelable.hh" 16 #include "member_variables.hh" 19 #include <condition_variable> 42 virtual void operator()( std::function<
void() > ) = 0;
54 virtual void operator()( std::function<
void() > an_executable )
92 template<
typename executor = simple_executor,
typename clock = std::chrono::system_clock >
138 void unschedule(
int an_id );
150 mv_referrable_const( executor, the_executor );
153 mv_referrable_const( events_map_t, events );
156 mv_accessible_static(
int, curr_id )
165 std::condition_variable_any
f_cv;
169 template<
typename executor,
typename clock >
172 template<
typename executor,
typename clock >
174 f_exe_buffer(
std::chrono::milliseconds(50) ),
175 f_cycle_time(
std::chrono::milliseconds(500) ),
184 template<
typename executor,
typename clock >
186 f_exe_buffer( std::move(a_orig.f_exe_buffer) ),
187 f_cycle_time( std::move(a_orig.f_cycle_time) ),
189 f_events( std::move(a_orig.f_events) ),
193 f_scheduler_thread( std::move(a_orig.f_scheduler_thread) )
196 template<
typename executor,
typename clock >
200 template<
typename executor,
typename clock >
203 std::unique_lock< std::recursive_mutex >t_this_lock( f_scheduler_mutex );
204 std::unique_lock< std::recursive_mutex >t_orig_lock( a_orig.f_scheduler_mutex );
206 f_exe_buffer = std::move(a_orig.f_exe_buffer);
207 f_cycle_time = std::move(a_orig.f_cycle_time);
208 f_events = std::move(a_orig.f_events);
209 f_scheduler_thread = std::move(a_orig.f_scheduler_thread);
214 template<
typename executor,
typename clock >
217 bool t_new_first =
false;
218 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
219 if( f_events.empty() || an_exe_time < f_events.begin()->first )
221 LDEBUG(
dlog_sh,
"New first event" );
224 LDEBUG(
dlog_sh,
"Inserting new event" );
226 t_event.f_executable = an_executable;
227 t_event.f_id = s_curr_id++;
228 f_events.insert( std::make_pair( an_exe_time, t_event ) );
232 LDEBUG(
dlog_sh,
"That event was first; waking execution thread" );
239 template<
typename executor,
typename clock >
243 if( an_interval < 2*f_exe_buffer )
245 throw dripline_error() <<
"Cannot schedule executions with an interval of less than " << std::chrono::duration_cast<std::chrono::seconds>(2*f_exe_buffer).count() <<
" seconds";
248 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
249 int t_id = scheduler< executor, clock >::s_curr_id++;
250 schedule_repeating( an_executable, an_interval, t_id, an_exe_time );
256 template<
typename executor,
typename clock >
259 LDEBUG(
dlog_sh,
"Scheduling a repeating event" );
262 executable_t t_wrapped_executable = [
this, an_executable, an_interval, an_id, a_rep_start](){
263 LDEBUG(
dlog_sh,
"wrapped execution" );
265 this->schedule_repeating( an_executable, an_interval, an_id, a_rep_start + an_interval );
267 LDEBUG(
dlog_sh,
"executing the wrapped executable" );
273 t_event.f_executable = t_wrapped_executable;
274 t_event.f_id = an_id;
277 bool t_new_first =
false;
278 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
279 if( f_events.empty() || a_rep_start < f_events.begin()->first )
281 LDEBUG(
dlog_sh,
"New first event" );
286 f_events.insert( std::make_pair( a_rep_start, t_event ) );
290 LDEBUG(
dlog_sh,
"That event was first; waking execution thread" );
297 template<
typename executor,
typename clock >
300 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
301 auto i_event = f_events.begin();
302 for( ; i_event->second.f_id != an_id && i_event != f_events.end(); ++i_event ) {
303 LDEBUG(
dlog_sh,
"Looking for event with id <" << an_id <<
">; found one with id <" << i_event->second.f_id <<
">" );
306 if( i_event->second.f_id == an_id )
308 LDEBUG(
dlog_sh,
"Found event with id <" << i_event->second.f_id <<
">; erasing it now" );
309 f_events.erase( i_event );
310 LDEBUG(
dlog_sh,
"Removed event <" << an_id <<
"> from the schedule" );
314 LDEBUG(
dlog_sh,
"No event with id <" << an_id <<
"> found" );
320 template<
typename executor,
typename clock >
323 LDEBUG(
dlog_sh,
"Starting scheduler" );
324 while( ! is_canceled() )
326 std::unique_lock< std::recursive_mutex > t_lock( f_scheduler_mutex );
327 if( f_events.empty() )
330 f_cv.wait_for( t_lock, f_cycle_time );
335 auto t_first_event = f_events.begin();
337 duration_t t_to_earliest = t_first_event->first - clock::now();
338 if( t_to_earliest < f_exe_buffer )
341 LDEBUG(
dlog_sh,
"Executing first event from the map" );
342 std::unique_lock< std::mutex > t_exe_lock( f_executor_mutex );
343 f_the_executor( t_first_event->second.f_executable );
344 f_events.erase( t_first_event );
347 if( t_to_earliest < f_cycle_time )
350 f_cv.wait_until( t_lock, t_first_event->first );
354 f_cv.wait_for( t_lock, f_cycle_time );
358 LDEBUG(
dlog_sh,
"Scheduler exiting" );
std::condition_variable_any f_cv
virtual ~simple_executor()
snake_case_mv_accessible_static(int, curr_id) protected std::recursive_mutex f_scheduler_mutex
The ID to be used for the next scheduled event.
Definition of an event, including the executable object and the scheduler ID.
Dripline-specific errors.
typename std::chrono::system_clock ::time_point time_point_t
Executes scheduled events.
std::multimap< time_point_t, event > events_map_t
std::thread f_scheduler_thread
std::chrono::system_clock clock_t
std::function< void() > executable_t
virtual void operator()(std::function< void() > an_executable)
std::mutex f_executor_mutex
Base class for executors.
Given an executable function object, uses operator() to execute it.
static scarab::logger dlog_sh("scheduler")
executable_t f_executable
typename std::chrono::system_clock ::duration duration_t