diff --git a/database/src/lib.rs b/database/src/lib.rs index 7797edc0d..90dfb87ac 100644 --- a/database/src/lib.rs +++ b/database/src/lib.rs @@ -798,3 +798,175 @@ pub struct ArtifactCollection { pub duration: Duration, pub end_time: DateTime, } + +#[derive(Debug)] +pub enum BenchmarkRequestStatus { + WaitingForArtifacts, + WaitingForParent, + InProgress, + Completed, +} + +impl fmt::Display for BenchmarkRequestStatus { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BenchmarkRequestStatus::WaitingForArtifacts => write!(f, "waiting_for_artifacts"), + BenchmarkRequestStatus::WaitingForParent => write!(f, "waiting_for_parent"), + BenchmarkRequestStatus::InProgress => write!(f, "in_progress"), + BenchmarkRequestStatus::Completed => write!(f, "completed"), + } + } +} + +#[derive(Debug)] +pub enum BenchmarkRequestType { + /// A Try commit + Try { + sha: String, + parent_sha: String, + pr: u32, + }, + /// A Master commit + Master { + sha: String, + parent_sha: String, + pr: u32, + }, + /// A release only has a tag + Release { tag: String }, +} + +impl BenchmarkRequestType { + pub fn commit_type_str(&self) -> &str { + match self { + BenchmarkRequestType::Try { + sha: _, + parent_sha: _, + pr: _, + } => "try", + BenchmarkRequestType::Master { + sha: _, + parent_sha: _, + pr: _, + } => "master", + BenchmarkRequestType::Release { tag: _ } => "release", + } + } +} + +impl fmt::Display for BenchmarkRequestType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + BenchmarkRequestType::Try { .. } => write!(f, "try"), + BenchmarkRequestType::Master { .. } => write!(f, "master"), + BenchmarkRequestType::Release { tag: _ } => write!(f, "release"), + } + } +} + +#[derive(Debug)] +pub struct BenchmarkRequest { + pub commit_type: BenchmarkRequestType, + pub created_at: DateTime, + pub completed_at: Option>, + pub status: BenchmarkRequestStatus, + pub backends: String, + pub profiles: String, +} + +impl BenchmarkRequest { + pub fn create_release( + tag: &str, + created_at: DateTime, + status: BenchmarkRequestStatus, + backends: &str, + profiles: &str, + ) -> Self { + Self { + commit_type: BenchmarkRequestType::Release { + tag: tag.to_string(), + }, + created_at, + completed_at: None, + status, + backends: backends.to_string(), + profiles: profiles.to_string(), + } + } + + pub fn create_try( + sha: &str, + parent_sha: &str, + pr: u32, + created_at: DateTime, + status: BenchmarkRequestStatus, + backends: &str, + profiles: &str, + ) -> Self { + Self { + commit_type: BenchmarkRequestType::Try { + pr, + sha: sha.to_string(), + parent_sha: parent_sha.to_string(), + }, + created_at, + completed_at: None, + status, + backends: backends.to_string(), + profiles: profiles.to_string(), + } + } + + pub fn create_master( + sha: &str, + parent_sha: &str, + pr: u32, + created_at: DateTime, + status: BenchmarkRequestStatus, + backends: &str, + profiles: &str, + ) -> Self { + Self { + commit_type: BenchmarkRequestType::Master { + pr, + sha: sha.to_string(), + parent_sha: parent_sha.to_string(), + }, + created_at, + completed_at: None, + status, + backends: backends.to_string(), + profiles: profiles.to_string(), + } + } + + /// Get either the `sha` for a `try` or `master` commit or a `tag` for a + /// `release` + pub fn tag(&self) -> &str { + match &self.commit_type { + BenchmarkRequestType::Try { sha, .. } | BenchmarkRequestType::Master { sha, .. } => sha, + BenchmarkRequestType::Release { tag } => tag, + } + } + + pub fn pr(&self) -> Option<&u32> { + match &self.commit_type { + BenchmarkRequestType::Try { pr, .. } | BenchmarkRequestType::Master { pr, .. } => { + Some(pr) + } + BenchmarkRequestType::Release { tag: _ } => None, + } + } + + pub fn commit_type(&self) -> &str { + self.commit_type.commit_type_str() + } + + pub fn parent_sha(&self) -> Option<&str> { + match &self.commit_type { + BenchmarkRequestType::Try { parent_sha, .. } + | BenchmarkRequestType::Master { parent_sha, .. } => Some(parent_sha), + BenchmarkRequestType::Release { tag: _ } => None, + } + } +} diff --git a/database/src/pool.rs b/database/src/pool.rs index 71c0855a7..96d1dfc2a 100644 --- a/database/src/pool.rs +++ b/database/src/pool.rs @@ -1,5 +1,6 @@ use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, CodegenBackend, CompileBenchmark, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, BenchmarkRequest, CodegenBackend, + CompileBenchmark, Target, }; use crate::{CollectionId, Index, Profile, QueuedCommit, Scenario, Step}; use chrono::{DateTime, Utc}; @@ -178,6 +179,10 @@ pub trait Connection: Send + Sync { /// Removes all data associated with the given artifact. async fn purge_artifact(&self, aid: &ArtifactId); + + /// Add an item to the `benchmark_requests`, if the `benchmark_request` + /// exists it will be ignored + async fn insert_benchmark_request(&self, benchmark_request: &BenchmarkRequest); } #[async_trait::async_trait] @@ -301,7 +306,10 @@ mod tests { use std::str::FromStr; use super::*; - use crate::{tests::run_db_test, Commit, CommitType, Date}; + use crate::{ + tests::{run_db_test, run_postgres_test}, + BenchmarkRequestStatus, Commit, CommitType, Date, + }; /// Create a Commit fn create_commit(commit_sha: &str, time: chrono::DateTime, r#type: CommitType) -> Commit { @@ -370,4 +378,50 @@ mod tests { }) .await; } + + #[tokio::test] + async fn insert_benchmark_requests() { + run_postgres_test(|ctx| async { + let db = ctx.db_client(); + let time = chrono::DateTime::from_str("2021-09-01T00:00:00.000Z").unwrap(); + let master_benchmark_request = BenchmarkRequest::create_master( + "a-sha-1", + "parent-sha-1", + 42, + time, + BenchmarkRequestStatus::WaitingForParent, + "llvm", + "", + ); + + let try_benchmark_request = BenchmarkRequest::create_try( + "b-sha-2", + "parent-sha-2", + 32, + time, + BenchmarkRequestStatus::WaitingForParent, + "cranelift", + "", + ); + + let release_benchmark_request = BenchmarkRequest::create_release( + "1.8.0", + time, + BenchmarkRequestStatus::WaitingForParent, + "cranelift,llvm", + "", + ); + + let db = db.connection().await; + db.insert_benchmark_request(&master_benchmark_request).await; + db.insert_benchmark_request(&try_benchmark_request).await; + db.insert_benchmark_request(&release_benchmark_request) + .await; + // duplicate insert + db.insert_benchmark_request(&master_benchmark_request).await; + + Ok(ctx) + }) + .await; + } } diff --git a/database/src/pool/postgres.rs b/database/src/pool/postgres.rs index 374b4904f..1da5016c0 100644 --- a/database/src/pool/postgres.rs +++ b/database/src/pool/postgres.rs @@ -1,7 +1,8 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, CodegenBackend, CollectionId, - Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, Scenario, Target, + ArtifactCollection, ArtifactId, ArtifactIdNumber, Benchmark, BenchmarkRequest, CodegenBackend, + CollectionId, Commit, CommitType, CompileBenchmark, Date, Index, Profile, QueuedCommit, + Scenario, Target, }; use anyhow::Context as _; use chrono::{DateTime, TimeZone, Utc}; @@ -285,6 +286,21 @@ static MIGRATIONS: &[&str] = &[ alter table pstat_series drop constraint test_case; alter table pstat_series add constraint test_case UNIQUE(crate, profile, scenario, backend, target, metric); "#, + r#" + CREATE TABLE IF NOT EXISTS benchmark_request ( + id SERIAL PRIMARY KEY, + tag TEXT NOT NULL UNIQUE, + parent_sha TEXT, + commit_type TEXT NOT NULL, + pr INTEGER, + created_at TIMESTAMPTZ NOT NULL, + completed_at TIMESTAMPTZ, + status TEXT NOT NULL, + backends TEXT NOT NULL, + profiles TEXT NOT NULL + ); + CREATE INDEX IF NOT EXISTS benchmark_request_status_idx on benchmark_request (status) WHERE status != 'completed'; + "#, ]; #[async_trait::async_trait] @@ -1365,6 +1381,38 @@ where .await .unwrap(); } + + async fn insert_benchmark_request(&self, benchmark_request: &BenchmarkRequest) { + self.conn() + .execute( + r#" + INSERT INTO benchmark_request( + tag, + parent_sha, + pr, + commit_type, + status, + created_at, + backends, + profiles + ) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8) + ON CONFLICT DO NOTHING; + "#, + &[ + &benchmark_request.tag(), + &benchmark_request.parent_sha(), + &benchmark_request.pr().map(|it| *it as i32), + &benchmark_request.commit_type(), + &benchmark_request.status.to_string(), + &benchmark_request.created_at, + &benchmark_request.backends, + &benchmark_request.profiles, + ], + ) + .await + .unwrap(); + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option>) -> ArtifactId { diff --git a/database/src/pool/sqlite.rs b/database/src/pool/sqlite.rs index 39925072b..9a0614b07 100644 --- a/database/src/pool/sqlite.rs +++ b/database/src/pool/sqlite.rs @@ -1,7 +1,7 @@ use crate::pool::{Connection, ConnectionManager, ManagedConnection, Transaction}; use crate::{ - ArtifactCollection, ArtifactId, Benchmark, CodegenBackend, CollectionId, Commit, CommitType, - CompileBenchmark, Date, Profile, Target, + ArtifactCollection, ArtifactId, Benchmark, BenchmarkRequest, CodegenBackend, CollectionId, + Commit, CommitType, CompileBenchmark, Date, Profile, Target, }; use crate::{ArtifactIdNumber, Index, QueuedCommit}; use chrono::{DateTime, TimeZone, Utc}; @@ -1252,6 +1252,10 @@ impl Connection for SqliteConnection { ) .unwrap(); } + + async fn insert_benchmark_request(&self, _benchmark_request: &BenchmarkRequest) { + panic!("Queueing for SQLite has not been implemented, if you are wanting to test the queueing functionality please use postgres. Presuming you have docker installed, at the root of the repo you can run `make start-postgres` to spin up a postgres database."); + } } fn parse_artifact_id(ty: &str, sha: &str, date: Option) -> ArtifactId { diff --git a/database/src/tests/mod.rs b/database/src/tests/mod.rs index 7f89f77dc..3e9872a9c 100644 --- a/database/src/tests/mod.rs +++ b/database/src/tests/mod.rs @@ -113,9 +113,8 @@ impl TestContext { } } -/// Runs a test against an actual database. -/// Checks both Postgres and SQLite. -pub(crate) async fn run_db_test(f: F) +/// Runs a test against an actual postgres database. +pub(crate) async fn run_postgres_test(f: F) where F: Fn(TestContext) -> Fut, Fut: Future>, @@ -138,7 +137,16 @@ where ); } } +} +/// Runs a test against an actual database. +/// Checks both Postgres and SQLite. +pub(crate) async fn run_db_test(f: F) +where + F: Fn(TestContext) -> Fut + Clone, + Fut: Future>, +{ + run_postgres_test(f.clone()).await; // SQLite eprintln!("Running test with SQLite"); let ctx = TestContext::new_sqlite().await; diff --git a/site/src/job_queue.rs b/site/src/job_queue.rs new file mode 100644 index 000000000..c09fd7857 --- /dev/null +++ b/site/src/job_queue.rs @@ -0,0 +1,58 @@ +use std::{str::FromStr, sync::Arc}; + +use crate::load::SiteCtxt; +use chrono::Utc; +use database::{BenchmarkRequest, BenchmarkRequestStatus}; +use parking_lot::RwLock; +use tokio::time::{self, Duration}; + +/// Store the latest master commits or do nothing if all of them are +/// already in the database +async fn enqueue_master_commits(ctxt: &Arc) { + let conn = ctxt.conn().await; + let master_commits = &ctxt.get_master_commits().commits; + // TODO; delete at some point in the future + let cutoff: chrono::DateTime = + chrono::DateTime::from_str("2025-06-01T00:00:00.000Z").unwrap(); + + for master_commit in master_commits { + // We don't want to add masses of obsolete data + if master_commit.time >= cutoff { + let pr = master_commit.pr.unwrap_or(0); + let benchmark = BenchmarkRequest::create_master( + &master_commit.sha, + &master_commit.parent_sha, + pr, + master_commit.time, + BenchmarkRequestStatus::WaitingForParent, + "", + "", + ); + conn.insert_benchmark_request(&benchmark).await; + } + } +} + +/// For queueing jobs, add the jobs you want to queue to this function +async fn cron_enqueue_jobs(site_ctxt: &Arc) { + // Put the master commits into the `benchmark_requests` queue + enqueue_master_commits(site_ctxt).await; +} + +/// Entry point for the cron +pub async fn cron_main(site_ctxt: Arc>>>, seconds: u64) { + let mut interval = time::interval(Duration::from_secs(seconds)); + let ctxt = site_ctxt.clone(); + + loop { + interval.tick().await; + + if let Some(ctxt_clone) = { + let guard = ctxt.read(); + guard.as_ref().cloned() + } { + cron_enqueue_jobs(&ctxt_clone).await; + log::info!("Cron job executed at: {:?}", std::time::SystemTime::now()); + } + } +} diff --git a/site/src/lib.rs b/site/src/lib.rs index 05bb1e443..2ce6dfb58 100644 --- a/site/src/lib.rs +++ b/site/src/lib.rs @@ -3,6 +3,7 @@ extern crate itertools; pub mod api; pub mod github; +pub mod job_queue; pub mod load; pub mod server; diff --git a/site/src/main.rs b/site/src/main.rs index 9af67b6b5..f4dedf49f 100644 --- a/site/src/main.rs +++ b/site/src/main.rs @@ -3,6 +3,7 @@ use parking_lot::RwLock; use site::load; use std::env; use std::sync::Arc; +use tokio::task; #[cfg(unix)] #[global_allocator] @@ -28,6 +29,15 @@ async fn main() { .ok() .and_then(|x| x.parse().ok()) .unwrap_or(2346); + let queue_update_interval_seconds = env::var("QUEUE_UPDATE_INTERVAL_SECONDS") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(30); + let run_cron_job = env::var("RUN_CRON") + .ok() + .and_then(|x| x.parse().ok()) + .unwrap_or(false); + let fut = tokio::task::spawn_blocking(move || { tokio::task::spawn(async move { let res = Arc::new(load::SiteCtxt::from_db_url(&db_url).await.unwrap()); @@ -50,7 +60,14 @@ async fn main() { .fuse(); println!("Starting server with port={:?}", port); - let server = site::server::start(ctxt, port).fuse(); + let server = site::server::start(ctxt.clone(), port).fuse(); + + if run_cron_job { + task::spawn(async move { + site::job_queue::cron_main(ctxt.clone(), queue_update_interval_seconds).await; + }); + } + futures::pin_mut!(server); futures::pin_mut!(fut); loop {