Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
191 changes: 188 additions & 3 deletions bottlecap/src/traces/stats_concentrator_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,68 @@ use tracing::error;
const S_TO_NS: u64 = 1_000_000_000;
const BUCKET_DURATION_NS: u64 = 10 * S_TO_NS; // 10 seconds

/// Span kinds eligible for stats computation, matching the Go agent's default
/// `ComputeStatsBySpanKind: true` behavior.
/// Reference: `datadog-agent/pkg/trace/stats/span_concentrator.go` (`KindsComputed`)
///
/// TODO: The source of truth is the Go agent's `KindsComputed`; this list is hand-copied here
/// and in other Rust repos. Refactor so they stay in sync with the Go agent instead of each
/// keeping its own copy.
const STATS_ELIGIBLE_SPAN_KINDS: &[&str] = &["client", "consumer", "producer", "server"];

/// Default peer tag keys for stats aggregation, matching the Go agent's `basePeerTags`
/// derived from pkg/trace/semantics/mappings.json via the 16 peer tag concepts.
/// Reference: `datadog-agent/pkg/trace/config/peer_tags.go` (`peerTagConcepts` + `basePeerTags`)
///
/// TODO: The source of truth is the Go agent's `basePeerTags` (derived from
/// pkg/trace/semantics/mappings.json); this list is hand-copied here and in other Rust repos.
/// Refactor so they stay in sync with the Go agent instead of each keeping its own copy.
const DEFAULT_PEER_TAG_KEYS: &[&str] = &[
"_dd.base_service",
"active_record.db.vendor",
"amqp.destination",
"amqp.exchange",
"amqp.queue",
"aws.queue.name",
"aws.s3.bucket",
"bucketname",
"cassandra.keyspace",
"db.cassandra.contact.points",
"db.couchbase.seed.nodes",
"db.hostname",
"db.instance",
"db.name",
"db.namespace",
"db.system",
"db.type",
"dns.hostname",
"grpc.host",
"hostname",
"http.host",
"http.server_name",
"messaging.destination",
"messaging.destination.name",
"messaging.kafka.bootstrap.servers",
"messaging.rabbitmq.exchange",
"messaging.system",
"mongodb.db",
"msmq.queue.path",
"net.peer.name",
"network.destination.ip",
"network.destination.name",
"out.host",
"peer.hostname",
"peer.service",
"queuename",
"rpc.service",
"rpc.system",
"sequel.db.vendor",
"server.address",
"streamname",
"tablename",
"topicname",
];

#[derive(Debug, thiserror::Error)]
pub enum StatsError {
#[error("Failed to send command to concentrator: {0}")]
Expand Down Expand Up @@ -113,12 +175,17 @@ impl StatsConcentratorService {
pub fn new(config: Arc<Config>) -> (Self, StatsConcentratorHandle) {
let (tx, rx) = mpsc::unbounded_channel();
let handle = StatsConcentratorHandle::new(tx);
// TODO: set span_kinds_stats_computed and peer_tag_keys
let concentrator = SpanConcentrator::new(
Duration::from_nanos(BUCKET_DURATION_NS),
SystemTime::now(),
vec![],
vec![],
STATS_ELIGIBLE_SPAN_KINDS
.iter()
.map(ToString::to_string)
.collect(),
DEFAULT_PEER_TAG_KEYS
.iter()
.map(ToString::to_string)
.collect(),
);
let service: StatsConcentratorService = Self {
concentrator,
Expand Down Expand Up @@ -192,3 +259,121 @@ impl StatsConcentratorService {
}
}
}

#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use std::collections::HashMap;

/// Create a `pb::Span` with the given meta tags and metrics.
/// The span is non-root (`parent_id=1`) and not measured, so it will only be
/// eligible for stats if `span_kinds_stats_computed` includes its `span.kind`.
fn create_span_kind_span(span_kind: &str, meta: Vec<(&str, &str)>) -> pb::Span {
let now_ns = i64::try_from(
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap()
.as_nanos(),
)
.unwrap();
let mut meta_map: HashMap<String, String> = meta
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
meta_map.insert("span.kind".to_string(), span_kind.to_string());
pb::Span {
service: "test-service".to_string(),
name: "test-op".to_string(),
resource: "test-resource".to_string(),
trace_id: 1,
span_id: 2,
parent_id: 1, // non-root
start: now_ns,
duration: 100,
error: 0,
r#type: "web".to_string(),
meta: meta_map,
metrics: HashMap::new(), // no _top_level, no _dd.measured
meta_struct: HashMap::new(),
span_links: vec![],
span_events: vec![],
}
}

/// A non-root, non-measured span with `span.kind`="client" should produce stats
/// because `span_kinds_stats_computed` is populated with the eligible span kinds.
#[tokio::test]
async fn test_span_kind_stats_computed() {
let config = Arc::new(Config::default());
let (service, handle) = StatsConcentratorService::new(config);
tokio::spawn(service.run());

let span = create_span_kind_span("client", vec![]);
handle.add(&span).unwrap();

let result = handle.flush(true).await.unwrap();

assert!(
result.is_some(),
"Expected stats for a client span, but got None. \
span.kind-based eligibility is not working."
);
let payload = result.unwrap();
let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect();
assert!(
!all_stats.is_empty(),
"Expected at least one grouped stats entry for the client span."
);
let client_stats: Vec<_> = all_stats
.iter()
.filter(|s| s.span_kind == "client")
.collect();
assert!(
!client_stats.is_empty(),
"Expected a stats entry with span_kind='client'."
);
}

/// A client span with peer tag meta keys (`db.instance`, `db.system`) should produce
/// stats with non-empty `peer_tags` because `peer_tag_keys` is configured.
#[tokio::test]
async fn test_peer_tags_populated() {
let config = Arc::new(Config::default());
let (service, handle) = StatsConcentratorService::new(config);
tokio::spawn(service.run());

let span = create_span_kind_span(
"client",
vec![("db.instance", "i-1234"), ("db.system", "postgres")],
);
handle.add(&span).unwrap();

let result = handle.flush(true).await.unwrap();

assert!(
result.is_some(),
"Expected stats for a client span with peer tags, but got None."
);
let payload = result.unwrap();
let all_stats: Vec<_> = payload.stats.iter().flat_map(|b| &b.stats).collect();
let stats_with_peer_tags: Vec<_> = all_stats
.iter()
.filter(|s| !s.peer_tags.is_empty())
.collect();
assert!(
!stats_with_peer_tags.is_empty(),
"Expected at least one stats entry with non-empty peer_tags, \
but all entries have empty peer_tags."
);
let peer_tags = &stats_with_peer_tags[0].peer_tags;
assert!(
peer_tags.iter().any(|t| t.starts_with("db.instance:")),
"Expected peer_tags to contain db.instance, got: {peer_tags:?}"
);
assert!(
peer_tags.iter().any(|t| t.starts_with("db.system:")),
"Expected peer_tags to contain db.system, got: {peer_tags:?}"
);
}
}
147 changes: 147 additions & 0 deletions bottlecap/tests/apm_integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@
//! This is what APMSVLS-496 phase 1 unblocks: regression coverage for
//! payload-level changes that `body_contains`-style mocks can't catch.

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};

use bottlecap::LAMBDA_RUNTIME_SLUG;
use bottlecap::config::Config;
Expand Down Expand Up @@ -498,3 +500,148 @@ async fn e2e_client_computed_stats_absent_meta_and_no_stats() {
);
assert!(outcome.stats.is_empty(), "no stats payloads must be sent",);
}

/// Build a non-root, non-measured span eligible for stats only via its `span.kind`.
///
/// `parent_id` is non-zero (non-root) and `metrics` is empty (no `_top_level` /
/// `_dd.measured`), so the concentrator will only compute stats for it when its
/// `span.kind` is in `span_kinds_stats_computed`. `start` is set to "now" so the
/// span lands in the current bucket and a forced flush returns it.
fn make_eligible_span(span_kind: &str, peer_meta: &[(&str, &str)]) -> pb::Span {
let now_ns = i64::try_from(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("system time before unix epoch")
.as_nanos(),
)
.expect("nanos since epoch must fit in i64");

let mut meta: HashMap<String, String> = peer_meta
.iter()
.map(|(k, v)| ((*k).to_string(), (*v).to_string()))
.collect();
meta.insert("span.kind".to_string(), span_kind.to_string());

pb::Span {
service: "fake-intake-stats-service".to_string(),
name: "test-op".to_string(),
resource: "test-resource".to_string(),
trace_id: 1,
span_id: 2,
parent_id: 1, // non-root
start: now_ns,
duration: 100,
error: 0,
r#type: "web".to_string(),
meta,
metrics: HashMap::new(), // no _top_level, no _dd.measured
..pb::Span::default()
}
}

/// Wire concentrator -> aggregator -> flusher pointed at the fake intake, feed in
/// `spans`, force a flush, and return the single captured `StatsPayload`.
async fn flush_spans_to_fake_intake(
fake_intake: &FakeIntake,
spans: &[pb::Span],
) -> pb::StatsPayload {
let config = test_config();
let http_client = create_client(None, None, false).expect("failed to create http client");

let (concentrator_service, concentrator_handle) =
StatsConcentratorService::new(Arc::clone(&config));
tokio::spawn(concentrator_service.run());

let aggregator = Arc::new(Mutex::new(StatsAggregator::new_with_concentrator(
concentrator_handle.clone(),
)));

for span in spans {
concentrator_handle
.add(span)
.expect("concentrator add must succeed");
}

let api_key_factory = Arc::new(ApiKeyFactory::new(DD_API_KEY));
let flusher = StatsFlusher::new(
api_key_factory,
aggregator,
config,
http_client,
fake_intake.stats_url(),
);

let failed = flusher.flush(true, None).await;
assert!(
failed.is_none(),
"stats flush reported a retry-able failure: {failed:?}",
);

let captured = fake_intake.stats_payloads();
assert_eq!(captured.len(), 1, "expected exactly one StatsPayload");
captured.into_iter().next().expect("captured payload")
}

/// End-to-end: a non-root, non-measured `span.kind="server"` span fed through the
/// concentrator must survive aggregation + msgpack/gzip serialization and arrive
/// at the intake as a grouped-stats entry with `span_kind="server"`. This closes
/// the gap left by the in-memory concentrator unit tests, which never serialize.
#[tokio::test]
async fn stats_span_kind_through_fake_intake() {
let fake_intake = FakeIntake::start().await;
let span = make_eligible_span("server", &[]);

let payload = flush_spans_to_fake_intake(&fake_intake, &[span]).await;

let grouped: Vec<_> = payload
.stats
.iter()
.flat_map(|p| &p.stats)
.flat_map(|b| &b.stats)
.collect();
assert!(
!grouped.is_empty(),
"expected at least one grouped-stats entry for the server span",
);
assert!(
grouped.iter().any(|s| s.span_kind == "server"),
"expected a grouped-stats entry with span_kind='server', got: {:?}",
grouped.iter().map(|s| &s.span_kind).collect::<Vec<_>>(),
);
}

/// End-to-end: a `span.kind="client"` span carrying peer-tag meta keys
/// (`db.instance`, `db.system`) must arrive at the intake with those keys
/// populated in `peer_tags`, proving peer-tags survive serialization through
/// the concentrator -> flusher -> intake path.
#[tokio::test]
async fn stats_peer_tags_through_fake_intake() {
let fake_intake = FakeIntake::start().await;
let span = make_eligible_span(
"client",
&[("db.instance", "i-1234"), ("db.system", "postgres")],
);

let payload = flush_spans_to_fake_intake(&fake_intake, &[span]).await;

let with_peer_tags: Vec<_> = payload
.stats
.iter()
.flat_map(|p| &p.stats)
.flat_map(|b| &b.stats)
.filter(|s| !s.peer_tags.is_empty())
.collect();
assert!(
!with_peer_tags.is_empty(),
"expected at least one grouped-stats entry with non-empty peer_tags",
);
let peer_tags = &with_peer_tags[0].peer_tags;
assert!(
peer_tags.iter().any(|t| t.starts_with("db.instance:")),
"expected peer_tags to contain db.instance, got: {peer_tags:?}",
);
assert!(
peer_tags.iter().any(|t| t.starts_with("db.system:")),
"expected peer_tags to contain db.system, got: {peer_tags:?}",
);
}
Loading