Skip to content

Commit

Permalink
varchar only :(
Browse files Browse the repository at this point in the history
  • Loading branch information
lmangani committed Dec 7, 2024
1 parent 151cf23 commit f46e813
Showing 1 changed file with 68 additions and 52 deletions.
120 changes: 68 additions & 52 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ impl VTab for PcapVTab {
type BindData = PcapBindData;

unsafe fn bind(bind: &BindInfo, data: *mut PcapBindData) -> Result<(), Box<dyn Error>> {
bind.add_result_column("timestamp", LogicalTypeHandle::from(LogicalTypeId::Timestamp));
bind.add_result_column("timestamp", LogicalTypeHandle::from(LogicalTypeId::Varchar));
bind.add_result_column("src_ip", LogicalTypeHandle::from(LogicalTypeId::Varchar));
bind.add_result_column("dst_ip", LogicalTypeHandle::from(LogicalTypeId::Varchar));
bind.add_result_column("src_port", LogicalTypeHandle::from(LogicalTypeId::Integer));
bind.add_result_column("dst_port", LogicalTypeHandle::from(LogicalTypeId::Integer));
bind.add_result_column("src_port", LogicalTypeHandle::from(LogicalTypeId::Varchar));
bind.add_result_column("dst_port", LogicalTypeHandle::from(LogicalTypeId::Varchar));
bind.add_result_column("protocol", LogicalTypeHandle::from(LogicalTypeId::Varchar));
bind.add_result_column("length", LogicalTypeHandle::from(LogicalTypeId::Integer));
bind.add_result_column("payload", LogicalTypeHandle::from(LogicalTypeId::Blob));
bind.add_result_column("length", LogicalTypeHandle::from(LogicalTypeId::Varchar));
bind.add_result_column("payload", LogicalTypeHandle::from(LogicalTypeId::Varchar));

let filepath = bind.get_parameter(0).to_string();
unsafe {
Expand Down Expand Up @@ -124,54 +124,70 @@ impl VTab for PcapVTab {
}

match next_result {
Ok((offset, block)) => {
let (timestamp, length_str, src_ip, dst_ip, src_port, dst_port, protocol, payload) = match block {
PcapBlockOwned::Legacy(packet) => {
let parsed = Self::parse_packet(&packet.data)?;
let (src_ip, dst_ip, src_port, dst_port, protocol, payload) = parsed;

let timestamp_micros = packet.ts_sec as i64 * 1_000_000 + packet.ts_usec as i64;

(timestamp_micros, packet.origlen.to_string(),
src_ip, dst_ip, src_port, dst_port,
protocol, payload)
},
PcapBlockOwned::LegacyHeader(_) => {
(0, "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(),
0, 0, "UNKNOWN".to_string(), Vec::new())
},
_ => {
(0, "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(),
0, 0, "UNKNOWN".to_string(), Vec::new())
}
};
Ok((offset, block)) => {
let (ts_sec_str, length_str, src_ip, dst_ip, src_port, dst_port, protocol, payload) = match block {
PcapBlockOwned::Legacy(packet) => {
let parsed = Self::parse_packet(&packet.data)?;
let (src_ip, dst_ip, src_port, dst_port, protocol, payload) = parsed;

let payload_str = if !payload.is_empty() {
if let Ok(utf8_str) = std::str::from_utf8(&payload) {
if utf8_str.chars().all(|c| c.is_ascii_graphic() || c.is_ascii_whitespace()) {
format!("{}", utf8_str)
} else {
let hex_str: Vec<String> = payload.iter()
.take(32)
.map(|b| format!("{:02x}", b))
.collect();
format!("{}{}", hex_str.join(" "),
if payload.len() > 32 { " ..." } else { "" })
}
} else {
let hex_str: Vec<String> = payload.iter()
.take(32)
.map(|b| format!("{:02x}", b))
.collect();
format!("{}{}", hex_str.join(" "),
if payload.len() > 32 { " ..." } else { "" })
}
} else {
"empty".to_string()
};

(packet.ts_sec.to_string(), packet.origlen.to_string(),
src_ip, dst_ip, src_port.to_string(), dst_port.to_string(),
protocol, payload_str)
},
PcapBlockOwned::LegacyHeader(_) => {
("0".to_string(), "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(),
"0".to_string(), "0".to_string(), "UNKNOWN".to_string(), "empty".to_string())
},
_ => {
("0".to_string(), "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(),
"0".to_string(), "0".to_string(), "UNKNOWN".to_string(), "empty".to_string())
}
};

output.flat_vector(0).insert(count, CString::new(ts_sec_str)?);
output.flat_vector(1).insert(count, CString::new(src_ip)?);
output.flat_vector(2).insert(count, CString::new(dst_ip)?);
output.flat_vector(3).insert(count, CString::new(src_port)?);
output.flat_vector(4).insert(count, CString::new(dst_port)?);
output.flat_vector(5).insert(count, CString::new(protocol)?);
output.flat_vector(6).insert(count, CString::new(length_str)?);
output.flat_vector(7).insert(count, CString::new(payload)?);

count += 1;
unsafe { (*init_data).reader.as_mut() }.unwrap().consume(offset);
},
Err(PcapError::Eof) => {
unsafe { (*init_data).done = true; }
output.set_len(count);
return Ok(());
},
Err(e) => return Err(Box::new(e)),
}

debug_print!("Processing packet: timestamp={}, src={}:{}, dst={}:{}, proto={}, len={}",
timestamp, src_ip, src_port, dst_ip, dst_port, protocol, length_str);

output.flat_vector(0).insert(count, CString::new(timestamp.to_string())?);
output.flat_vector(1).insert(count, CString::new(src_ip)?);
output.flat_vector(2).insert(count, CString::new(dst_ip)?);
output.flat_vector(3).insert(count, CString::new(src_port.to_string())?);
output.flat_vector(4).insert(count, CString::new(dst_port.to_string())?);
output.flat_vector(5).insert(count, CString::new(protocol)?);
output.flat_vector(6).insert(count, CString::new(length_str)?);

let hex: String = payload.iter()
.map(|b| format!("{:02x}", b))
.collect();
output.flat_vector(7).insert(count, CString::new(hex)?);

count += 1;
unsafe { (*init_data).reader.as_mut() }.unwrap().consume(offset);
},
Err(PcapError::Eof) => {
unsafe { (*init_data).done = true; }
output.set_len(count);
return Ok(());
},
Err(e) => return Err(Box::new(e)),
}

output.set_len(count);
Ok(())
Expand Down

0 comments on commit f46e813

Please sign in to comment.