Merge pull request 'Switch from Sidekiq to Solid Queue' (#219) from dev/sidekiq_to_solidqueue into master
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #219
Reviewed-by: Greg <greg@noreply.kosmos.org>
This commit is contained in:
Râu Cao 2025-05-05 11:24:56 +00:00
commit e686cf42e8
21 changed files with 243 additions and 63 deletions

View File

@ -4,7 +4,7 @@ git_source(:github) { |repo| "https://github.com/#{repo}.git" }
# Bundle edge Rails instead: gem 'rails', github: 'rails/rails'
gem 'rails', '~> 8.0'
# Use Puma as the app server
gem 'puma', '~> 4.1'
gem 'puma', '~> 6.6'
# View components
gem "view_component"
# Asset bundler
@ -19,8 +19,6 @@ gem "turbo-rails"
gem "stimulus-rails"
# Build JSON APIs with ease. Read more: https://github.com/rails/jbuilder
gem 'jbuilder', '~> 2.7'
# Use Redis adapter to run Action Cable in production
# gem 'redis', '~> 4.0'
# Use Active Model has_secure_password
gem 'bcrypt', '~> 3.1'
@ -53,8 +51,8 @@ gem 'down'
gem 'aws-sdk-s3', require: false
# Background/scheduled jobs
gem 'sidekiq', '< 7'
gem 'sidekiq-scheduler'
gem 'solid_queue'
gem "mission_control-jobs"
# Monitoring
gem "sentry-ruby"
@ -65,6 +63,7 @@ gem 'discourse_api'
gem "lnurl"
gem 'manifique', '~> 1.1.0'
gem 'nostr', '~> 0.6.0'
gem "redis", "~> 5.4"
group :development, :test do
# Use sqlite3 as the database for Active Record

View File

@ -265,6 +265,16 @@ GEM
mini_mime (1.1.5)
mini_portile2 (2.8.8)
minitest (5.25.5)
mission_control-jobs (1.0.2)
actioncable (>= 7.1)
actionpack (>= 7.1)
activejob (>= 7.1)
activerecord (>= 7.1)
importmap-rails (>= 1.2.1)
irb (~> 1.13)
railties (>= 7.1)
stimulus-rails
turbo-rails
multipart-post (2.4.1)
net-http (0.6.0)
uri
@ -315,7 +325,7 @@ GEM
date
stringio
public_suffix (6.0.1)
puma (4.3.12)
puma (6.6.0)
nio4r (~> 2.0)
raabro (1.4.0)
racc (1.8.1)
@ -375,7 +385,10 @@ GEM
logger
rdoc (6.13.1)
psych (>= 4.0.0)
redis (4.8.1)
redis (5.4.0)
redis-client (>= 0.22.0)
redis-client (0.24.0)
connection_pool
regexp_parser (2.10.0)
reline (0.6.1)
io-console (~> 0.5)
@ -424,8 +437,6 @@ GEM
ruby-vips (2.2.3)
ffi (~> 1.12)
logger
rufus-scheduler (3.9.2)
fugit (~> 1.1, >= 1.11.1)
sanitize (7.0.0)
crass (~> 1.0.2)
nokogiri (>= 1.16.8)
@ -436,14 +447,6 @@ GEM
sentry-ruby (5.23.0)
bigdecimal
concurrent-ruby (~> 1.0, >= 1.0.2)
sidekiq (6.5.12)
connection_pool (>= 2.2.5, < 3)
rack (~> 2.0)
redis (>= 4.5.0, < 5)
sidekiq-scheduler (5.0.6)
rufus-scheduler (~> 3.2)
sidekiq (>= 6, < 8)
tilt (>= 1.4.0, < 3)
solargraph (0.54.2)
backport (~> 1.2)
benchmark (~> 0.4)
@ -463,6 +466,13 @@ GEM
tilt (~> 2.0)
yard (~> 0.9, >= 0.9.24)
yard-solargraph (~> 0.1)
solid_queue (1.1.5)
activejob (>= 7.1)
activerecord (>= 7.1)
concurrent-ruby (>= 1.3.1)
fugit (~> 1.11.0)
railties (>= 7.1)
thor (~> 1.3.1)
sqlite3 (2.6.0)
mini_portile2 (~> 2.8.0)
sqlite3 (2.6.0-arm64-darwin)
@ -543,22 +553,23 @@ DEPENDENCIES
lnurl
lockbox
manifique (~> 1.1.0)
mission_control-jobs
net-ldap
nostr (~> 0.6.0)
pagy (~> 6.0, >= 6.0.2)
pg (~> 1.5)
propshaft
puma (~> 4.1)
puma (~> 6.6)
rails (~> 8.0)
rails-controller-testing
rails-settings-cached (~> 2.8.3)
redis (~> 5.4)
rqrcode (~> 2.0)
rspec-rails
sentry-rails
sentry-ruby
sidekiq (< 7)
sidekiq-scheduler
solargraph
solid_queue
sqlite3 (>= 2.1)
stimulus-rails
turbo-rails

View File

@ -57,7 +57,7 @@ Running the test suite:
Running the test suite with Docker Compose requires overriding the Rails
environment:
docker-compose run -e "RAILS_ENV=test" web rspec
docker-compose exec -e "RAILS_ENV=test" web rspec
### Docker Compose

View File

@ -3,8 +3,6 @@ class RemoteStorageExpireAuthorizationJob < ApplicationJob
def perform(rs_auth_id)
rs_auth = RemoteStorageAuthorization.find rs_auth_id
return unless rs_auth.expire_at.nil? || rs_auth.expire_at <= DateTime.now
rs_auth.destroy!
end
end

View File

@ -69,11 +69,19 @@ class RemoteStorageAuthorization < ApplicationRecord
end
def remove_token_expiry_job
queue = Sidekiq::Queue.new(RemoteStorageExpireAuthorizationJob.queue_name)
queue.each do |job|
next unless job.display_class == "RemoteStorageExpireAuthorizationJob"
job.delete if job.display_args == [id]
end
job_class = RemoteStorageExpireAuthorizationJob
job_args = [id]
query = SolidQueue::Job.where(class_name: job_class.to_s)
case ActiveRecord::Base.connection.adapter_name.downcase
when /sqlite/
query.where("json_extract(arguments, '$.arguments') = ?", job_args.to_json)
when /postgres/
query.where("arguments->>'arguments' = ?", job_args.to_json)
else
raise "Unsupported database adapter"
end.destroy_all
end
def find_or_create_web_app

6
bin/jobs Executable file
View File

@ -0,0 +1,6 @@
#!/usr/bin/env ruby
require_relative "../config/environment"
require "solid_queue/cli"
SolidQueue::Cli.start(ARGV)

View File

@ -47,7 +47,8 @@ module Akkounts
g.stylesheets false
end
config.active_job.queue_adapter = :sidekiq
config.active_job.queue_adapter = :solid_queue
config.mission_control.jobs.http_basic_auth_enabled = false
config.action_mailer.deliver_later_queue_name = nil # use "default" queue

View File

@ -7,6 +7,10 @@ development:
primary:
<<: *default
database: db/development.sqlite3
queue:
<<: *default
database: db/development_queue.sqlite3
migrations_paths: db/queue_migrate
lndhub:
<<: *default
adapter: postgresql

View File

@ -54,6 +54,9 @@ Rails.application.configure do
# Highlight code that enqueued background job in logs.
config.active_job.verbose_enqueue_logs = true
# Solid Queue database
config.solid_queue.connects_to = { database: { writing: :queue } }
# Suppress logger output for asset requests.
# config.assets.quiet = true

View File

@ -54,8 +54,9 @@ Rails.application.configure do
# Replace the default in-process memory cache store with a durable alternative.
# config.cache_store = :mem_cache_store
# Replace the default in-process and non-durable queuing backend for Active Job.
# config.active_job.queue_adapter = :resque
# Solid Queue database
config.solid_queue.connects_to = { database: { writing: :queue } }
# E-mail settings, adapted from https://github.com/mastodon/mastodon
outgoing_email_address = ENV.fetch('SMTP_FROM_ADDRESS', 'accounts@localhost')

View File

@ -0,0 +1,11 @@
# See https://alvincrespo.hashnode.dev/rails-8s-lazy-route-loading-devise
# TODO remove when Devise is fixed
require 'devise'
Devise # make sure it's already loaded
module Devise
def self.mappings
Rails.application.try(:reload_routes_unless_loaded)
@@mappings
end
end

View File

@ -1,5 +0,0 @@
require_relative "../../app/models/setting"
Sidekiq.configure_server do |config|
config.redis = { url: Setting.redis_url }
end

21
config/queue.yml Normal file
View File

@ -0,0 +1,21 @@
default: &default
dispatchers:
- polling_interval: 1
batch_size: 500
workers:
- queues: "*"
threads: 3
processes: <%= ENV.fetch("JOB_CONCURRENCY", 1) %>
polling_interval: 0.1
development:
<<: *default
workers:
- queues: "*"
threads: 1
test:
<<: *default
production:
<<: *default

10
config/recurring.yml Normal file
View File

@ -0,0 +1,10 @@
# production:
# periodic_cleanup:
# class: CleanSoftDeletedRecordsJob
# queue: background
# args: [ 1000, { batch_size: 500 } ]
# schedule: every hour
# periodic_command:
# command: "SoftDeletedRecord.due.delete_all"
# priority: 2
# schedule: at 5am every day

View File

@ -1,5 +1,3 @@
require 'sidekiq/web'
Rails.application.routes.draw do
devise_for :users, controllers: {
confirmations: 'users/confirmations',
@ -123,7 +121,7 @@ Rails.application.routes.draw do
end
authenticate :user, ->(user) { user.is_admin? } do
mount Sidekiq::Web, at: '/sidekiq'
mount MissionControl::Jobs::Engine, at: "/jobs"
mount Flipper::UI.app(Flipper), at: '/flipper'
end

129
db/queue_schema.rb Normal file
View File

@ -0,0 +1,129 @@
ActiveRecord::Schema[7.1].define(version: 1) do
create_table "solid_queue_blocked_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.string "concurrency_key", null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.index [ "concurrency_key", "priority", "job_id" ], name: "index_solid_queue_blocked_executions_for_release"
t.index [ "expires_at", "concurrency_key" ], name: "index_solid_queue_blocked_executions_for_maintenance"
t.index [ "job_id" ], name: "index_solid_queue_blocked_executions_on_job_id", unique: true
end
create_table "solid_queue_claimed_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.bigint "process_id"
t.datetime "created_at", null: false
t.index [ "job_id" ], name: "index_solid_queue_claimed_executions_on_job_id", unique: true
t.index [ "process_id", "job_id" ], name: "index_solid_queue_claimed_executions_on_process_id_and_job_id"
end
create_table "solid_queue_failed_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.text "error"
t.datetime "created_at", null: false
t.index [ "job_id" ], name: "index_solid_queue_failed_executions_on_job_id", unique: true
end
create_table "solid_queue_jobs", force: :cascade do |t|
t.string "queue_name", null: false
t.string "class_name", null: false
t.text "arguments"
t.integer "priority", default: 0, null: false
t.string "active_job_id"
t.datetime "scheduled_at"
t.datetime "finished_at"
t.string "concurrency_key"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index [ "active_job_id" ], name: "index_solid_queue_jobs_on_active_job_id"
t.index [ "class_name" ], name: "index_solid_queue_jobs_on_class_name"
t.index [ "finished_at" ], name: "index_solid_queue_jobs_on_finished_at"
t.index [ "queue_name", "finished_at" ], name: "index_solid_queue_jobs_for_filtering"
t.index [ "scheduled_at", "finished_at" ], name: "index_solid_queue_jobs_for_alerting"
end
create_table "solid_queue_pauses", force: :cascade do |t|
t.string "queue_name", null: false
t.datetime "created_at", null: false
t.index [ "queue_name" ], name: "index_solid_queue_pauses_on_queue_name", unique: true
end
create_table "solid_queue_processes", force: :cascade do |t|
t.string "kind", null: false
t.datetime "last_heartbeat_at", null: false
t.bigint "supervisor_id"
t.integer "pid", null: false
t.string "hostname"
t.text "metadata"
t.datetime "created_at", null: false
t.string "name", null: false
t.index [ "last_heartbeat_at" ], name: "index_solid_queue_processes_on_last_heartbeat_at"
t.index [ "name", "supervisor_id" ], name: "index_solid_queue_processes_on_name_and_supervisor_id", unique: true
t.index [ "supervisor_id" ], name: "index_solid_queue_processes_on_supervisor_id"
end
create_table "solid_queue_ready_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "created_at", null: false
t.index [ "job_id" ], name: "index_solid_queue_ready_executions_on_job_id", unique: true
t.index [ "priority", "job_id" ], name: "index_solid_queue_poll_all"
t.index [ "queue_name", "priority", "job_id" ], name: "index_solid_queue_poll_by_queue"
end
create_table "solid_queue_recurring_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "task_key", null: false
t.datetime "run_at", null: false
t.datetime "created_at", null: false
t.index [ "job_id" ], name: "index_solid_queue_recurring_executions_on_job_id", unique: true
t.index [ "task_key", "run_at" ], name: "index_solid_queue_recurring_executions_on_task_key_and_run_at", unique: true
end
create_table "solid_queue_recurring_tasks", force: :cascade do |t|
t.string "key", null: false
t.string "schedule", null: false
t.string "command", limit: 2048
t.string "class_name"
t.text "arguments"
t.string "queue_name"
t.integer "priority", default: 0
t.boolean "static", default: true, null: false
t.text "description"
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index [ "key" ], name: "index_solid_queue_recurring_tasks_on_key", unique: true
t.index [ "static" ], name: "index_solid_queue_recurring_tasks_on_static"
end
create_table "solid_queue_scheduled_executions", force: :cascade do |t|
t.bigint "job_id", null: false
t.string "queue_name", null: false
t.integer "priority", default: 0, null: false
t.datetime "scheduled_at", null: false
t.datetime "created_at", null: false
t.index [ "job_id" ], name: "index_solid_queue_scheduled_executions_on_job_id", unique: true
t.index [ "scheduled_at", "priority", "job_id" ], name: "index_solid_queue_dispatch_all"
end
create_table "solid_queue_semaphores", force: :cascade do |t|
t.string "key", null: false
t.integer "value", default: 1, null: false
t.datetime "expires_at", null: false
t.datetime "created_at", null: false
t.datetime "updated_at", null: false
t.index [ "expires_at" ], name: "index_solid_queue_semaphores_on_expires_at"
t.index [ "key", "value" ], name: "index_solid_queue_semaphores_on_key_and_value"
t.index [ "key" ], name: "index_solid_queue_semaphores_on_key", unique: true
end
add_foreign_key "solid_queue_blocked_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_claimed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_failed_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_ready_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_recurring_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
add_foreign_key "solid_queue_scheduled_executions", "solid_queue_jobs", column: "job_id", on_delete: :cascade
end

View File

@ -37,6 +37,9 @@ services:
- "3000:3000"
environment:
RAILS_ENV: development
SOLID_QUEUE_IN_PUMA: true
LAUNCHY_DRY_RUN: true
BROWSER: /dev/null
PRIMARY_DOMAIN: kosmos.org
LDAP_HOST: ldap
LDAP_PORT: 3389
@ -54,30 +57,6 @@ services:
- ldap
- redis
sidekiq:
build: .
command: bash -c "bundle exec sidekiq -C config/sidekiq.yml"
volumes:
- .:/akkounts
networks:
- internal_network
environment:
RAILS_ENV: development
PRIMARY_DOMAIN: kosmos.org
LDAP_HOST: ldap
LDAP_PORT: 3389
LDAP_ADMIN_PASSWORD: passthebutter
LDAP_USE_TLS: "false"
LAUNCHY_DRY_RUN: true
BROWSER: /dev/null
REDIS_URL: redis://redis:6379/0
RS_REDIS_URL: redis://redis:6379/1
RS_STORAGE_URL: "http://localhost:4567"
S3_ENABLED: false
depends_on:
- ldap
- redis
minio:
image: quay.io/minio/minio:latest
command: "server /data --console-address ':9001'"

View File

@ -5,6 +5,7 @@ RSpec.describe Rs::OauthController, type: :controller do
before do
allow_any_instance_of(AppCatalog::WebApp).to receive(:update_metadata).and_return(true)
allow_any_instance_of(RemoteStorageAuthorization).to receive(:remove_token_expiry_job).and_return(nil)
end
describe "GET /rs/oauth/:username" do

View File

@ -5,6 +5,7 @@ RSpec.describe Services::RsAuthsController, type: :controller do
before do
allow_any_instance_of(AppCatalog::WebApp).to receive(:update_metadata).and_return(true)
allow_any_instance_of(RemoteStorageAuthorization).to receive(:remove_token_expiry_job).and_return(nil)
allow_any_instance_of(Flipper).to receive(:enabled?).and_return(true)
end

View File

@ -5,6 +5,9 @@ RSpec.describe RemoteStorageExpireAuthorizationJob, type: :job do
allow_any_instance_of(AppCatalog::WebApp).to(
receive(:update_metadata).and_return(true)
)
allow_any_instance_of(RemoteStorageAuthorization).to(
receive(:remove_token_expiry_job).and_return(nil)
)
@user = create :user, cn: "ronald", ou: "kosmos.org"
@rs_authorization = create :remote_storage_authorization,

View File

@ -7,6 +7,7 @@ RSpec.describe RemoteStorageAuthorization, type: :model do
before do
allow_any_instance_of(AppCatalog::WebApp).to receive(:update_metadata).and_return(true)
allow_any_instance_of(RemoteStorageAuthorization).to receive(:remove_token_expiry_job).and_return(nil)
end
describe "#create" do