diff --git a/dev/src/main/java/com/google/adk/web/config/OpenTelemetryConfig.java b/dev/src/main/java/com/google/adk/web/config/OpenTelemetryConfig.java index 407c0ac40..3be4af156 100644 --- a/dev/src/main/java/com/google/adk/web/config/OpenTelemetryConfig.java +++ b/dev/src/main/java/com/google/adk/web/config/OpenTelemetryConfig.java @@ -17,6 +17,7 @@ package com.google.adk.web.config; import com.google.adk.web.service.ApiServerSpanExporter; +import com.google.adk.web.service.ApiServerSpanExporterConfig; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.common.Attributes; @@ -24,8 +25,10 @@ import io.opentelemetry.sdk.resources.Resource; import io.opentelemetry.sdk.trace.SdkTracerProvider; import io.opentelemetry.sdk.trace.export.SimpleSpanProcessor; +import java.util.Optional; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @@ -35,8 +38,14 @@ public class OpenTelemetryConfig { private static final Logger otelLog = LoggerFactory.getLogger(OpenTelemetryConfig.class); @Bean - public ApiServerSpanExporter apiServerSpanExporter() { - return new ApiServerSpanExporter(); + public ApiServerSpanExporterConfig apiServerSpanExporterConfig( + @Value("${adk.debug.trace.max-spans:#{null}}") Optional maxSpansToKeep) { + return ApiServerSpanExporterConfig.builder().maxSpansToKeep(maxSpansToKeep).build(); + } + + @Bean + public ApiServerSpanExporter apiServerSpanExporter(ApiServerSpanExporterConfig config) { + return new ApiServerSpanExporter(config); } @Bean(destroyMethod = "shutdown") diff --git a/dev/src/main/java/com/google/adk/web/service/ApiServerSpanExporter.java b/dev/src/main/java/com/google/adk/web/service/ApiServerSpanExporter.java index bc0aa73a7..fb8b028ce 100644 --- a/dev/src/main/java/com/google/adk/web/service/ApiServerSpanExporter.java +++ b/dev/src/main/java/com/google/adk/web/service/ApiServerSpanExporter.java @@ -20,13 +20,13 @@ import io.opentelemetry.sdk.common.CompletableResultCode; import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.export.SpanExporter; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; +import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,74 +41,131 @@ public class ApiServerSpanExporter implements SpanExporter { private static final Logger exporterLog = LoggerFactory.getLogger(ApiServerSpanExporter.class); - private final Map> eventIdTraceStorage = new ConcurrentHashMap<>(); + private final ApiServerSpanExporterConfig config; + + private final Map eventIdRefCount = new HashMap<>(); + private final Map> eventIdTraceStorage = new HashMap<>(); // Session ID -> Trace IDs -> Trace Object - private final Map> sessionToTraceIdsMap = new ConcurrentHashMap<>(); + private final Map> sessionToTraceIdsMap = new HashMap<>(); + + private final Deque allExportedSpans = new ArrayDeque<>(); - private final List allExportedSpans = Collections.synchronizedList(new ArrayList<>()); + public ApiServerSpanExporter() { + this(ApiServerSpanExporterConfig.builder().build()); + } - public ApiServerSpanExporter() {} + public ApiServerSpanExporter(ApiServerSpanExporterConfig config) { + this.config = config; + } public Map getEventTraceAttributes(String eventId) { - return this.eventIdTraceStorage.get(eventId); + synchronized (allExportedSpans) { + return this.eventIdTraceStorage.get(eventId); + } } public Map> getSessionToTraceIdsMap() { - return this.sessionToTraceIdsMap; + synchronized (allExportedSpans) { + return new HashMap<>(this.sessionToTraceIdsMap); + } } public List getAllExportedSpans() { - return this.allExportedSpans; + synchronized (allExportedSpans) { + return new ArrayList<>(this.allExportedSpans); + } } @Override public CompletableResultCode export(Collection spans) { exporterLog.debug("ApiServerSpanExporter received {} spans to export.", spans.size()); - List currentBatch = new ArrayList<>(spans); - allExportedSpans.addAll(currentBatch); - - for (SpanData span : currentBatch) { - String spanName = span.getName(); - if ("call_llm".equals(spanName) - || "send_data".equals(spanName) - || (spanName != null && spanName.startsWith("tool_response"))) { - String eventId = - span.getAttributes().get(AttributeKey.stringKey("gcp.vertex.agent.event_id")); - if (eventId != null && !eventId.isEmpty()) { - Map attributesMap = new HashMap<>(); - span.getAttributes().forEach((key, value) -> attributesMap.put(key.getKey(), value)); - attributesMap.put("trace_id", span.getSpanContext().getTraceId()); - attributesMap.put("span_id", span.getSpanContext().getSpanId()); - attributesMap.putIfAbsent("gcp.vertex.agent.event_id", eventId); - exporterLog.debug("Storing event-based trace attributes for event_id: {}", eventId); - this.eventIdTraceStorage.put(eventId, attributesMap); // Use internal storage - } else { - exporterLog.trace( - "Span {} for event-based trace did not have 'gcp.vertex.agent.event_id'" - + " attribute or it was empty.", - spanName); + + synchronized (allExportedSpans) { + for (SpanData span : spans) { + if (config.maxSpansToKeep().isPresent() + && allExportedSpans.size() >= config.maxSpansToKeep().get()) { + SpanData evicted = allExportedSpans.pollFirst(); + if (evicted != null) { + handleEviction(evicted); + } } + allExportedSpans.addLast(span); + handleAddition(span); } + } + return CompletableResultCode.ofSuccess(); + } - if ("call_llm".equals(spanName)) { - String sessionId = - span.getAttributes().get(AttributeKey.stringKey("gcp.vertex.agent.session_id")); - if (sessionId != null && !sessionId.isEmpty()) { - String traceId = span.getSpanContext().getTraceId(); - sessionToTraceIdsMap - .computeIfAbsent(sessionId, k -> Collections.synchronizedList(new ArrayList<>())) - .add(traceId); - exporterLog.trace( - "Associated trace_id {} with session_id {} for session tracing", traceId, sessionId); + private void handleAddition(SpanData span) { + String spanName = span.getName(); + String eventId = span.getAttributes().get(AttributeKey.stringKey("gcp.vertex.agent.event_id")); + boolean isEventTraceSpan = + "call_llm".equals(spanName) + || "send_data".equals(spanName) + || (spanName != null && spanName.startsWith("tool_response")); + if (eventId != null && !eventId.isEmpty()) { + eventIdRefCount.merge(eventId, 1, Integer::sum); + if (isEventTraceSpan) { + Map attributesMap = new HashMap<>(); + span.getAttributes().forEach((key, value) -> attributesMap.put(key.getKey(), value)); + attributesMap.put("trace_id", span.getSpanContext().getTraceId()); + attributesMap.put("span_id", span.getSpanContext().getSpanId()); + attributesMap.putIfAbsent("gcp.vertex.agent.event_id", eventId); + exporterLog.debug("Storing event-based trace attributes for event_id: {}", eventId); + eventIdTraceStorage.put(eventId, attributesMap); + } + } else if (isEventTraceSpan) { + exporterLog.trace( + "Span {} for event-based trace did not have 'gcp.vertex.agent.event_id'" + + " attribute or it was empty.", + spanName); + } + + if ("call_llm".equals(spanName)) { + String sessionId = + span.getAttributes().get(AttributeKey.stringKey("gcp.vertex.agent.session_id")); + if (sessionId != null && !sessionId.isEmpty()) { + String traceId = span.getSpanContext().getTraceId(); + sessionToTraceIdsMap.computeIfAbsent(sessionId, k -> new ArrayList<>()).add(traceId); + exporterLog.trace( + "Associated trace_id {} with session_id {} for session tracing", traceId, sessionId); + } else { + exporterLog.trace( + "Span {} for session trace did not have 'gcp.vertex.agent.session_id' attribute.", + spanName); + } + } + } + + private void handleEviction(SpanData span) { + String spanName = span.getName(); + String eventId = span.getAttributes().get(AttributeKey.stringKey("gcp.vertex.agent.event_id")); + if (eventId != null && !eventId.isEmpty()) { + Integer count = eventIdRefCount.get(eventId); + if (count != null) { + if (count <= 1) { + eventIdRefCount.remove(eventId); + eventIdTraceStorage.remove(eventId); } else { - exporterLog.trace( - "Span {} for session trace did not have 'gcp.vertex.agent.session_id' attribute.", - spanName); + eventIdRefCount.put(eventId, count - 1); + } + } + } + + if ("call_llm".equals(spanName)) { + String sessionId = + span.getAttributes().get(AttributeKey.stringKey("gcp.vertex.agent.session_id")); + if (sessionId != null && !sessionId.isEmpty()) { + List traceIds = sessionToTraceIdsMap.get(sessionId); + if (traceIds != null) { + traceIds.remove(span.getSpanContext().getTraceId()); + if (traceIds.isEmpty()) { + sessionToTraceIdsMap.remove(sessionId); + } } } } - return CompletableResultCode.ofSuccess(); } @Override @@ -119,7 +176,12 @@ public CompletableResultCode flush() { @Override public CompletableResultCode shutdown() { exporterLog.debug("Shutting down ApiServerSpanExporter."); - // no need to clear storage on shutdown, as everything is currently stored in memory. + synchronized (allExportedSpans) { + allExportedSpans.clear(); + eventIdRefCount.clear(); + eventIdTraceStorage.clear(); + sessionToTraceIdsMap.clear(); + } return CompletableResultCode.ofSuccess(); } } diff --git a/dev/src/main/java/com/google/adk/web/service/ApiServerSpanExporterConfig.java b/dev/src/main/java/com/google/adk/web/service/ApiServerSpanExporterConfig.java new file mode 100644 index 000000000..c1721dbf8 --- /dev/null +++ b/dev/src/main/java/com/google/adk/web/service/ApiServerSpanExporterConfig.java @@ -0,0 +1,59 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.web.service; + +import com.google.auto.value.AutoValue; +import java.util.Optional; + +/** Configuration for {@link ApiServerSpanExporter}. */ +@AutoValue +public abstract class ApiServerSpanExporterConfig { + + /** + * The maximum number of spans to keep in memory. When the limit is reached, the oldest spans are + * evicted (FIFO). If empty, no limit is enforced and spans accumulate without bound. + * + *

When set, the value must be a positive integer ({@code >= 1}). + */ + public abstract Optional maxSpansToKeep(); + + public static Builder builder() { + return new AutoValue_ApiServerSpanExporterConfig.Builder(); + } + + /** Builder for {@link ApiServerSpanExporterConfig}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder maxSpansToKeep(Optional maxSpansToKeep); + + abstract ApiServerSpanExporterConfig autoBuild(); + + public final ApiServerSpanExporterConfig build() { + ApiServerSpanExporterConfig config = autoBuild(); + config + .maxSpansToKeep() + .ifPresent( + max -> { + if (max < 1) { + throw new IllegalArgumentException( + "maxSpansToKeep must be >= 1 when set, got: " + max); + } + }); + return config; + } + } +} diff --git a/dev/src/test/java/com/google/adk/web/service/ApiServerSpanExporterTest.java b/dev/src/test/java/com/google/adk/web/service/ApiServerSpanExporterTest.java new file mode 100644 index 000000000..dc5a31f25 --- /dev/null +++ b/dev/src/test/java/com/google/adk/web/service/ApiServerSpanExporterTest.java @@ -0,0 +1,200 @@ +/* + * Copyright 2025 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.adk.web.service; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.Attributes; +import io.opentelemetry.api.trace.SpanContext; +import io.opentelemetry.sdk.trace.data.SpanData; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.jupiter.api.Test; + +class ApiServerSpanExporterTest { + + private static final AtomicLong ID_COUNTER = new AtomicLong(); + + private SpanData mockSpan() { + return mockSpan("some-span", null, null); + } + + private SpanData mockSpan(String name, String eventId, String sessionId) { + SpanData span = mock(SpanData.class); + Attributes attrs = mock(Attributes.class); + SpanContext spanContext = mock(SpanContext.class); + + long id = ID_COUNTER.incrementAndGet(); + when(span.getName()).thenReturn(name); + when(span.getAttributes()).thenReturn(attrs); + when(span.getSpanContext()).thenReturn(spanContext); + when(spanContext.getTraceId()).thenReturn("trace-" + id); + when(spanContext.getSpanId()).thenReturn("span-" + id); + + when(attrs.get(any())) + .thenAnswer( + invocation -> { + AttributeKey key = invocation.getArgument(0); + if ("gcp.vertex.agent.event_id".equals(key.getKey())) { + return eventId; + } + if ("gcp.vertex.agent.session_id".equals(key.getKey())) { + return sessionId; + } + return null; + }); + + return span; + } + + @Test + void standardUsage_shouldStoreAndRetrieveData() { + ApiServerSpanExporter exporter = new ApiServerSpanExporter(); + String eventId = "test-event"; + String sessionId = "test-session"; + + SpanData callLlm = mockSpan("call_llm", eventId, sessionId); + exporter.export(Collections.singletonList(callLlm)); + + assertEquals(1, exporter.getAllExportedSpans().size()); + Map eventAttrs = exporter.getEventTraceAttributes(eventId); + assertEquals(eventId, eventAttrs.get("gcp.vertex.agent.event_id")); + assertEquals(callLlm.getSpanContext().getTraceId(), eventAttrs.get("trace_id")); + + Map> sessionMap = exporter.getSessionToTraceIdsMap(); + assertTrue(sessionMap.containsKey(sessionId)); + assertTrue(sessionMap.get(sessionId).contains(callLlm.getSpanContext().getTraceId())); + } + + @Test + void eviction_shouldRespectRefCount() { + // Limit to 2 spans + ApiServerSpanExporter exporter = + new ApiServerSpanExporter( + ApiServerSpanExporterConfig.builder().maxSpansToKeep(Optional.of(2)).build()); + String eventId = "shared-event"; + + // Export two spans with same eventId + SpanData span1 = mockSpan("call_llm", eventId, "session1"); + SpanData span2 = mockSpan("tool_response", eventId, null); + exporter.export(Collections.singletonList(span1)); + exporter.export(Collections.singletonList(span2)); + + // Verify storage is present + assertNotNull(exporter.getEventTraceAttributes(eventId)); + + // Export 3rd span, triggering eviction of span1 + exporter.export(Collections.singletonList(mockSpan("other", null, null))); + + // eventId should still be there because span2 is still in memory (refCount=1) + assertNotNull(exporter.getEventTraceAttributes(eventId)); + + // Export 4th span, triggering eviction of span2 + exporter.export(Collections.singletonList(mockSpan("another", null, null))); + + // Now eventId storage should be gone + assertNull(exporter.getEventTraceAttributes(eventId)); + } + + @Test + void noArgConstructor_shouldKeepAllSpansByDefault() { + ApiServerSpanExporter exporter = new ApiServerSpanExporter(); + // Default is unlimited; verify no eviction occurs. + for (int i = 0; i < 1000; i++) { + exporter.export(Collections.singletonList(mockSpan())); + } + assertEquals(1000, exporter.getAllExportedSpans().size()); + } + + @Test + void export_shouldLimitSpans() { + int maxSpans = 5; + ApiServerSpanExporter exporter = + new ApiServerSpanExporter( + ApiServerSpanExporterConfig.builder().maxSpansToKeep(Optional.of(maxSpans)).build()); + + for (int i = 0; i < 10; i++) { + exporter.export(Collections.singletonList(mockSpan())); + } + + assertEquals(maxSpans, exporter.getAllExportedSpans().size()); + } + + @Test + void export_noLimit_shouldKeepAllSpans() { + ApiServerSpanExporter exporter = + new ApiServerSpanExporter(ApiServerSpanExporterConfig.builder().build()); + + for (int i = 0; i < 100; i++) { + exporter.export(Collections.singletonList(mockSpan())); + } + + assertEquals(100, exporter.getAllExportedSpans().size()); + } + + @Test + void configConstructor_shouldUseConfiguredLimits() { + int maxSpans = 3; + ApiServerSpanExporterConfig config = + ApiServerSpanExporterConfig.builder().maxSpansToKeep(Optional.of(maxSpans)).build(); + ApiServerSpanExporter exporter = new ApiServerSpanExporter(config); + + for (int i = 0; i < 10; i++) { + exporter.export(Collections.singletonList(mockSpan())); + } + + assertEquals(maxSpans, exporter.getAllExportedSpans().size()); + } + + @Test + void configBuilder_shouldRejectNonPositiveMaxSpans() { + assertThrows( + IllegalArgumentException.class, + () -> ApiServerSpanExporterConfig.builder().maxSpansToKeep(Optional.of(0)).build()); + assertThrows( + IllegalArgumentException.class, + () -> ApiServerSpanExporterConfig.builder().maxSpansToKeep(Optional.of(-1)).build()); + } + + @Test + void shutdown_shouldClearStorage() { + ApiServerSpanExporter exporter = + new ApiServerSpanExporter( + ApiServerSpanExporterConfig.builder().maxSpansToKeep(Optional.of(10)).build()); + exporter.export(Collections.singletonList(mockSpan())); + + assertEquals(1, exporter.getAllExportedSpans().size()); + + exporter.shutdown(); + + assertTrue(exporter.getAllExportedSpans().isEmpty()); + assertTrue(exporter.getSessionToTraceIdsMap().isEmpty()); + // eventIdTraceStorage is private but we can infer it should be cleared if we had access or + // tests for it + } +}