class Mosquito::Runners::Executor

Overview

The executor is the center of work in Mosquito, and it's is the demarcation point between Mosquito framework and application code. Above the Executor is entirely Mosquito, and below it is application code.

An Executor is responsible for hydrating Job classes with deserialized parameters and calling Mosquito::Job#run on them. It measures the time it takes to run a job and provides detailed log messages about the current status.

An executor is a Mosquito::Runnable and should be interacted with according to the Runnable API.

To build an executor, provide a job input channel and an idle bell channel. These channels can be shared between all available executors.

The executor will ring the idle bell when it is ready to accept work and then wait for work to show up on the job pipeline. After the job is finished it will ring the bell again and wait for more work.

Included Modules

Defined in:

mosquito/runners/executor.cr

Constant Summary

Log = ::Log.for(self)

Constructors

Instance Method Summary

Instance methods inherited from module Mosquito::Runnable

dead? : Bool dead?, each_run : Nil each_run, fiber : Fiber | Nil fiber, my_name : String my_name, post_run : Nil post_run, pre_run : Nil pre_run, run run, runnable_name : String runnable_name, state : State state, stop : Channel(Bool) stop

Instance methods inherited from module Mosquito::Runners::RunAtMost

execution_timestamps execution_timestamps

Constructor Detail

def self.new(job_pipeline : Channel(Tuple(Mosquito::JobRun, Mosquito::Queue)), idle_bell : Channel(Bool)) #

Instance Method Detail

def execute(job_run : JobRun, from_queue q : Queue) #

Runs a job from a Queue.

Execution time is measured and logged, and the job is either forgotten or, if it fails, rescheduled.


def failed_job_ttl : Int32 #

How long a job config is persisted after failure


def failed_job_ttl=(failed_job_ttl : Int32) #

How long a job config is persisted after failure


def idle_bell : Channel(Bool) #

Used to notify the overseer that this executor is idle.


def job_pipeline : Channel(Tuple(JobRun, Queue)) #

Where work is received from the overseer.


def log : ::Log #

def successful_job_ttl : Int32 #

How long a job config is persisted after success


def successful_job_ttl=(successful_job_ttl : Int32) #

How long a job config is persisted after success