class
Mosquito::RemoteConfigDequeueAdapter
- Mosquito::RemoteConfigDequeueAdapter
- Mosquito::DequeueAdapter
- Reference
- Object
Overview
A dequeue adapter that wraps ConcurrencyLimitedDequeueAdapter with
remotely configurable concurrency limits stored in the Mosquito backend
(e.g. Redis).
Limits are refreshed by polling the backend at a configurable interval.
When the remote key is absent or empty the adapter falls back to the
#defaults hash provided at construction time.
Remote values are merged on top of defaults: a queue present only in defaults keeps its value, a queue present only in the remote config is added, and a queue present in both uses the remote value.
Per-overseer configuration
When #overseer_id is set, the adapter reads from both the global key
and a per-overseer key. The merge order is:
defaults → global remote → per-overseer remote
This lets you run overseers on asymmetric hardware and tune each one independently while still sharing a common baseline.
Setting limits remotely
Use Mosquito::Api.set_concurrency_limits to write global limits:
Mosquito::Api.set_concurrency_limits({"queue_a" => 2, "queue_b" => 10})
Or target a specific overseer:
Mosquito::Api.set_concurrency_limits({"queue_a" => 1}, overseer_id: "gpu-worker-1")
Example
Mosquito.configure do |settings|
settings.dequeue_adapter = Mosquito::RemoteConfigDequeueAdapter.new(
defaults: {"queue_a" => 3, "queue_b" => 5},
overseer_id: "gpu-worker-1",
refresh_interval: 5.seconds,
)
end
In this configuration the adapter starts with the given defaults. Any
limits written to the backend via the API will take effect within
#refresh_interval seconds. Per-overseer limits override global limits
which override defaults.
Defined in:
mosquito/dequeue_adapters/remote_config_dequeue_adapter.crConstant Summary
-
CONFIG_KEY =
"concurrency_limits"
Constructors
Class Method Summary
-
.clear_limits(overseer_id : String) : Nil
Removes stored concurrency limits for a specific overseer.
-
.clear_limits : Nil
Removes all globally stored concurrency limits, causing adapters to fall back to their defaults (or per-overseer limits if set).
-
.store_limits(limits : Hash(String, Int32), overseer_id : String) : Nil
Overwrites the concurrency limits for a specific overseer with limits.
-
.store_limits(limits : Hash(String, Int32)) : Nil
Overwrites the global concurrency limits with limits.
-
.stored_limits(overseer_id : String) : Hash(String, Int32)
Reads the concurrency limits for a specific overseer.
-
.stored_limits : Hash(String, Int32)
Reads the global concurrency limits hash stored in the backend.
Instance Method Summary
-
#active_count(queue_name : String) : Int32
Returns the current in-flight count for queue_name, delegated to the inner adapter.
- #defaults : Hash(String, Int32)
-
#dequeue(queue_list : Runners::QueueList) : WorkUnit | Nil
Attempt to dequeue a job from one of the queues managed by
queue_list. -
#finished_with(job_run : JobRun, queue : Queue) : Nil
Called by the Overseer when a job run has finished executing.
- #inner : ConcurrencyLimitedDequeueAdapter
-
#limits : Hash(String, Int32)
Returns the current effective concurrency limits (defaults merged with any remote overrides).
- #overseer_id : String | Nil
- #refresh_interval : Time::Span
-
#refresh_limits : Nil
Force an immediate refresh from the backend, ignoring the
#refresh_intervaltimer.
Instance methods inherited from class Mosquito::DequeueAdapter
dequeue(queue_list : Runners::QueueList) : WorkUnit | Nil
dequeue,
finished_with(job_run : JobRun, queue : Queue) : Nil
finished_with
Constructor Detail
Class Method Detail
Removes stored concurrency limits for a specific overseer.
Removes all globally stored concurrency limits, causing adapters to fall back to their defaults (or per-overseer limits if set).
Overwrites the concurrency limits for a specific overseer with limits.
Overwrites the global concurrency limits with limits. Any previously stored queue entries not present in limits are removed.
Reads the concurrency limits for a specific overseer.
Reads the global concurrency limits hash stored in the backend.
Instance Method Detail
Returns the current in-flight count for queue_name, delegated to the inner adapter.
Attempt to dequeue a job from one of the queues managed by queue_list.
Returns a WorkUnit when a job is available, or nil
when all queues are empty.
Called by the Overseer when a job run has finished executing. Override this to react to completed jobs (e.g. update internal counters or rebalance queue weights).
Returns the current effective concurrency limits (defaults merged with any remote overrides).
Force an immediate refresh from the backend, ignoring the
#refresh_interval timer.