feat(langfuse): propagate observation metadata for evals (#14634)

# Pull Request Template

## Description

We need to pass on trace level attributes down to the spans inside them
like tool calls, observations, etc.
This way, we can filter observations based on trace level attributes.


## Type of change

- [x] Bug fix (non-breaking change which fixes an issue)

## How Has This Been Tested?

Please describe the tests that you ran to verify your changes. Provide
instructions so we can reproduce. Please also list any relevant details
for your test configuration.

Attributes added to observation metadata for easy filtering
<img width="1327" height="708" alt="image"
src="https://github.com/user-attachments/assets/8f1d1bf8-cde4-481d-a2c2-7920ad2fc52e"
/>

added a `generation_stage` to differentiate llm_calls that call tools vs
those that generate a `final_response`
<img width="1806" height="968" alt="CleanShot 2026-06-03 at 15 11 09@2x"
src="https://github.com/user-attachments/assets/db1fa8e0-7f2d-404b-a719-27a16d400442"
/>


propagated attributes to tool calls for future use
<img width="903" height="517" alt="image"
src="https://github.com/user-attachments/assets/edc61ce8-93db-465c-a66e-043138e2dc15"
/>



## Checklist:

- [x] My code follows the style guidelines of this project
- [x] I have performed a self-review of my code
- [x] I have commented on my code, particularly in hard-to-understand
areas
- [ ] I have made corresponding changes to the documentation
- [x] My changes generate no new warnings
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] New and existing unit tests pass locally with my changes
- [x] Any dependent changes have been merged and published in downstream
modules
This commit is contained in:
Aakash Bakhle 2026-06-03 16:45:19 +05:30 committed by GitHub
parent 18ef019cd4
commit eaffad12e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 246 additions and 52 deletions

View File

@ -10,6 +10,7 @@ module Captain::ChatGenerationRecorder
# Create a generation span with model and token info for Langfuse cost calculation.
# Note: span duration will be near-zero since we create and end it immediately, but token counts are what Langfuse uses for cost calculation.
tracer.in_span("llm.captain.#{feature_name}.generation") do |span|
apply_current_langfuse_attributes(span)
set_generation_span_attributes(span, chat, message)
end
rescue StandardError => e
@ -37,11 +38,23 @@ module Captain::ChatGenerationRecorder
ATTR_GEN_AI_USAGE_INPUT_TOKENS => message.input_tokens,
ATTR_GEN_AI_USAGE_OUTPUT_TOKENS => message.respond_to?(:output_tokens) ? message.output_tokens : nil,
ATTR_LANGFUSE_OBSERVATION_INPUT => format_input_messages(chat),
ATTR_LANGFUSE_OBSERVATION_OUTPUT => message.respond_to?(:content) ? message.content.to_s : nil
ATTR_LANGFUSE_OBSERVATION_OUTPUT => message.respond_to?(:content) ? message.content.to_s : nil,
format(ATTR_LANGFUSE_OBSERVATION_METADATA, 'generation_stage') => generation_stage(message)
}
end
def format_input_messages(chat)
chat.messages[0...-1].map { |m| { role: m.role.to_s, content: m.content.to_s } }.to_json
end
def generation_stage(message)
message_has_tool_calls?(message) ? 'tool_call' : 'final_response'
end
def message_has_tool_calls?(message)
return false unless message.respond_to?(:tool_calls)
tool_calls = message.tool_calls
tool_calls.respond_to?(:any?) && tool_calls.any?
end
end

View File

@ -10,12 +10,14 @@ module Captain::ToolInstrumentation
response = nil
executed = false
tracer.in_span(params[:span_name]) do |span|
set_tool_session_attributes(span, params)
response = yield
executed = true
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, response[:message] || response.to_json)
set_tool_session_error_attributes(span, response) if response.is_a?(Hash)
with_propagated_langfuse_attributes(params) do
tracer.in_span(params[:span_name]) do |span|
set_tool_session_attributes(span, params)
response = yield
executed = true
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, response[:message] || response.to_json)
set_tool_session_error_attributes(span, response) if response.is_a?(Hash)
end
end
response
rescue StandardError => e
@ -24,9 +26,7 @@ module Captain::ToolInstrumentation
end
def set_tool_session_attributes(span, params)
span.set_attribute(ATTR_LANGFUSE_USER_ID, params[:account_id].to_s) if params[:account_id]
span.set_attribute(ATTR_LANGFUSE_SESSION_ID, "#{params[:account_id]}_#{params[:conversation_id]}") if params[:conversation_id].present?
span.set_attribute(ATTR_LANGFUSE_TAGS, [params[:feature_name]].to_json)
set_metadata_attributes(span, params)
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:messages].to_json)
end
@ -43,6 +43,7 @@ module Captain::ToolInstrumentation
return unless message.respond_to?(:role) && message.role.to_s == 'assistant'
tracer.in_span("llm.#{event_name}.generation") do |span|
apply_current_langfuse_attributes(span)
span.set_attribute(ATTR_GEN_AI_PROVIDER, 'openai')
span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, model)
span.set_attribute(ATTR_GEN_AI_USAGE_INPUT_TOKENS, message.input_tokens)

View File

@ -29,16 +29,18 @@ module Integrations::LlmInstrumentation
result = nil
executed = false
tracer.in_span(params[:span_name]) do |span|
set_metadata_attributes(span, params)
with_propagated_langfuse_attributes(params) do
tracer.in_span(params[:span_name]) do |span|
set_metadata_attributes(span, params)
# By default, the input and output of a trace are set from the root observation
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:messages].to_json)
result = yield
executed = true
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, result.to_json)
set_error_attributes(span, result) if result.is_a?(Hash)
result
# By default, the input and output of a trace are set from the root observation
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:messages].to_json)
result = yield
executed = true
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, result.to_json)
set_error_attributes(span, result) if result.is_a?(Hash)
result
end
end
rescue StandardError => e
ChatwootExceptionTracker.new(e, account: resolve_account(params)).capture_exception
@ -51,6 +53,7 @@ module Integrations::LlmInstrumentation
return yield unless ChatwootApp.otel_enabled?
tracer.in_span(format(TOOL_SPAN_NAME, tool_name)) do |span|
apply_current_langfuse_attributes(span)
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_TYPE, 'tool')
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, arguments.to_json)
result = yield
@ -96,23 +99,6 @@ module Integrations::LlmInstrumentation
end
end
def instrument_with_span(span_name, params, &)
result = nil
executed = false
tracer.in_span(span_name) do |span|
track_result = lambda do |r|
executed = true
result = r
end
yield(span, track_result)
end
rescue StandardError => e
ChatwootExceptionTracker.new(e, account: resolve_account(params)).capture_exception
raise unless executed
result
end
private
def resolve_account(params)

View File

@ -10,7 +10,6 @@ module Integrations::LlmInstrumentationCompletionHelpers
span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, params[:model])
span.set_attribute('embedding.input_length', params[:input]&.length || 0)
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:input].to_s)
set_common_span_metadata(span, params)
end
def set_audio_transcription_span_attributes(span, params)
@ -18,7 +17,6 @@ module Integrations::LlmInstrumentationCompletionHelpers
span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, params[:model] || 'whisper-1')
span.set_attribute('audio.duration_seconds', params[:duration]) if params[:duration]
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:file_path].to_s) if params[:file_path]
set_common_span_metadata(span, params)
end
def set_moderation_span_attributes(span, params)
@ -26,12 +24,6 @@ module Integrations::LlmInstrumentationCompletionHelpers
span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, params[:model] || 'text-moderation-latest')
span.set_attribute('moderation.input_length', params[:input]&.length || 0)
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:input].to_s)
set_common_span_metadata(span, params)
end
def set_common_span_metadata(span, params)
span.set_attribute(ATTR_LANGFUSE_USER_ID, params[:account_id].to_s) if params[:account_id]
span.set_attribute(ATTR_LANGFUSE_TAGS, [params[:feature_name]].to_json) if params[:feature_name]
end
def set_embedding_result_attributes(span, result)

View File

@ -29,4 +29,5 @@ module Integrations::LlmInstrumentationConstants
ATTR_LANGFUSE_OBSERVATION_TYPE = 'langfuse.observation.type'
ATTR_LANGFUSE_OBSERVATION_INPUT = 'langfuse.observation.input'
ATTR_LANGFUSE_OBSERVATION_OUTPUT = 'langfuse.observation.output'
ATTR_LANGFUSE_OBSERVATION_METADATA = 'langfuse.observation.metadata.%s'
end

View File

@ -0,0 +1,41 @@
# frozen_string_literal: true
module Integrations::LlmInstrumentationContext
LANGFUSE_ATTRIBUTES_KEY = :llm_instrumentation_langfuse_attributes
LANGFUSE_OBSERVATION_METADATA_KEY = :llm_instrumentation_langfuse_observation_metadata_attributes
private
def with_propagated_langfuse_attributes(params)
previous_attributes = current_langfuse_attributes
previous_observation_metadata_attributes = current_observation_metadata_attributes
self.current_langfuse_attributes = previous_attributes.merge(propagated_langfuse_attributes(params))
self.current_observation_metadata_attributes = previous_observation_metadata_attributes.merge(propagated_observation_metadata_attributes(params))
yield
ensure
self.current_langfuse_attributes = previous_attributes
self.current_observation_metadata_attributes = previous_observation_metadata_attributes
end
def apply_current_langfuse_attributes(span)
set_langfuse_attributes(span, current_langfuse_attributes)
set_langfuse_attributes(span, current_observation_metadata_attributes)
end
def current_langfuse_attributes
ActiveSupport::IsolatedExecutionState[LANGFUSE_ATTRIBUTES_KEY] || {}
end
def current_langfuse_attributes=(attrs)
ActiveSupport::IsolatedExecutionState[LANGFUSE_ATTRIBUTES_KEY] = attrs
end
def current_observation_metadata_attributes
ActiveSupport::IsolatedExecutionState[LANGFUSE_OBSERVATION_METADATA_KEY] || {}
end
def current_observation_metadata_attributes=(attrs)
ActiveSupport::IsolatedExecutionState[LANGFUSE_OBSERVATION_METADATA_KEY] = attrs
end
end

View File

@ -2,6 +2,7 @@
module Integrations::LlmInstrumentationHelpers
include Integrations::LlmInstrumentationConstants
include Integrations::LlmInstrumentationContext
include Integrations::LlmInstrumentationCompletionHelpers
def determine_provider(model_name)
@ -51,15 +52,55 @@ module Integrations::LlmInstrumentationHelpers
end
def set_metadata_attributes(span, params)
session_id = params[:conversation_id].present? ? "#{params[:account_id]}_#{params[:conversation_id]}" : nil
span.set_attribute(ATTR_LANGFUSE_USER_ID, params[:account_id].to_s) if params[:account_id]
span.set_attribute(ATTR_LANGFUSE_SESSION_ID, session_id) if session_id.present?
span.set_attribute(ATTR_LANGFUSE_TAGS, [params[:feature_name]].to_json)
set_langfuse_attributes(span, current_langfuse_attributes.merge(propagated_langfuse_attributes(params)))
set_langfuse_attributes(span, current_observation_metadata_attributes.merge(propagated_observation_metadata_attributes(params)))
end
return unless params[:metadata].is_a?(Hash)
def propagated_langfuse_attributes(params)
attrs = {}
session_id = params[:conversation_id].present? ? "#{params[:account_id]}_#{params[:conversation_id]}" : nil
attrs[ATTR_LANGFUSE_USER_ID] = params[:account_id].to_s if params[:account_id]
attrs[ATTR_LANGFUSE_SESSION_ID] = session_id if session_id.present?
attrs[ATTR_LANGFUSE_TAGS] = [params[:feature_name].to_s] if params[:feature_name].present?
return attrs unless params[:metadata].is_a?(Hash)
params[:metadata].each do |key, value|
span.set_attribute(format(ATTR_LANGFUSE_METADATA, key), value.to_s)
attrs[format(ATTR_LANGFUSE_METADATA, key)] = value.to_s
end
attrs
end
def propagated_observation_metadata_attributes(params)
attrs = {}
session_id = params[:conversation_id].present? ? "#{params[:account_id]}_#{params[:conversation_id]}" : nil
add_observation_metadata(attrs, 'user_id', params[:account_id])
add_observation_metadata(attrs, 'account_id', params[:account_id])
add_observation_metadata(attrs, 'session_id', session_id)
add_observation_metadata(attrs, 'trace_tags', [params[:feature_name]].to_json)
add_observation_metadata(attrs, 'feature_name', params[:feature_name])
return attrs unless params[:metadata].is_a?(Hash)
params[:metadata].each do |key, value|
add_observation_metadata(attrs, key, value)
end
attrs
end
def add_observation_metadata(attrs, key, value)
return if value.blank?
attrs[format(ATTR_LANGFUSE_OBSERVATION_METADATA, key)] = value.to_s
end
def set_langfuse_attributes(span, attrs)
attrs.each do |key, value|
span.set_attribute(key, value)
end
end
end

View File

@ -39,6 +39,7 @@ module Integrations::LlmInstrumentationSpans
tool_name = tool_call.name.to_s
span = tracer.start_span(format(TOOL_SPAN_NAME, tool_name))
apply_current_langfuse_attributes(span)
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_TYPE, 'tool')
span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, tool_call.arguments.to_json)
@ -61,6 +62,24 @@ module Integrations::LlmInstrumentationSpans
Rails.logger.warn "Failed to end tool span: #{e.message}"
end
def instrument_with_span(span_name, params, &)
result = nil
executed = false
tracer.in_span(span_name) do |span|
set_metadata_attributes(span, params)
track_result = lambda do |r|
executed = true
result = r
end
yield(span, track_result)
end
rescue StandardError => e
ChatwootExceptionTracker.new(e, account: resolve_account(params)).capture_exception
raise unless executed
result
end
private
def set_llm_turn_request_attributes(span, params)

View File

@ -72,7 +72,10 @@ module OpentelemetryConfig
config = {
endpoint: traces_endpoint,
headers: { 'Authorization' => "Basic #{auth_header}" }
headers: {
'Authorization' => "Basic #{auth_header}",
'x-langfuse-ingestion-version' => '4'
}
}
config[:ssl_verify_mode] = OpenSSL::SSL::VERIFY_NONE if Rails.env.development?

View File

@ -39,6 +39,30 @@ RSpec.describe Captain::Llm::AssistantChatService do
allow(mock_chat).to receive(:ask).and_return(mock_response)
service.generate_response(message_history: [{ role: 'user', content: 'Hello' }])
end
it 'marks final response generations for observation-level evaluators' do
service = described_class.new(assistant: assistant, conversation: conversation)
message = instance_double(RubyLLM::Message, content: 'Final answer', input_tokens: 10, output_tokens: 20, tool_calls: {})
attributes = service.send(:generation_attributes, mock_chat, message)
expect(attributes['langfuse.observation.metadata.generation_stage']).to eq('final_response')
end
it 'marks tool call generations separately from final responses' do
service = described_class.new(assistant: assistant, conversation: conversation)
message = instance_double(
RubyLLM::Message,
content: '',
input_tokens: 10,
output_tokens: 20,
tool_calls: { 'call_1' => instance_double(RubyLLM::ToolCall) }
)
attributes = service.send(:generation_attributes, mock_chat, message)
expect(attributes['langfuse.observation.metadata.generation_stage']).to eq('tool_call')
end
end
describe 'image analysis' do

View File

@ -144,7 +144,10 @@ RSpec.describe Integrations::LlmInstrumentation do
expect(mock_span).to have_received(:set_attribute).with('langfuse.user.id', '123')
expect(mock_span).to have_received(:set_attribute).with('langfuse.session.id', '123_456')
expect(mock_span).to have_received(:set_attribute).with('langfuse.trace.tags', '["reply_suggestion"]')
expect(mock_span).to have_received(:set_attribute).with('langfuse.trace.tags', ['reply_suggestion'])
expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.metadata.user_id', '123')
expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456')
expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'reply_suggestion')
end
it 'sets completion message attributes when result contains message' do
@ -253,6 +256,76 @@ RSpec.describe Integrations::LlmInstrumentation do
expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.output', result_data.to_json)
end
it 'propagates trace attributes as observation metadata to child tool spans' do
root_span = instance_double(OpenTelemetry::Trace::Span)
tool_span = instance_double(OpenTelemetry::Trace::Span)
tool_instance = test_class.new
allow(root_span).to receive(:set_attribute)
allow(tool_span).to receive(:set_attribute)
allow(instance).to receive(:tracer).and_return(mock_tracer)
allow(tool_instance).to receive(:tracer).and_return(mock_tracer)
allow(mock_tracer).to receive(:in_span).with('llm.test').and_yield(root_span)
allow(mock_tracer).to receive(:in_span).with('tool.search').and_yield(tool_span)
instance.instrument_agent_session(params) do
tool_instance.instrument_tool_call('search', { query: 'test' }) { 'tool result' }
end
expect(tool_span).to have_received(:set_attribute).with('langfuse.observation.metadata.user_id', '123')
expect(tool_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456')
expect(tool_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'reply_suggestion')
end
it 'keeps inherited session metadata for nested service spans with their own feature tag' do
root_span = instance_double(OpenTelemetry::Trace::Span)
nested_span = instance_double(OpenTelemetry::Trace::Span)
nested_instance = test_class.new
nested_params = params.merge(span_name: 'llm.translate_query', conversation_id: nil, feature_name: 'translate_query')
allow(root_span).to receive(:set_attribute)
allow(nested_span).to receive(:set_attribute)
allow(instance).to receive(:tracer).and_return(mock_tracer)
allow(nested_instance).to receive(:tracer).and_return(mock_tracer)
allow(mock_tracer).to receive(:in_span).with('llm.test').and_yield(root_span)
allow(mock_tracer).to receive(:in_span).with('llm.translate_query').and_yield(nested_span)
instance.instrument_agent_session(params) do
nested_instance.instrument_llm_call(nested_params) { 'translated query' }
end
expect(nested_span).to have_received(:set_attribute).with('langfuse.session.id', '123_456')
expect(nested_span).to have_received(:set_attribute).with('langfuse.trace.tags', ['translate_query'])
expect(nested_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456')
expect(nested_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'translate_query')
end
it 'propagates session metadata to nested embedding spans' do
root_span = instance_double(OpenTelemetry::Trace::Span)
embedding_span = instance_double(OpenTelemetry::Trace::Span)
embedding_instance = test_class.new
embedding_params = {
span_name: 'llm.captain.embedding',
account_id: 123,
feature_name: 'embedding',
model: 'text-embedding-3-small',
input: 'search result'
}
allow(root_span).to receive(:set_attribute)
allow(embedding_span).to receive(:set_attribute)
allow(instance).to receive(:tracer).and_return(mock_tracer)
allow(embedding_instance).to receive(:tracer).and_return(mock_tracer)
allow(mock_tracer).to receive(:in_span).with('llm.test').and_yield(root_span)
allow(mock_tracer).to receive(:in_span).with('llm.captain.embedding').and_yield(embedding_span)
instance.instrument_agent_session(params) do
embedding_instance.instrument_embedding_call(embedding_params) { [0.1, 0.2, 0.3] }
end
expect(embedding_span).to have_received(:set_attribute).with('langfuse.session.id', '123_456')
expect(embedding_span).to have_received(:set_attribute).with('langfuse.trace.tags', ['embedding'])
expect(embedding_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456')
expect(embedding_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'embedding')
end
# Regression test for Langfuse double-counting bug.
# Setting gen_ai.request.model on parent spans causes Langfuse to classify them as
# GENERATIONs instead of SPANs, resulting in cost being counted multiple times