chatwoot/app/services/reports/raw_data_source.rb
Shivam Mishra 6cbddbdb67
feat(rollup): report builder abstraction [2/3] (#13798)
## PR2: Report builder refactor — DataSource abstraction

The existing report builders (timeseries + summary) had their SQL
queries inlined — each builder constructed its own scopes, groupings,
and aggregations directly. This made it hard to swap the underlying data
source without duplicating builder logic.

This PR extracts all raw-event querying into a `Reports::RawDataSource`
behind a `Reports::DataSource` factory. Builders now call
`data_source.timeseries`, `.aggregate`, or `.summary` instead of
constructing queries themselves. Behavior is identical —
`DataSource.for(...)` returns `RawDataSource` in all cases today.

The timeseries path had two separate builders (`CountReportBuilder`,
`AverageReportBuilder`) that were selected via a metric-name case
statement in `Conversations::BaseReportBuilder`. These are replaced by a
single `ReportBuilder` that delegates to the data source. The metric
type (count vs average) is now decided inside the data source, not the
builder.

Summary builders similarly moved their inline SQL into
`RawDataSource#summary`, which returns a unified hash keyed by dimension
ID.
 the rollup read path.

## Flow

### Before

```
ReportsController ──▶ case metric ──▶ AverageReportBuilder ──▶ inline SQL ──▶ DB
                                  └──▶ CountReportBuilder   ──▶ inline SQL ──▶ DB

SummaryController ──▶ AgentSummaryBuilder ──▶ inline SQL ──▶ DB
                  └──▶ InboxSummaryBuilder ──▶ inline SQL ──▶ DB
                  └──▶ TeamSummaryBuilder  ──▶ inline SQL ──▶ DB
```

### After

```
ReportsController ──▶ ReportBuilder  ──┐
                                       ├──▶ DataSource.for ──▶ RawDataSource ──▶ DB
SummaryController ──▶ SummaryBuilder ──┘
```


### Expected (after rollup read path)

```
ReportsController ──▶ ReportBuilder  ──┐
                                       ├──▶ DataSource.for ──▶ RawDataSource    ──▶ reporting_events
SummaryController ──▶ SummaryBuilder ──┘                   └──▶ RollupDataSource ──▶ reporting_events_rollups
```

### What changed

- `Reports::DataSource` factory + `Reports::RawDataSource`
- `TimezoneHelper#timezone_name_from_params` — prefers IANA name, falls
back to offset
- Unified `Timeseries::ReportBuilder` replaces `CountReportBuilder` +
`AverageReportBuilder`
- Summary builders delegate to `DataSource` instead of querying directly

### How to test

This is a pure refactor — all existing report pages (Overview, Agent,
Inbox, Label, Team) should produce identical numbers. No feature flag or
new config needed.

---------

Co-authored-by: Muhsin Keloth <muhsinkeramam@gmail.com>
Co-authored-by: Tanmay Deep Sharma <tanmaydeepsharma21@gmail.com>
Co-authored-by: Tanmay Deep Sharma <32020192+tds-1@users.noreply.github.com>
2026-04-20 11:15:48 +05:30

157 lines
4.4 KiB
Ruby

class Reports::RawDataSource < Reports::DataSource
def timeseries
average_metric? ? average_timeseries : count_timeseries
end
def aggregate
average_metric? ? average_scope.average(average_value_key) : count_scope.count
end
def summary
metric_results = summary_scope
.select(*summary_select_fields)
.group(summary_group_by_key)
.index_by { |record| record.public_send(summary_index_key) }
merge_summary_results(metric_results, summary_conversation_counts)
end
private
def count_timeseries
grouped_count.map do |event_date, event_count|
{ value: event_count, timestamp: event_date.in_time_zone(timezone).to_i }
end
end
def average_timeseries
grouped_average_time = grouped_average_scope.average(average_value_key)
grouped_event_count = grouped_average_scope.count
grouped_average_time.each_with_object([]) do |(event_date, average_time), results|
results << {
value: average_time,
timestamp: event_date.in_time_zone(timezone).to_i,
count: grouped_event_count[event_date]
}
end
end
def grouped_average_scope
average_scope.group_by_period(
group_by,
:created_at,
default_value: 0,
range: range,
permit: %w[day week month year hour],
time_zone: timezone
)
end
def grouped_count
count_scope.group_by_period(
group_by,
:created_at,
default_value: 0,
range: range,
permit: %w[day week month year hour],
time_zone: timezone
).count
end
def average_scope
scope.reporting_events.where(name: raw_event_name, created_at: range, account_id: account.id)
end
def count_scope
case metric.to_s
when 'conversations_count'
scope.conversations.where(account_id: account.id, created_at: range)
when 'incoming_messages_count'
scope.messages.where(account_id: account.id, created_at: range).incoming.unscope(:order)
when 'outgoing_messages_count'
scope.messages.where(account_id: account.id, created_at: range).outgoing.unscope(:order)
else
reporting_event_count_scope
end
end
def reporting_event_count_scope
events = scope.reporting_events.where(
name: raw_event_name,
account_id: account.id,
created_at: range
)
return events unless raw_count_strategy == :distinct_conversation
events.joins(:conversation).select(:conversation_id).distinct
end
def summary_scope
scope = account.reporting_events.where(created_at: range)
return scope.joins(:conversation) if dimension_type == 'team'
scope
end
def summary_conversation_counts
account.conversations
.where(created_at: range)
.group(summary_conversation_group_by_key)
.count
end
def merge_summary_results(metric_results, conversation_counts)
(metric_results.keys | conversation_counts.keys).each_with_object({}) do |dimension_id, results|
record = metric_results[dimension_id]
results[dimension_id] = summary_attributes_for(record, conversation_counts[dimension_id])
end
end
def summary_select_fields
["#{summary_group_by_key} as #{summary_index_key}"] + summary_metrics.map { |definition| summary_select_field(definition) }
end
def summary_select_field(definition)
if definition.count?
"COUNT(CASE WHEN name = '#{definition.raw_event_name}' THEN 1 END) as #{definition.summary_key}"
else
"AVG(CASE WHEN name = '#{definition.raw_event_name}' THEN #{average_value_key} END) as #{definition.summary_key}"
end
end
def summary_attributes_for(record, conversations_count = 0)
summary_metrics.each_with_object({ conversations_count: conversations_count.to_i }) do |definition, attributes|
value = record&.public_send(definition.summary_key)
attributes[definition.summary_key] = definition.count? ? value.to_i : value
end
end
def summary_group_by_key
{
'account' => :account_id,
'agent' => :user_id,
'inbox' => :inbox_id,
'team' => 'conversations.team_id'
}[dimension_type]
end
def summary_conversation_group_by_key
{
'account' => :account_id,
'agent' => :assignee_id,
'inbox' => :inbox_id,
'team' => :team_id
}[dimension_type]
end
def summary_index_key
summary_group_by_key.to_s.split('.').last
end
def average_value_key
use_business_hours? ? :value_in_business_hours : :value
end
end