multiple jq expressions

This commit is contained in:
Richard
2026-02-04 20:23:35 +00:00
parent 13ccb14826
commit 519f75803d

View File

@@ -38,9 +38,9 @@ struct Args {
#[arg(short = 'q', long = "query-string-all")] #[arg(short = 'q', long = "query-string-all")]
query_string_all: Option<String>, query_string_all: Option<String>,
/// jq expression to evaluate on JSON text messages for logging (e.g., ".message" or ".data.id") /// jq expression(s) to evaluate on JSON text messages for logging (can be specified multiple times)
#[arg(short = 'j', long = "jaq")] #[arg(short = 'j', long = "jaq")]
jaq: Option<String>, jaq: Vec<String>,
} }
#[tokio::main] #[tokio::main]
@@ -64,8 +64,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
fs::create_dir_all(&session_dir)?; fs::create_dir_all(&session_dir)?;
info!("Created session directory: {}", session_dir.display()); info!("Created session directory: {}", session_dir.display());
// Compile jaq filter if provided // Compile jaq filters if provided
let jaq_filter: Option<Arc<Filter>> = if let Some(ref expr) = args.jaq { let jaq_filters: Arc<Vec<Filter>> = {
let mut filters = Vec::new();
for expr in &args.jaq {
let mut defs = ParseCtx::new(Vec::new()); let mut defs = ParseCtx::new(Vec::new());
defs.insert_natives(jaq_core::core()); defs.insert_natives(jaq_core::core());
defs.insert_defs(jaq_std::std()); defs.insert_defs(jaq_std::std());
@@ -73,26 +76,29 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let (parsed, errs) = jaq_parse::parse(expr, jaq_parse::main()); let (parsed, errs) = jaq_parse::parse(expr, jaq_parse::main());
if !errs.is_empty() { if !errs.is_empty() {
let err_msgs: Vec<String> = errs.iter().map(|e| format!("{:?}", e)).collect(); let err_msgs: Vec<String> = errs.iter().map(|e| format!("{:?}", e)).collect();
return Err(format!("Failed to parse jaq expression: {}", err_msgs.join(", ")).into());
}
let parsed = parsed.ok_or("Failed to parse jaq expression")?;
let filter = defs.compile(parsed);
if !defs.errs.is_empty() {
return Err( return Err(
format!( format!("Failed to parse jaq expression '{}': {}", expr, err_msgs.join(", "))
"Failed to compile jaq expression ({} error(s))",
defs.errs.len()
)
.into(), .into(),
); );
} }
let parsed = parsed.ok_or_else(|| format!("Failed to parse jaq expression '{}'", expr))?;
let filter = defs.compile(parsed);
if !defs.errs.is_empty() {
return Err(format!(
"Failed to compile jaq expression '{}' ({} error(s))",
expr,
defs.errs.len()
)
.into());
}
info!("Using jaq filter: {}", expr); info!("Using jaq filter: {}", expr);
Some(Arc::new(filter)) filters.push(filter);
} else { }
None
Arc::new(filters)
}; };
// Parse extra query params once if specified // Parse extra query params once if specified
@@ -253,7 +259,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
for (letter, url, ws_stream) in connections { for (letter, url, ws_stream) in connections {
let session_dir = session_dir.clone(); let session_dir = session_dir.clone();
let jaq_filter = jaq_filter.clone(); let jaq_filters = jaq_filters.clone();
join_set.spawn(async move { join_set.spawn(async move {
let (_, mut read) = ws_stream.split(); let (_, mut read) = ws_stream.split();
@@ -264,21 +270,26 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
Ok(message) => { Ok(message) => {
match message { match message {
Message::Text(text) => { Message::Text(text) => {
// Determine what to log based on jaq filter // Determine what to log based on jaq filters
let log_content = if let Some(ref filter) = jaq_filter { if jaq_filters.is_empty() {
let preview: String = text.chars().take(50).collect();
let truncated = if text.len() > 50 { "..." } else { "" };
info!("[{}:{}] Text: {}{}", letter, seq_num, preview, truncated);
} else {
match serde_json::from_str::<serde_json::Value>(&text) { match serde_json::from_str::<serde_json::Value>(&text) {
Ok(json_val) => { Ok(json_val) => {
let mut all_outputs = Vec::new();
for filter in jaq_filters.iter() {
let inputs = RcIter::new(core::iter::empty()); let inputs = RcIter::new(core::iter::empty());
let ctx = Ctx::new([], &inputs); let ctx = Ctx::new([], &inputs);
let out = let out = filter.run((ctx, Val::from(json_val.clone())));
filter.run((ctx, Val::from(json_val)));
let results: Vec<Result<Val, _>> = out.collect();
let mut output_parts = Vec::new(); let mut filter_outputs = Vec::new();
for result in results { for result in out {
match result { match result {
Ok(val) => { Ok(val) => {
output_parts.push(val.to_string()); filter_outputs.push(val.to_string());
} }
Err(e) => { Err(e) => {
warn!( warn!(
@@ -288,12 +299,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
} }
} }
if output_parts.is_empty() { if filter_outputs.is_empty() {
"(no output)".to_string() all_outputs.push("(no output)".to_string());
} else { } else {
output_parts.join(", ") all_outputs.push(filter_outputs.join(", "));
} }
} }
info!("[{}:{}] {}", letter, seq_num, all_outputs.join(" | "));
}
Err(e) => { Err(e) => {
warn!( warn!(
"[{}:{}] JSON parse error: {}", "[{}:{}] JSON parse error: {}",
@@ -301,16 +315,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
); );
let preview: String = text.chars().take(50).collect(); let preview: String = text.chars().take(50).collect();
let truncated = if text.len() > 50 { "..." } else { "" }; let truncated = if text.len() > 50 { "..." } else { "" };
format!("{}{}", preview, truncated) info!("[{}:{}] Text: {}{}", letter, seq_num, preview, truncated);
}
} }
} }
} else {
let preview: String = text.chars().take(50).collect();
let truncated = if text.len() > 50 { "..." } else { "" };
format!("{}{}", preview, truncated)
};
info!("[{}:{}] Text: {}", letter, seq_num, log_content);
// Always write full message to file // Always write full message to file
let filename = let filename =