diff --git a/enterprise/app/helpers/captain/chat_generation_recorder.rb b/enterprise/app/helpers/captain/chat_generation_recorder.rb index cd631fb16c0..63bcdc2762e 100644 --- a/enterprise/app/helpers/captain/chat_generation_recorder.rb +++ b/enterprise/app/helpers/captain/chat_generation_recorder.rb @@ -10,6 +10,7 @@ module Captain::ChatGenerationRecorder # Create a generation span with model and token info for Langfuse cost calculation. # Note: span duration will be near-zero since we create and end it immediately, but token counts are what Langfuse uses for cost calculation. tracer.in_span("llm.captain.#{feature_name}.generation") do |span| + apply_current_langfuse_attributes(span) set_generation_span_attributes(span, chat, message) end rescue StandardError => e @@ -37,11 +38,23 @@ module Captain::ChatGenerationRecorder ATTR_GEN_AI_USAGE_INPUT_TOKENS => message.input_tokens, ATTR_GEN_AI_USAGE_OUTPUT_TOKENS => message.respond_to?(:output_tokens) ? message.output_tokens : nil, ATTR_LANGFUSE_OBSERVATION_INPUT => format_input_messages(chat), - ATTR_LANGFUSE_OBSERVATION_OUTPUT => message.respond_to?(:content) ? message.content.to_s : nil + ATTR_LANGFUSE_OBSERVATION_OUTPUT => message.respond_to?(:content) ? message.content.to_s : nil, + format(ATTR_LANGFUSE_OBSERVATION_METADATA, 'generation_stage') => generation_stage(message) } end def format_input_messages(chat) chat.messages[0...-1].map { |m| { role: m.role.to_s, content: m.content.to_s } }.to_json end + + def generation_stage(message) + message_has_tool_calls?(message) ? 'tool_call' : 'final_response' + end + + def message_has_tool_calls?(message) + return false unless message.respond_to?(:tool_calls) + + tool_calls = message.tool_calls + tool_calls.respond_to?(:any?) && tool_calls.any? + end end diff --git a/lib/captain/tool_instrumentation.rb b/lib/captain/tool_instrumentation.rb index 157aab82986..3c4a1c76ea5 100644 --- a/lib/captain/tool_instrumentation.rb +++ b/lib/captain/tool_instrumentation.rb @@ -10,12 +10,14 @@ module Captain::ToolInstrumentation response = nil executed = false - tracer.in_span(params[:span_name]) do |span| - set_tool_session_attributes(span, params) - response = yield - executed = true - span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, response[:message] || response.to_json) - set_tool_session_error_attributes(span, response) if response.is_a?(Hash) + with_propagated_langfuse_attributes(params) do + tracer.in_span(params[:span_name]) do |span| + set_tool_session_attributes(span, params) + response = yield + executed = true + span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, response[:message] || response.to_json) + set_tool_session_error_attributes(span, response) if response.is_a?(Hash) + end end response rescue StandardError => e @@ -24,9 +26,7 @@ module Captain::ToolInstrumentation end def set_tool_session_attributes(span, params) - span.set_attribute(ATTR_LANGFUSE_USER_ID, params[:account_id].to_s) if params[:account_id] - span.set_attribute(ATTR_LANGFUSE_SESSION_ID, "#{params[:account_id]}_#{params[:conversation_id]}") if params[:conversation_id].present? - span.set_attribute(ATTR_LANGFUSE_TAGS, [params[:feature_name]].to_json) + set_metadata_attributes(span, params) span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:messages].to_json) end @@ -43,6 +43,7 @@ module Captain::ToolInstrumentation return unless message.respond_to?(:role) && message.role.to_s == 'assistant' tracer.in_span("llm.#{event_name}.generation") do |span| + apply_current_langfuse_attributes(span) span.set_attribute(ATTR_GEN_AI_PROVIDER, 'openai') span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, model) span.set_attribute(ATTR_GEN_AI_USAGE_INPUT_TOKENS, message.input_tokens) diff --git a/lib/integrations/llm_instrumentation.rb b/lib/integrations/llm_instrumentation.rb index 326bb901e46..0257f5c3a0d 100644 --- a/lib/integrations/llm_instrumentation.rb +++ b/lib/integrations/llm_instrumentation.rb @@ -29,16 +29,18 @@ module Integrations::LlmInstrumentation result = nil executed = false - tracer.in_span(params[:span_name]) do |span| - set_metadata_attributes(span, params) + with_propagated_langfuse_attributes(params) do + tracer.in_span(params[:span_name]) do |span| + set_metadata_attributes(span, params) - # By default, the input and output of a trace are set from the root observation - span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:messages].to_json) - result = yield - executed = true - span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, result.to_json) - set_error_attributes(span, result) if result.is_a?(Hash) - result + # By default, the input and output of a trace are set from the root observation + span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:messages].to_json) + result = yield + executed = true + span.set_attribute(ATTR_LANGFUSE_OBSERVATION_OUTPUT, result.to_json) + set_error_attributes(span, result) if result.is_a?(Hash) + result + end end rescue StandardError => e ChatwootExceptionTracker.new(e, account: resolve_account(params)).capture_exception @@ -51,6 +53,7 @@ module Integrations::LlmInstrumentation return yield unless ChatwootApp.otel_enabled? tracer.in_span(format(TOOL_SPAN_NAME, tool_name)) do |span| + apply_current_langfuse_attributes(span) span.set_attribute(ATTR_LANGFUSE_OBSERVATION_TYPE, 'tool') span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, arguments.to_json) result = yield @@ -96,23 +99,6 @@ module Integrations::LlmInstrumentation end end - def instrument_with_span(span_name, params, &) - result = nil - executed = false - tracer.in_span(span_name) do |span| - track_result = lambda do |r| - executed = true - result = r - end - yield(span, track_result) - end - rescue StandardError => e - ChatwootExceptionTracker.new(e, account: resolve_account(params)).capture_exception - raise unless executed - - result - end - private def resolve_account(params) diff --git a/lib/integrations/llm_instrumentation_completion_helpers.rb b/lib/integrations/llm_instrumentation_completion_helpers.rb index 551d0780f15..26af2aae15e 100644 --- a/lib/integrations/llm_instrumentation_completion_helpers.rb +++ b/lib/integrations/llm_instrumentation_completion_helpers.rb @@ -10,7 +10,6 @@ module Integrations::LlmInstrumentationCompletionHelpers span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, params[:model]) span.set_attribute('embedding.input_length', params[:input]&.length || 0) span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:input].to_s) - set_common_span_metadata(span, params) end def set_audio_transcription_span_attributes(span, params) @@ -18,7 +17,6 @@ module Integrations::LlmInstrumentationCompletionHelpers span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, params[:model] || 'whisper-1') span.set_attribute('audio.duration_seconds', params[:duration]) if params[:duration] span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:file_path].to_s) if params[:file_path] - set_common_span_metadata(span, params) end def set_moderation_span_attributes(span, params) @@ -26,12 +24,6 @@ module Integrations::LlmInstrumentationCompletionHelpers span.set_attribute(ATTR_GEN_AI_REQUEST_MODEL, params[:model] || 'text-moderation-latest') span.set_attribute('moderation.input_length', params[:input]&.length || 0) span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, params[:input].to_s) - set_common_span_metadata(span, params) - end - - def set_common_span_metadata(span, params) - span.set_attribute(ATTR_LANGFUSE_USER_ID, params[:account_id].to_s) if params[:account_id] - span.set_attribute(ATTR_LANGFUSE_TAGS, [params[:feature_name]].to_json) if params[:feature_name] end def set_embedding_result_attributes(span, result) diff --git a/lib/integrations/llm_instrumentation_constants.rb b/lib/integrations/llm_instrumentation_constants.rb index dfe1e7704a8..f274d114582 100644 --- a/lib/integrations/llm_instrumentation_constants.rb +++ b/lib/integrations/llm_instrumentation_constants.rb @@ -29,4 +29,5 @@ module Integrations::LlmInstrumentationConstants ATTR_LANGFUSE_OBSERVATION_TYPE = 'langfuse.observation.type' ATTR_LANGFUSE_OBSERVATION_INPUT = 'langfuse.observation.input' ATTR_LANGFUSE_OBSERVATION_OUTPUT = 'langfuse.observation.output' + ATTR_LANGFUSE_OBSERVATION_METADATA = 'langfuse.observation.metadata.%s' end diff --git a/lib/integrations/llm_instrumentation_context.rb b/lib/integrations/llm_instrumentation_context.rb new file mode 100644 index 00000000000..27b1eb2b252 --- /dev/null +++ b/lib/integrations/llm_instrumentation_context.rb @@ -0,0 +1,41 @@ +# frozen_string_literal: true + +module Integrations::LlmInstrumentationContext + LANGFUSE_ATTRIBUTES_KEY = :llm_instrumentation_langfuse_attributes + LANGFUSE_OBSERVATION_METADATA_KEY = :llm_instrumentation_langfuse_observation_metadata_attributes + + private + + def with_propagated_langfuse_attributes(params) + previous_attributes = current_langfuse_attributes + previous_observation_metadata_attributes = current_observation_metadata_attributes + self.current_langfuse_attributes = previous_attributes.merge(propagated_langfuse_attributes(params)) + self.current_observation_metadata_attributes = previous_observation_metadata_attributes.merge(propagated_observation_metadata_attributes(params)) + + yield + ensure + self.current_langfuse_attributes = previous_attributes + self.current_observation_metadata_attributes = previous_observation_metadata_attributes + end + + def apply_current_langfuse_attributes(span) + set_langfuse_attributes(span, current_langfuse_attributes) + set_langfuse_attributes(span, current_observation_metadata_attributes) + end + + def current_langfuse_attributes + ActiveSupport::IsolatedExecutionState[LANGFUSE_ATTRIBUTES_KEY] || {} + end + + def current_langfuse_attributes=(attrs) + ActiveSupport::IsolatedExecutionState[LANGFUSE_ATTRIBUTES_KEY] = attrs + end + + def current_observation_metadata_attributes + ActiveSupport::IsolatedExecutionState[LANGFUSE_OBSERVATION_METADATA_KEY] || {} + end + + def current_observation_metadata_attributes=(attrs) + ActiveSupport::IsolatedExecutionState[LANGFUSE_OBSERVATION_METADATA_KEY] = attrs + end +end diff --git a/lib/integrations/llm_instrumentation_helpers.rb b/lib/integrations/llm_instrumentation_helpers.rb index 129092ed4b6..debbfaeda0a 100644 --- a/lib/integrations/llm_instrumentation_helpers.rb +++ b/lib/integrations/llm_instrumentation_helpers.rb @@ -2,6 +2,7 @@ module Integrations::LlmInstrumentationHelpers include Integrations::LlmInstrumentationConstants + include Integrations::LlmInstrumentationContext include Integrations::LlmInstrumentationCompletionHelpers def determine_provider(model_name) @@ -51,15 +52,55 @@ module Integrations::LlmInstrumentationHelpers end def set_metadata_attributes(span, params) - session_id = params[:conversation_id].present? ? "#{params[:account_id]}_#{params[:conversation_id]}" : nil - span.set_attribute(ATTR_LANGFUSE_USER_ID, params[:account_id].to_s) if params[:account_id] - span.set_attribute(ATTR_LANGFUSE_SESSION_ID, session_id) if session_id.present? - span.set_attribute(ATTR_LANGFUSE_TAGS, [params[:feature_name]].to_json) + set_langfuse_attributes(span, current_langfuse_attributes.merge(propagated_langfuse_attributes(params))) + set_langfuse_attributes(span, current_observation_metadata_attributes.merge(propagated_observation_metadata_attributes(params))) + end - return unless params[:metadata].is_a?(Hash) + def propagated_langfuse_attributes(params) + attrs = {} + session_id = params[:conversation_id].present? ? "#{params[:account_id]}_#{params[:conversation_id]}" : nil + + attrs[ATTR_LANGFUSE_USER_ID] = params[:account_id].to_s if params[:account_id] + attrs[ATTR_LANGFUSE_SESSION_ID] = session_id if session_id.present? + attrs[ATTR_LANGFUSE_TAGS] = [params[:feature_name].to_s] if params[:feature_name].present? + + return attrs unless params[:metadata].is_a?(Hash) params[:metadata].each do |key, value| - span.set_attribute(format(ATTR_LANGFUSE_METADATA, key), value.to_s) + attrs[format(ATTR_LANGFUSE_METADATA, key)] = value.to_s + end + + attrs + end + + def propagated_observation_metadata_attributes(params) + attrs = {} + session_id = params[:conversation_id].present? ? "#{params[:account_id]}_#{params[:conversation_id]}" : nil + + add_observation_metadata(attrs, 'user_id', params[:account_id]) + add_observation_metadata(attrs, 'account_id', params[:account_id]) + add_observation_metadata(attrs, 'session_id', session_id) + add_observation_metadata(attrs, 'trace_tags', [params[:feature_name]].to_json) + add_observation_metadata(attrs, 'feature_name', params[:feature_name]) + + return attrs unless params[:metadata].is_a?(Hash) + + params[:metadata].each do |key, value| + add_observation_metadata(attrs, key, value) + end + + attrs + end + + def add_observation_metadata(attrs, key, value) + return if value.blank? + + attrs[format(ATTR_LANGFUSE_OBSERVATION_METADATA, key)] = value.to_s + end + + def set_langfuse_attributes(span, attrs) + attrs.each do |key, value| + span.set_attribute(key, value) end end end diff --git a/lib/integrations/llm_instrumentation_spans.rb b/lib/integrations/llm_instrumentation_spans.rb index 85ea599f8e4..2def9749d72 100644 --- a/lib/integrations/llm_instrumentation_spans.rb +++ b/lib/integrations/llm_instrumentation_spans.rb @@ -39,6 +39,7 @@ module Integrations::LlmInstrumentationSpans tool_name = tool_call.name.to_s span = tracer.start_span(format(TOOL_SPAN_NAME, tool_name)) + apply_current_langfuse_attributes(span) span.set_attribute(ATTR_LANGFUSE_OBSERVATION_TYPE, 'tool') span.set_attribute(ATTR_LANGFUSE_OBSERVATION_INPUT, tool_call.arguments.to_json) @@ -61,6 +62,24 @@ module Integrations::LlmInstrumentationSpans Rails.logger.warn "Failed to end tool span: #{e.message}" end + def instrument_with_span(span_name, params, &) + result = nil + executed = false + tracer.in_span(span_name) do |span| + set_metadata_attributes(span, params) + track_result = lambda do |r| + executed = true + result = r + end + yield(span, track_result) + end + rescue StandardError => e + ChatwootExceptionTracker.new(e, account: resolve_account(params)).capture_exception + raise unless executed + + result + end + private def set_llm_turn_request_attributes(span, params) diff --git a/lib/opentelemetry_config.rb b/lib/opentelemetry_config.rb index 5ed17e0982c..32be413d093 100644 --- a/lib/opentelemetry_config.rb +++ b/lib/opentelemetry_config.rb @@ -72,7 +72,10 @@ module OpentelemetryConfig config = { endpoint: traces_endpoint, - headers: { 'Authorization' => "Basic #{auth_header}" } + headers: { + 'Authorization' => "Basic #{auth_header}", + 'x-langfuse-ingestion-version' => '4' + } } config[:ssl_verify_mode] = OpenSSL::SSL::VERIFY_NONE if Rails.env.development? diff --git a/spec/enterprise/services/captain/llm/assistant_chat_service_spec.rb b/spec/enterprise/services/captain/llm/assistant_chat_service_spec.rb index 9d233943e38..6b2cc55c8c6 100644 --- a/spec/enterprise/services/captain/llm/assistant_chat_service_spec.rb +++ b/spec/enterprise/services/captain/llm/assistant_chat_service_spec.rb @@ -39,6 +39,30 @@ RSpec.describe Captain::Llm::AssistantChatService do allow(mock_chat).to receive(:ask).and_return(mock_response) service.generate_response(message_history: [{ role: 'user', content: 'Hello' }]) end + + it 'marks final response generations for observation-level evaluators' do + service = described_class.new(assistant: assistant, conversation: conversation) + message = instance_double(RubyLLM::Message, content: 'Final answer', input_tokens: 10, output_tokens: 20, tool_calls: {}) + + attributes = service.send(:generation_attributes, mock_chat, message) + + expect(attributes['langfuse.observation.metadata.generation_stage']).to eq('final_response') + end + + it 'marks tool call generations separately from final responses' do + service = described_class.new(assistant: assistant, conversation: conversation) + message = instance_double( + RubyLLM::Message, + content: '', + input_tokens: 10, + output_tokens: 20, + tool_calls: { 'call_1' => instance_double(RubyLLM::ToolCall) } + ) + + attributes = service.send(:generation_attributes, mock_chat, message) + + expect(attributes['langfuse.observation.metadata.generation_stage']).to eq('tool_call') + end end describe 'image analysis' do diff --git a/spec/lib/integrations/llm_instrumentation_spec.rb b/spec/lib/integrations/llm_instrumentation_spec.rb index 0be62f4377a..f291cd7b24b 100644 --- a/spec/lib/integrations/llm_instrumentation_spec.rb +++ b/spec/lib/integrations/llm_instrumentation_spec.rb @@ -144,7 +144,10 @@ RSpec.describe Integrations::LlmInstrumentation do expect(mock_span).to have_received(:set_attribute).with('langfuse.user.id', '123') expect(mock_span).to have_received(:set_attribute).with('langfuse.session.id', '123_456') - expect(mock_span).to have_received(:set_attribute).with('langfuse.trace.tags', '["reply_suggestion"]') + expect(mock_span).to have_received(:set_attribute).with('langfuse.trace.tags', ['reply_suggestion']) + expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.metadata.user_id', '123') + expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456') + expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'reply_suggestion') end it 'sets completion message attributes when result contains message' do @@ -253,6 +256,76 @@ RSpec.describe Integrations::LlmInstrumentation do expect(mock_span).to have_received(:set_attribute).with('langfuse.observation.output', result_data.to_json) end + it 'propagates trace attributes as observation metadata to child tool spans' do + root_span = instance_double(OpenTelemetry::Trace::Span) + tool_span = instance_double(OpenTelemetry::Trace::Span) + tool_instance = test_class.new + allow(root_span).to receive(:set_attribute) + allow(tool_span).to receive(:set_attribute) + allow(instance).to receive(:tracer).and_return(mock_tracer) + allow(tool_instance).to receive(:tracer).and_return(mock_tracer) + allow(mock_tracer).to receive(:in_span).with('llm.test').and_yield(root_span) + allow(mock_tracer).to receive(:in_span).with('tool.search').and_yield(tool_span) + + instance.instrument_agent_session(params) do + tool_instance.instrument_tool_call('search', { query: 'test' }) { 'tool result' } + end + + expect(tool_span).to have_received(:set_attribute).with('langfuse.observation.metadata.user_id', '123') + expect(tool_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456') + expect(tool_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'reply_suggestion') + end + + it 'keeps inherited session metadata for nested service spans with their own feature tag' do + root_span = instance_double(OpenTelemetry::Trace::Span) + nested_span = instance_double(OpenTelemetry::Trace::Span) + nested_instance = test_class.new + nested_params = params.merge(span_name: 'llm.translate_query', conversation_id: nil, feature_name: 'translate_query') + allow(root_span).to receive(:set_attribute) + allow(nested_span).to receive(:set_attribute) + allow(instance).to receive(:tracer).and_return(mock_tracer) + allow(nested_instance).to receive(:tracer).and_return(mock_tracer) + allow(mock_tracer).to receive(:in_span).with('llm.test').and_yield(root_span) + allow(mock_tracer).to receive(:in_span).with('llm.translate_query').and_yield(nested_span) + + instance.instrument_agent_session(params) do + nested_instance.instrument_llm_call(nested_params) { 'translated query' } + end + + expect(nested_span).to have_received(:set_attribute).with('langfuse.session.id', '123_456') + expect(nested_span).to have_received(:set_attribute).with('langfuse.trace.tags', ['translate_query']) + expect(nested_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456') + expect(nested_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'translate_query') + end + + it 'propagates session metadata to nested embedding spans' do + root_span = instance_double(OpenTelemetry::Trace::Span) + embedding_span = instance_double(OpenTelemetry::Trace::Span) + embedding_instance = test_class.new + embedding_params = { + span_name: 'llm.captain.embedding', + account_id: 123, + feature_name: 'embedding', + model: 'text-embedding-3-small', + input: 'search result' + } + allow(root_span).to receive(:set_attribute) + allow(embedding_span).to receive(:set_attribute) + allow(instance).to receive(:tracer).and_return(mock_tracer) + allow(embedding_instance).to receive(:tracer).and_return(mock_tracer) + allow(mock_tracer).to receive(:in_span).with('llm.test').and_yield(root_span) + allow(mock_tracer).to receive(:in_span).with('llm.captain.embedding').and_yield(embedding_span) + + instance.instrument_agent_session(params) do + embedding_instance.instrument_embedding_call(embedding_params) { [0.1, 0.2, 0.3] } + end + + expect(embedding_span).to have_received(:set_attribute).with('langfuse.session.id', '123_456') + expect(embedding_span).to have_received(:set_attribute).with('langfuse.trace.tags', ['embedding']) + expect(embedding_span).to have_received(:set_attribute).with('langfuse.observation.metadata.session_id', '123_456') + expect(embedding_span).to have_received(:set_attribute).with('langfuse.observation.metadata.feature_name', 'embedding') + end + # Regression test for Langfuse double-counting bug. # Setting gen_ai.request.model on parent spans causes Langfuse to classify them as # GENERATIONs instead of SPANs, resulting in cost being counted multiple times