diff --git a/src/lib.rs b/src/lib.rs index ae8c191..2ce7907 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,14 +64,14 @@ impl VTab for PcapVTab { type BindData = PcapBindData; unsafe fn bind(bind: &BindInfo, data: *mut PcapBindData) -> Result<(), Box> { - bind.add_result_column("timestamp", LogicalTypeHandle::from(LogicalTypeId::Timestamp)); + bind.add_result_column("timestamp", LogicalTypeHandle::from(LogicalTypeId::Varchar)); bind.add_result_column("src_ip", LogicalTypeHandle::from(LogicalTypeId::Varchar)); bind.add_result_column("dst_ip", LogicalTypeHandle::from(LogicalTypeId::Varchar)); - bind.add_result_column("src_port", LogicalTypeHandle::from(LogicalTypeId::Integer)); - bind.add_result_column("dst_port", LogicalTypeHandle::from(LogicalTypeId::Integer)); + bind.add_result_column("src_port", LogicalTypeHandle::from(LogicalTypeId::Varchar)); + bind.add_result_column("dst_port", LogicalTypeHandle::from(LogicalTypeId::Varchar)); bind.add_result_column("protocol", LogicalTypeHandle::from(LogicalTypeId::Varchar)); - bind.add_result_column("length", LogicalTypeHandle::from(LogicalTypeId::Integer)); - bind.add_result_column("payload", LogicalTypeHandle::from(LogicalTypeId::Blob)); + bind.add_result_column("length", LogicalTypeHandle::from(LogicalTypeId::Varchar)); + bind.add_result_column("payload", LogicalTypeHandle::from(LogicalTypeId::Varchar)); let filepath = bind.get_parameter(0).to_string(); unsafe { @@ -124,54 +124,70 @@ impl VTab for PcapVTab { } match next_result { - Ok((offset, block)) => { - let (timestamp, length_str, src_ip, dst_ip, src_port, dst_port, protocol, payload) = match block { - PcapBlockOwned::Legacy(packet) => { - let parsed = Self::parse_packet(&packet.data)?; - let (src_ip, dst_ip, src_port, dst_port, protocol, payload) = parsed; - - let timestamp_micros = packet.ts_sec as i64 * 1_000_000 + packet.ts_usec as i64; - - (timestamp_micros, packet.origlen.to_string(), - src_ip, dst_ip, src_port, dst_port, - protocol, payload) - }, - PcapBlockOwned::LegacyHeader(_) => { - (0, "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(), - 0, 0, "UNKNOWN".to_string(), Vec::new()) - }, - _ => { - (0, "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(), - 0, 0, "UNKNOWN".to_string(), Vec::new()) - } - }; + Ok((offset, block)) => { + let (ts_sec_str, length_str, src_ip, dst_ip, src_port, dst_port, protocol, payload) = match block { + PcapBlockOwned::Legacy(packet) => { + let parsed = Self::parse_packet(&packet.data)?; + let (src_ip, dst_ip, src_port, dst_port, protocol, payload) = parsed; + + let payload_str = if !payload.is_empty() { + if let Ok(utf8_str) = std::str::from_utf8(&payload) { + if utf8_str.chars().all(|c| c.is_ascii_graphic() || c.is_ascii_whitespace()) { + format!("{}", utf8_str) + } else { + let hex_str: Vec = payload.iter() + .take(32) + .map(|b| format!("{:02x}", b)) + .collect(); + format!("{}{}", hex_str.join(" "), + if payload.len() > 32 { " ..." } else { "" }) + } + } else { + let hex_str: Vec = payload.iter() + .take(32) + .map(|b| format!("{:02x}", b)) + .collect(); + format!("{}{}", hex_str.join(" "), + if payload.len() > 32 { " ..." } else { "" }) + } + } else { + "empty".to_string() + }; + + (packet.ts_sec.to_string(), packet.origlen.to_string(), + src_ip, dst_ip, src_port.to_string(), dst_port.to_string(), + protocol, payload_str) + }, + PcapBlockOwned::LegacyHeader(_) => { + ("0".to_string(), "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(), + "0".to_string(), "0".to_string(), "UNKNOWN".to_string(), "empty".to_string()) + }, + _ => { + ("0".to_string(), "0".to_string(), "0.0.0.0".to_string(), "0.0.0.0".to_string(), + "0".to_string(), "0".to_string(), "UNKNOWN".to_string(), "empty".to_string()) + } + }; + + output.flat_vector(0).insert(count, CString::new(ts_sec_str)?); + output.flat_vector(1).insert(count, CString::new(src_ip)?); + output.flat_vector(2).insert(count, CString::new(dst_ip)?); + output.flat_vector(3).insert(count, CString::new(src_port)?); + output.flat_vector(4).insert(count, CString::new(dst_port)?); + output.flat_vector(5).insert(count, CString::new(protocol)?); + output.flat_vector(6).insert(count, CString::new(length_str)?); + output.flat_vector(7).insert(count, CString::new(payload)?); + + count += 1; + unsafe { (*init_data).reader.as_mut() }.unwrap().consume(offset); + }, + Err(PcapError::Eof) => { + unsafe { (*init_data).done = true; } + output.set_len(count); + return Ok(()); + }, + Err(e) => return Err(Box::new(e)), + } - debug_print!("Processing packet: timestamp={}, src={}:{}, dst={}:{}, proto={}, len={}", - timestamp, src_ip, src_port, dst_ip, dst_port, protocol, length_str); - - output.flat_vector(0).insert(count, CString::new(timestamp.to_string())?); - output.flat_vector(1).insert(count, CString::new(src_ip)?); - output.flat_vector(2).insert(count, CString::new(dst_ip)?); - output.flat_vector(3).insert(count, CString::new(src_port.to_string())?); - output.flat_vector(4).insert(count, CString::new(dst_port.to_string())?); - output.flat_vector(5).insert(count, CString::new(protocol)?); - output.flat_vector(6).insert(count, CString::new(length_str)?); - - let hex: String = payload.iter() - .map(|b| format!("{:02x}", b)) - .collect(); - output.flat_vector(7).insert(count, CString::new(hex)?); - - count += 1; - unsafe { (*init_data).reader.as_mut() }.unwrap().consume(offset); - }, - Err(PcapError::Eof) => { - unsafe { (*init_data).done = true; } - output.set_len(count); - return Ok(()); - }, - Err(e) => return Err(Box::new(e)), - } output.set_len(count); Ok(())