Skip to content

Commit 3242366

Browse files
committed
Add a little progress_jsonl module
- Use `if let Some()` to deconstruct vs `is_some()` and `unwrap()` - Use RawFd over i32 for clarity - We don't need the Arc<Mutex<>> - Use a `BufWriter` (forgetting this is a big performance footgun in Rust) - Add a unit test of the basic code Signed-off-by: Colin Walters <[email protected]>
1 parent 8d2a95e commit 3242366

File tree

4 files changed

+106
-29
lines changed

4 files changed

+106
-29
lines changed

lib/src/cli.rs

Lines changed: 6 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,9 @@
55
use std::ffi::CString;
66
use std::ffi::OsString;
77
use std::io::Seek;
8-
use std::os::unix::io::FromRawFd;
8+
use std::os::fd::RawFd;
99
use std::os::unix::process::CommandExt;
1010
use std::process::Command;
11-
use std::sync::{Arc, Mutex};
1211

1312
use anyhow::{Context, Result};
1413
use camino::Utf8PathBuf;
@@ -27,6 +26,7 @@ use serde::{Deserialize, Serialize};
2726

2827
use crate::deploy::RequiredHostSpec;
2928
use crate::lints;
29+
use crate::progress_jsonl;
3030
use crate::spec::Host;
3131
use crate::spec::ImageReference;
3232
use crate::utils::sigpolicy_from_opts;
@@ -57,7 +57,7 @@ pub(crate) struct UpgradeOpts {
5757

5858
/// Pipe download progress to this fd in a jsonl format.
5959
#[clap(long)]
60-
pub(crate) json_fd: Option<i32>,
60+
pub(crate) json_fd: Option<RawFd>,
6161
}
6262

6363
/// Perform an switch operation
@@ -110,7 +110,7 @@ pub(crate) struct SwitchOpts {
110110

111111
/// Pipe download progress to this fd in a jsonl format.
112112
#[clap(long)]
113-
pub(crate) json_fd: Option<i32>,
113+
pub(crate) json_fd: Option<RawFd>,
114114
}
115115

116116
/// Options controlling rollback
@@ -624,7 +624,7 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
624624
let (booted_deployment, _deployments, host) =
625625
crate::status::get_status_require_booted(sysroot)?;
626626
let imgref = host.spec.image.as_ref();
627-
let jsonw = unwrap_fd(opts.json_fd);
627+
let jsonw = opts.json_fd.map(progress_jsonl::JsonlWriter::from_raw_fd);
628628

629629
// If there's no specified image, let's be nice and check if the booted system is using rpm-ostree
630630
if imgref.is_none() {
@@ -727,19 +727,6 @@ async fn upgrade(opts: UpgradeOpts) -> Result<()> {
727727
Ok(())
728728
}
729729

730-
#[allow(unsafe_code)]
731-
fn unwrap_fd(fd: Option<i32>) -> Option<Arc<Mutex<dyn std::io::Write + Send>>> {
732-
unsafe {
733-
if !fd.is_none() {
734-
return Some(Arc::new(Mutex::new(std::fs::File::from_raw_fd(
735-
fd.unwrap(),
736-
))));
737-
} else {
738-
return None;
739-
};
740-
}
741-
}
742-
743730
/// Implementation of the `bootc switch` CLI command.
744731
#[context("Switching")]
745732
async fn switch(opts: SwitchOpts) -> Result<()> {
@@ -754,7 +741,7 @@ async fn switch(opts: SwitchOpts) -> Result<()> {
754741
);
755742
let target = ostree_container::OstreeImageReference { sigverify, imgref };
756743
let target = ImageReference::from(target);
757-
let jsonw = unwrap_fd(opts.json_fd);
744+
let jsonw = opts.json_fd.map(progress_jsonl::JsonlWriter::from_raw_fd);
758745

759746
// If we're doing an in-place mutation, we shortcut most of the rest of the work here
760747
if opts.mutate_in_place {

lib/src/deploy.rs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
55
use std::collections::HashSet;
66
use std::io::{BufRead, Write};
7-
use std::sync::{Arc, Mutex};
87

98
use anyhow::Ok;
109
use anyhow::{anyhow, Context, Result};
@@ -22,6 +21,7 @@ use ostree_ext::ostree::{self, Sysroot};
2221
use ostree_ext::sysroot::SysrootLock;
2322
use ostree_ext::tokio_util::spawn_blocking_cancellable_flatten;
2423

24+
use crate::progress_jsonl::JsonlWriter;
2525
use crate::spec::ImageReference;
2626
use crate::spec::{BootOrder, HostSpec};
2727
use crate::status::labels_of_config;
@@ -250,7 +250,7 @@ async fn handle_layer_progress_print_jsonl(
250250
n_layers_to_fetch: usize,
251251
download_bytes: u64,
252252
image_bytes: u64,
253-
jsonw: Arc<Mutex<dyn std::io::Write + Send>>,
253+
mut jsonw: JsonlWriter,
254254
) {
255255
let mut total_read = 0u64;
256256
let mut layers_done: usize = 0;
@@ -284,18 +284,17 @@ async fn handle_layer_progress_print_jsonl(
284284
// They are common enough, anyhow. Debounce on time.
285285
let curr = std::time::Instant::now();
286286
if curr.duration_since(last_json_written).as_secs_f64() > 0.2 {
287-
let json = JsonProgress {
287+
let progress = JsonProgress {
288288
stage: "fetching".to_string(),
289289
done_bytes,
290290
download_bytes,
291291
image_bytes,
292292
n_layers: n_layers_to_fetch,
293293
n_layers_done: layers_done,
294294
};
295-
let json = serde_json::to_string(&json).unwrap();
296-
if let Err(e) = writeln!(jsonw.clone().lock().unwrap(), "{}", json) {
297-
eprintln!("Failed to write JSON progress: {}", e);
298-
break;
295+
if let Err(e) = jsonw.send(&progress) {
296+
tracing::debug!("failed to send progress: {e}");
297+
break
299298
}
300299
last_json_written = curr;
301300
}
@@ -312,7 +311,7 @@ pub(crate) async fn pull(
312311
imgref: &ImageReference,
313312
target_imgref: Option<&OstreeImageReference>,
314313
quiet: bool,
315-
jsonw: Option<Arc<Mutex<dyn std::io::Write + Send>>>,
314+
jsonw: Option<crate::progress_jsonl::JsonlWriter>,
316315
) -> Result<Box<ImageState>> {
317316
let ostree_imgref = &OstreeImageReference::from(imgref.clone());
318317
let mut imp = new_importer(repo, ostree_imgref).await?;
@@ -339,15 +338,15 @@ pub(crate) async fn pull(
339338
let printer = (!quiet || jsonw.is_some()).then(|| {
340339
let layer_progress = imp.request_progress();
341340
let layer_byte_progress = imp.request_layer_progress();
342-
if jsonw.is_some() {
341+
if let Some(jsonw) = jsonw {
343342
tokio::task::spawn(async move {
344343
handle_layer_progress_print_jsonl(
345344
layer_progress,
346345
layer_byte_progress,
347346
n_layers_to_fetch,
348347
download_bytes,
349348
image_bytes,
350-
jsonw.unwrap(),
349+
jsonw,
351350
)
352351
.await
353352
})

lib/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ pub mod spec;
4141
mod docgen;
4242
mod glyph;
4343
mod imgstorage;
44+
mod progress_jsonl;

lib/src/progress_jsonl.rs

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
//! Output progress data using the json-lines format. For more information
2+
//! see <https://jsonlines.org/>.
3+
4+
use std::fs;
5+
use std::io::{BufWriter, Write};
6+
use std::os::fd::{FromRawFd, RawFd};
7+
8+
use anyhow::Result;
9+
use serde::Serialize;
10+
11+
pub(crate) struct JsonlWriter {
12+
fd: BufWriter<fs::File>,
13+
}
14+
15+
impl From<fs::File> for JsonlWriter {
16+
fn from(value: fs::File) -> Self {
17+
Self {
18+
fd: BufWriter::new(value),
19+
}
20+
}
21+
}
22+
23+
impl JsonlWriter {
24+
/// Given a raw file descriptor, create an instance of a json-lines writer.
25+
#[allow(unsafe_code)]
26+
pub(crate) fn from_raw_fd(fd: RawFd) -> Self {
27+
unsafe { fs::File::from_raw_fd(fd) }.into()
28+
}
29+
30+
/// Serialize the target object to JSON as a single line
31+
pub(crate) fn send<T: Serialize>(&mut self, v: T) -> Result<()> {
32+
// serde is guaranteed not to output newlines here
33+
serde_json::to_writer(&mut self.fd, &v)?;
34+
// We always end in a newline
35+
self.fd.write_all(b"\n")?;
36+
// And flush to ensure the remote side sees updates immediately
37+
self.fd.flush()?;
38+
Ok(())
39+
}
40+
41+
/// Flush remaining data and return the underlying file.
42+
#[allow(dead_code)]
43+
pub(crate) fn into_inner(self) -> Result<fs::File> {
44+
self.fd.into_inner().map_err(Into::into)
45+
}
46+
}
47+
48+
#[cfg(test)]
49+
mod test {
50+
use std::io::{BufRead, BufReader, Seek};
51+
52+
use serde::Deserialize;
53+
54+
use super::*;
55+
56+
#[derive(Serialize, Deserialize, PartialEq, Eq, Debug)]
57+
struct S {
58+
s: String,
59+
v: u32,
60+
}
61+
62+
#[test]
63+
fn test_jsonl() -> Result<()> {
64+
let tf = tempfile::tempfile()?;
65+
let mut w = JsonlWriter::from(tf);
66+
let testvalues = [
67+
S {
68+
s: "foo".into(),
69+
v: 42,
70+
},
71+
S {
72+
// Test with an embedded newline to sanity check that serde doesn't write it literally
73+
s: "foo\nbar".into(),
74+
v: 0,
75+
},
76+
];
77+
for value in &testvalues {
78+
w.send(value).unwrap();
79+
}
80+
let mut tf = w.into_inner().unwrap();
81+
tf.seek(std::io::SeekFrom::Start(0))?;
82+
let tf = BufReader::new(tf);
83+
for (line, expected) in tf.lines().zip(testvalues.iter()) {
84+
let line = line?;
85+
let found: S = serde_json::from_str(&line)?;
86+
assert_eq!(&found, expected);
87+
}
88+
Ok(())
89+
}
90+
}

0 commit comments

Comments
 (0)