class Mosquito::Runners::Overseer

Overview

The Overseer is responsible for managing:

An overseer manages the loop that each thread or process runs.

Included Modules

Defined in:

mosquito/runners/overseer.cr

Constant Summary

Log = ::Log.for(self)

Constructors

Instance Method Summary

Instance methods inherited from module Mosquito::Runnable

dead? : Bool dead?, each_run : Nil each_run, fiber : Fiber | Nil fiber, my_name : String my_name, post_run : Nil post_run, pre_run : Nil pre_run, run run, runnable_name : String runnable_name, state : State state, stop : Channel(Bool) stop

Instance methods inherited from module Mosquito::Runners::RunAtMost

execution_timestamps execution_timestamps

Instance methods inherited from module Mosquito::Runners::IdleWait

with_idle_wait(idle_wait : Time::Span, &) with_idle_wait

Constructor Detail

def self.new #

Instance Method Detail

def build_executor : Executor #

def check_for_deceased_runners : Nil #

If an executor dies, it's probably because a bug exists somewhere in Mosquito itself.

When a job fails any exceptions are caught and logged. If a job causes something more catastrophic we can try to recover by spawning a new executor.


def coordinator : Mosquito::Runners::Coordinator #

def dequeue_job? : Tuple(JobRun, Queue) | Nil #

Weaknesses: This implementation sometimes starves queues because it doesn't round robin, prioritize queues, or anything else.


def each_run : Nil #

The goal for the overseer is to:

  • Ensure that the coordinator gets run frequently to schedule delayed/periodic jobs.
  • Wait for an executor to be idle, and dequeue work if possible.
  • Monitor the executor pool for unexpected termination and respawn.

def executor_count : Int32 #

The number of executors to start.


def executors : Array(Mosquito::Runners::Executor) #

def idle_notifier : Channel(Bool) #

When an executor transitions to idle it will send a True here. The Overseer uses this as a signal to check the queues for more work.


def idle_wait : Time::Span #

def post_run : Nil #

Notify all subprocesses to stop, and wait until they do.


def pre_run : Nil #

Starts all the subprocesses.


def queue_list : QueueList #

def runnable_name : String #
Description copied from module Mosquito::Runnable

Used to print a pretty name for logging.


def sleep #

def work_handout : Channel({Mosquito::JobRun, Mosquito::Queue}) #

The channel where job runs which have been dequeued are sent to executors.