class Mosquito::Runners::Overseer

Overview

The Overseer is responsible for managing:

An overseer manages the loop that each thread or process runs.

Included Modules

Defined in:

mosquito/runners/overseer.cr

Constructors

Instance Method Summary

Instance methods inherited from module Mosquito::Runnable

dead? : Bool dead?, each_run : Nil each_run, fiber : Fiber | Nil fiber, my_name : String my_name, post_run : Nil post_run, pre_run : Nil pre_run, run(*, spawn spawn_fiber = true) run, runnable_name : String runnable_name, state : State state, stop(wait_group : WaitGroup = WaitGroup.new(1)) : WaitGroup stop

Instance methods inherited from module Mosquito::Runners::RunAtMost

execution_timestamps execution_timestamps

Instance methods inherited from module Mosquito::Runners::IdleWait

with_idle_wait(idle_wait : Time::Span, &) with_idle_wait

Constructor Detail

def self.new #

Instance Method Detail

def build_executor : Executor #

def check_for_deceased_runners : Nil #

When a job fails any exceptions are caught and logged. If a job causes something more catastrophic we can try to recover by spawning a new executor.

This happens, for example, when a new version of a worker is deployed and work is still in the queue that references job classes that no longer exist.

When a dead executor is found, any job it was working on has its failure counter incremented and follows the standard retry logic.


def cleanup_orphaned_pending_jobs : Nil #

Scans pending queues for jobs owned by overseers that are no longer alive. Each orphaned job has its failure counter incremented and follows the standard retry logic.

An overseer is considered alive if it has registered a heartbeat within the configured dead_overseer_threshold. Jobs with no overseer_id (pre- dating this feature) are claimed by this overseer so they become recoverable when this overseer later dies. :nodoc:


def coordinator : Mosquito::Runners::Coordinator #

def dequeue_adapter : Mosquito::DequeueAdapter #

def dequeue_job? : Tuple(JobRun, Queue) | Nil #

Delegates job dequeue to the configured DequeueAdapter.

The adapter can be swapped via Mosquito.configuration.dequeue_adapter to implement custom strategies (priority, round-robin, rate limiting, etc).


def each_run : Nil #

The goal for the overseer is to:

  • Ensure that the coordinator gets run frequently to schedule delayed/periodic jobs.
  • Wait for an executor to be idle, and dequeue work if possible.
  • Monitor the executor pool for unexpected termination and respawn.

def executor_count : Int32 #

The number of executors to start.


def executors : Array(Mosquito::Runners::Executor) #

def idle_notifier : Channel(Bool) #

When an executor transitions to idle it will send a True here. The Overseer uses this as a signal to check the queues for more work.


def idle_wait : Time::Span #

def observer : Observability::Overseer #

def post_run : Nil #

Notify all subprocesses to stop, and wait until they do. After executors finish, any jobs left in the pending queue are moved back to waiting so another worker can pick them up.


def pre_run : Nil #

Starts all the subprocesses.


def queue_list : QueueList #

def runnable_name : String #
Description copied from module Mosquito::Runnable

Used to print a pretty name for logging.


def sleep #

def stop(wait_group : WaitGroup = WaitGroup.new(1)) : WaitGroup #
Description copied from module Mosquito::Runnable

Request that the next time the run loop cycles it should exit instead. The runnable doesn't exit immediately so #stop spawns a fiber to monitor the state transition.

Returns the WaitGroup, which will be decremented when the runnable has finished. This enables runnable.stop.wait.

If a WaitGroup is provided, it will be decremented when the runnable has finished. This is useful when stopping multiple runnables and waiting for all of them to finish.

Calling stop on a runnable that has already finished or crashed is a no-op (the wait_group is signaled immediately).


def work_handout : Channel({Mosquito::JobRun, Mosquito::Queue}) #

The channel where job runs which have been dequeued are sent to executors.