class Mosquito::Runners::Executor

Overview

The executor is the center of work in Mosquito, and it's is the demarcation point between Mosquito framework and application code. Above the Executor is entirely Mosquito, and below it is application code.

An Executor is responsible for hydrating Job classes with deserialized parameters and calling Mosquito::Job#run on them. It measures the time it takes to run a job and provides detailed log messages about the current status.

An executor is a Mosquito::Runnable and should be interacted with according to the Runnable API.

To build an executor, provide a job input channel and an idle bell channel. These channels can be shared between all available executors.

The executor will ring the idle bell when it is ready to accept work and then wait for work to show up on the job pipeline. After the job is finished it will ring the bell again and wait for more work.

Included Modules

Defined in:

mosquito/runners/executor.cr

Constructors

Instance Method Summary

Instance methods inherited from module Mosquito::Runnable

dead? : Bool dead?, each_run : Nil each_run, fiber : Fiber | Nil fiber, my_name : String my_name, post_run : Nil post_run, pre_run : Nil pre_run, run(*, spawn spawn_fiber = true) run, runnable_name : String runnable_name, state : State state, stop(wait_group : WaitGroup = WaitGroup.new(1)) : WaitGroup stop

Instance methods inherited from module Mosquito::Runners::RunAtMost

execution_timestamps execution_timestamps

Constructor Detail

def self.new(overseer : Overseer) #

Instance Method Detail

def decommission! #

Marks this executor for graceful shutdown. It will stop after completing its current job (if any).


def decommissioned? : Bool #

def execute #

Runs a job from a Queue.

Execution time is measured and logged, and the job is either forgotten or, if it fails, rescheduled.


def failed_job_ttl : Int32 #

How long a job config is persisted after failure


def failed_job_ttl=(failed_job_ttl : Int32) #

How long a job config is persisted after failure


def finished_bell : Channel(WorkUnit | Nil) #

Used to notify the overseer when this executor is idle. Sends the {JobRun, Queue} tuple that was just finished, or nil when the executor first starts up.


def job_pipeline : Channel(WorkUnit) #

Where work is received from the overseer.


def observer : Observability::Executor #

def overseer : Overseer #

def stop(wait_group : WaitGroup = WaitGroup.new(1)) : WaitGroup #
Description copied from module Mosquito::Runnable

Request that the next time the run loop cycles it should exit instead. The runnable doesn't exit immediately so #stop spawns a fiber to monitor the state transition.

Returns the WaitGroup, which will be decremented when the runnable has finished. This enables runnable.stop.wait.

If a WaitGroup is provided, it will be decremented when the runnable has finished. This is useful when stopping multiple runnables and waiting for all of them to finish.

Calling stop on a runnable that has already finished or crashed is a no-op (the wait_group is signaled immediately).


def successful_job_ttl : Int32 #

How long a job config is persisted after success


def successful_job_ttl=(successful_job_ttl : Int32) #

How long a job config is persisted after success


def work_unit : WorkUnit #

def work_unit? : WorkUnit | Nil #