Skip to content

Commit 29a7959

Browse files
committed
feat: add json output to stderr of upgrade and switch
1 parent 6b4cdaf commit 29a7959

File tree

3 files changed

+95
-9
lines changed

3 files changed

+95
-9
lines changed

lib/src/cli.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,10 @@ pub(crate) struct UpgradeOpts {
5252
/// a userspace-only restart.
5353
#[clap(long, conflicts_with = "check")]
5454
pub(crate) apply: bool,
55+
56+
/// Pipe download progress to stderr in a jsonl format.
57+
#[clap(long)]
58+
pub(crate) json: bool,
5559
}
5660

5761
/// Perform an switch operation
@@ -101,6 +105,10 @@ pub(crate) struct SwitchOpts {
101105

102106
/// Target image to use for the next boot.
103107
pub(crate) target: String,
108+
109+
/// Pipe download progress to stderr in a jsonl format.
110+
#[clap(long)]
111+
pub(crate) json: bool,
104112
}
105113

106114
/// Options controlling rollback
@@ -670,7 +678,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
670678
}
671679
}
672680
} else {
673-
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet).await?;
681+
let fetched = crate::deploy::pull(repo, imgref, None, opts.quiet, opts.json).await?;
674682
let staged_digest = staged_image.map(|s| s.digest().expect("valid digest in status"));
675683
let fetched_digest = &fetched.manifest_digest;
676684
tracing::debug!("staged: {staged_digest:?}");
@@ -764,7 +772,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
764772
}
765773
let new_spec = RequiredHostSpec::from_spec(&new_spec)?;
766774

767-
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet).await?;
775+
let fetched = crate::deploy::pull(repo, &target, None, opts.quiet, opts.json).await?;
768776

769777
if !opts.retain {
770778
// By default, we prune the previous ostree ref so it will go away after later upgrades
@@ -826,7 +834,7 @@ async fn edit(opts: EditOpts) -> Result<()> {
826834
return crate::deploy::rollback(sysroot).await;
827835
}
828836

829-
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet).await?;
837+
let fetched = crate::deploy::pull(repo, new_spec.image, None, opts.quiet, false).await?;
830838

831839
// TODO gc old layers here
832840

lib/src/deploy.rs

Lines changed: 83 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,16 @@ pub(crate) struct ImageState {
4545
pub(crate) ostree_commit: String,
4646
}
4747

48+
/// Download information
49+
#[derive(Debug,serde::Serialize)]
50+
pub struct JsonProgress {
51+
pub done_bytes: u64,
52+
pub download_bytes: u64,
53+
pub image_bytes: u64,
54+
pub n_layers: usize,
55+
pub n_layers_done: usize,
56+
}
57+
4858
impl<'a> RequiredHostSpec<'a> {
4959
/// Given a (borrowed) host specification, "unwrap" its internal
5060
/// options, giving a spec that is required to have a base container image.
@@ -233,13 +243,73 @@ async fn handle_layer_progress_print(
233243
}
234244
}
235245

246+
/// Write container fetch progress to standard output.
247+
async fn handle_layer_progress_print_jsonl(
248+
mut layers: tokio::sync::mpsc::Receiver<ostree_container::store::ImportProgress>,
249+
mut layer_bytes: tokio::sync::watch::Receiver<Option<ostree_container::store::LayerProgress>>,
250+
n_layers_to_fetch: usize,
251+
download_bytes: u64,
252+
image_bytes: u64,
253+
) {
254+
let mut total_read = 0u64;
255+
let mut layers_done: usize = 0;
256+
let mut last_json_written = std::time::Instant::now();
257+
loop {
258+
tokio::select! {
259+
// Always handle layer changes first.
260+
biased;
261+
layer = layers.recv() => {
262+
if let Some(l) = layer {
263+
if !l.is_starting() {
264+
let layer = descriptor_of_progress(&l);
265+
layers_done += 1;
266+
total_read += total_read.saturating_add(layer.size());
267+
}
268+
} else {
269+
// If the receiver is disconnected, then we're done
270+
break
271+
};
272+
},
273+
r = layer_bytes.changed() => {
274+
if r.is_err() {
275+
// If the receiver is disconnected, then we're done
276+
break
277+
}
278+
let bytes = layer_bytes.borrow();
279+
if let Some(bytes) = &*bytes {
280+
let done_bytes = total_read + bytes.fetched;
281+
282+
// Lets update the json output only on bytes fetched
283+
// They are common enough, anyhow. Debounce on time.
284+
let curr = std::time::Instant::now();
285+
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
286+
let json = JsonProgress {
287+
done_bytes,
288+
download_bytes,
289+
image_bytes,
290+
n_layers: n_layers_to_fetch,
291+
n_layers_done: layers_done,
292+
};
293+
let json = serde_json::to_string(&json).unwrap();
294+
// Write to stderr so that consumer can filter this
295+
eprintln!("{}", json);
296+
last_json_written = curr;
297+
}
298+
}
299+
}
300+
}
301+
}
302+
}
303+
304+
236305
/// Wrapper for pulling a container image, wiring up status output.
237306
#[context("Pulling")]
238307
pub(crate) async fn pull(
239308
repo: &ostree::Repo,
240309
imgref: &ImageReference,
241310
target_imgref: Option<&OstreeImageReference>,
242311
quiet: bool,
312+
json: bool,
243313
) -> Result<Box<ImageState>> {
244314
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
245315
let mut imp = new_importer(repo, ostree_imgref).await?;
@@ -261,14 +331,22 @@ pub(crate) async fn pull(
261331
let layers_to_fetch = prep.layers_to_fetch().collect::<Result<Vec<_>>>()?;
262332
let n_layers_to_fetch = layers_to_fetch.len();
263333
let download_bytes: u64 = layers_to_fetch.iter().map(|(l, _)| l.layer.size()).sum();
334+
let image_bytes: u64 = prep.all_layers().map(|l| l.layer.size()).sum();
264335

265-
let printer = (!quiet).then(|| {
336+
let printer = (!quiet || json).then(|| {
266337
let layer_progress = imp.request_progress();
267338
let layer_byte_progress = imp.request_layer_progress();
268-
tokio::task::spawn(async move {
269-
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes)
270-
.await
271-
})
339+
if json {
340+
tokio::task::spawn(async move {
341+
handle_layer_progress_print_jsonl(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes, image_bytes)
342+
.await
343+
})
344+
} else {
345+
tokio::task::spawn(async move {
346+
handle_layer_progress_print(layer_progress, layer_byte_progress, n_layers_to_fetch, download_bytes)
347+
.await
348+
})
349+
}
272350
});
273351
let import = imp.import(prep).await;
274352
if let Some(printer) = printer {

lib/src/install.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -744,7 +744,7 @@ async fn install_container(
744744
let spec_imgref = ImageReference::from(src_imageref.clone());
745745
let repo = &sysroot.repo();
746746
repo.set_disable_fsync(true);
747-
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false).await?;
747+
let r = crate::deploy::pull(repo, &spec_imgref, Some(&state.target_imgref), false, false).await?;
748748
repo.set_disable_fsync(false);
749749
r
750750
};

0 commit comments

Comments
 (0)