mirror of
https://github.com/chatwoot/chatwoot.git
synced 2026-06-04 21:02:35 +08:00
feat: scheduler fairness [AI-159] (#14425)
# Pull Request Template ## Description Better scheduling and queueing mechanics for document auto-sync - add jitter plan wise for document sync - move auto-sync documents to purgeable queue ## Type of change Please delete options that are not relevant. - [x] New feature (non-breaking change which adds functionality) ## How Has This Been Tested? Please describe the tests that you ran to verify your changes. Provide instructions so we can reproduce. Please also list any relevant details for your test configuration. locally tested and with specs ## Checklist: - [x] My code follows the style guidelines of this project - [x] I have performed a self-review of my code - [x] I have commented on my code, particularly in hard-to-understand areas - [ ] I have made corresponding changes to the documentation - [x] My changes generate no new warnings - [x] I have added tests that prove my fix is effective or that my feature works - [x] New and existing unit tests pass locally with my changes - [x] Any dependent changes have been merged and published in downstream modules --------- Co-authored-by: Sivin Varghese <64252451+iamsivin@users.noreply.github.com> Co-authored-by: iamsivin <iamsivin@gmail.com> Co-authored-by: Muhsin Keloth <muhsinkeramam@gmail.com> Co-authored-by: Sony Mathew <sony@chatwoot.com> Co-authored-by: Vishnu Narayanan <iamwishnu@gmail.com>
This commit is contained in:
parent
9c17a6f302
commit
d20950c5b4
@ -25,7 +25,7 @@ const store = useStore();
|
||||
const bulkDeleteDialog = ref(null);
|
||||
|
||||
const isSyncableDocument = doc =>
|
||||
!doc.pdf_document && doc.status === 'available' && !doc.sync_in_progress;
|
||||
!doc.pdf_document && doc.status === 'available';
|
||||
|
||||
const syncableSelectedIds = computed(() => {
|
||||
if (!props.selectedIds.size) return [];
|
||||
|
||||
@ -127,7 +127,6 @@ const menuItems = computed(() => {
|
||||
value: 'sync',
|
||||
action: 'sync',
|
||||
icon: 'i-lucide-refresh-cw',
|
||||
disabled: props.syncInProgress,
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@ -23,3 +23,5 @@ class Internal::TriggerDailyScheduledItemsJob < ApplicationJob
|
||||
@designated_minute ||= Digest::MD5.hexdigest(ChatwootHub.installation_identifier).hex % 1440
|
||||
end
|
||||
end
|
||||
|
||||
Internal::TriggerDailyScheduledItemsJob.prepend_mod_with('Internal::TriggerDailyScheduledItemsJob')
|
||||
|
||||
@ -215,6 +215,18 @@
|
||||
value:
|
||||
locked: false
|
||||
type: code
|
||||
- name: CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT
|
||||
display_title: 'Captain Document Auto Sync Per Account Batch Limit'
|
||||
description: 'Maximum syncable Captain documents to enqueue per account in one scheduler run. Defaults to 50.'
|
||||
value: 50
|
||||
locked: false
|
||||
type: number
|
||||
- name: CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT
|
||||
display_title: 'Captain Document Auto Sync Global Batch Limit'
|
||||
description: 'Maximum syncable Captain documents to enqueue globally in one scheduler run. Defaults to 1000.'
|
||||
value: 1000
|
||||
locked: false
|
||||
type: number
|
||||
# End of Captain Config
|
||||
|
||||
# ------- Context.dev Config ------- #
|
||||
|
||||
@ -75,7 +75,6 @@ class Api::V1::Accounts::Captain::BulkActionsController < Api::V1::Accounts::Bas
|
||||
Current.account.captain_documents.where(id: params[:ids]).find_each(batch_size: 100) do |document|
|
||||
next unless document.syncable?
|
||||
next unless document.available?
|
||||
next if document.sync_in_progress?
|
||||
|
||||
document.update!(
|
||||
sync_status: :syncing,
|
||||
|
||||
@ -37,7 +37,6 @@ class Api::V1::Accounts::Captain::DocumentsController < Api::V1::Accounts::BaseC
|
||||
def sync
|
||||
return render_could_not_create_error(I18n.t('captain.documents.sync_not_supported_for_pdf')) unless @document.syncable?
|
||||
return render_could_not_create_error(I18n.t('captain.documents.sync_only_available_documents')) unless @document.available?
|
||||
return render_could_not_create_error(I18n.t('captain.documents.sync_already_in_progress')) if @document.sync_in_progress?
|
||||
|
||||
@document.update!(
|
||||
sync_status: :syncing,
|
||||
|
||||
@ -48,9 +48,7 @@ class Captain::Documents::PerformSyncJob < MutexApplicationJob
|
||||
return if document.pdf_document?
|
||||
|
||||
with_lock(lock_key(document), LOCK_TIMEOUT) do
|
||||
mark_sync_started(document)
|
||||
result = Captain::Documents::SyncService.new(document.reload).perform
|
||||
log_sync_outcome(document, result: result, duration_ms: duration_ms_since(start_time))
|
||||
perform_sync(document, start_time)
|
||||
end
|
||||
rescue LockAcquisitionError
|
||||
log_sync_outcome(document, result: :already_syncing)
|
||||
@ -64,6 +62,12 @@ class Captain::Documents::PerformSyncJob < MutexApplicationJob
|
||||
|
||||
private
|
||||
|
||||
def perform_sync(document, start_time)
|
||||
mark_sync_started(document)
|
||||
result = Captain::Documents::SyncService.new(document.reload).perform
|
||||
log_sync_outcome(document, result: result, duration_ms: duration_ms_since(start_time))
|
||||
end
|
||||
|
||||
def log_sync_outcome(document, **fields)
|
||||
payload = {
|
||||
document_id: document.id,
|
||||
|
||||
@ -1,21 +1,27 @@
|
||||
class Captain::Documents::ScheduleSyncsJob < ApplicationJob
|
||||
queue_as :scheduled_jobs
|
||||
|
||||
PER_ACCOUNT_HOURLY_CAP = 50
|
||||
GLOBAL_HOURLY_CAP = 1000
|
||||
DUE_DOCUMENT_BATCH_SIZE = PER_ACCOUNT_HOURLY_CAP * 2 # Inspite of skipping, we should at least reach the hourly cap
|
||||
DEFAULT_PER_ACCOUNT_BATCH_LIMIT = 50
|
||||
DEFAULT_GLOBAL_BATCH_LIMIT = 1000
|
||||
SYNC_STALE_TIMEOUT = Captain::Document::SYNC_STALE_TIMEOUT
|
||||
DAILY_SYNC_JITTER = 4.hours
|
||||
WEEKLY_SYNC_JITTER = 1.day
|
||||
MONTHLY_SYNC_JITTER = 4.days
|
||||
|
||||
def perform
|
||||
@remaining_global_capacity = GLOBAL_HOURLY_CAP
|
||||
def perform(plan_name = nil)
|
||||
@per_account_batch_limit = configured_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', DEFAULT_PER_ACCOUNT_BATCH_LIMIT)
|
||||
@global_batch_limit = configured_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT', DEFAULT_GLOBAL_BATCH_LIMIT)
|
||||
@remaining_global_capacity = @global_batch_limit
|
||||
@plan_name = plan_name.to_s.downcase.presence
|
||||
sync_intervals = Enterprise::Account.captain_document_sync_intervals
|
||||
stats = { accounts_scanned: 0, accounts_enabled: 0, accounts_scheduled: 0, documents_enqueued: 0, documents_skipped: 0 }
|
||||
stats = { accounts_scanned: 0, accounts_enabled: 0, accounts_scheduled: 0, documents_enqueued: 0 }
|
||||
|
||||
Account.joins(:captain_documents).distinct.find_each(batch_size: 100) do |account|
|
||||
break if @remaining_global_capacity <= 0
|
||||
|
||||
stats[:accounts_scanned] += 1
|
||||
next unless account.feature_enabled?('captain_document_auto_sync')
|
||||
next unless account_in_selected_plan?(account)
|
||||
|
||||
stats[:accounts_enabled] += 1
|
||||
interval = account.captain_document_sync_interval(sync_intervals)
|
||||
@ -24,7 +30,6 @@ class Captain::Documents::ScheduleSyncsJob < ApplicationJob
|
||||
stats[:accounts_scheduled] += 1
|
||||
result = enqueue_due_documents(account, interval)
|
||||
stats[:documents_enqueued] += result[:enqueued]
|
||||
stats[:documents_skipped] += result[:skipped]
|
||||
end
|
||||
|
||||
log_scheduler_summary(stats)
|
||||
@ -33,92 +38,85 @@ class Captain::Documents::ScheduleSyncsJob < ApplicationJob
|
||||
private
|
||||
|
||||
def enqueue_due_documents(account, interval)
|
||||
per_account_limit = [PER_ACCOUNT_HOURLY_CAP, @remaining_global_capacity].min
|
||||
result = { enqueued: 0, skipped: 0 }
|
||||
skipped_document_ids = []
|
||||
per_account_limit = [@per_account_batch_limit, @remaining_global_capacity].min
|
||||
result = { enqueued: 0 }
|
||||
|
||||
while result[:enqueued] < per_account_limit
|
||||
|
||||
documents = due_documents(account, interval, skipped_document_ids).limit(DUE_DOCUMENT_BATCH_SIZE).to_a
|
||||
break if documents.empty?
|
||||
|
||||
documents.each do |document|
|
||||
break if result[:enqueued] >= per_account_limit
|
||||
|
||||
process_due_document(document, result, skipped_document_ids)
|
||||
end
|
||||
due_documents(account, interval).limit(per_account_limit).to_a.each do |document|
|
||||
process_due_document(document, interval, result)
|
||||
end
|
||||
|
||||
result
|
||||
end
|
||||
|
||||
def process_due_document(document, result, skipped_document_ids)
|
||||
return unless document.syncable?
|
||||
def process_due_document(document, interval, result)
|
||||
sync_execution_delay = sync_jitter(interval)
|
||||
|
||||
# Reserve the sync slot before enqueueing so later scheduler runs skip this document while the job is queued.
|
||||
unless reserve_sync_slot(document)
|
||||
result[:skipped] += 1
|
||||
skipped_document_ids << document.id
|
||||
return
|
||||
end
|
||||
|
||||
Captain::Documents::PerformSyncJob.perform_later(document)
|
||||
Captain::Documents::PerformSyncJob.set(queue: :purgable, wait: sync_execution_delay).perform_later(document)
|
||||
@remaining_global_capacity -= 1
|
||||
result[:enqueued] += 1
|
||||
end
|
||||
|
||||
def due_documents(account, interval, skipped_document_ids)
|
||||
def due_documents(account, interval)
|
||||
syncing = Captain::Document.sync_statuses[:syncing]
|
||||
synced = Captain::Document.sync_statuses[:synced]
|
||||
failed = Captain::Document.sync_statuses[:failed]
|
||||
stale_cutoff = SYNC_STALE_TIMEOUT.ago
|
||||
# The scheduler runs at predictable plan windows. Use a wider due window so
|
||||
# jittered executions do not miss the next window just because they finished later.
|
||||
sync_due_before = due_window(interval).ago
|
||||
|
||||
documents = account.captain_documents.syncable.where(status: :available).where(
|
||||
'(sync_status = ? AND last_synced_at < ?) OR (sync_status = ? AND last_sync_attempted_at < ?) OR ' \
|
||||
'(sync_status = ? AND last_sync_attempted_at < ?)',
|
||||
synced, interval.ago, failed, interval.ago, syncing, SYNC_STALE_TIMEOUT.ago
|
||||
synced, sync_due_before, failed, sync_due_before, syncing, stale_cutoff
|
||||
)
|
||||
documents = documents.where.not(id: skipped_document_ids) if skipped_document_ids.present?
|
||||
documents.order(Arel.sql('last_sync_attempted_at ASC NULLS FIRST'), :id)
|
||||
end
|
||||
|
||||
def reserve_sync_slot(document)
|
||||
mark_sync_started(document)
|
||||
true
|
||||
rescue ActiveRecord::RecordInvalid => e
|
||||
log_document_skip(document, e)
|
||||
false
|
||||
def configured_sync_limit(config_key, default)
|
||||
configured_value = InstallationConfig.find_by(name: config_key)&.value
|
||||
limit = configured_value.to_s.to_i
|
||||
limit.positive? ? limit : default
|
||||
end
|
||||
|
||||
def log_document_skip(document, error)
|
||||
payload = {
|
||||
event: 'document_skipped',
|
||||
document_id: document.id,
|
||||
account_id: document.account_id,
|
||||
assistant_id: document.assistant_id,
|
||||
error_class: error.class.name,
|
||||
error_message: error.message,
|
||||
validation_errors: document.errors.full_messages
|
||||
}
|
||||
def account_in_selected_plan?(account)
|
||||
return true if @plan_name.blank?
|
||||
|
||||
Rails.logger.warn("[Captain::Documents::ScheduleSyncsJob] #{payload.to_json}")
|
||||
account_sync_plan(account) == @plan_name
|
||||
end
|
||||
|
||||
def account_sync_plan(account)
|
||||
plan = account.custom_attributes['plan_name']
|
||||
plan = 'enterprise' if plan.blank? && ChatwootApp.self_hosted_enterprise?
|
||||
plan.to_s.downcase.presence
|
||||
end
|
||||
|
||||
def sync_jitter(interval)
|
||||
jitter_window = if interval <= 1.day
|
||||
DAILY_SYNC_JITTER
|
||||
elsif interval <= 1.week
|
||||
WEEKLY_SYNC_JITTER
|
||||
else
|
||||
MONTHLY_SYNC_JITTER
|
||||
end
|
||||
|
||||
rand(0..jitter_window.to_i).seconds
|
||||
end
|
||||
|
||||
def due_window(interval)
|
||||
(interval.to_i / 2).seconds
|
||||
end
|
||||
|
||||
def log_scheduler_summary(stats)
|
||||
payload = {
|
||||
event: 'completed',
|
||||
plan_name: @plan_name,
|
||||
global_cap_hit: @remaining_global_capacity <= 0,
|
||||
per_account_batch_limit: @per_account_batch_limit,
|
||||
global_batch_limit: @global_batch_limit,
|
||||
remaining_global_capacity: @remaining_global_capacity
|
||||
}.merge(stats)
|
||||
|
||||
Rails.logger.info("[Captain::Documents::ScheduleSyncsJob] #{payload.to_json}")
|
||||
end
|
||||
|
||||
def mark_sync_started(document)
|
||||
document.update!(
|
||||
sync_status: :syncing,
|
||||
sync_step: nil,
|
||||
last_sync_error_code: nil,
|
||||
last_sync_attempted_at: Time.current
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
@ -0,0 +1,19 @@
|
||||
module Enterprise::Internal::TriggerDailyScheduledItemsJob
|
||||
def perform
|
||||
super
|
||||
|
||||
Captain::Documents::ScheduleSyncsJob.perform_later('enterprise')
|
||||
Captain::Documents::ScheduleSyncsJob.perform_later('business') if business_auto_sync_due?
|
||||
Captain::Documents::ScheduleSyncsJob.perform_later('startups') if startup_auto_sync_due?
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def business_auto_sync_due?
|
||||
Time.current.utc.sunday?
|
||||
end
|
||||
|
||||
def startup_auto_sync_due?
|
||||
Time.current.utc.day == 1
|
||||
end
|
||||
end
|
||||
@ -1,7 +0,0 @@
|
||||
module Enterprise::Internal::TriggerHourlyScheduledItemsJob
|
||||
def perform
|
||||
super
|
||||
|
||||
Captain::Documents::ScheduleSyncsJob.perform_later
|
||||
end
|
||||
end
|
||||
@ -147,7 +147,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::BulkActions', type: :request do
|
||||
params: sync_params,
|
||||
headers: admin.create_new_auth_token,
|
||||
as: :json
|
||||
end.to have_enqueued_job(Captain::Documents::PerformSyncJob).exactly(documents.size).times
|
||||
end.to have_enqueued_job(Captain::Documents::PerformSyncJob).on_queue('low').exactly(documents.size).times
|
||||
|
||||
documents.each do |document|
|
||||
expect(document.reload).to have_attributes(
|
||||
@ -190,7 +190,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::BulkActions', type: :request do
|
||||
expect(response).to have_http_status(:ok)
|
||||
end
|
||||
|
||||
it 'skips documents that already have a sync in progress' do
|
||||
it 'queues documents that already have a sync in progress' do
|
||||
syncing_document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
syncing_document.update!(sync_status: :syncing, last_sync_attempted_at: 1.minute.ago)
|
||||
|
||||
@ -199,9 +199,10 @@ RSpec.describe 'Api::V1::Accounts::Captain::BulkActions', type: :request do
|
||||
params: sync_params.merge(ids: [syncing_document.id]),
|
||||
headers: admin.create_new_auth_token,
|
||||
as: :json
|
||||
end.not_to have_enqueued_job(Captain::Documents::PerformSyncJob)
|
||||
end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(syncing_document).on_queue('low')
|
||||
|
||||
expect(response).to have_http_status(:ok)
|
||||
expect(json_response).to eq({ ids: [syncing_document.id], count: 1 })
|
||||
end
|
||||
|
||||
it 'queues stale syncing documents again' do
|
||||
|
||||
@ -243,7 +243,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::Documents', type: :request do
|
||||
before do
|
||||
create_list(:captain_document, 5, assistant: assistant, account: account)
|
||||
|
||||
create(:installation_config, name: 'CAPTAIN_CLOUD_PLAN_LIMITS', value: captain_limits.to_json)
|
||||
InstallationConfig.find_or_initialize_by(name: 'CAPTAIN_CLOUD_PLAN_LIMITS').update!(value: captain_limits.to_json)
|
||||
post "/api/v1/accounts/#{account.id}/captain/documents",
|
||||
params: valid_attributes,
|
||||
headers: admin.create_new_auth_token
|
||||
@ -281,7 +281,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::Documents', type: :request do
|
||||
expect do
|
||||
post "/api/v1/accounts/#{account.id}/captain/documents/#{document.id}/sync",
|
||||
headers: admin.create_new_auth_token, as: :json
|
||||
end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document)
|
||||
end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document).on_queue('low')
|
||||
|
||||
expect(document.reload).to have_attributes(
|
||||
sync_status: 'syncing',
|
||||
@ -292,15 +292,15 @@ RSpec.describe 'Api::V1::Accounts::Captain::Documents', type: :request do
|
||||
expect(response).to have_http_status(:accepted)
|
||||
end
|
||||
|
||||
it 'rejects documents that already have a sync in progress' do
|
||||
it 'queues documents that already have a sync in progress' do
|
||||
document.update!(sync_status: :syncing, last_sync_attempted_at: 1.minute.ago)
|
||||
|
||||
expect do
|
||||
post "/api/v1/accounts/#{account.id}/captain/documents/#{document.id}/sync",
|
||||
headers: admin.create_new_auth_token, as: :json
|
||||
end.not_to have_enqueued_job(Captain::Documents::PerformSyncJob)
|
||||
end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document).on_queue('low')
|
||||
|
||||
expect(response).to have_http_status(:unprocessable_entity)
|
||||
expect(response).to have_http_status(:accepted)
|
||||
end
|
||||
|
||||
it 'queues stale syncing documents again' do
|
||||
|
||||
@ -0,0 +1,60 @@
|
||||
require 'rails_helper'
|
||||
|
||||
RSpec.describe Captain::Documents::PerformSyncJob, type: :job do
|
||||
let(:account) { create(:account) }
|
||||
let(:assistant) { create(:captain_assistant, account: account) }
|
||||
let(:document) { create(:captain_document, assistant: assistant, account: account, status: :available) }
|
||||
|
||||
def stub_lock(job)
|
||||
allow(job).to receive(:with_lock).and_yield
|
||||
end
|
||||
|
||||
def stub_page_fetch(content: 'Updated content')
|
||||
fetch_result = Captain::Documents::SinglePageFetcher::Result.new(
|
||||
success: true,
|
||||
title: 'Updated title',
|
||||
content: content
|
||||
)
|
||||
fetcher = instance_double(Captain::Documents::SinglePageFetcher, fetch: fetch_result)
|
||||
allow(Captain::Documents::SinglePageFetcher).to receive(:new).and_return(fetcher)
|
||||
end
|
||||
|
||||
def stub_page_fetch_failure
|
||||
fetcher = instance_double(Captain::Documents::SinglePageFetcher)
|
||||
allow(fetcher).to receive(:fetch).and_raise(StandardError, 'boom')
|
||||
allow(Captain::Documents::SinglePageFetcher).to receive(:new).and_return(fetcher)
|
||||
end
|
||||
|
||||
it 'syncs the document content' do
|
||||
travel_to Time.zone.local(2026, 5, 18, 10, 0, 0) do
|
||||
job = described_class.new
|
||||
stub_lock(job)
|
||||
stub_page_fetch
|
||||
|
||||
job.perform(document)
|
||||
|
||||
expect(document.reload).to have_attributes(
|
||||
sync_status: 'synced',
|
||||
last_sync_attempted_at: Time.current,
|
||||
last_synced_at: Time.current,
|
||||
content: 'Updated content'
|
||||
)
|
||||
end
|
||||
end
|
||||
|
||||
it 'marks unexpected failures as failed' do
|
||||
travel_to Time.zone.local(2026, 5, 18, 10, 0, 0) do
|
||||
job = described_class.new
|
||||
stub_lock(job)
|
||||
stub_page_fetch_failure
|
||||
|
||||
expect { job.perform(document) }.to raise_error(StandardError, 'boom')
|
||||
|
||||
expect(document.reload).to have_attributes(
|
||||
sync_status: 'failed',
|
||||
last_sync_error_code: 'sync_error',
|
||||
last_sync_attempted_at: Time.current
|
||||
)
|
||||
end
|
||||
end
|
||||
end
|
||||
@ -5,11 +5,28 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do
|
||||
let(:assistant) { create(:captain_assistant, account: account) }
|
||||
|
||||
before do
|
||||
create(:installation_config, name: 'CAPTAIN_DOCUMENT_AUTO_SYNC_INTERVALS', value: { business: 24, hacker: nil }.to_json)
|
||||
set_installation_config('CAPTAIN_DOCUMENT_AUTO_SYNC_INTERVALS', { business: 168, enterprise: 24, startups: 720, hacker: nil }.to_json)
|
||||
set_installation_config('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', 50)
|
||||
set_installation_config('CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT', 1000)
|
||||
account.enable_features!('captain_document_auto_sync')
|
||||
clear_enqueued_jobs
|
||||
end
|
||||
|
||||
def set_installation_config(name, value)
|
||||
InstallationConfig.find_or_initialize_by(name: name).tap do |config|
|
||||
config.value = value
|
||||
config.save!
|
||||
end
|
||||
end
|
||||
|
||||
def update_sync_limit(name, value)
|
||||
InstallationConfig.find_by!(name: name).update!(value: value)
|
||||
end
|
||||
|
||||
def sync_job_for(document)
|
||||
have_enqueued_job(Captain::Documents::PerformSyncJob).with(document)
|
||||
end
|
||||
|
||||
context 'when the account has not enabled auto-sync' do
|
||||
before { account.disable_features!('captain_document_auto_sync') }
|
||||
|
||||
@ -32,6 +49,25 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do
|
||||
end
|
||||
end
|
||||
|
||||
context 'when a plan name is passed' do
|
||||
it 'queues due documents only for that plan' do
|
||||
enterprise_account = create(:account, custom_attributes: { plan_name: 'Enterprise' })
|
||||
enterprise_account.enable_features!('captain_document_auto_sync')
|
||||
enterprise_assistant = create(:captain_assistant, account: enterprise_account)
|
||||
business_document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
enterprise_document = create(:captain_document, assistant: enterprise_assistant, account: enterprise_account, status: :available)
|
||||
|
||||
business_document.update!(sync_status: :synced, last_synced_at: 3.days.ago, last_sync_attempted_at: 3.days.ago)
|
||||
enterprise_document.update!(sync_status: :synced, last_synced_at: 3.days.ago, last_sync_attempted_at: 3.days.ago)
|
||||
clear_enqueued_jobs
|
||||
|
||||
described_class.new.perform('enterprise')
|
||||
|
||||
expect(Captain::Documents::PerformSyncJob).to have_been_enqueued.with(enterprise_document)
|
||||
expect(Captain::Documents::PerformSyncJob).not_to have_been_enqueued.with(business_document)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when an available document has backfilled sync metadata' do
|
||||
it 'leaves it alone when last synced within the plan cadence' do
|
||||
create(
|
||||
@ -54,53 +90,34 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do
|
||||
account: account,
|
||||
status: :available,
|
||||
sync_status: :synced,
|
||||
last_synced_at: 3.days.ago
|
||||
last_synced_at: 8.days.ago
|
||||
)
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document)
|
||||
.to sync_job_for(document).on_queue('purgable')
|
||||
end
|
||||
|
||||
it 'marks the due document as syncing before queueing' do
|
||||
it 'delays only the queued sync job' do
|
||||
travel_to Time.zone.local(2026, 4, 27, 10, 0, 0) do
|
||||
job = described_class.new
|
||||
allow(job).to receive(:rand).and_return(30.minutes.to_i)
|
||||
document = create(
|
||||
:captain_document,
|
||||
assistant: assistant,
|
||||
account: account,
|
||||
status: :available,
|
||||
sync_status: :synced,
|
||||
last_synced_at: 3.days.ago
|
||||
last_synced_at: 8.days.ago
|
||||
)
|
||||
clear_enqueued_jobs
|
||||
|
||||
described_class.new.perform
|
||||
job.perform
|
||||
|
||||
expect(document.reload).to have_attributes(
|
||||
sync_status: 'syncing',
|
||||
last_sync_attempted_at: Time.current
|
||||
)
|
||||
expect(Captain::Documents::PerformSyncJob)
|
||||
.to have_been_enqueued.with(document).at(30.minutes.from_now)
|
||||
end
|
||||
end
|
||||
|
||||
it 'does not queue the same document again while the reserved sync is fresh' do
|
||||
document = create(
|
||||
:captain_document,
|
||||
assistant: assistant,
|
||||
account: account,
|
||||
status: :available,
|
||||
sync_status: :synced,
|
||||
last_synced_at: 2.days.ago
|
||||
)
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document)
|
||||
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }.not_to have_enqueued_job(Captain::Documents::PerformSyncJob)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when an available document was synced within the plan cadence' do
|
||||
@ -116,78 +133,59 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do
|
||||
context 'when an available document was last synced before the plan cadence' do
|
||||
it 'queues a sync for that document' do
|
||||
document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago)
|
||||
document.update!(sync_status: :synced, last_synced_at: 8.days.ago, last_sync_attempted_at: 8.days.ago)
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document)
|
||||
.to sync_job_for(document)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when jitter spreads queued sync execution' do
|
||||
it 'uses a widened due window so jittered syncs do not skip the next plan run' do
|
||||
travel_to Time.zone.local(2026, 4, 27, 10, 0, 0) do
|
||||
job = described_class.new
|
||||
interval = 1.week
|
||||
due_window = (interval.to_i / 2).seconds
|
||||
allow(job).to receive(:rand).and_return(2.hours.to_i)
|
||||
document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
|
||||
document.update!(sync_status: :synced, last_synced_at: (due_window - 1.minute).ago)
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { job.perform }.not_to have_enqueued_job(Captain::Documents::PerformSyncJob)
|
||||
|
||||
document.update!(sync_status: :synced, last_synced_at: (due_window + 1.minute).ago)
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { job.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob)
|
||||
.with(document)
|
||||
.on_queue('purgable')
|
||||
.at(2.hours.from_now)
|
||||
end
|
||||
end
|
||||
|
||||
it 'skips invalid legacy documents without counting them against the account cap' do
|
||||
stub_const("#{described_class}::PER_ACCOUNT_HOURLY_CAP", 1)
|
||||
create(
|
||||
:captain_document,
|
||||
assistant: assistant,
|
||||
account: account,
|
||||
status: :in_progress,
|
||||
content: nil,
|
||||
external_link: 'https://example.com'
|
||||
)
|
||||
invalid_document = build(
|
||||
:captain_document,
|
||||
assistant: assistant,
|
||||
account: account,
|
||||
status: :available,
|
||||
sync_status: :synced,
|
||||
last_synced_at: 2.days.ago,
|
||||
last_sync_attempted_at: 2.days.ago,
|
||||
external_link: 'https://example.com/'
|
||||
)
|
||||
invalid_document.save!(validate: false)
|
||||
valid_document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
valid_document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago)
|
||||
clear_enqueued_jobs
|
||||
it 'uses a random delay inside the cadence window' do
|
||||
travel_to Time.zone.local(2026, 4, 27, 10, 0, 0) do
|
||||
document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
document.update!(sync_status: :synced, last_synced_at: 8.days.ago)
|
||||
job = described_class.new
|
||||
sync_execution_delay = 12_345.seconds
|
||||
|
||||
expect { described_class.new.perform }.not_to raise_error
|
||||
expect(Captain::Documents::PerformSyncJob).not_to have_been_enqueued.with(invalid_document)
|
||||
expect(Captain::Documents::PerformSyncJob).to have_been_enqueued.with(valid_document)
|
||||
end
|
||||
clear_enqueued_jobs
|
||||
allow(job).to receive(:rand).with(0..described_class::WEEKLY_SYNC_JITTER.to_i).and_return(sync_execution_delay.to_i)
|
||||
|
||||
it 'keeps paging due documents when invalid documents fill the first batch' do
|
||||
stub_const("#{described_class}::PER_ACCOUNT_HOURLY_CAP", 1)
|
||||
stub_const("#{described_class}::DUE_DOCUMENT_BATCH_SIZE", 1)
|
||||
create(
|
||||
:captain_document,
|
||||
assistant: assistant,
|
||||
account: account,
|
||||
status: :in_progress,
|
||||
content: nil,
|
||||
external_link: 'https://example.com'
|
||||
)
|
||||
invalid_document = build(
|
||||
:captain_document,
|
||||
assistant: assistant,
|
||||
account: account,
|
||||
status: :available,
|
||||
sync_status: :synced,
|
||||
last_synced_at: 2.days.ago,
|
||||
last_sync_attempted_at: 3.days.ago,
|
||||
external_link: 'https://example.com/'
|
||||
)
|
||||
invalid_document.save!(validate: false)
|
||||
valid_document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
valid_document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago)
|
||||
clear_enqueued_jobs
|
||||
|
||||
described_class.new.perform
|
||||
|
||||
expect(Captain::Documents::PerformSyncJob).to have_been_enqueued.with(valid_document)
|
||||
expect { job.perform }
|
||||
.to sync_job_for(document)
|
||||
.at(sync_execution_delay.from_now)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
context 'when more documents are due than the account cap allows' do
|
||||
before do
|
||||
stub_const("#{described_class}::PER_ACCOUNT_HOURLY_CAP", 2)
|
||||
update_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', 2)
|
||||
end
|
||||
|
||||
it 'queues backfilled and oldest-attempted documents first' do
|
||||
@ -195,26 +193,47 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do
|
||||
oldest_document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
backfilled_document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
|
||||
newest_document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago)
|
||||
oldest_document.update!(sync_status: :synced, last_synced_at: 3.days.ago, last_sync_attempted_at: 3.days.ago)
|
||||
backfilled_document.update!(sync_status: :synced, last_synced_at: 4.days.ago, last_sync_attempted_at: nil)
|
||||
newest_document.update!(sync_status: :synced, last_synced_at: 8.days.ago, last_sync_attempted_at: 8.days.ago)
|
||||
oldest_document.update!(sync_status: :synced, last_synced_at: 9.days.ago, last_sync_attempted_at: 9.days.ago)
|
||||
backfilled_document.update!(sync_status: :synced, last_synced_at: 10.days.ago, last_sync_attempted_at: nil)
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(backfilled_document)
|
||||
.and have_enqueued_job(Captain::Documents::PerformSyncJob).with(oldest_document)
|
||||
.to sync_job_for(backfilled_document)
|
||||
.and sync_job_for(oldest_document)
|
||||
expect(Captain::Documents::PerformSyncJob).not_to have_been_enqueued.with(newest_document)
|
||||
end
|
||||
end
|
||||
|
||||
context 'when sync caps are configured' do
|
||||
it 'uses installation config caps for per-account and global limits' do
|
||||
update_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', 2)
|
||||
update_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT', 3)
|
||||
|
||||
second_account = create(:account, custom_attributes: { plan_name: 'business' })
|
||||
second_account.enable_features!('captain_document_auto_sync')
|
||||
second_assistant = create(:captain_assistant, account: second_account)
|
||||
|
||||
first_account_documents = create_list(:captain_document, 3, assistant: assistant, account: account, status: :available)
|
||||
second_account_documents = create_list(:captain_document, 3, assistant: second_assistant, account: second_account, status: :available)
|
||||
(first_account_documents + second_account_documents).each do |document|
|
||||
document.update!(sync_status: :synced, last_synced_at: 8.days.ago, last_sync_attempted_at: 8.days.ago)
|
||||
end
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob).exactly(3).times
|
||||
end
|
||||
end
|
||||
|
||||
context 'when an available document failed before the plan cadence' do
|
||||
it 'queues a sync for that document' do
|
||||
document = create(:captain_document, assistant: assistant, account: account, status: :available)
|
||||
document.update!(sync_status: :failed, last_sync_attempted_at: 2.days.ago)
|
||||
document.update!(sync_status: :failed, last_sync_attempted_at: 8.days.ago)
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document)
|
||||
.to sync_job_for(document)
|
||||
end
|
||||
end
|
||||
|
||||
@ -228,7 +247,7 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do
|
||||
clear_enqueued_jobs
|
||||
|
||||
expect { described_class.new.perform }
|
||||
.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document)
|
||||
.to sync_job_for(document)
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@ -0,0 +1,42 @@
|
||||
require 'rails_helper'
|
||||
|
||||
RSpec.describe Internal::TriggerDailyScheduledItemsJob do
|
||||
before do
|
||||
allow(ChatwootHub).to receive(:installation_identifier).and_return('test-installation-id')
|
||||
allow(Captain::Documents::ScheduleSyncsJob).to receive(:perform_later)
|
||||
end
|
||||
|
||||
it 'enqueues enterprise Captain document auto-sync every day' do
|
||||
travel_to Time.zone.parse('2026-05-26 00:00:00 UTC') do
|
||||
described_class.perform_now
|
||||
end
|
||||
|
||||
expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('enterprise')
|
||||
end
|
||||
|
||||
it 'enqueues business Captain document auto-sync weekly' do
|
||||
travel_to Time.zone.parse('2026-05-24 00:00:00 UTC') do
|
||||
described_class.perform_now
|
||||
end
|
||||
|
||||
expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('business')
|
||||
end
|
||||
|
||||
it 'enqueues startup Captain document auto-sync monthly' do
|
||||
travel_to Time.zone.parse('2026-06-01 00:00:00 UTC') do
|
||||
described_class.perform_now
|
||||
end
|
||||
|
||||
expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('startups')
|
||||
end
|
||||
|
||||
it 'does not enqueue business or startup Captain document auto-sync before their plan window' do
|
||||
travel_to Time.zone.parse('2026-05-25 00:00:00 UTC') do
|
||||
described_class.perform_now
|
||||
end
|
||||
|
||||
expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('enterprise')
|
||||
expect(Captain::Documents::ScheduleSyncsJob).not_to have_received(:perform_later).with('business')
|
||||
expect(Captain::Documents::ScheduleSyncsJob).not_to have_received(:perform_later).with('startups')
|
||||
end
|
||||
end
|
||||
Loading…
Reference in New Issue
Block a user