Skip to content

Commit

Permalink
Merge pull request #523 from messense/signal
Browse files Browse the repository at this point in the history
Invalidate directory cache on SIGHUP
  • Loading branch information
messense authored Jul 21, 2022
2 parents 37b9b70 + b29c644 commit b804c5f
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 13 deletions.
30 changes: 22 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ rustls-pemfile = { version = "1.0.0", optional = true }
tls-listener = { version = "0.5.1", features = ["hyper-h1", "hyper-h2", "rt"], optional = true }
tokio-rustls = { version = "0.23.2", optional = true }

# Unix signal support
[target.'cfg(unix)'.dependencies]
signal-hook = "0.3.14"
signal-hook-tokio = { version = "0.3.1", features = ["futures-v0_3"] }

[features]
default = ["rustls-tls", "atomic64"]
rustls-tls = ["reqwest/rustls-tls", "rustls-pemfile", "tls-listener/rustls", "hyper/stream", "tokio-rustls", "self_update/rustls"]
Expand Down
37 changes: 36 additions & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,17 @@ use std::{env, io};
use anyhow::Context as _;
use clap::{Parser, Subcommand};
use dav_server::{body::Body, memls::MemLs, DavConfig, DavHandler};
#[cfg(any(unix, feature = "rustls-tls"))]
use futures_util::stream::StreamExt;
use headers::{authorization::Basic, Authorization, HeaderMapExt};
use hyper::{service::Service, Request, Response};
use self_update::cargo_crate_version;
use tracing::{debug, error, info, warn};
#[cfg(unix)]
use {signal_hook::consts::signal::*, signal_hook_tokio::Signals};

#[cfg(feature = "rustls-tls")]
use {
futures_util::stream::StreamExt,
hyper::server::accept,
hyper::server::conn::AddrIncoming,
std::fs::File,
Expand All @@ -27,6 +30,7 @@ use {
tokio_rustls::TlsAcceptor,
};

use cache::Cache;
use drive::{parse_refresh_token, read_refresh_token, AliyunDrive, ClientType, DriveConfig};
use vfs::AliyunDriveFileSystem;

Expand Down Expand Up @@ -265,6 +269,9 @@ async fn main() -> anyhow::Result<()> {
)?;
debug!("aliyundrive file system initialized");

#[cfg(unix)]
let dir_cache = fs.dir_cache.clone();

let mut dav_server_builder = DavHandler::builder()
.filesystem(Box::new(fs))
.locksystem(MemLs::new())
Expand Down Expand Up @@ -318,10 +325,38 @@ async fn main() -> anyhow::Result<()> {
handler: dav_server,
});
info!("listening on http://{}", server.local_addr());

#[cfg(not(unix))]
let _ = server.await.map_err(|e| error!("server error: {}", e));

#[cfg(unix)]
{
let signals = Signals::new(&[SIGHUP])?;
let handle = signals.handle();
let signals_task = tokio::spawn(handle_signals(signals, dir_cache));

let _ = server.await.map_err(|e| error!("server error: {}", e));

// Terminate the signal stream.
handle.close();
signals_task.await?;
}
Ok(())
}

#[cfg(unix)]
async fn handle_signals(mut signals: Signals, dir_cache: Cache) {
while let Some(signal) = signals.next().await {
match signal {
SIGHUP => {
dir_cache.invalidate_all();
info!("directory cache invalidated by SIGHUP");
}
_ => unreachable!(),
}
}
}

#[derive(Clone)]
struct AliyunDriveWebDav {
auth_user: Option<String>,
Expand Down
12 changes: 8 additions & 4 deletions src/vfs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::{
#[derive(Clone)]
pub struct AliyunDriveFileSystem {
drive: AliyunDrive,
dir_cache: Cache,
pub(crate) dir_cache: Cache,
uploading: Arc<DashMap<String, Vec<AliyunFile>>>,
root: PathBuf,
no_trash: bool,
Expand Down Expand Up @@ -124,7 +124,6 @@ impl AliyunDriveFileSystem {

async fn read_dir_and_cache(&self, path: PathBuf) -> Result<Vec<AliyunFile>, FsError> {
let path_str = path.to_slash_lossy();
debug!(path = %path_str, "read_dir and cache");
let parent_file_id = if path_str == "/" {
"root".to_string()
} else {
Expand All @@ -141,16 +140,21 @@ impl AliyunDriveFileSystem {
}
};
let mut files = if let Some(files) = self.dir_cache.get(&path_str) {
debug!(path = %path_str, "read_dir cache hit");
files
} else {
let res = self
.list_files_and_cache(path_str.to_string(), parent_file_id.clone())
.await;
match res {
Ok(files) => files,
Ok(files) => {
debug!(path = %path_str, "read_dir cache miss");
files
}
Err(err) => {
if let Some(req_err) = err.downcast_ref::<reqwest::Error>() {
if matches!(req_err.status(), Some(reqwest::StatusCode::NOT_FOUND)) {
debug!(path = %path_str, "read_dir not found");
return Err(FsError::NotFound);
} else {
error!(path = %path_str, error = %err, "list_files_and_cache failed");
Expand All @@ -166,8 +170,8 @@ impl AliyunDriveFileSystem {
let uploading_files = self.list_uploading_files(&parent_file_id);
if !uploading_files.is_empty() {
debug!("added {} uploading files", uploading_files.len());
files.extend(uploading_files);
}
files.extend(uploading_files);
Ok(files)
}

Expand Down

0 comments on commit b804c5f

Please sign in to comment.