class Mosquito::ConcurrencyLimitedDequeueAdapter

Overview

A dequeue adapter that enforces per-queue concurrency limits.

Each queue can be assigned a maximum number of jobs that may execute concurrently. When a queue has reached its limit, it is skipped during dequeue until an in-flight job finishes.

Queues not present in the limits table have no concurrency ceiling and are bounded only by the total executor pool size.

Among eligible queues the adapter uses a shuffle to provide rough fairness, similar to ShuffleDequeueAdapter.

Example

Mosquito.configure do |settings|
  settings.executor_count = 8

  settings.dequeue_adapter = Mosquito::ConcurrencyLimitedDequeueAdapter.new({
    "queue_a" => 3,
    "queue_b" => 5,
  })
end

In this configuration at most 3 jobs from "queue_a" and 5 from "queue_b" will execute at the same time. Other queues are unlimited.

Defined in:

mosquito/dequeue_adapters/concurrency_limited_dequeue_adapter.cr

Constructors

Instance Method Summary

Instance methods inherited from class Mosquito::DequeueAdapter

dequeue(queue_list : Runners::QueueList) : WorkUnit | Nil dequeue, finished_with(job_run : JobRun, queue : Queue) : Nil finished_with

Constructor Detail

def self.new(limits : Hash(String, Int32)) #

Instance Method Detail

def active_count(queue_name : String) : Int32 #

Returns the current number of in-flight jobs for the given queue.


def dequeue(queue_list : Runners::QueueList) : WorkUnit | Nil #
Description copied from class Mosquito::DequeueAdapter

Attempt to dequeue a job from one of the queues managed by queue_list.

Returns a WorkUnit when a job is available, or nil when all queues are empty.


def finished_with(job_run : JobRun, queue : Queue) : Nil #

Called by the Overseer when a job from this queue has finished executing. Decrements the in-flight counter so the queue becomes eligible for dequeue again.


def limits : Hash(String, Int32) #

def limits=(limits : Hash(String, Int32)) #