class Mosquito::RemoteConfigDequeueAdapter

Overview

A dequeue adapter that wraps ConcurrencyLimitedDequeueAdapter with remotely configurable concurrency limits stored in the Mosquito backend (e.g. Redis).

Limits are refreshed by polling the backend at a configurable interval. When the remote key is absent or empty the adapter falls back to the #defaults hash provided at construction time.

Remote values are merged on top of defaults: a queue present only in defaults keeps its value, a queue present only in the remote config is added, and a queue present in both uses the remote value.

Per-overseer configuration

When #overseer_id is set, the adapter reads from both the global key and a per-overseer key. The merge order is:

defaults → global remote → per-overseer remote

This lets you run overseers on asymmetric hardware and tune each one independently while still sharing a common baseline.

Setting limits remotely

Use Mosquito::Api.set_concurrency_limits to write global limits:

Mosquito::Api.set_concurrency_limits({"queue_a" => 2, "queue_b" => 10})

Or target a specific overseer:

Mosquito::Api.set_concurrency_limits({"queue_a" => 1}, overseer_id: "gpu-worker-1")

Example

Mosquito.configure do |settings|
  settings.dequeue_adapter = Mosquito::RemoteConfigDequeueAdapter.new(
    defaults: {"queue_a" => 3, "queue_b" => 5},
    overseer_id: "gpu-worker-1",
    refresh_interval: 5.seconds,
  )
end

In this configuration the adapter starts with the given defaults. Any limits written to the backend via the API will take effect within #refresh_interval seconds. Per-overseer limits override global limits which override defaults.

Defined in:

mosquito/dequeue_adapters/remote_config_dequeue_adapter.cr

Constant Summary

CONFIG_KEY = "concurrency_limits"

Constructors

Class Method Summary

Instance Method Summary

Instance methods inherited from class Mosquito::DequeueAdapter

dequeue(queue_list : Runners::QueueList) : WorkUnit | Nil dequeue, finished_with(job_run : JobRun, queue : Queue) : Nil finished_with

Constructor Detail

def self.new(defaults : Hash(String, Int32) = {} of String => Int32, overseer_id : String | Nil = nil, refresh_interval : Time::Span = 5.seconds) #

Class Method Detail

def self.clear_limits(overseer_id : String) : Nil #

Removes stored concurrency limits for a specific overseer.


def self.clear_limits : Nil #

Removes all globally stored concurrency limits, causing adapters to fall back to their defaults (or per-overseer limits if set).


def self.store_limits(limits : Hash(String, Int32), overseer_id : String) : Nil #

Overwrites the concurrency limits for a specific overseer with limits.


def self.store_limits(limits : Hash(String, Int32)) : Nil #

Overwrites the global concurrency limits with limits. Any previously stored queue entries not present in limits are removed.


def self.stored_limits(overseer_id : String) : Hash(String, Int32) #

Reads the concurrency limits for a specific overseer.


def self.stored_limits : Hash(String, Int32) #

Reads the global concurrency limits hash stored in the backend.


Instance Method Detail

def active_count(queue_name : String) : Int32 #

Returns the current in-flight count for queue_name, delegated to the inner adapter.


def defaults : Hash(String, Int32) #

def dequeue(queue_list : Runners::QueueList) : WorkUnit | Nil #
Description copied from class Mosquito::DequeueAdapter

Attempt to dequeue a job from one of the queues managed by queue_list.

Returns a WorkUnit when a job is available, or nil when all queues are empty.


def finished_with(job_run : JobRun, queue : Queue) : Nil #
Description copied from class Mosquito::DequeueAdapter

Called by the Overseer when a job run has finished executing. Override this to react to completed jobs (e.g. update internal counters or rebalance queue weights).



def limits : Hash(String, Int32) #

Returns the current effective concurrency limits (defaults merged with any remote overrides).


def overseer_id : String | Nil #

def refresh_interval : Time::Span #

def refresh_limits : Nil #

Force an immediate refresh from the backend, ignoring the #refresh_interval timer.