Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 122 additions & 11 deletions crates/edgezero-core/src/key_value_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use web_time::Instant;

use crate::error::EdgeError;

Expand Down Expand Up @@ -404,14 +405,62 @@ impl KvHandle {
.transpose()
}

fn kv_timing_start() -> Option<Instant> {
log::log_enabled!(log::Level::Debug).then(Instant::now)
}

fn log_kv_timing<T, F>(
started_at: Option<Instant>,
operation: &str,
result: &Result<T, KvError>,
metadata: F,
) where
F: FnOnce() -> String,
{
if let Some(started_at) = started_at {
let status = if result.is_ok() { "ok" } else { "error" };
log::debug!(
"kv operation={} elapsed_ms={:.3} status={} {}",
operation,
started_at.elapsed().as_secs_f64() * 1000.0,
status,
metadata()
);
}
}

fn kv_hit_metadata(result: &Result<Option<Bytes>, KvError>) -> String {
match result.as_ref() {
Ok(Some(bytes)) => format!("hit=true bytes={}", bytes.len()),
Ok(None) => "hit=false bytes=0".to_string(),
Err(_) => "hit=unknown bytes=unknown".to_string(),
}
}

fn kv_write_metadata(key_len: usize, bytes_len: usize, ttl: Option<Duration>) -> String {
match ttl {
Some(ttl) => format!(
"key_len={key_len} bytes={bytes_len} ttl_secs={}",
ttl.as_secs()
),
None => format!("key_len={key_len} bytes={bytes_len}"),
}
}

// -- Typed helpers (JSON) -----------------------------------------------

/// Get a value by key, deserializing from JSON.
///
/// Returns `Ok(None)` if the key does not exist.
pub async fn get<T: DeserializeOwned>(&self, key: &str) -> Result<Option<T>, KvError> {
Self::validate_key(key)?;
match self.store.get_bytes(key).await? {
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::log_kv_timing(started_at, "get", &result, || {
format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result))
});

match result? {
Some(bytes) => {
let val = serde_json::from_slice(&bytes)?;
Ok(Some(val))
Expand All @@ -430,7 +479,13 @@ impl KvHandle {
Self::validate_key(key)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store.put_bytes(key, Bytes::from(bytes)).await
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, Bytes::from(bytes)).await;
Self::log_kv_timing(started_at, "put", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put a value with a TTL, serializing it to JSON.
Expand All @@ -444,9 +499,16 @@ impl KvHandle {
Self::validate_ttl(ttl)?;
let bytes = serde_json::to_vec(value)?;
Self::validate_value(&bytes)?;
self.store
let bytes_len = bytes.len();
let started_at = Self::kv_timing_start();
let result = self
.store
.put_bytes_with_ttl(key, Bytes::from(bytes), ttl)
.await
.await;
Self::log_kv_timing(started_at, "put_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

/// Read-modify-write: get the current value (or `default`),
Expand Down Expand Up @@ -478,14 +540,25 @@ impl KvHandle {
/// Get raw bytes for a key.
pub async fn get_bytes(&self, key: &str) -> Result<Option<Bytes>, KvError> {
Self::validate_key(key)?;
self.store.get_bytes(key).await
let started_at = Self::kv_timing_start();
let result = self.store.get_bytes(key).await;
Self::log_kv_timing(started_at, "get_bytes", &result, || {
format!("key_len={} {}", key.len(), Self::kv_hit_metadata(&result))
});
result
}

/// Put raw bytes for a key.
pub async fn put_bytes(&self, key: &str, value: Bytes) -> Result<(), KvError> {
Self::validate_key(key)?;
Self::validate_value(&value)?;
self.store.put_bytes(key, value).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes(key, value).await;
Self::log_kv_timing(started_at, "put_bytes", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, None)
});
result
}

/// Put raw bytes with a TTL.
Expand All @@ -498,21 +571,41 @@ impl KvHandle {
Self::validate_key(key)?;
Self::validate_ttl(ttl)?;
Self::validate_value(&value)?;
self.store.put_bytes_with_ttl(key, value, ttl).await
let bytes_len = value.len();
let started_at = Self::kv_timing_start();
let result = self.store.put_bytes_with_ttl(key, value, ttl).await;
Self::log_kv_timing(started_at, "put_bytes_with_ttl", &result, || {
Self::kv_write_metadata(key.len(), bytes_len, Some(ttl))
});
result
}

// -- Other operations ---------------------------------------------------

/// Check whether a key exists without deserializing its value.
pub async fn exists(&self, key: &str) -> Result<bool, KvError> {
Self::validate_key(key)?;
self.store.exists(key).await
let started_at = Self::kv_timing_start();
let result = self.store.exists(key).await;
Self::log_kv_timing(started_at, "exists", &result, || {
let exists = result
.as_ref()
.map(|exists| exists.to_string())
.unwrap_or_else(|_| "unknown".to_string());
format!("key_len={} exists={exists}", key.len())
});
result
}

/// Delete a key.
pub async fn delete(&self, key: &str) -> Result<(), KvError> {
Self::validate_key(key)?;
self.store.delete(key).await
let started_at = Self::kv_timing_start();
let result = self.store.delete(key).await;
Self::log_kv_timing(started_at, "delete", &result, || {
format!("key_len={}", key.len())
});
result
}

/// List keys in a bounded, paginated fashion.
Expand All @@ -530,10 +623,28 @@ impl KvHandle {
Self::validate_prefix(prefix)?;
Self::validate_list_limit(limit)?;
let decoded_cursor = Self::decode_list_cursor(prefix, cursor)?;
let page = self
let started_at = Self::kv_timing_start();
let result = self
.store
.list_keys_page(prefix, decoded_cursor.as_deref(), limit)
.await?;
.await;
Self::log_kv_timing(started_at, "list_keys_page", &result, || {
let (count, next_cursor) = result
.as_ref()
.map(|page| {
(
page.keys.len().to_string(),
page.cursor.is_some().to_string(),
)
})
.unwrap_or_else(|_| ("unknown".to_string(), "unknown".to_string()));
format!(
"prefix_len={} cursor_present={} limit={limit} count={count} next_cursor_present={next_cursor}",
prefix.len(),
cursor.is_some()
)
});
let page = result?;

Ok(KvPage {
keys: page.keys,
Expand Down
6 changes: 6 additions & 0 deletions docs/guide/kv.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ For strict correctness, use a transactional data store.

Key listing is paginated by design. This avoids buffering an unbounded number of keys in memory and matches the underlying provider APIs. The Spin adapter returns `KvError::Validation` for key listing because Spin's current `Store::get_keys()` API is unbounded.

## Operation Timing / Observability

`KvHandle` emits debug-level timing logs for backend KV operations across all adapters. Logs include safe metadata such as operation name, elapsed milliseconds, success/error status, key or prefix length, hit/miss, byte counts, TTL seconds, and list page counts.

Raw keys, prefixes, cursors, and values are never logged. Typed helper timings measure only the backend call after validation/serialization and before JSON deserialization. `read_modify_write` performs separate read and write calls, so it emits separate operation logs.

## Platform Specifics

### Local Development
Expand Down
Loading