Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ fmt:
check-fuzz:
cargo check --manifest-path crates/composefs/fuzz/Cargo.toml

# Run unit + non-privileged integration tests (no VM, no root)
test-all: test test-integration

# Run all checks (clippy + fmt + test + fuzz build)
check: clippy check-feature-combos fmt-check test check-fuzz

Expand Down
250 changes: 249 additions & 1 deletion crates/composefs-ctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,12 @@ pub use composefs_http;
#[cfg(feature = "oci")]
pub use composefs_oci;

#[cfg(any(feature = "oci", feature = "http"))]
use std::collections::HashMap;
use std::io::Read;
use std::path::Path;
#[cfg(any(feature = "oci", feature = "http"))]
use std::sync::Mutex;
use std::{ffi::OsString, path::PathBuf};

#[cfg(feature = "oci")]
Expand All @@ -35,9 +39,15 @@ use anyhow::{Context as _, Result};
use clap::{Parser, Subcommand, ValueEnum};
#[cfg(feature = "oci")]
use comfy_table::{Table, presets::UTF8_FULL};
#[cfg(any(feature = "oci", feature = "http"))]
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use rustix::fs::{CWD, Mode, OFlags};
use serde::Serialize;

#[cfg(any(feature = "oci", feature = "http"))]
use composefs::progress::{
ComponentId, ProgressEvent, ProgressReporter, ProgressUnit, SharedReporter,
};
use composefs_boot::BootOps;
#[cfg(feature = "oci")]
use composefs_boot::write_boot;
Expand All @@ -53,6 +63,94 @@ use composefs::{
tree::RegularFile,
};

/// An `indicatif`-backed [`ProgressReporter`] for use in the CLI.
///
/// Renders per-component progress bars via [`MultiProgress`]. When a component
/// completes or is skipped the bar is removed; human-readable messages are
/// printed above the bar group via [`MultiProgress::println`].
#[cfg(any(feature = "oci", feature = "http"))]
struct IndicatifReporter {
multi: MultiProgress,
bars: Mutex<HashMap<ComponentId, ProgressBar>>,
}

#[cfg(any(feature = "oci", feature = "http"))]
impl IndicatifReporter {
fn new() -> Self {
IndicatifReporter {
multi: MultiProgress::new(),
bars: Mutex::new(HashMap::new()),
}
}

/// Build a shared reporter from this instance.
fn into_shared(self) -> SharedReporter {
Arc::new(self)
}
}

#[cfg(any(feature = "oci", feature = "http"))]
impl std::fmt::Debug for IndicatifReporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IndicatifReporter").finish_non_exhaustive()
}
}

#[cfg(any(feature = "oci", feature = "http"))]
impl ProgressReporter for IndicatifReporter {
fn report(&self, event: ProgressEvent) {
match event {
ProgressEvent::Started { id, total, unit } => {
let bar = if let Some(total) = total {
self.multi.add(ProgressBar::new(total))
} else {
self.multi.add(ProgressBar::new_spinner())
};
let style = match unit {
ProgressUnit::Bytes => ProgressStyle::with_template(
"[eta {eta}] {bar:40.cyan/blue} {decimal_bytes:>7}/{decimal_total_bytes:7} {msg}",
),
ProgressUnit::Items => ProgressStyle::with_template(
"[eta {eta}] {bar:40.cyan/blue} {pos:>7}/{len:7} objects {msg}",
),
// Future unit variants fall back to a generic spinner.
_ => ProgressStyle::with_template(
"[eta {eta}] {bar:40.cyan/blue} {pos}/{len} {msg}",
),
};
bar.set_style(
style
.unwrap_or_else(|_| ProgressStyle::default_bar())
.progress_chars("##-"),
);
bar.set_message(id.to_string());
self.bars.lock().unwrap().insert(id, bar);
}
ProgressEvent::Progress { id, fetched, .. } => {
if let Some(bar) = self.bars.lock().unwrap().get(&id) {
bar.set_position(fetched);
}
}
ProgressEvent::Done { id, .. } => {
if let Some(bar) = self.bars.lock().unwrap().remove(&id) {
bar.finish_and_clear();
}
}
ProgressEvent::Skipped { id } => {
if let Some(bar) = self.bars.lock().unwrap().remove(&id) {
bar.finish_with_message("skipped");
}
}
ProgressEvent::Message(msg) => {
let _ = self.multi.println(msg);
}
// `ProgressEvent` is #[non_exhaustive]: new variants added to the library
// will be silently ignored here until cfsctl is updated to handle them.
_ => {}
}
}
}

/// JSON output wrapper for `cfsctl fsck --json`.
#[derive(Serialize)]
struct FsckJsonOutput {
Expand Down Expand Up @@ -986,8 +1084,10 @@ where
// If no explicit name provided, use the image reference as the tag
let tag_name = name.as_deref().unwrap_or(image);

let reporter: SharedReporter = IndicatifReporter::new().into_shared();
let opts = composefs_oci::PullOptions {
local_fetch: local_fetch.into(),
progress: Some(reporter),
..Default::default()
};

Expand Down Expand Up @@ -1244,10 +1344,158 @@ where
}
#[cfg(feature = "http")]
Command::Fetch { url, name } => {
let (digest, verity) = composefs_http::download(&url, &name, Arc::clone(&repo)).await?;
let reporter: SharedReporter = IndicatifReporter::new().into_shared();
let (digest, verity) = composefs_http::download(
&url,
&name,
Arc::clone(&repo),
composefs_http::DownloadOptions {
progress: Some(reporter),
},
)
.await?;
println!("content {digest}");
println!("verity {}", verity.to_hex());
}
}
Ok(())
}

#[cfg(test)]
#[cfg(any(feature = "oci", feature = "http"))]
mod tests {
use super::*;
use composefs::progress::{ProgressEvent, ProgressUnit};

// ── IndicatifReporter ────────────────────────────────────────────────────

/// A complete valid lifecycle (Started → Progress → Done) must not panic,
/// even without a real terminal (indicatif handles headless gracefully).
#[test]
fn test_indicatif_reporter_valid_lifecycle() {
let reporter = IndicatifReporter::new();
// Message before any component
reporter.report(ProgressEvent::Message("starting pull".into()));
// Byte-tracked component
reporter.report(ProgressEvent::Started {
id: "sha256:abc".into(),
total: Some(1_000_000),
unit: ProgressUnit::Bytes,
});
reporter.report(ProgressEvent::Progress {
id: "sha256:abc".into(),
fetched: 500_000,
total: Some(1_000_000),
});
reporter.report(ProgressEvent::Done {
id: "sha256:abc".into(),
transferred: 1_000_000,
});
// Item-counted component (HTTP objects)
reporter.report(ProgressEvent::Started {
id: "objects:stream".into(),
total: Some(200),
unit: ProgressUnit::Items,
});
reporter.report(ProgressEvent::Progress {
id: "objects:stream".into(),
fetched: 100,
total: Some(200),
});
reporter.report(ProgressEvent::Done {
id: "objects:stream".into(),
transferred: 200,
});
// Skipped component
reporter.report(ProgressEvent::Started {
id: "sha256:cached".into(),
total: None,
unit: ProgressUnit::Bytes,
});
reporter.report(ProgressEvent::Skipped {
id: "sha256:cached".into(),
});
}

/// Progress/Done events for an ID that was never `Started` must not panic.
///
/// This guards against error-recovery paths where a `Started` event may
/// have been suppressed or the reporter was attached after the operation
/// began.
#[test]
fn test_indicatif_reporter_unknown_id_no_panic() {
let reporter = IndicatifReporter::new();
// Progress for unknown ID — should silently ignore
reporter.report(ProgressEvent::Progress {
id: "ghost".into(),
fetched: 42,
total: None,
});
// Done for unknown ID — should silently ignore
reporter.report(ProgressEvent::Done {
id: "ghost".into(),
transferred: 42,
});
// Skipped for unknown ID — should silently ignore
reporter.report(ProgressEvent::Skipped { id: "ghost".into() });
}

/// A spinner-style bar (unknown total) must not panic.
#[test]
fn test_indicatif_reporter_spinner_lifecycle() {
let reporter = IndicatifReporter::new();
// Started with unknown total → spinner
reporter.report(ProgressEvent::Started {
id: "layer:unknown-size".into(),
total: None,
unit: ProgressUnit::Bytes,
});
reporter.report(ProgressEvent::Progress {
id: "layer:unknown-size".into(),
fetched: 1024,
total: None,
});
reporter.report(ProgressEvent::Done {
id: "layer:unknown-size".into(),
transferred: 2048,
});
}

/// Multiple concurrent components must not interfere with each other.
#[test]
fn test_indicatif_reporter_multiple_concurrent_components() {
let reporter = IndicatifReporter::new();
// Start two layers in parallel
reporter.report(ProgressEvent::Started {
id: "layer:a".into(),
total: Some(100),
unit: ProgressUnit::Bytes,
});
reporter.report(ProgressEvent::Started {
id: "layer:b".into(),
total: Some(200),
unit: ProgressUnit::Bytes,
});
// Interleaved progress
reporter.report(ProgressEvent::Progress {
id: "layer:a".into(),
fetched: 50,
total: Some(100),
});
reporter.report(ProgressEvent::Progress {
id: "layer:b".into(),
fetched: 100,
total: Some(200),
});
// Layer B finishes first
reporter.report(ProgressEvent::Done {
id: "layer:b".into(),
transferred: 200,
});
// Layer A finishes
reporter.report(ProgressEvent::Done {
id: "layer:a".into(),
transferred: 100,
});
}
}
1 change: 0 additions & 1 deletion crates/composefs-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ anyhow = { version = "1.0.87", default-features = false }
bytes = { version = "1.7.1", default-features = false }
composefs = { workspace = true }
hex = { version = "0.4.0", default-features = false }
indicatif = { version = "0.18.0", default-features = false }
reqwest = { version = "0.13.0", features = ["zstd"] }
sha2 = { version = "0.11.0", default-features = false }
tokio = { version = "1.24.2", default-features = false }
Expand Down
Loading