From 15c79f6e29fbb0ad31f52e346d3533d4aee53a7d Mon Sep 17 00:00:00 2001 From: Christian Date: Tue, 26 May 2026 13:02:19 -0500 Subject: [PATCH] Instrument KV operation timing logs --- crates/edgezero-core/src/key_value_store.rs | 133 ++++++++++++++++++-- docs/guide/kv.md | 6 + 2 files changed, 128 insertions(+), 11 deletions(-) diff --git a/crates/edgezero-core/src/key_value_store.rs b/crates/edgezero-core/src/key_value_store.rs index 1e7b535f..fe081faf 100644 --- a/crates/edgezero-core/src/key_value_store.rs +++ b/crates/edgezero-core/src/key_value_store.rs @@ -51,6 +51,7 @@ use async_trait::async_trait; use bytes::Bytes; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; +use web_time::Instant; use crate::error::EdgeError; @@ -404,6 +405,48 @@ impl KvHandle { .transpose() } + fn kv_timing_start() -> Option { + log::log_enabled!(log::Level::Debug).then(Instant::now) + } + + fn log_kv_timing( + started_at: Option, + operation: &str, + result: &Result, + metadata: F, + ) where + F: FnOnce() -> String, + { + if let Some(started_at) = started_at { + let status = if result.is_ok() { "ok" } else { "error" }; + log::debug!( + "kv operation={} elapsed_ms={:.3} status={} {}", + operation, + started_at.elapsed().as_secs_f64() * 1000.0, + status, + metadata() + ); + } + } + + fn kv_hit_metadata(result: &Result, KvError>) -> String { + match result.as_ref() { + Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()), + Ok(None) => "hit=false bytes=0".to_string(), + Err(_) => "hit=unknown bytes=unknown".to_string(), + } + } + + fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option) -> String { + match ttl { + Some(ttl) => format!( + "key_len={key_len} bytes={bytes_len} ttl_secs={}", + ttl.as_secs() + ), + None => format!("key_len={key_len} bytes={bytes_len}"), + } + } + // -- Typed helpers (JSON) ----------------------------------------------- /// Get a value by key, deserializing from JSON. @@ -411,7 +454,13 @@ impl KvHandle { /// Returns `Ok(None)` if the key does not exist. pub async fn get(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - match self.store.get_bytes(key).await? { + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::log_kv_timing(started_at, "get", &result, || { + format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + }); + + match result? { Some(bytes) => { let val = serde_json::from_slice(&bytes)?; Ok(Some(val)) @@ -430,7 +479,13 @@ impl KvHandle { Self::validate_key(key)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store.put_bytes(key, Bytes::from(bytes)).await + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, Bytes::from(bytes)).await; + Self::log_kv_timing(started_at, "put", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put a value with a TTL, serializing it to JSON. @@ -444,9 +499,16 @@ impl KvHandle { Self::validate_ttl(ttl)?; let bytes = serde_json::to_vec(value)?; Self::validate_value(&bytes)?; - self.store + let bytes_len = bytes.len(); + let started_at = Self::kv_timing_start(); + let result = self + .store .put_bytes_with_ttl(key, Bytes::from(bytes), ttl) - .await + .await; + Self::log_kv_timing(started_at, "put_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } /// Read-modify-write: get the current value (or `default`), @@ -478,14 +540,25 @@ impl KvHandle { /// Get raw bytes for a key. pub async fn get_bytes(&self, key: &str) -> Result, KvError> { Self::validate_key(key)?; - self.store.get_bytes(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.get_bytes(key).await; + Self::log_kv_timing(started_at, "get_bytes", &result, || { + format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result)) + }); + result } /// Put raw bytes for a key. pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> { Self::validate_key(key)?; Self::validate_value(&value)?; - self.store.put_bytes(key, value).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes(key, value).await; + Self::log_kv_timing(started_at, "put_bytes", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, None) + }); + result } /// Put raw bytes with a TTL. @@ -498,7 +571,13 @@ impl KvHandle { Self::validate_key(key)?; Self::validate_ttl(ttl)?; Self::validate_value(&value)?; - self.store.put_bytes_with_ttl(key, value, ttl).await + let bytes_len = value.len(); + let started_at = Self::kv_timing_start(); + let result = self.store.put_bytes_with_ttl(key, value, ttl).await; + Self::log_kv_timing(started_at, "put_bytes_with_ttl", &result, || { + Self::kv_write_metadata(key.len(), bytes_len, Some(ttl)) + }); + result } // -- Other operations --------------------------------------------------- @@ -506,13 +585,27 @@ impl KvHandle { /// Check whether a key exists without deserializing its value. pub async fn exists(&self, key: &str) -> Result { Self::validate_key(key)?; - self.store.exists(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.exists(key).await; + Self::log_kv_timing(started_at, "exists", &result, || { + let exists = result + .as_ref() + .map(|exists| exists.to_string()) + .unwrap_or_else(|_| "unknown".to_string()); + format!("key_len={} exists={exists}", key.len()) + }); + result } /// Delete a key. pub async fn delete(&self, key: &str) -> Result<(), KvError> { Self::validate_key(key)?; - self.store.delete(key).await + let started_at = Self::kv_timing_start(); + let result = self.store.delete(key).await; + Self::log_kv_timing(started_at, "delete", &result, || { + format!("key_len={}", key.len()) + }); + result } /// List keys in a bounded, paginated fashion. @@ -530,10 +623,28 @@ impl KvHandle { Self::validate_prefix(prefix)?; Self::validate_list_limit(limit)?; let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?; - let page = self + let started_at = Self::kv_timing_start(); + let result = self .store .list_keys_page(prefix, decoded_cursor.as_deref(), limit) - .await?; + .await; + Self::log_kv_timing(started_at, "list_keys_page", &result, || { + let (count, next_cursor) = result + .as_ref() + .map(|page| { + ( + page.keys.len().to_string(), + page.cursor.is_some().to_string(), + ) + }) + .unwrap_or_else(|_| ("unknown".to_string(), "unknown".to_string())); + format!( + "prefix_len={} cursor_present={} limit={limit} count={count} next_cursor_present={next_cursor}", + prefix.len(), + cursor.is_some() + ) + }); + let page = result?; Ok(KvPage { keys: page.keys, diff --git a/docs/guide/kv.md b/docs/guide/kv.md index 8d7cb329..629aeeb0 100644 --- a/docs/guide/kv.md +++ b/docs/guide/kv.md @@ -88,6 +88,12 @@ For strict correctness, use a transactional data store. Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter returns `KvError::Validation` for key listing because Spin's current `Store::get_keys()` API is unbounded. +## Operation Timing / Observability + +`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts. + +Raw keys, prefixes, cursors, and values are never logged. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs. + ## Platform Specifics ### Local Development