Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize structured log parsers #12

Merged
merged 4 commits into from
Apr 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 37 additions & 87 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
use anyhow::anyhow;
use fxhash::FxHashMap;
use md5::{Digest, Md5};
use std::ffi::{OsStr, OsString};

use regex::Regex;
use std::fs::File;
use std::io::{self, BufRead};
use std::path::Path;
use std::path::PathBuf;
use tinytemplate::TinyTemplate;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
Expand All @@ -15,13 +13,12 @@ use std::time::Instant;

use crate::types::*;
use crate::templates::*;

use crate::parsers::all_parsers;
mod parsers;
mod templates;
mod types;


pub type ParseOutput = Vec<(PathBuf, String)>;

pub struct ParseConfig {
pub strict: bool,
}
Expand Down Expand Up @@ -86,6 +83,8 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
})
.peekable();

let all_parsers = all_parsers(&tt);

while let Some((lineno, line)) = iter.next() {
bytes_read += line.len() as u64;
pb.set_position(bytes_read);
Expand Down Expand Up @@ -140,23 +139,8 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
}
};

let compile_id_dir: PathBuf = e
.compile_id
.as_ref()
.map_or(
format!("unknown_{lineno}"),
|CompileId {
frame_id,
frame_compile_id,
attempt,
}| { format!("{frame_id}_{frame_compile_id}_{attempt}") },
)
.into();

let subdir = &compile_id_dir;

let mut payload = String::new();
if let Some(expect) = e.has_payload {
if let Some(ref expect) = e.has_payload {
let mut first = true;
while let Some((_payload_lineno, payload_line)) =
iter.next_if(|(_, l)| l.starts_with('\t'))
Expand Down Expand Up @@ -185,8 +169,39 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
stats.ok += 1;

// lol this clone, probably shouldn't use entry
// TODO: output should be able to generate this without explicitly creating
let compile_directory = directory.entry(e.compile_id.clone()).or_default();

for parser in &all_parsers {
if let Some(md) = parser.get_metadata(&e) {
let results = parser.parse(lineno, md, e.rank, &e.compile_id, &payload);
match results {
Ok(results) => {
for (filename, out) in results {
output.push((filename.clone(), out));
compile_directory.push(filename);
}
}
Err(err) => {
match parser.name() {
"dynamo_guards" => {
eprintln!("Failed to parse guards json: {}", err);
stats.fail_dynamo_guards_json += 1;
}
name => {
eprintln!("Parser {name} failed: {err}");
stats.fail_parser += 1;
}
}
}
}

}
}

// TODO: implement this as StructuredLogParser
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the abstraction you've introduced is good for artifacts, but I think there's a class of other things that we could potentially get from the trace log which don't create a file. For example, imagine if we started emitting traditional traces (start + end events) to the log, and then wanted to visualize these together. These would be strewn across many different trace messages and you wouldn't want to make a file per each one.

I think my primary ask here is to avoid overabstracting, at least for now. I think there's a decent case to be made for generalizing artifacts (but note that artifacts as currently implemented have some funny problems, e.g., when ddp optimize is on, you'll get multiple copies of the same artifact in the same compile id), but I would hesitate to say that we have an abstraction that works for arbitrary analyses you might want to do. (If you really wanted to design it, you'd probably want some sort of state machine per analysis, with enough structure in the input parsing so you can efficiently dispatch to the correct analyses that actually care about a given token without having to O(N) loop through all analyses... and that's not getting into if there's ever shared information that wants to be saved over other analyses. It probably also matters if we want streaming or if we can just assume everything fits in RAM.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it might be that analyses that need to span across multiple envelopes and events, such as this one, won't fit into this model, which is fine. I think the case I want to cover is the most common one where someone wants to log a some type of event that occurs a constant number of times per compilation, that can be rendered per event (i.e. compilation metrics)

Trying to abstract the idea of "log collects global information into a single template or UI" like the one for the stack trie gets to the point of being cumbersome

// This one is hard because it consumes the stack, and
// I don't want to clone the stack when passing it as reference
if let Some(m) = e.dynamo_start {
if let Some(stack) = m.stack {
stack_trie.insert(
Expand All @@ -198,72 +213,6 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
};
};

let mut output_dump =
|filename: &str, sentinel: Option<EmptyMetadata>| -> anyhow::Result<()> {
if sentinel.is_some() {
let f = subdir.join(filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}
Ok(())
};

output_dump("optimize_ddp_split_graph.txt", e.optimize_ddp_split_graph)?;
output_dump("compiled_autograd_graph.txt", e.compiled_autograd_graph)?;
output_dump("aot_forward_graph.txt", e.aot_forward_graph)?;
output_dump("aot_backward_graph.txt", e.aot_backward_graph)?;
output_dump("aot_joint_graph.txt", e.aot_joint_graph)?;
output_dump("inductor_post_grad_graph.txt", e.inductor_post_grad_graph)?;

if e.dynamo_output_graph.is_some() {
// TODO: dump sizes
let filename = "dynamo_output_graph.txt";
let f = subdir.join(filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}

if e.dynamo_guards.is_some() {
let filename = "dynamo_guards.html";
let f = subdir.join(filename);
match serde_json::from_str::<Vec<DynamoGuard>>(payload.as_str()) {
Ok(guards) => {
let guards_context = DynamoGuardsContext { guards };
output.push((f, tt.render("dynamo_guards.html", &guards_context)?));
compile_directory.push(compile_id_dir.join(filename));
}
Err(err) => {
eprintln!("Failed to parse guards json: {}", err);
stats.fail_dynamo_guards_json += 1;
}
}
}

if let Some(metadata) = e.inductor_output_code {
let filename = metadata
.filename
.as_ref()
.and_then(|p| Path::file_stem(p))
.map_or_else(
|| PathBuf::from("inductor_output_code.txt"),
|stem| {
let mut r = OsString::from("inductor_output_code_");
r.push(stem);
r.push(OsStr::new(".txt"));
r.into()
},
);
let f = subdir.join(&filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}

if let Some(metadata) = e.optimize_ddp_split_child {
let filename = format!("optimize_ddp_split_child_{}.txt", metadata.name);
let f = subdir.join(&filename);
output.push((f, payload.clone()));
compile_directory.push(compile_id_dir.join(filename));
}
}
pb.finish_with_message("done");
spinner.finish();
Expand All @@ -288,6 +237,7 @@ pub fn parse_path(path: &PathBuf, config: ParseConfig) -> anyhow::Result<ParseOu
+ stats.fail_payload_md5
+ stats.other_rank
+ stats.fail_dynamo_guards_json
+ stats.fail_parser
> 0)
{
// Report something went wrong
Expand Down
208 changes: 208 additions & 0 deletions src/parsers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
use crate::types::*;
use std::path::PathBuf;
use std::ffi::{OsStr, OsString};
use std::path::Path;
use tinytemplate::TinyTemplate;

/**
* StructuredLogParser
* Parses a structured log and returns a vec of file outputs.
* Implement this trait to add your own analyses.
*
* 'e is the lifetime of the envelope being parsed
*/
pub trait StructuredLogParser {
// If this returns Some value, the parser will be run on that metadata.
// Otherwise, it will be skipped.
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>>;

// Take a log input and the metadata you asked for, return a set of files to write
fn parse<'e>(&self,
lineno: usize, // Line number from log
metadata: Metadata<'e>, // Metadata from get_metadata
rank: Option<u32>, // Rank of the log
compile_id: &Option<CompileId>, // Compile ID of the envelope
payload: &str // Payload from the log (empty string when None)
) -> anyhow::Result<ParseOutput>;

// Name of the parser, for error logging
fn name(&self) -> &'static str;
}

// Takes a filename and a payload and writes that payload into a the file
fn simple_file_output(
filename: &str,
lineno: usize,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
let compile_id_dir: PathBuf = compile_id
.as_ref()
.map_or(
format!("unknown_{lineno}"),
|CompileId {
frame_id,
frame_compile_id,
attempt,
}| { format!("{frame_id}_{frame_compile_id}_{attempt}") },
)
.into();
let subdir = PathBuf::from(compile_id_dir);
let f = subdir.join(filename);
Ok(Vec::from([(f, String::from(payload))]))
}

/**
* Parser for simple output dumps where the metadata is a sentinel {}
*/
pub struct SentinelFileParser {
filename: &'static str,
get_sentinel: fn (&Envelope) -> Option<&EmptyMetadata>,
} impl SentinelFileParser {
pub fn new(filename: &'static str, get_sentinel: fn (&Envelope) -> Option<&EmptyMetadata>) -> Self {
Self { filename, get_sentinel }
}
}
impl StructuredLogParser for SentinelFileParser {
fn name(&self) -> &'static str {
self.filename
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
(self.get_sentinel)(e).map(|m| Metadata::Empty(m))
}
fn parse<'e>(&self,
lineno: usize,
_metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
simple_file_output(&format!("{}.txt",self.filename), lineno, compile_id, payload)
}
}

// Same as SentinelFileParser, but can log the size of the graph
pub struct DynamoOutputGraphParser;
impl StructuredLogParser for DynamoOutputGraphParser {
fn name(&self) -> &'static str {
"dynamo_output_graph"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.dynamo_output_graph.as_ref().map(|m| Metadata::DynamoOutputGraph(m))
}
fn parse<'e>(&self,
lineno: usize,
_metadata: Metadata<'e>, // TODO: log size of graph
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
simple_file_output("dynamo_output_graph.txt", lineno, compile_id, payload)
}
}

pub struct DynamoGuardParser<'t> {
tt: &'t TinyTemplate<'t>,
}
impl StructuredLogParser for DynamoGuardParser<'_> {
fn name(&self) -> &'static str {
"dynamo_guards"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.dynamo_guards.as_ref().map(|m| Metadata::Empty(m))
}
fn parse<'e>(&self,
lineno: usize,
_metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
let filename = format!("{}.html", self.name());
let guards = serde_json::from_str::<Vec<DynamoGuard>>(payload)?;
let guards_context = DynamoGuardsContext { guards };
let output = self.tt.render(&filename, &guards_context)?;
simple_file_output(&filename, lineno, compile_id, &output)
}
}

pub struct InductorOutputCodeParser;
impl StructuredLogParser for InductorOutputCodeParser {
fn name(&self) -> &'static str {
"inductor_output_code"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.inductor_output_code.as_ref().map(|m| Metadata::InductorOutputCode(m))
}

fn parse<'e>(&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
if let Metadata::InductorOutputCode(metadata) = metadata {
let filename = metadata
.filename
.as_ref()
.and_then(|p| Path::file_stem(p))
.map_or_else(
|| PathBuf::from("inductor_output_code.txt"),
|stem| {
let mut r = OsString::from("inductor_output_code_");
r.push(stem);
r.push(OsStr::new(".txt"));
r.into()
},
);
simple_file_output(&filename.to_string_lossy(), lineno, compile_id, payload)
} else {
Err(anyhow::anyhow!("Expected InductorOutputCode metadata"))
}
}
}

pub struct OptimizeDdpSplitChildParser;
impl StructuredLogParser for OptimizeDdpSplitChildParser {
fn name(&self) -> &'static str {
"optimize_ddp_split_child"
}
fn get_metadata<'e>(&self, e: &'e Envelope) -> Option<Metadata<'e>> {
e.optimize_ddp_split_child.as_ref().map(|m| Metadata::OptimizeDdpSplitChild(m))
}

fn parse<'e>(&self,
lineno: usize,
metadata: Metadata<'e>,
_rank: Option<u32>,
compile_id: &Option<CompileId>,
payload: &str
) -> anyhow::Result<ParseOutput> {
if let Metadata::OptimizeDdpSplitChild(m) = metadata {
let filename = format!("optimize_ddp_split_child_{}.txt", m.name);
simple_file_output(&filename, lineno, compile_id, payload)
} else {
Err(anyhow::anyhow!("Expected OptimizeDdpSplitChild metadata"))
}
}
}

// Register your parser here
pub fn all_parsers<'t>(tt: &'t TinyTemplate<'t>) -> Vec<Box<dyn StructuredLogParser + 't>> {
// We need to use Box wrappers here because vecs in Rust need to have known size
let result : Vec<Box<dyn StructuredLogParser>> = vec![
Box::new(SentinelFileParser::new("optimize_ddp_split_graph", |e| e.optimize_ddp_split_graph.as_ref())),
Box::new(SentinelFileParser::new("compiled_autograd_graph", |e| e.compiled_autograd_graph.as_ref())),
Box::new(SentinelFileParser::new("aot_forward_graph", |e| e.aot_forward_graph.as_ref())),
Box::new(SentinelFileParser::new("aot_backward_graph", |e| e.aot_backward_graph.as_ref())),
Box::new(SentinelFileParser::new("aot_joint_graph", |e| e.aot_joint_graph.as_ref())),
Box::new(SentinelFileParser::new("inductor_post_grad_graph", |e| e.inductor_post_grad_graph.as_ref())),
Box::new(DynamoOutputGraphParser),
Box::new(DynamoGuardParser { tt }),
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the individual analyses should be responsible for their own template instances themselves

Box::new(InductorOutputCodeParser),
Box::new(OptimizeDdpSplitChildParser),
];

result
}
Loading
Loading