diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 29cf298..6acfbf5 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -6,6 +6,9 @@ on: pull_request: workflow_dispatch: +env: + SUFFIX: ${{ (github.head_ref || github.ref_name) == 'main' && '' || format('-{0}', github.head_ref || github.ref_name) }} + jobs: checks: runs-on: ubuntu-latest @@ -72,7 +75,7 @@ jobs: run: | LAYER_ARN=$( aws lambda publish-layer-version \ - --layer-name diet-lambda-${{ matrix.arch }} \ + --layer-name diet-lambda-${{ matrix.arch }}$SUFFIX \ --license-info "Apache 2.0" \ --compatible-architectures ${{ matrix.arch == 'x86_64' && 'x86_64' || 'arm64' }} \ --zip-file fileb://${{ steps.download.outputs.download-path }}/diet-lambda-${{ matrix.arch }}.zip \ @@ -112,7 +115,7 @@ jobs: with: context: . push: true - tags: ghcr.io/${{ github.repository }}:${{ matrix.arch }} + tags: ghcr.io/${{ github.repository }}:${{ matrix.arch }}${{ env.SUFFIX }} multiarch: needs: docker @@ -134,6 +137,6 @@ jobs: - run: | docker buildx imagetools create \ - --tag ghcr.io/${{ github.repository }}:latest \ - ghcr.io/${{ github.repository }}:x86_64 \ - ghcr.io/${{ github.repository }}:aarch64 + --tag ghcr.io/${{ github.repository }}:latest$SUFFIX \ + ghcr.io/${{ github.repository }}:x86_64$SUFFIX \ + ghcr.io/${{ github.repository }}:aarch64$SUFFIX diff --git a/Cargo.lock b/Cargo.lock index d8338fa..6b5cfc7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -269,6 +269,7 @@ dependencies = [ "cfg-if", "chrono", "const-hex", + "envy", "flate2", "futures-util", "http-body-util", @@ -307,6 +308,15 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" +[[package]] +name = "envy" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f47e0157f2cb54f5ae1bd371b30a2ae4311e1c028f575cd4e81de7353215965" +dependencies = [ + "serde", +] + [[package]] name = "equivalent" version = "1.0.2" diff --git a/Cargo.toml b/Cargo.toml index 469c94a..a51e72a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,6 +16,7 @@ bytes = "1" cfg-if = "1" chrono = { version = "0.4.43", features = ["serde"] } const-hex = { version = "1.17.0", features = ["serde"] } +envy = "0.4.2" flate2 = "1" futures-util = "0.3" http-body-util = { version = "0.1.3", features = [] } diff --git a/README.md b/README.md index 80c0a6d..115bd2f 100644 --- a/README.md +++ b/README.md @@ -8,6 +8,11 @@ The collector listens for `HTTP/OTLP`, `HTTP/JSON` and `gRPC/OTLP` traces, metri This collector uses the same `SW_APM_API_TOKEN` and `SW_APM_DATA_CENTER` environment variables as the full collector. It does not support configuration via a `yaml` file, or any custom processing logic. +- Service name - `OTEL_SERVICE_NAME` -> `SW_APM_SERVICE_KEY` -> `AWS_LAMBDA_FUNCTION_NAME` +- API token - `SW_APM_API_TOKEN` -> `SW_APM_SERVICE_KEY` +- APM collector endpoint - `SW_APM_COLLECTOR` -> `SW_APM_DATA_CENTER` +- OTLP exporter endpoints - `SW_EXPORTER_OTLP_$signal_ENDPOINT` -> `SW_EXPORTER_OTLP_ENDPOINT` -> `SW_APM_COLLECTOR` -> `SW_APM_DATA_CENTER` + ## Docker Images These are also updated on new commits to main. The collector is built on Amazon Linux and expects OpenSSL to be available in the image. diff --git a/src/env.rs b/src/env.rs index 1b9cb0c..25f150a 100644 --- a/src/env.rs +++ b/src/env.rs @@ -6,6 +6,28 @@ use std::{ }; use anyhow::{Context, Error}; +use serde::Deserialize; + +#[derive(Deserialize, Default)] +pub struct Env { + otel_service_name: Option, + aws_lambda_function_name: Option, + + aws_lambda_initialization_type: Option, + aws_lambda_runtime_api: Option, + sw_exporter_compression: Option, + + sw_apm_api_token: Option, + sw_apm_service_key: Option, + + sw_apm_data_center: Option, + sw_apm_collector: Option, + sw_exporter_otlp_endpoint: Option, + sw_exporter_otlp_traces_endpoint: Option, + sw_exporter_otlp_metrics_endpoint: Option, + sw_exporter_otlp_logs_endpoint: Option, + sw_exporter_otlp_profiles_endpoint: Option, +} pub struct Config { pub _service: String, @@ -13,10 +35,17 @@ pub struct Config { pub executable: String, pub managed: bool, + pub compression: Compression, pub urls: UrlsConfig, } +#[derive(Clone, Copy)] +pub enum Compression { + Gzip, + Zstd, +} + pub struct UrlsConfig { pub settings: String, pub exporters: ExportersUrlsConfig, @@ -65,28 +94,55 @@ impl Config { const LOCAL_HOST: &str = "sandbox.localdomain"; pub fn parse() -> Result, Error> { - let service_key = env::var("SW_APM_SERVICE_KEY").ok(); - let service_key = service_key.as_ref().and_then(|s| s.split_once(':')); + let Env { + otel_service_name, + aws_lambda_function_name, - let service_name = env::var("OTEL_SERVICE_NAME") - .ok() + aws_lambda_initialization_type, + aws_lambda_runtime_api, + sw_exporter_compression, + + sw_apm_api_token, + sw_apm_service_key, + + sw_apm_data_center, + sw_apm_collector, + sw_exporter_otlp_endpoint, + sw_exporter_otlp_traces_endpoint, + sw_exporter_otlp_metrics_endpoint, + sw_exporter_otlp_logs_endpoint, + sw_exporter_otlp_profiles_endpoint, + } = envy::from_env().unwrap_or_default(); + + let service_key = sw_apm_service_key.as_ref().and_then(|s| s.split_once(':')); + + let service_name = otel_service_name .or_else(|| service_key.map(|(name, _)| name.to_string())) - .or_else(|| env::var("AWS_LAMBDA_FUNCTION_NAME").ok()) + .or(aws_lambda_function_name) .context("missing service name")?; - let api_token = env::var("SW_APM_API_TOKEN") - .ok() - .or_else(|| service_key.map(|(_, token)| token.to_string())).unwrap_or_else(|| { + let managed = + aws_lambda_initialization_type.is_some_and(|v| v == "lambda-managed-instances"); + let api_host = aws_lambda_runtime_api.unwrap_or_else(|| Self::API_HOST.to_string()); + + let api_token = sw_apm_api_token + .or_else(|| service_key.map(|(_, token)| token.to_string())) + .unwrap_or_else(|| { eprintln!("Missing SolarWinds APM API token. Please set the `SW_APM_API_TOKEN` environment variable to enable sampling."); "missing".to_string() }); - let data_center = env::var("SW_APM_DATA_CENTER") - .ok() - .unwrap_or_else(|| "na-01".to_string()); + let data_center = sw_apm_data_center.unwrap_or_else(|| "na-01".to_string()); + let mut collector = sw_apm_collector + .unwrap_or_else(|| format!("https://apm.collector.{data_center}.cloud.solarwinds.com")); + let mut exporter = sw_exporter_otlp_endpoint + .unwrap_or_else(|| collector.replace("apm.collector", "otel.collector")); - let api_host = - env::var("AWS_LAMBDA_RUNTIME_API").unwrap_or_else(|_| Self::API_HOST.to_string()); + for url in [&mut collector, &mut exporter] { + if !url.starts_with("https://") && !url.starts_with("http://") { + *url = format!("https://{url}"); + } + } let executable = env::current_exe() .ok() @@ -96,31 +152,26 @@ impl Config { }) .unwrap_or_else(|| env!("CARGO_PKG_NAME").to_string()); - let managed = env::var("AWS_LAMBDA_INITIALIZATION_TYPE") - .is_ok_and(|v| v == "lambda-managed-instances"); + let compression = sw_exporter_compression + .and_then(|c| match c.to_lowercase().trim() { + "gzip" | "gz" => Some(Compression::Gzip), + "zstd" => Some(Compression::Zstd), + _ => None, + }) + .unwrap_or(Compression::Gzip); Ok(Arc::new(Self { urls: UrlsConfig { - settings: format!( - "https://apm.collector.{data_center}.cloud.solarwinds.com/v1/settings/{service_name}/{service_name}", - ), + settings: format!("{collector}/v1/settings/{service_name}/{service_name}",), exporters: ExportersUrlsConfig { - traces: format!( - "https://otel.collector.{data_center}.cloud.solarwinds.com{}", - Self::TRACES_ROUTE - ), - metrics: format!( - "https://otel.collector.{data_center}.cloud.solarwinds.com{}", - Self::METRICS_ROUTE - ), - logs: format!( - "https://otel.collector.{data_center}.cloud.solarwinds.com{}", - Self::LOGS_ROUTE - ), - profiles: format!( - "https://otel.collector.{data_center}.cloud.solarwinds.com{}", - Self::PROFILES_ROUTE - ), + traces: sw_exporter_otlp_traces_endpoint + .unwrap_or_else(|| format!("{exporter}{}", Self::TRACES_ROUTE)), + metrics: sw_exporter_otlp_metrics_endpoint + .unwrap_or_else(|| format!("{exporter}{}", Self::METRICS_ROUTE)), + logs: sw_exporter_otlp_logs_endpoint + .unwrap_or_else(|| format!("{exporter}{}", Self::LOGS_ROUTE)), + profiles: sw_exporter_otlp_profiles_endpoint + .unwrap_or_else(|| format!("{exporter}{}", Self::PROFILES_ROUTE)), }, extension: ExtensionUrlsConfig { register: format!( @@ -151,6 +202,7 @@ impl Config { executable, managed, + compression, })) } } diff --git a/src/exporter.rs b/src/exporter.rs index e43108f..5338c97 100644 --- a/src/exporter.rs +++ b/src/exporter.rs @@ -3,7 +3,10 @@ use std::{ }; use anyhow::Error; -use async_compression::{Level, tokio::bufread::GzipEncoder}; +use async_compression::{ + Level, + tokio::bufread::{GzipEncoder, ZstdEncoder}, +}; use bytes::BytesMut; use futures_util::TryStreamExt; use http_body_util::{BodyExt, StreamBody}; @@ -26,6 +29,7 @@ use opentelemetry_proto::tonic::{ }; use prost::Message; use tokio::{ + io::AsyncRead, sync::{mpsc, watch}, task::JoinSet, time::{self, MissedTickBehavior}, @@ -37,7 +41,7 @@ use uuid::Uuid; use crate::{ ServiceRequest, - env::Config, + env::{Compression, Config}, util::{Client, body, flatten}, }; @@ -185,6 +189,7 @@ async fn send( mut client: FollowRedirect, url: String, token: String, + compression: Compression, instance_id: Uuid, attributes: Arc<[KeyValue]>, ) -> Result<(), Error> @@ -202,13 +207,23 @@ where let mut buf = BytesMut::with_capacity(request.encoded_len()); request.encode(&mut buf)?; - let compressed = StreamBody::new( - ReaderStream::new(GzipEncoder::with_quality( - Cursor::new(buf), - Level::Precise(6), - )) - .map_ok(Frame::data), - ); + let (boxed, encoding): (Box, &str) = match compression { + Compression::Zstd => ( + Box::new(ZstdEncoder::with_quality( + Cursor::new(buf), + Level::Precise(4), + )), + "zstd", + ), + Compression::Gzip => ( + Box::new(GzipEncoder::with_quality( + Cursor::new(buf), + Level::Precise(6), + )), + "gzip", + ), + }; + let compressed = StreamBody::new(ReaderStream::new(boxed).map_ok(Frame::data)); future::poll_fn(|cx| client.poll_ready(cx)).await?; let response = client @@ -217,7 +232,7 @@ where .method("POST") .uri(url) .header(CONTENT_TYPE, "application/x-protobuf") - .header(CONTENT_ENCODING, "gzip") + .header(CONTENT_ENCODING, encoding) .header(AUTHORIZATION, format!("Bearer {token}")) .header(USER_AGENT, Config::USER_AGENT) .body(body(compressed))?, @@ -296,6 +311,7 @@ fn export(state: &mut State, config: &Config, id: Option) { state.client.clone(), config.urls.exporters.traces.clone(), config.token.clone(), + config.compression, state.instance_id, state.attributes.clone(), )); @@ -305,6 +321,7 @@ fn export(state: &mut State, config: &Config, id: Option) { state.client.clone(), config.urls.exporters.metrics.clone(), config.token.clone(), + config.compression, state.instance_id, state.attributes.clone(), )); @@ -314,6 +331,7 @@ fn export(state: &mut State, config: &Config, id: Option) { state.client.clone(), config.urls.exporters.logs.clone(), config.token.clone(), + config.compression, state.instance_id, state.attributes.clone(), )); @@ -324,6 +342,7 @@ fn export(state: &mut State, config: &Config, id: Option) { state.client.clone(), config.urls.exporters.profiles.clone(), config.token.clone(), + config.compression, state.instance_id, state.attributes.clone(), ));