diff --git a/app/javascript/dashboard/components-next/captain/assistant/DocumentBulkActions.vue b/app/javascript/dashboard/components-next/captain/assistant/DocumentBulkActions.vue index 378860b3e2f..fa48885768c 100644 --- a/app/javascript/dashboard/components-next/captain/assistant/DocumentBulkActions.vue +++ b/app/javascript/dashboard/components-next/captain/assistant/DocumentBulkActions.vue @@ -25,7 +25,7 @@ const store = useStore(); const bulkDeleteDialog = ref(null); const isSyncableDocument = doc => - !doc.pdf_document && doc.status === 'available' && !doc.sync_in_progress; + !doc.pdf_document && doc.status === 'available'; const syncableSelectedIds = computed(() => { if (!props.selectedIds.size) return []; diff --git a/app/javascript/dashboard/components-next/captain/assistant/DocumentCard.vue b/app/javascript/dashboard/components-next/captain/assistant/DocumentCard.vue index c8a011b9ae6..9d6d574ecdd 100644 --- a/app/javascript/dashboard/components-next/captain/assistant/DocumentCard.vue +++ b/app/javascript/dashboard/components-next/captain/assistant/DocumentCard.vue @@ -127,7 +127,6 @@ const menuItems = computed(() => { value: 'sync', action: 'sync', icon: 'i-lucide-refresh-cw', - disabled: props.syncInProgress, }); } diff --git a/app/jobs/internal/trigger_daily_scheduled_items_job.rb b/app/jobs/internal/trigger_daily_scheduled_items_job.rb index 80b92ea6b46..b09193e927b 100644 --- a/app/jobs/internal/trigger_daily_scheduled_items_job.rb +++ b/app/jobs/internal/trigger_daily_scheduled_items_job.rb @@ -23,3 +23,5 @@ class Internal::TriggerDailyScheduledItemsJob < ApplicationJob @designated_minute ||= Digest::MD5.hexdigest(ChatwootHub.installation_identifier).hex % 1440 end end + +Internal::TriggerDailyScheduledItemsJob.prepend_mod_with('Internal::TriggerDailyScheduledItemsJob') diff --git a/config/installation_config.yml b/config/installation_config.yml index e374d0948b6..673c6df4cf1 100644 --- a/config/installation_config.yml +++ b/config/installation_config.yml @@ -215,6 +215,18 @@ value: locked: false type: code +- name: CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT + display_title: 'Captain Document Auto Sync Per Account Batch Limit' + description: 'Maximum syncable Captain documents to enqueue per account in one scheduler run. Defaults to 50.' + value: 50 + locked: false + type: number +- name: CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT + display_title: 'Captain Document Auto Sync Global Batch Limit' + description: 'Maximum syncable Captain documents to enqueue globally in one scheduler run. Defaults to 1000.' + value: 1000 + locked: false + type: number # End of Captain Config # ------- Context.dev Config ------- # diff --git a/enterprise/app/controllers/api/v1/accounts/captain/bulk_actions_controller.rb b/enterprise/app/controllers/api/v1/accounts/captain/bulk_actions_controller.rb index b9a6bbcc5f1..7e2817f69d0 100644 --- a/enterprise/app/controllers/api/v1/accounts/captain/bulk_actions_controller.rb +++ b/enterprise/app/controllers/api/v1/accounts/captain/bulk_actions_controller.rb @@ -75,7 +75,6 @@ class Api::V1::Accounts::Captain::BulkActionsController < Api::V1::Accounts::Bas Current.account.captain_documents.where(id: params[:ids]).find_each(batch_size: 100) do |document| next unless document.syncable? next unless document.available? - next if document.sync_in_progress? document.update!( sync_status: :syncing, diff --git a/enterprise/app/controllers/api/v1/accounts/captain/documents_controller.rb b/enterprise/app/controllers/api/v1/accounts/captain/documents_controller.rb index 23f4104997b..273c082b13e 100644 --- a/enterprise/app/controllers/api/v1/accounts/captain/documents_controller.rb +++ b/enterprise/app/controllers/api/v1/accounts/captain/documents_controller.rb @@ -37,7 +37,6 @@ class Api::V1::Accounts::Captain::DocumentsController < Api::V1::Accounts::BaseC def sync return render_could_not_create_error(I18n.t('captain.documents.sync_not_supported_for_pdf')) unless @document.syncable? return render_could_not_create_error(I18n.t('captain.documents.sync_only_available_documents')) unless @document.available? - return render_could_not_create_error(I18n.t('captain.documents.sync_already_in_progress')) if @document.sync_in_progress? @document.update!( sync_status: :syncing, diff --git a/enterprise/app/jobs/captain/documents/perform_sync_job.rb b/enterprise/app/jobs/captain/documents/perform_sync_job.rb index eaef7d64d9e..2a33db3f322 100644 --- a/enterprise/app/jobs/captain/documents/perform_sync_job.rb +++ b/enterprise/app/jobs/captain/documents/perform_sync_job.rb @@ -48,9 +48,7 @@ class Captain::Documents::PerformSyncJob < MutexApplicationJob return if document.pdf_document? with_lock(lock_key(document), LOCK_TIMEOUT) do - mark_sync_started(document) - result = Captain::Documents::SyncService.new(document.reload).perform - log_sync_outcome(document, result: result, duration_ms: duration_ms_since(start_time)) + perform_sync(document, start_time) end rescue LockAcquisitionError log_sync_outcome(document, result: :already_syncing) @@ -64,6 +62,12 @@ class Captain::Documents::PerformSyncJob < MutexApplicationJob private + def perform_sync(document, start_time) + mark_sync_started(document) + result = Captain::Documents::SyncService.new(document.reload).perform + log_sync_outcome(document, result: result, duration_ms: duration_ms_since(start_time)) + end + def log_sync_outcome(document, **fields) payload = { document_id: document.id, diff --git a/enterprise/app/jobs/captain/documents/schedule_syncs_job.rb b/enterprise/app/jobs/captain/documents/schedule_syncs_job.rb index 393a307db55..82693c0bbde 100644 --- a/enterprise/app/jobs/captain/documents/schedule_syncs_job.rb +++ b/enterprise/app/jobs/captain/documents/schedule_syncs_job.rb @@ -1,21 +1,27 @@ class Captain::Documents::ScheduleSyncsJob < ApplicationJob queue_as :scheduled_jobs - PER_ACCOUNT_HOURLY_CAP = 50 - GLOBAL_HOURLY_CAP = 1000 - DUE_DOCUMENT_BATCH_SIZE = PER_ACCOUNT_HOURLY_CAP * 2 # Inspite of skipping, we should at least reach the hourly cap + DEFAULT_PER_ACCOUNT_BATCH_LIMIT = 50 + DEFAULT_GLOBAL_BATCH_LIMIT = 1000 SYNC_STALE_TIMEOUT = Captain::Document::SYNC_STALE_TIMEOUT + DAILY_SYNC_JITTER = 4.hours + WEEKLY_SYNC_JITTER = 1.day + MONTHLY_SYNC_JITTER = 4.days - def perform - @remaining_global_capacity = GLOBAL_HOURLY_CAP + def perform(plan_name = nil) + @per_account_batch_limit = configured_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', DEFAULT_PER_ACCOUNT_BATCH_LIMIT) + @global_batch_limit = configured_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT', DEFAULT_GLOBAL_BATCH_LIMIT) + @remaining_global_capacity = @global_batch_limit + @plan_name = plan_name.to_s.downcase.presence sync_intervals = Enterprise::Account.captain_document_sync_intervals - stats = { accounts_scanned: 0, accounts_enabled: 0, accounts_scheduled: 0, documents_enqueued: 0, documents_skipped: 0 } + stats = { accounts_scanned: 0, accounts_enabled: 0, accounts_scheduled: 0, documents_enqueued: 0 } Account.joins(:captain_documents).distinct.find_each(batch_size: 100) do |account| break if @remaining_global_capacity <= 0 stats[:accounts_scanned] += 1 next unless account.feature_enabled?('captain_document_auto_sync') + next unless account_in_selected_plan?(account) stats[:accounts_enabled] += 1 interval = account.captain_document_sync_interval(sync_intervals) @@ -24,7 +30,6 @@ class Captain::Documents::ScheduleSyncsJob < ApplicationJob stats[:accounts_scheduled] += 1 result = enqueue_due_documents(account, interval) stats[:documents_enqueued] += result[:enqueued] - stats[:documents_skipped] += result[:skipped] end log_scheduler_summary(stats) @@ -33,92 +38,85 @@ class Captain::Documents::ScheduleSyncsJob < ApplicationJob private def enqueue_due_documents(account, interval) - per_account_limit = [PER_ACCOUNT_HOURLY_CAP, @remaining_global_capacity].min - result = { enqueued: 0, skipped: 0 } - skipped_document_ids = [] + per_account_limit = [@per_account_batch_limit, @remaining_global_capacity].min + result = { enqueued: 0 } - while result[:enqueued] < per_account_limit - - documents = due_documents(account, interval, skipped_document_ids).limit(DUE_DOCUMENT_BATCH_SIZE).to_a - break if documents.empty? - - documents.each do |document| - break if result[:enqueued] >= per_account_limit - - process_due_document(document, result, skipped_document_ids) - end + due_documents(account, interval).limit(per_account_limit).to_a.each do |document| + process_due_document(document, interval, result) end result end - def process_due_document(document, result, skipped_document_ids) - return unless document.syncable? + def process_due_document(document, interval, result) + sync_execution_delay = sync_jitter(interval) - # Reserve the sync slot before enqueueing so later scheduler runs skip this document while the job is queued. - unless reserve_sync_slot(document) - result[:skipped] += 1 - skipped_document_ids << document.id - return - end - - Captain::Documents::PerformSyncJob.perform_later(document) + Captain::Documents::PerformSyncJob.set(queue: :purgable, wait: sync_execution_delay).perform_later(document) @remaining_global_capacity -= 1 result[:enqueued] += 1 end - def due_documents(account, interval, skipped_document_ids) + def due_documents(account, interval) syncing = Captain::Document.sync_statuses[:syncing] synced = Captain::Document.sync_statuses[:synced] failed = Captain::Document.sync_statuses[:failed] + stale_cutoff = SYNC_STALE_TIMEOUT.ago + # The scheduler runs at predictable plan windows. Use a wider due window so + # jittered executions do not miss the next window just because they finished later. + sync_due_before = due_window(interval).ago documents = account.captain_documents.syncable.where(status: :available).where( '(sync_status = ? AND last_synced_at < ?) OR (sync_status = ? AND last_sync_attempted_at < ?) OR ' \ '(sync_status = ? AND last_sync_attempted_at < ?)', - synced, interval.ago, failed, interval.ago, syncing, SYNC_STALE_TIMEOUT.ago + synced, sync_due_before, failed, sync_due_before, syncing, stale_cutoff ) - documents = documents.where.not(id: skipped_document_ids) if skipped_document_ids.present? documents.order(Arel.sql('last_sync_attempted_at ASC NULLS FIRST'), :id) end - def reserve_sync_slot(document) - mark_sync_started(document) - true - rescue ActiveRecord::RecordInvalid => e - log_document_skip(document, e) - false + def configured_sync_limit(config_key, default) + configured_value = InstallationConfig.find_by(name: config_key)&.value + limit = configured_value.to_s.to_i + limit.positive? ? limit : default end - def log_document_skip(document, error) - payload = { - event: 'document_skipped', - document_id: document.id, - account_id: document.account_id, - assistant_id: document.assistant_id, - error_class: error.class.name, - error_message: error.message, - validation_errors: document.errors.full_messages - } + def account_in_selected_plan?(account) + return true if @plan_name.blank? - Rails.logger.warn("[Captain::Documents::ScheduleSyncsJob] #{payload.to_json}") + account_sync_plan(account) == @plan_name + end + + def account_sync_plan(account) + plan = account.custom_attributes['plan_name'] + plan = 'enterprise' if plan.blank? && ChatwootApp.self_hosted_enterprise? + plan.to_s.downcase.presence + end + + def sync_jitter(interval) + jitter_window = if interval <= 1.day + DAILY_SYNC_JITTER + elsif interval <= 1.week + WEEKLY_SYNC_JITTER + else + MONTHLY_SYNC_JITTER + end + + rand(0..jitter_window.to_i).seconds + end + + def due_window(interval) + (interval.to_i / 2).seconds end def log_scheduler_summary(stats) payload = { event: 'completed', + plan_name: @plan_name, global_cap_hit: @remaining_global_capacity <= 0, + per_account_batch_limit: @per_account_batch_limit, + global_batch_limit: @global_batch_limit, remaining_global_capacity: @remaining_global_capacity }.merge(stats) Rails.logger.info("[Captain::Documents::ScheduleSyncsJob] #{payload.to_json}") end - - def mark_sync_started(document) - document.update!( - sync_status: :syncing, - sync_step: nil, - last_sync_error_code: nil, - last_sync_attempted_at: Time.current - ) - end end diff --git a/enterprise/app/jobs/enterprise/internal/trigger_daily_scheduled_items_job.rb b/enterprise/app/jobs/enterprise/internal/trigger_daily_scheduled_items_job.rb new file mode 100644 index 00000000000..aa22310fb9d --- /dev/null +++ b/enterprise/app/jobs/enterprise/internal/trigger_daily_scheduled_items_job.rb @@ -0,0 +1,19 @@ +module Enterprise::Internal::TriggerDailyScheduledItemsJob + def perform + super + + Captain::Documents::ScheduleSyncsJob.perform_later('enterprise') + Captain::Documents::ScheduleSyncsJob.perform_later('business') if business_auto_sync_due? + Captain::Documents::ScheduleSyncsJob.perform_later('startups') if startup_auto_sync_due? + end + + private + + def business_auto_sync_due? + Time.current.utc.sunday? + end + + def startup_auto_sync_due? + Time.current.utc.day == 1 + end +end diff --git a/enterprise/app/jobs/enterprise/internal/trigger_hourly_scheduled_items_job.rb b/enterprise/app/jobs/enterprise/internal/trigger_hourly_scheduled_items_job.rb deleted file mode 100644 index 9d3baa2d6f3..00000000000 --- a/enterprise/app/jobs/enterprise/internal/trigger_hourly_scheduled_items_job.rb +++ /dev/null @@ -1,7 +0,0 @@ -module Enterprise::Internal::TriggerHourlyScheduledItemsJob - def perform - super - - Captain::Documents::ScheduleSyncsJob.perform_later - end -end diff --git a/spec/enterprise/controllers/api/v1/accounts/captain/bulk_actions_controller_spec.rb b/spec/enterprise/controllers/api/v1/accounts/captain/bulk_actions_controller_spec.rb index 744fa4ea808..31a0bc36fa2 100644 --- a/spec/enterprise/controllers/api/v1/accounts/captain/bulk_actions_controller_spec.rb +++ b/spec/enterprise/controllers/api/v1/accounts/captain/bulk_actions_controller_spec.rb @@ -147,7 +147,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::BulkActions', type: :request do params: sync_params, headers: admin.create_new_auth_token, as: :json - end.to have_enqueued_job(Captain::Documents::PerformSyncJob).exactly(documents.size).times + end.to have_enqueued_job(Captain::Documents::PerformSyncJob).on_queue('low').exactly(documents.size).times documents.each do |document| expect(document.reload).to have_attributes( @@ -190,7 +190,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::BulkActions', type: :request do expect(response).to have_http_status(:ok) end - it 'skips documents that already have a sync in progress' do + it 'queues documents that already have a sync in progress' do syncing_document = create(:captain_document, assistant: assistant, account: account, status: :available) syncing_document.update!(sync_status: :syncing, last_sync_attempted_at: 1.minute.ago) @@ -199,9 +199,10 @@ RSpec.describe 'Api::V1::Accounts::Captain::BulkActions', type: :request do params: sync_params.merge(ids: [syncing_document.id]), headers: admin.create_new_auth_token, as: :json - end.not_to have_enqueued_job(Captain::Documents::PerformSyncJob) + end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(syncing_document).on_queue('low') expect(response).to have_http_status(:ok) + expect(json_response).to eq({ ids: [syncing_document.id], count: 1 }) end it 'queues stale syncing documents again' do diff --git a/spec/enterprise/controllers/api/v1/accounts/captain/documents_controller_spec.rb b/spec/enterprise/controllers/api/v1/accounts/captain/documents_controller_spec.rb index cb7731652be..77cb25f493f 100644 --- a/spec/enterprise/controllers/api/v1/accounts/captain/documents_controller_spec.rb +++ b/spec/enterprise/controllers/api/v1/accounts/captain/documents_controller_spec.rb @@ -243,7 +243,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::Documents', type: :request do before do create_list(:captain_document, 5, assistant: assistant, account: account) - create(:installation_config, name: 'CAPTAIN_CLOUD_PLAN_LIMITS', value: captain_limits.to_json) + InstallationConfig.find_or_initialize_by(name: 'CAPTAIN_CLOUD_PLAN_LIMITS').update!(value: captain_limits.to_json) post "/api/v1/accounts/#{account.id}/captain/documents", params: valid_attributes, headers: admin.create_new_auth_token @@ -281,7 +281,7 @@ RSpec.describe 'Api::V1::Accounts::Captain::Documents', type: :request do expect do post "/api/v1/accounts/#{account.id}/captain/documents/#{document.id}/sync", headers: admin.create_new_auth_token, as: :json - end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document) + end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document).on_queue('low') expect(document.reload).to have_attributes( sync_status: 'syncing', @@ -292,15 +292,15 @@ RSpec.describe 'Api::V1::Accounts::Captain::Documents', type: :request do expect(response).to have_http_status(:accepted) end - it 'rejects documents that already have a sync in progress' do + it 'queues documents that already have a sync in progress' do document.update!(sync_status: :syncing, last_sync_attempted_at: 1.minute.ago) expect do post "/api/v1/accounts/#{account.id}/captain/documents/#{document.id}/sync", headers: admin.create_new_auth_token, as: :json - end.not_to have_enqueued_job(Captain::Documents::PerformSyncJob) + end.to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document).on_queue('low') - expect(response).to have_http_status(:unprocessable_entity) + expect(response).to have_http_status(:accepted) end it 'queues stale syncing documents again' do diff --git a/spec/enterprise/jobs/captain/documents/perform_sync_job_spec.rb b/spec/enterprise/jobs/captain/documents/perform_sync_job_spec.rb new file mode 100644 index 00000000000..dfec1c9d75c --- /dev/null +++ b/spec/enterprise/jobs/captain/documents/perform_sync_job_spec.rb @@ -0,0 +1,60 @@ +require 'rails_helper' + +RSpec.describe Captain::Documents::PerformSyncJob, type: :job do + let(:account) { create(:account) } + let(:assistant) { create(:captain_assistant, account: account) } + let(:document) { create(:captain_document, assistant: assistant, account: account, status: :available) } + + def stub_lock(job) + allow(job).to receive(:with_lock).and_yield + end + + def stub_page_fetch(content: 'Updated content') + fetch_result = Captain::Documents::SinglePageFetcher::Result.new( + success: true, + title: 'Updated title', + content: content + ) + fetcher = instance_double(Captain::Documents::SinglePageFetcher, fetch: fetch_result) + allow(Captain::Documents::SinglePageFetcher).to receive(:new).and_return(fetcher) + end + + def stub_page_fetch_failure + fetcher = instance_double(Captain::Documents::SinglePageFetcher) + allow(fetcher).to receive(:fetch).and_raise(StandardError, 'boom') + allow(Captain::Documents::SinglePageFetcher).to receive(:new).and_return(fetcher) + end + + it 'syncs the document content' do + travel_to Time.zone.local(2026, 5, 18, 10, 0, 0) do + job = described_class.new + stub_lock(job) + stub_page_fetch + + job.perform(document) + + expect(document.reload).to have_attributes( + sync_status: 'synced', + last_sync_attempted_at: Time.current, + last_synced_at: Time.current, + content: 'Updated content' + ) + end + end + + it 'marks unexpected failures as failed' do + travel_to Time.zone.local(2026, 5, 18, 10, 0, 0) do + job = described_class.new + stub_lock(job) + stub_page_fetch_failure + + expect { job.perform(document) }.to raise_error(StandardError, 'boom') + + expect(document.reload).to have_attributes( + sync_status: 'failed', + last_sync_error_code: 'sync_error', + last_sync_attempted_at: Time.current + ) + end + end +end diff --git a/spec/enterprise/jobs/captain/documents/schedule_syncs_job_spec.rb b/spec/enterprise/jobs/captain/documents/schedule_syncs_job_spec.rb index 76604a0b5fb..3a241d0a1c2 100644 --- a/spec/enterprise/jobs/captain/documents/schedule_syncs_job_spec.rb +++ b/spec/enterprise/jobs/captain/documents/schedule_syncs_job_spec.rb @@ -5,11 +5,28 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do let(:assistant) { create(:captain_assistant, account: account) } before do - create(:installation_config, name: 'CAPTAIN_DOCUMENT_AUTO_SYNC_INTERVALS', value: { business: 24, hacker: nil }.to_json) + set_installation_config('CAPTAIN_DOCUMENT_AUTO_SYNC_INTERVALS', { business: 168, enterprise: 24, startups: 720, hacker: nil }.to_json) + set_installation_config('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', 50) + set_installation_config('CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT', 1000) account.enable_features!('captain_document_auto_sync') clear_enqueued_jobs end + def set_installation_config(name, value) + InstallationConfig.find_or_initialize_by(name: name).tap do |config| + config.value = value + config.save! + end + end + + def update_sync_limit(name, value) + InstallationConfig.find_by!(name: name).update!(value: value) + end + + def sync_job_for(document) + have_enqueued_job(Captain::Documents::PerformSyncJob).with(document) + end + context 'when the account has not enabled auto-sync' do before { account.disable_features!('captain_document_auto_sync') } @@ -32,6 +49,25 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do end end + context 'when a plan name is passed' do + it 'queues due documents only for that plan' do + enterprise_account = create(:account, custom_attributes: { plan_name: 'Enterprise' }) + enterprise_account.enable_features!('captain_document_auto_sync') + enterprise_assistant = create(:captain_assistant, account: enterprise_account) + business_document = create(:captain_document, assistant: assistant, account: account, status: :available) + enterprise_document = create(:captain_document, assistant: enterprise_assistant, account: enterprise_account, status: :available) + + business_document.update!(sync_status: :synced, last_synced_at: 3.days.ago, last_sync_attempted_at: 3.days.ago) + enterprise_document.update!(sync_status: :synced, last_synced_at: 3.days.ago, last_sync_attempted_at: 3.days.ago) + clear_enqueued_jobs + + described_class.new.perform('enterprise') + + expect(Captain::Documents::PerformSyncJob).to have_been_enqueued.with(enterprise_document) + expect(Captain::Documents::PerformSyncJob).not_to have_been_enqueued.with(business_document) + end + end + context 'when an available document has backfilled sync metadata' do it 'leaves it alone when last synced within the plan cadence' do create( @@ -54,53 +90,34 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do account: account, status: :available, sync_status: :synced, - last_synced_at: 3.days.ago + last_synced_at: 8.days.ago ) clear_enqueued_jobs expect { described_class.new.perform } - .to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document) + .to sync_job_for(document).on_queue('purgable') end - it 'marks the due document as syncing before queueing' do + it 'delays only the queued sync job' do travel_to Time.zone.local(2026, 4, 27, 10, 0, 0) do + job = described_class.new + allow(job).to receive(:rand).and_return(30.minutes.to_i) document = create( :captain_document, assistant: assistant, account: account, status: :available, sync_status: :synced, - last_synced_at: 3.days.ago + last_synced_at: 8.days.ago ) clear_enqueued_jobs - described_class.new.perform + job.perform - expect(document.reload).to have_attributes( - sync_status: 'syncing', - last_sync_attempted_at: Time.current - ) + expect(Captain::Documents::PerformSyncJob) + .to have_been_enqueued.with(document).at(30.minutes.from_now) end end - - it 'does not queue the same document again while the reserved sync is fresh' do - document = create( - :captain_document, - assistant: assistant, - account: account, - status: :available, - sync_status: :synced, - last_synced_at: 2.days.ago - ) - clear_enqueued_jobs - - expect { described_class.new.perform } - .to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document) - - clear_enqueued_jobs - - expect { described_class.new.perform }.not_to have_enqueued_job(Captain::Documents::PerformSyncJob) - end end context 'when an available document was synced within the plan cadence' do @@ -116,78 +133,59 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do context 'when an available document was last synced before the plan cadence' do it 'queues a sync for that document' do document = create(:captain_document, assistant: assistant, account: account, status: :available) - document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago) + document.update!(sync_status: :synced, last_synced_at: 8.days.ago, last_sync_attempted_at: 8.days.ago) clear_enqueued_jobs expect { described_class.new.perform } - .to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document) + .to sync_job_for(document) + end + end + + context 'when jitter spreads queued sync execution' do + it 'uses a widened due window so jittered syncs do not skip the next plan run' do + travel_to Time.zone.local(2026, 4, 27, 10, 0, 0) do + job = described_class.new + interval = 1.week + due_window = (interval.to_i / 2).seconds + allow(job).to receive(:rand).and_return(2.hours.to_i) + document = create(:captain_document, assistant: assistant, account: account, status: :available) + + document.update!(sync_status: :synced, last_synced_at: (due_window - 1.minute).ago) + clear_enqueued_jobs + + expect { job.perform }.not_to have_enqueued_job(Captain::Documents::PerformSyncJob) + + document.update!(sync_status: :synced, last_synced_at: (due_window + 1.minute).ago) + clear_enqueued_jobs + + expect { job.perform } + .to have_enqueued_job(Captain::Documents::PerformSyncJob) + .with(document) + .on_queue('purgable') + .at(2.hours.from_now) + end end - it 'skips invalid legacy documents without counting them against the account cap' do - stub_const("#{described_class}::PER_ACCOUNT_HOURLY_CAP", 1) - create( - :captain_document, - assistant: assistant, - account: account, - status: :in_progress, - content: nil, - external_link: 'https://example.com' - ) - invalid_document = build( - :captain_document, - assistant: assistant, - account: account, - status: :available, - sync_status: :synced, - last_synced_at: 2.days.ago, - last_sync_attempted_at: 2.days.ago, - external_link: 'https://example.com/' - ) - invalid_document.save!(validate: false) - valid_document = create(:captain_document, assistant: assistant, account: account, status: :available) - valid_document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago) - clear_enqueued_jobs + it 'uses a random delay inside the cadence window' do + travel_to Time.zone.local(2026, 4, 27, 10, 0, 0) do + document = create(:captain_document, assistant: assistant, account: account, status: :available) + document.update!(sync_status: :synced, last_synced_at: 8.days.ago) + job = described_class.new + sync_execution_delay = 12_345.seconds - expect { described_class.new.perform }.not_to raise_error - expect(Captain::Documents::PerformSyncJob).not_to have_been_enqueued.with(invalid_document) - expect(Captain::Documents::PerformSyncJob).to have_been_enqueued.with(valid_document) - end + clear_enqueued_jobs + allow(job).to receive(:rand).with(0..described_class::WEEKLY_SYNC_JITTER.to_i).and_return(sync_execution_delay.to_i) - it 'keeps paging due documents when invalid documents fill the first batch' do - stub_const("#{described_class}::PER_ACCOUNT_HOURLY_CAP", 1) - stub_const("#{described_class}::DUE_DOCUMENT_BATCH_SIZE", 1) - create( - :captain_document, - assistant: assistant, - account: account, - status: :in_progress, - content: nil, - external_link: 'https://example.com' - ) - invalid_document = build( - :captain_document, - assistant: assistant, - account: account, - status: :available, - sync_status: :synced, - last_synced_at: 2.days.ago, - last_sync_attempted_at: 3.days.ago, - external_link: 'https://example.com/' - ) - invalid_document.save!(validate: false) - valid_document = create(:captain_document, assistant: assistant, account: account, status: :available) - valid_document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago) - clear_enqueued_jobs - - described_class.new.perform - - expect(Captain::Documents::PerformSyncJob).to have_been_enqueued.with(valid_document) + expect { job.perform } + .to sync_job_for(document) + .at(sync_execution_delay.from_now) + end end end context 'when more documents are due than the account cap allows' do before do - stub_const("#{described_class}::PER_ACCOUNT_HOURLY_CAP", 2) + update_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', 2) end it 'queues backfilled and oldest-attempted documents first' do @@ -195,26 +193,47 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do oldest_document = create(:captain_document, assistant: assistant, account: account, status: :available) backfilled_document = create(:captain_document, assistant: assistant, account: account, status: :available) - newest_document.update!(sync_status: :synced, last_synced_at: 2.days.ago, last_sync_attempted_at: 2.days.ago) - oldest_document.update!(sync_status: :synced, last_synced_at: 3.days.ago, last_sync_attempted_at: 3.days.ago) - backfilled_document.update!(sync_status: :synced, last_synced_at: 4.days.ago, last_sync_attempted_at: nil) + newest_document.update!(sync_status: :synced, last_synced_at: 8.days.ago, last_sync_attempted_at: 8.days.ago) + oldest_document.update!(sync_status: :synced, last_synced_at: 9.days.ago, last_sync_attempted_at: 9.days.ago) + backfilled_document.update!(sync_status: :synced, last_synced_at: 10.days.ago, last_sync_attempted_at: nil) clear_enqueued_jobs expect { described_class.new.perform } - .to have_enqueued_job(Captain::Documents::PerformSyncJob).with(backfilled_document) - .and have_enqueued_job(Captain::Documents::PerformSyncJob).with(oldest_document) + .to sync_job_for(backfilled_document) + .and sync_job_for(oldest_document) expect(Captain::Documents::PerformSyncJob).not_to have_been_enqueued.with(newest_document) end end + context 'when sync caps are configured' do + it 'uses installation config caps for per-account and global limits' do + update_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_PER_ACCOUNT_BATCH_LIMIT', 2) + update_sync_limit('CAPTAIN_DOCUMENT_AUTO_SYNC_GLOBAL_BATCH_LIMIT', 3) + + second_account = create(:account, custom_attributes: { plan_name: 'business' }) + second_account.enable_features!('captain_document_auto_sync') + second_assistant = create(:captain_assistant, account: second_account) + + first_account_documents = create_list(:captain_document, 3, assistant: assistant, account: account, status: :available) + second_account_documents = create_list(:captain_document, 3, assistant: second_assistant, account: second_account, status: :available) + (first_account_documents + second_account_documents).each do |document| + document.update!(sync_status: :synced, last_synced_at: 8.days.ago, last_sync_attempted_at: 8.days.ago) + end + clear_enqueued_jobs + + expect { described_class.new.perform } + .to have_enqueued_job(Captain::Documents::PerformSyncJob).exactly(3).times + end + end + context 'when an available document failed before the plan cadence' do it 'queues a sync for that document' do document = create(:captain_document, assistant: assistant, account: account, status: :available) - document.update!(sync_status: :failed, last_sync_attempted_at: 2.days.ago) + document.update!(sync_status: :failed, last_sync_attempted_at: 8.days.ago) clear_enqueued_jobs expect { described_class.new.perform } - .to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document) + .to sync_job_for(document) end end @@ -228,7 +247,7 @@ RSpec.describe Captain::Documents::ScheduleSyncsJob, type: :job do clear_enqueued_jobs expect { described_class.new.perform } - .to have_enqueued_job(Captain::Documents::PerformSyncJob).with(document) + .to sync_job_for(document) end end diff --git a/spec/enterprise/jobs/enterprise/internal/trigger_daily_scheduled_items_job_spec.rb b/spec/enterprise/jobs/enterprise/internal/trigger_daily_scheduled_items_job_spec.rb new file mode 100644 index 00000000000..3afc20328a9 --- /dev/null +++ b/spec/enterprise/jobs/enterprise/internal/trigger_daily_scheduled_items_job_spec.rb @@ -0,0 +1,42 @@ +require 'rails_helper' + +RSpec.describe Internal::TriggerDailyScheduledItemsJob do + before do + allow(ChatwootHub).to receive(:installation_identifier).and_return('test-installation-id') + allow(Captain::Documents::ScheduleSyncsJob).to receive(:perform_later) + end + + it 'enqueues enterprise Captain document auto-sync every day' do + travel_to Time.zone.parse('2026-05-26 00:00:00 UTC') do + described_class.perform_now + end + + expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('enterprise') + end + + it 'enqueues business Captain document auto-sync weekly' do + travel_to Time.zone.parse('2026-05-24 00:00:00 UTC') do + described_class.perform_now + end + + expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('business') + end + + it 'enqueues startup Captain document auto-sync monthly' do + travel_to Time.zone.parse('2026-06-01 00:00:00 UTC') do + described_class.perform_now + end + + expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('startups') + end + + it 'does not enqueue business or startup Captain document auto-sync before their plan window' do + travel_to Time.zone.parse('2026-05-25 00:00:00 UTC') do + described_class.perform_now + end + + expect(Captain::Documents::ScheduleSyncsJob).to have_received(:perform_later).with('enterprise') + expect(Captain::Documents::ScheduleSyncsJob).not_to have_received(:perform_later).with('business') + expect(Captain::Documents::ScheduleSyncsJob).not_to have_received(:perform_later).with('startups') + end +end