Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ on:
pull_request:
workflow_dispatch:

env:
SUFFIX: ${{ (github.head_ref || github.ref_name) == 'main' && '' || format('-{0}', github.head_ref || github.ref_name) }}

jobs:
checks:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -72,7 +75,7 @@ jobs:
run: |
LAYER_ARN=$(
aws lambda publish-layer-version \
--layer-name diet-lambda-${{ matrix.arch }} \
--layer-name diet-lambda-${{ matrix.arch }}$SUFFIX \
--license-info "Apache 2.0" \
--compatible-architectures ${{ matrix.arch == 'x86_64' && 'x86_64' || 'arm64' }} \
--zip-file fileb://${{ steps.download.outputs.download-path }}/diet-lambda-${{ matrix.arch }}.zip \
Expand Down Expand Up @@ -112,7 +115,7 @@ jobs:
with:
context: .
push: true
tags: ghcr.io/${{ github.repository }}:${{ matrix.arch }}
tags: ghcr.io/${{ github.repository }}:${{ matrix.arch }}${{ env.SUFFIX }}

multiarch:
needs: docker
Expand All @@ -134,6 +137,6 @@ jobs:

- run: |
docker buildx imagetools create \
--tag ghcr.io/${{ github.repository }}:latest \
ghcr.io/${{ github.repository }}:x86_64 \
ghcr.io/${{ github.repository }}:aarch64
--tag ghcr.io/${{ github.repository }}:latest$SUFFIX \
ghcr.io/${{ github.repository }}:x86_64$SUFFIX \
ghcr.io/${{ github.repository }}:aarch64$SUFFIX
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ bytes = "1"
cfg-if = "1"
chrono = { version = "0.4.43", features = ["serde"] }
const-hex = { version = "1.17.0", features = ["serde"] }
envy = "0.4.2"
flate2 = "1"
futures-util = "0.3"
http-body-util = { version = "0.1.3", features = [] }
Expand Down
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@ The collector listens for `HTTP/OTLP`, `HTTP/JSON` and `gRPC/OTLP` traces, metri

This collector uses the same `SW_APM_API_TOKEN` and `SW_APM_DATA_CENTER` environment variables as the full collector. It does not support configuration via a `yaml` file, or any custom processing logic.

- Service name - `OTEL_SERVICE_NAME` -> `SW_APM_SERVICE_KEY` -> `AWS_LAMBDA_FUNCTION_NAME`
- API token - `SW_APM_API_TOKEN` -> `SW_APM_SERVICE_KEY`
- APM collector endpoint - `SW_APM_COLLECTOR` -> `SW_APM_DATA_CENTER`
- OTLP exporter endpoints - `SW_EXPORTER_OTLP_$signal_ENDPOINT` -> `SW_EXPORTER_OTLP_ENDPOINT` -> `SW_APM_COLLECTOR` -> `SW_APM_DATA_CENTER`
Comment thread
raphael-theriault-swi marked this conversation as resolved.

## Docker Images

These are also updated on new commits to main. The collector is built on Amazon Linux and expects OpenSSL to be available in the image.
Expand Down
120 changes: 86 additions & 34 deletions src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,46 @@ use std::{
};

use anyhow::{Context, Error};
use serde::Deserialize;

#[derive(Deserialize, Default)]
pub struct Env {
otel_service_name: Option<String>,
aws_lambda_function_name: Option<String>,

aws_lambda_initialization_type: Option<String>,
aws_lambda_runtime_api: Option<String>,
sw_exporter_compression: Option<String>,

sw_apm_api_token: Option<String>,
sw_apm_service_key: Option<String>,

sw_apm_data_center: Option<String>,
sw_apm_collector: Option<String>,
sw_exporter_otlp_endpoint: Option<String>,
sw_exporter_otlp_traces_endpoint: Option<String>,
sw_exporter_otlp_metrics_endpoint: Option<String>,
sw_exporter_otlp_logs_endpoint: Option<String>,
sw_exporter_otlp_profiles_endpoint: Option<String>,
}

pub struct Config {
pub _service: String,
pub token: String,

pub executable: String,
pub managed: bool,
pub compression: Compression,

pub urls: UrlsConfig,
}

#[derive(Clone, Copy)]
pub enum Compression {
Gzip,
Zstd,
}

pub struct UrlsConfig {
pub settings: String,
pub exporters: ExportersUrlsConfig,
Expand Down Expand Up @@ -65,28 +94,55 @@ impl Config {
const LOCAL_HOST: &str = "sandbox.localdomain";

pub fn parse() -> Result<Arc<Self>, Error> {
let service_key = env::var("SW_APM_SERVICE_KEY").ok();
let service_key = service_key.as_ref().and_then(|s| s.split_once(':'));
let Env {
otel_service_name,
aws_lambda_function_name,

let service_name = env::var("OTEL_SERVICE_NAME")
.ok()
aws_lambda_initialization_type,
aws_lambda_runtime_api,
sw_exporter_compression,

sw_apm_api_token,
sw_apm_service_key,

sw_apm_data_center,
sw_apm_collector,
sw_exporter_otlp_endpoint,
sw_exporter_otlp_traces_endpoint,
sw_exporter_otlp_metrics_endpoint,
sw_exporter_otlp_logs_endpoint,
sw_exporter_otlp_profiles_endpoint,
} = envy::from_env().unwrap_or_default();

let service_key = sw_apm_service_key.as_ref().and_then(|s| s.split_once(':'));

let service_name = otel_service_name
.or_else(|| service_key.map(|(name, _)| name.to_string()))
.or_else(|| env::var("AWS_LAMBDA_FUNCTION_NAME").ok())
.or(aws_lambda_function_name)
.context("missing service name")?;

let api_token = env::var("SW_APM_API_TOKEN")
.ok()
.or_else(|| service_key.map(|(_, token)| token.to_string())).unwrap_or_else(|| {
let managed =
aws_lambda_initialization_type.is_some_and(|v| v == "lambda-managed-instances");
let api_host = aws_lambda_runtime_api.unwrap_or_else(|| Self::API_HOST.to_string());

let api_token = sw_apm_api_token
.or_else(|| service_key.map(|(_, token)| token.to_string()))
.unwrap_or_else(|| {
eprintln!("Missing SolarWinds APM API token. Please set the `SW_APM_API_TOKEN` environment variable to enable sampling.");
"missing".to_string()
});

let data_center = env::var("SW_APM_DATA_CENTER")
.ok()
.unwrap_or_else(|| "na-01".to_string());
let data_center = sw_apm_data_center.unwrap_or_else(|| "na-01".to_string());
let mut collector = sw_apm_collector
.unwrap_or_else(|| format!("https://apm.collector.{data_center}.cloud.solarwinds.com"));
let mut exporter = sw_exporter_otlp_endpoint
.unwrap_or_else(|| collector.replace("apm.collector", "otel.collector"));

let api_host =
env::var("AWS_LAMBDA_RUNTIME_API").unwrap_or_else(|_| Self::API_HOST.to_string());
for url in [&mut collector, &mut exporter] {
if !url.starts_with("https://") && !url.starts_with("http://") {
*url = format!("https://{url}");
}
}

let executable = env::current_exe()
.ok()
Expand All @@ -96,31 +152,26 @@ impl Config {
})
.unwrap_or_else(|| env!("CARGO_PKG_NAME").to_string());

let managed = env::var("AWS_LAMBDA_INITIALIZATION_TYPE")
.is_ok_and(|v| v == "lambda-managed-instances");
let compression = sw_exporter_compression
.and_then(|c| match c.to_lowercase().trim() {
"gzip" | "gz" => Some(Compression::Gzip),
"zstd" => Some(Compression::Zstd),
_ => None,
})
.unwrap_or(Compression::Gzip);
Comment thread
raphael-theriault-swi marked this conversation as resolved.

Ok(Arc::new(Self {
urls: UrlsConfig {
settings: format!(
"https://apm.collector.{data_center}.cloud.solarwinds.com/v1/settings/{service_name}/{service_name}",
),
settings: format!("{collector}/v1/settings/{service_name}/{service_name}",),
exporters: ExportersUrlsConfig {
traces: format!(
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
Self::TRACES_ROUTE
),
metrics: format!(
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
Self::METRICS_ROUTE
),
logs: format!(
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
Self::LOGS_ROUTE
),
profiles: format!(
"https://otel.collector.{data_center}.cloud.solarwinds.com{}",
Self::PROFILES_ROUTE
),
traces: sw_exporter_otlp_traces_endpoint
.unwrap_or_else(|| format!("{exporter}{}", Self::TRACES_ROUTE)),
metrics: sw_exporter_otlp_metrics_endpoint
.unwrap_or_else(|| format!("{exporter}{}", Self::METRICS_ROUTE)),
logs: sw_exporter_otlp_logs_endpoint
.unwrap_or_else(|| format!("{exporter}{}", Self::LOGS_ROUTE)),
profiles: sw_exporter_otlp_profiles_endpoint
.unwrap_or_else(|| format!("{exporter}{}", Self::PROFILES_ROUTE)),
},
extension: ExtensionUrlsConfig {
register: format!(
Expand Down Expand Up @@ -151,6 +202,7 @@ impl Config {

executable,
managed,
compression,
}))
}
}
39 changes: 29 additions & 10 deletions src/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::{
};

use anyhow::Error;
use async_compression::{Level, tokio::bufread::GzipEncoder};
use async_compression::{
Level,
tokio::bufread::{GzipEncoder, ZstdEncoder},
};
use bytes::BytesMut;
use futures_util::TryStreamExt;
use http_body_util::{BodyExt, StreamBody};
Expand All @@ -26,6 +29,7 @@ use opentelemetry_proto::tonic::{
};
use prost::Message;
use tokio::{
io::AsyncRead,
sync::{mpsc, watch},
task::JoinSet,
time::{self, MissedTickBehavior},
Expand All @@ -37,7 +41,7 @@ use uuid::Uuid;

use crate::{
ServiceRequest,
env::Config,
env::{Compression, Config},
util::{Client, body, flatten},
};

Expand Down Expand Up @@ -185,6 +189,7 @@ async fn send<R>(
mut client: FollowRedirect<Client>,
url: String,
token: String,
compression: Compression,
instance_id: Uuid,
attributes: Arc<[KeyValue]>,
) -> Result<(), Error>
Expand All @@ -202,13 +207,23 @@ where
let mut buf = BytesMut::with_capacity(request.encoded_len());
request.encode(&mut buf)?;

let compressed = StreamBody::new(
ReaderStream::new(GzipEncoder::with_quality(
Cursor::new(buf),
Level::Precise(6),
))
.map_ok(Frame::data),
);
let (boxed, encoding): (Box<dyn AsyncRead + Unpin + Send>, &str) = match compression {
Compression::Zstd => (
Box::new(ZstdEncoder::with_quality(
Cursor::new(buf),
Level::Precise(4),
)),
"zstd",
),
Compression::Gzip => (
Box::new(GzipEncoder::with_quality(
Cursor::new(buf),
Level::Precise(6),
)),
"gzip",
),
};
let compressed = StreamBody::new(ReaderStream::new(boxed).map_ok(Frame::data));

future::poll_fn(|cx| client.poll_ready(cx)).await?;
let response = client
Expand All @@ -217,7 +232,7 @@ where
.method("POST")
.uri(url)
.header(CONTENT_TYPE, "application/x-protobuf")
.header(CONTENT_ENCODING, "gzip")
.header(CONTENT_ENCODING, encoding)
.header(AUTHORIZATION, format!("Bearer {token}"))
.header(USER_AGENT, Config::USER_AGENT)
.body(body(compressed))?,
Expand Down Expand Up @@ -296,6 +311,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
state.client.clone(),
config.urls.exporters.traces.clone(),
config.token.clone(),
config.compression,
state.instance_id,
state.attributes.clone(),
));
Expand All @@ -305,6 +321,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
state.client.clone(),
config.urls.exporters.metrics.clone(),
config.token.clone(),
config.compression,
state.instance_id,
state.attributes.clone(),
));
Expand All @@ -314,6 +331,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
state.client.clone(),
config.urls.exporters.logs.clone(),
config.token.clone(),
config.compression,
state.instance_id,
state.attributes.clone(),
));
Expand All @@ -324,6 +342,7 @@ fn export(state: &mut State, config: &Config, id: Option<String>) {
state.client.clone(),
config.urls.exporters.profiles.clone(),
config.token.clone(),
config.compression,
state.instance_id,
state.attributes.clone(),
));
Expand Down
Loading