Skip to content

Commit 6c7a2cd

Browse files
committed
watched_tasks: also handle threads
Execution should stop if any of the important threads ends, not just one of the tasks. Signed-off-by: Leonard Göhrs <[email protected]>
1 parent cf7bd77 commit 6c7a2cd

File tree

13 files changed

+275
-72
lines changed

13 files changed

+275
-72
lines changed

src/adc.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,8 @@ pub struct Adc {
7979

8080
impl Adc {
8181
pub async fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
82-
let stm32_thread = IioThread::new_stm32().await?;
83-
let powerboard_thread = IioThread::new_powerboard().await?;
82+
let stm32_thread = IioThread::new_stm32(wtb).await?;
83+
let powerboard_thread = IioThread::new_powerboard(wtb).await?;
8484

8585
let adc = Self {
8686
usb_host_curr: AdcChannel {

src/adc/iio/demo_mode.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ pub struct IioThread {
160160
}
161161

162162
impl IioThread {
163-
pub async fn new_stm32() -> Result<Arc<Self>> {
163+
pub async fn new_stm32<W>(_wtb: &W) -> Result<Arc<Self>> {
164164
let mut demo_magic = block_on(DEMO_MAGIC_STM32.lock());
165165

166166
// Only ever set up a single demo_mode "IioThread" per ADC
@@ -195,7 +195,7 @@ impl IioThread {
195195
Ok(this)
196196
}
197197

198-
pub async fn new_powerboard() -> Result<Arc<Self>> {
198+
pub async fn new_powerboard<W>(_wtb: &W) -> Result<Arc<Self>> {
199199
let mut demo_magic = block_on(DEMO_MAGIC_POWERBOARD.lock());
200200

201201
// Only ever set up a single demo_mode "IioThread" per ADC

src/adc/iio/hardware.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,6 @@ use std::fs::create_dir;
2020
use std::io::Read;
2121
use std::path::Path;
2222
use std::sync::atomic::{AtomicU16, AtomicU64, Ordering};
23-
use std::sync::Mutex;
24-
use std::thread;
25-
use std::thread::JoinHandle;
2623
use std::time::{Duration, Instant};
2724

2825
use anyhow::{anyhow, Context, Error, Result};
@@ -35,6 +32,7 @@ use log::{debug, error, warn};
3532
use thread_priority::*;
3633

3734
use crate::measurement::{Measurement, Timestamp};
35+
use crate::watched_tasks::WatchedTasksBuilder;
3836

3937
struct ChannelDesc {
4038
kernel_name: &'static str,
@@ -255,7 +253,6 @@ pub struct IioThread {
255253
ref_instant: Instant,
256254
timestamp: AtomicU64,
257255
values: Vec<AtomicU16>,
258-
join: Mutex<Option<JoinHandle<()>>>,
259256
channel_descs: &'static [ChannelDesc],
260257
}
261258

@@ -325,7 +322,8 @@ impl IioThread {
325322
}
326323

327324
async fn new(
328-
thread_name: &str,
325+
wtb: &mut WatchedTasksBuilder,
326+
thread_name: &'static str,
329327
adc_name: &'static str,
330328
trigger_name: &'static str,
331329
sample_rate: i64,
@@ -342,9 +340,8 @@ impl IioThread {
342340
let (thread_res_tx, thread_res_rx) = bounded(1);
343341

344342
// Spawn a high priority thread that updates the atomic values in `thread`.
345-
let join = thread::Builder::new()
346-
.name(format!("tacd {thread_name} iio"))
347-
.spawn(move || {
343+
wtb.spawn_thread(thread_name, move || {
344+
{
348345
let adc_setup_res = Self::adc_setup(
349346
adc_name,
350347
trigger_name,
@@ -358,17 +355,15 @@ impl IioThread {
358355
ref_instant: Instant::now(),
359356
timestamp: AtomicU64::new(TIMESTAMP_ERROR),
360357
values: channels.iter().map(|_| AtomicU16::new(0)).collect(),
361-
join: Mutex::new(None),
362358
channel_descs,
363359
});
364-
365360
(thread, channels, buf)
366361
}
367362
Err(e) => {
368363
// Can not fail in practice as the queue is known to be empty
369364
// at this point.
370365
thread_res_tx.try_send(Err(e)).unwrap();
371-
return;
366+
return Ok(());
372367
}
373368
};
374369

@@ -423,21 +418,20 @@ impl IioThread {
423418
tx.try_send(Ok(content)).unwrap();
424419
}
425420
}
426-
})?;
421+
};
427422

428-
let thread = thread_res_rx.recv().await??;
423+
Ok(())
424+
})?;
429425

430-
// Locking the Mutex could only fail if the Mutex was poisoned by
431-
// a thread that held the lock and panicked.
432-
// At this point the Mutex has not yet been locked in another thread.
433-
*thread.join.lock().unwrap() = Some(join);
426+
let thread = thread_res_rx.recv().await??;
434427

435428
Ok(thread)
436429
}
437430

438-
pub async fn new_stm32() -> Result<Arc<Self>> {
431+
pub async fn new_stm32(wtb: &mut WatchedTasksBuilder) -> Result<Arc<Self>> {
439432
Self::new(
440-
"stm32",
433+
wtb,
434+
"adc-stm32",
441435
"48003000.adc:adc@0",
442436
"tim4_trgo",
443437
80,
@@ -447,14 +441,23 @@ impl IioThread {
447441
.await
448442
}
449443

450-
pub async fn new_powerboard() -> Result<Arc<Self>> {
444+
pub async fn new_powerboard(wtb: &mut WatchedTasksBuilder) -> Result<Arc<Self>> {
451445
let hr_trigger_path = Path::new(TRIGGER_HR_PWR_DIR);
452446

453447
if !hr_trigger_path.is_dir() {
454448
create_dir(hr_trigger_path)?;
455449
}
456450

457-
Self::new("powerboard", "lmp92064", "tacd-pwr", 20, CHANNELS_PWR, 1).await
451+
Self::new(
452+
wtb,
453+
"adc-powerboard",
454+
"lmp92064",
455+
"tacd-pwr",
456+
20,
457+
CHANNELS_PWR,
458+
1,
459+
)
460+
.await
458461
}
459462

460463
/// Use the channel names defined at the top of the file to get a reference

src/adc/iio/test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ pub struct IioThread {
107107
}
108108

109109
impl IioThread {
110-
pub async fn new_stm32() -> Result<Arc<Self>> {
110+
pub async fn new_stm32<W>(_wtb: &W) -> Result<Arc<Self>> {
111111
let mut channels = Vec::new();
112112

113113
for name in CHANNELS_STM32 {
@@ -117,7 +117,7 @@ impl IioThread {
117117
Ok(Arc::new(Self { channels }))
118118
}
119119

120-
pub async fn new_powerboard() -> Result<Arc<Self>> {
120+
pub async fn new_powerboard<W>(_wtb: &W) -> Result<Arc<Self>> {
121121
let mut channels = Vec::new();
122122

123123
for name in CHANNELS_PWR {

src/digital_io/gpio/demo_mode.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ impl LineHandle {
3232
// It is just a hack to let adc/iio/demo_mode.rs
3333
// communicate with this function so that toggling an output
3434
// has an effect on the measured values.
35-
let iio_thread_stm32 = block_on(IioThread::new_stm32()).unwrap();
36-
let iio_thread_pwr = block_on(IioThread::new_powerboard()).unwrap();
35+
let iio_thread_stm32 = block_on(IioThread::new_stm32(&())).unwrap();
36+
let iio_thread_pwr = block_on(IioThread::new_powerboard(&())).unwrap();
3737

3838
match self.name.as_str() {
3939
"OUT_0" => iio_thread_stm32

src/dut_power.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -320,9 +320,8 @@ impl DutPwrThread {
320320

321321
// Spawn a high priority thread that handles the power status
322322
// in a realtimey fashion.
323-
thread::Builder::new()
324-
.name("tacd power".into())
325-
.spawn(move || {
323+
wtb.spawn_thread("power-thread", move || {
324+
{
326325
let mut last_ts: Option<Instant> = None;
327326

328327
// There may be transients in the measured voltage/current, e.g. due to EMI or
@@ -482,7 +481,10 @@ impl DutPwrThread {
482481

483482
// Make sure to enter fail safe mode before leaving the thread
484483
turn_off_with_reason(OutputState::Off, &pwr_line, &discharge_line, &state);
485-
})?;
484+
};
485+
486+
Ok(())
487+
})?;
486488

487489
let (tick, request, state) = thread_res_rx.next().await.unwrap()?;
488490

src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ async fn init() -> Result<(Ui, HttpServer, WatchedTasksBuilder)> {
8282
.await?;
8383
let dig_io = DigitalIo::new(&mut bb, &mut wtb, led.out_0.clone(), led.out_1.clone());
8484
let regulators = Regulators::new(&mut bb, &mut wtb);
85-
let temperatures = Temperatures::new(&mut bb);
85+
let temperatures = Temperatures::new(&mut bb, &mut wtb)?;
8686
let usb_hub = UsbHub::new(
8787
&mut bb,
8888
&mut wtb,

src/regulators.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ mod reg {
3131

3232
pub fn regulator_set(name: &str, state: bool) -> Result<()> {
3333
if name == "output_iobus_12v" {
34-
let iio_thread = block_on(IioThread::new_stm32()).unwrap();
34+
let iio_thread = block_on(IioThread::new_stm32(&())).unwrap();
3535

3636
iio_thread
3737
.clone()

src/temperatures.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,34 +19,37 @@ use std::sync::atomic::{AtomicBool, Ordering};
1919
use std::thread::sleep;
2020
use std::time::Duration;
2121

22+
use anyhow::Result;
2223
use async_std::sync::Arc;
23-
use async_std::task::spawn_blocking;
2424
use serde::{Deserialize, Serialize};
2525

2626
use crate::broker::{BrokerBuilder, Topic};
2727
use crate::measurement::Measurement;
28+
use crate::watched_tasks::WatchedTasksBuilder;
2829

2930
#[cfg(feature = "demo_mode")]
3031
mod hw {
32+
use anyhow::Result;
33+
3134
pub(super) trait SysClass {
32-
fn input(&self) -> Result<u32, ()>;
35+
fn input(&self) -> Result<u32>;
3336
}
3437

3538
pub(super) struct HwMon;
3639
pub(super) struct TempDecoy;
3740

3841
impl SysClass for TempDecoy {
39-
fn input(&self) -> Result<u32, ()> {
42+
fn input(&self) -> Result<u32> {
4043
Ok(30_000)
4144
}
4245
}
4346

4447
impl HwMon {
45-
pub(super) fn new(_: &'static str) -> Result<Self, ()> {
48+
pub(super) fn new(_: &'static str) -> Result<Self> {
4649
Ok(Self)
4750
}
4851

49-
pub(super) fn temp(&self, _: u64) -> Result<TempDecoy, ()> {
52+
pub(super) fn temp(&self, _: u64) -> Result<TempDecoy> {
5053
Ok(TempDecoy)
5154
}
5255
}
@@ -89,7 +92,7 @@ pub struct Temperatures {
8992
}
9093

9194
impl Temperatures {
92-
pub fn new(bb: &mut BrokerBuilder) -> Self {
95+
pub fn new(bb: &mut BrokerBuilder, wtb: &mut WatchedTasksBuilder) -> Result<Self> {
9396
let run = Arc::new(AtomicBool::new(true));
9497
let soc_temperature = bb.topic_ro("/v1/tac/temperatures/soc", None);
9598
let warning = bb.topic_ro("/v1/tac/temperatures/warning", None);
@@ -98,14 +101,9 @@ impl Temperatures {
98101
let soc_temperature_thread = soc_temperature.clone();
99102
let warning_thread = warning.clone();
100103

101-
spawn_blocking(move || {
104+
wtb.spawn_thread("temperature-update", move || {
102105
while run_thread.load(Ordering::Relaxed) {
103-
let val = HwMon::new("hwmon0")
104-
.unwrap()
105-
.temp(1)
106-
.unwrap()
107-
.input()
108-
.unwrap();
106+
let val = HwMon::new("hwmon0")?.temp(1)?.input()?;
109107

110108
let val = val as f32 / 1000.0;
111109

@@ -120,13 +118,15 @@ impl Temperatures {
120118

121119
sleep(UPDATE_INTERVAL);
122120
}
123-
});
124121

125-
Self {
122+
Ok(())
123+
})?;
124+
125+
Ok(Self {
126126
soc_temperature,
127127
warning,
128128
run: Some(run),
129-
}
129+
})
130130
}
131131
}
132132

src/ui.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,10 @@ impl Ui {
141141
let screens = screens::init(wtb, &res, &alerts, &buttons, &reboot_message, &locator);
142142

143143
handle_buttons(
144+
wtb,
144145
"/dev/input/by-path/platform-gpio-keys-event",
145146
buttons.clone(),
146-
);
147+
)?;
147148

148149
// Blink the status LED when locator is active
149150
let led_status_pattern = res.led.status.clone();

src/ui/buttons.rs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,13 @@
1717

1818
use std::time::Duration;
1919

20+
use anyhow::Result;
2021
use async_std::sync::Arc;
21-
use async_std::task::{block_on, sleep, spawn, spawn_blocking, JoinHandle};
22+
use async_std::task::{block_on, sleep, spawn, JoinHandle};
2223
use serde::{Deserialize, Serialize};
2324

2425
use crate::broker::Topic;
26+
use crate::watched_tasks::WatchedTasksBuilder;
2527

2628
pub const LONG_PRESS: Duration = Duration::from_millis(500);
2729

@@ -133,8 +135,12 @@ impl ButtonEvent {
133135

134136
/// Spawn a thread that blockingly reads user input and pushes them into
135137
/// a broker framework topic.
136-
pub fn handle_buttons(path: &'static str, topic: Arc<Topic<ButtonEvent>>) {
137-
spawn_blocking(move || {
138+
pub fn handle_buttons(
139+
wtb: &mut WatchedTasksBuilder,
140+
path: &'static str,
141+
topic: Arc<Topic<ButtonEvent>>,
142+
) -> Result<()> {
143+
wtb.spawn_thread("button-input-thread", move || {
138144
let mut device = Device::open(path).unwrap();
139145
let mut press_task: [Option<JoinHandle<()>>; 2] = [None, None];
140146
let mut start_time = [None, None];
@@ -179,5 +185,7 @@ pub fn handle_buttons(path: &'static str, topic: Arc<Topic<ButtonEvent>>) {
179185
}
180186
}
181187
}
182-
});
188+
})?;
189+
190+
Ok(())
183191
}

src/usb_hub.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ mod rw {
9494

9595
for (path_tail, iio_channel) in DISABLE_CHANNELS {
9696
if path.ends_with(path_tail) {
97-
let iio_thread = block_on(IioThread::new_stm32()).unwrap();
97+
let iio_thread = block_on(IioThread::new_stm32(&())).unwrap();
9898

9999
iio_thread
100100
.get_channel(iio_channel)

0 commit comments

Comments
 (0)