diff --git a/src/main.rs b/src/main.rs index d28ef19..c569ac6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -38,9 +38,9 @@ struct Args { #[arg(short = 'q', long = "query-string-all")] query_string_all: Option, - /// jq expression to evaluate on JSON text messages for logging (e.g., ".message" or ".data.id") + /// jq expression(s) to evaluate on JSON text messages for logging (can be specified multiple times) #[arg(short = 'j', long = "jaq")] - jaq: Option, + jaq: Vec, } #[tokio::main] @@ -64,35 +64,41 @@ async fn main() -> Result<(), Box> { fs::create_dir_all(&session_dir)?; info!("Created session directory: {}", session_dir.display()); - // Compile jaq filter if provided - let jaq_filter: Option> = if let Some(ref expr) = args.jaq { - let mut defs = ParseCtx::new(Vec::new()); - defs.insert_natives(jaq_core::core()); - defs.insert_defs(jaq_std::std()); + // Compile jaq filters if provided + let jaq_filters: Arc> = { + let mut filters = Vec::new(); - let (parsed, errs) = jaq_parse::parse(expr, jaq_parse::main()); - if !errs.is_empty() { - let err_msgs: Vec = errs.iter().map(|e| format!("{:?}", e)).collect(); - return Err(format!("Failed to parse jaq expression: {}", err_msgs.join(", ")).into()); - } + for expr in &args.jaq { + let mut defs = ParseCtx::new(Vec::new()); + defs.insert_natives(jaq_core::core()); + defs.insert_defs(jaq_std::std()); - let parsed = parsed.ok_or("Failed to parse jaq expression")?; - let filter = defs.compile(parsed); + let (parsed, errs) = jaq_parse::parse(expr, jaq_parse::main()); + if !errs.is_empty() { + let err_msgs: Vec = errs.iter().map(|e| format!("{:?}", e)).collect(); + return Err( + format!("Failed to parse jaq expression '{}': {}", expr, err_msgs.join(", ")) + .into(), + ); + } - if !defs.errs.is_empty() { - return Err( - format!( - "Failed to compile jaq expression ({} error(s))", + let parsed = parsed.ok_or_else(|| format!("Failed to parse jaq expression '{}'", expr))?; + let filter = defs.compile(parsed); + + if !defs.errs.is_empty() { + return Err(format!( + "Failed to compile jaq expression '{}' ({} error(s))", + expr, defs.errs.len() ) - .into(), - ); + .into()); + } + + info!("Using jaq filter: {}", expr); + filters.push(filter); } - info!("Using jaq filter: {}", expr); - Some(Arc::new(filter)) - } else { - None + Arc::new(filters) }; // Parse extra query params once if specified @@ -253,7 +259,7 @@ async fn main() -> Result<(), Box> { for (letter, url, ws_stream) in connections { let session_dir = session_dir.clone(); - let jaq_filter = jaq_filter.clone(); + let jaq_filters = jaq_filters.clone(); join_set.spawn(async move { let (_, mut read) = ws_stream.split(); @@ -264,35 +270,43 @@ async fn main() -> Result<(), Box> { Ok(message) => { match message { Message::Text(text) => { - // Determine what to log based on jaq filter - let log_content = if let Some(ref filter) = jaq_filter { + // Determine what to log based on jaq filters + if jaq_filters.is_empty() { + let preview: String = text.chars().take(50).collect(); + let truncated = if text.len() > 50 { "..." } else { "" }; + info!("[{}:{}] Text: {}{}", letter, seq_num, preview, truncated); + } else { match serde_json::from_str::(&text) { Ok(json_val) => { - let inputs = RcIter::new(core::iter::empty()); - let ctx = Ctx::new([], &inputs); - let out = - filter.run((ctx, Val::from(json_val))); - let results: Vec> = out.collect(); + let mut all_outputs = Vec::new(); - let mut output_parts = Vec::new(); - for result in results { - match result { - Ok(val) => { - output_parts.push(val.to_string()); - } - Err(e) => { - warn!( - "[{}:{}] jaq error: {}", - letter, seq_num, e - ); + for filter in jaq_filters.iter() { + let inputs = RcIter::new(core::iter::empty()); + let ctx = Ctx::new([], &inputs); + let out = filter.run((ctx, Val::from(json_val.clone()))); + + let mut filter_outputs = Vec::new(); + for result in out { + match result { + Ok(val) => { + filter_outputs.push(val.to_string()); + } + Err(e) => { + warn!( + "[{}:{}] jaq error: {}", + letter, seq_num, e + ); + } } } + if filter_outputs.is_empty() { + all_outputs.push("(no output)".to_string()); + } else { + all_outputs.push(filter_outputs.join(", ")); + } } - if output_parts.is_empty() { - "(no output)".to_string() - } else { - output_parts.join(", ") - } + + info!("[{}:{}] {}", letter, seq_num, all_outputs.join(" | ")); } Err(e) => { warn!( @@ -301,16 +315,10 @@ async fn main() -> Result<(), Box> { ); let preview: String = text.chars().take(50).collect(); let truncated = if text.len() > 50 { "..." } else { "" }; - format!("{}{}", preview, truncated) + info!("[{}:{}] Text: {}{}", letter, seq_num, preview, truncated); } } - } else { - let preview: String = text.chars().take(50).collect(); - let truncated = if text.len() > 50 { "..." } else { "" }; - format!("{}{}", preview, truncated) - }; - - info!("[{}:{}] Text: {}", letter, seq_num, log_content); + } // Always write full message to file let filename =