Compare commits
2 Commits
13ccb14826
...
7ad031c6a9
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7ad031c6a9 | ||
|
|
519f75803d |
138
src/main.rs
138
src/main.rs
@@ -38,9 +38,9 @@ struct Args {
|
|||||||
#[arg(short = 'q', long = "query-string-all")]
|
#[arg(short = 'q', long = "query-string-all")]
|
||||||
query_string_all: Option<String>,
|
query_string_all: Option<String>,
|
||||||
|
|
||||||
/// jq expression to evaluate on JSON text messages for logging (e.g., ".message" or ".data.id")
|
/// jq expression(s) to evaluate on JSON text messages for logging (can be specified multiple times)
|
||||||
#[arg(short = 'j', long = "jaq")]
|
#[arg(short = 'j', long = "jaq")]
|
||||||
jaq: Option<String>,
|
jaq: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
@@ -64,35 +64,44 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
fs::create_dir_all(&session_dir)?;
|
fs::create_dir_all(&session_dir)?;
|
||||||
info!("Created session directory: {}", session_dir.display());
|
info!("Created session directory: {}", session_dir.display());
|
||||||
|
|
||||||
// Compile jaq filter if provided
|
// Compile jaq filters if provided
|
||||||
let jaq_filter: Option<Arc<Filter>> = if let Some(ref expr) = args.jaq {
|
let jaq_filters: Arc<Vec<Filter>> = {
|
||||||
let mut defs = ParseCtx::new(Vec::new());
|
let mut filters = Vec::new();
|
||||||
defs.insert_natives(jaq_core::core());
|
|
||||||
defs.insert_defs(jaq_std::std());
|
|
||||||
|
|
||||||
let (parsed, errs) = jaq_parse::parse(expr, jaq_parse::main());
|
for expr in &args.jaq {
|
||||||
if !errs.is_empty() {
|
let mut defs = ParseCtx::new(Vec::new());
|
||||||
let err_msgs: Vec<String> = errs.iter().map(|e| format!("{:?}", e)).collect();
|
defs.insert_natives(jaq_core::core());
|
||||||
return Err(format!("Failed to parse jaq expression: {}", err_msgs.join(", ")).into());
|
defs.insert_defs(jaq_std::std());
|
||||||
}
|
|
||||||
|
|
||||||
let parsed = parsed.ok_or("Failed to parse jaq expression")?;
|
let (parsed, errs) = jaq_parse::parse(expr, jaq_parse::main());
|
||||||
let filter = defs.compile(parsed);
|
if !errs.is_empty() {
|
||||||
|
let err_msgs: Vec<String> = errs.iter().map(|e| format!("{:?}", e)).collect();
|
||||||
|
return Err(format!(
|
||||||
|
"Failed to parse jaq expression '{}': {}",
|
||||||
|
expr,
|
||||||
|
err_msgs.join(", ")
|
||||||
|
)
|
||||||
|
.into());
|
||||||
|
}
|
||||||
|
|
||||||
if !defs.errs.is_empty() {
|
let parsed =
|
||||||
return Err(
|
parsed.ok_or_else(|| format!("Failed to parse jaq expression '{}'", expr))?;
|
||||||
format!(
|
let filter = defs.compile(parsed);
|
||||||
"Failed to compile jaq expression ({} error(s))",
|
|
||||||
|
if !defs.errs.is_empty() {
|
||||||
|
return Err(format!(
|
||||||
|
"Failed to compile jaq expression '{}' ({} error(s))",
|
||||||
|
expr,
|
||||||
defs.errs.len()
|
defs.errs.len()
|
||||||
)
|
)
|
||||||
.into(),
|
.into());
|
||||||
);
|
}
|
||||||
|
|
||||||
|
info!("Using jaq filter: {}", expr);
|
||||||
|
filters.push(filter);
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Using jaq filter: {}", expr);
|
Arc::new(filters)
|
||||||
Some(Arc::new(filter))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
};
|
||||||
|
|
||||||
// Parse extra query params once if specified
|
// Parse extra query params once if specified
|
||||||
@@ -253,7 +262,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
for (letter, url, ws_stream) in connections {
|
for (letter, url, ws_stream) in connections {
|
||||||
let session_dir = session_dir.clone();
|
let session_dir = session_dir.clone();
|
||||||
let jaq_filter = jaq_filter.clone();
|
let jaq_filters = jaq_filters.clone();
|
||||||
|
|
||||||
join_set.spawn(async move {
|
join_set.spawn(async move {
|
||||||
let (_, mut read) = ws_stream.split();
|
let (_, mut read) = ws_stream.split();
|
||||||
@@ -264,35 +273,52 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
Ok(message) => {
|
Ok(message) => {
|
||||||
match message {
|
match message {
|
||||||
Message::Text(text) => {
|
Message::Text(text) => {
|
||||||
// Determine what to log based on jaq filter
|
// Determine what to log based on jaq filters
|
||||||
let log_content = if let Some(ref filter) = jaq_filter {
|
if jaq_filters.is_empty() {
|
||||||
|
let preview: String = text.chars().take(50).collect();
|
||||||
|
let truncated = if text.len() > 50 { "..." } else { "" };
|
||||||
|
info!(
|
||||||
|
"[{}:{}] Text: {}{}",
|
||||||
|
letter, seq_num, preview, truncated
|
||||||
|
);
|
||||||
|
} else {
|
||||||
match serde_json::from_str::<serde_json::Value>(&text) {
|
match serde_json::from_str::<serde_json::Value>(&text) {
|
||||||
Ok(json_val) => {
|
Ok(json_val) => {
|
||||||
let inputs = RcIter::new(core::iter::empty());
|
let mut all_outputs = Vec::new();
|
||||||
let ctx = Ctx::new([], &inputs);
|
|
||||||
let out =
|
|
||||||
filter.run((ctx, Val::from(json_val)));
|
|
||||||
let results: Vec<Result<Val, _>> = out.collect();
|
|
||||||
|
|
||||||
let mut output_parts = Vec::new();
|
for filter in jaq_filters.iter() {
|
||||||
for result in results {
|
let inputs = RcIter::new(core::iter::empty());
|
||||||
match result {
|
let ctx = Ctx::new([], &inputs);
|
||||||
Ok(val) => {
|
let out =
|
||||||
output_parts.push(val.to_string());
|
filter.run((ctx, Val::from(json_val.clone())));
|
||||||
}
|
|
||||||
Err(e) => {
|
let mut filter_outputs = Vec::new();
|
||||||
warn!(
|
for result in out {
|
||||||
"[{}:{}] jaq error: {}",
|
match result {
|
||||||
letter, seq_num, e
|
Ok(val) => {
|
||||||
);
|
filter_outputs.push(val.to_string());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!(
|
||||||
|
"[{}:{}] jaq error: {}",
|
||||||
|
letter, seq_num, e
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if filter_outputs.is_empty() {
|
||||||
|
all_outputs.push("(no output)".to_string());
|
||||||
|
} else {
|
||||||
|
all_outputs.push(filter_outputs.join(", "));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if output_parts.is_empty() {
|
|
||||||
"(no output)".to_string()
|
info!(
|
||||||
} else {
|
"[{}:{}] {}",
|
||||||
output_parts.join(", ")
|
letter,
|
||||||
}
|
seq_num,
|
||||||
|
all_outputs.join(" | ")
|
||||||
|
);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
warn!(
|
warn!(
|
||||||
@@ -300,17 +326,15 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
letter, seq_num, e
|
letter, seq_num, e
|
||||||
);
|
);
|
||||||
let preview: String = text.chars().take(50).collect();
|
let preview: String = text.chars().take(50).collect();
|
||||||
let truncated = if text.len() > 50 { "..." } else { "" };
|
let truncated =
|
||||||
format!("{}{}", preview, truncated)
|
if text.len() > 50 { "..." } else { "" };
|
||||||
|
info!(
|
||||||
|
"[{}:{}] Text: {}{}",
|
||||||
|
letter, seq_num, preview, truncated
|
||||||
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
let preview: String = text.chars().take(50).collect();
|
|
||||||
let truncated = if text.len() > 50 { "..." } else { "" };
|
|
||||||
format!("{}{}", preview, truncated)
|
|
||||||
};
|
|
||||||
|
|
||||||
info!("[{}:{}] Text: {}", letter, seq_num, log_content);
|
|
||||||
|
|
||||||
// Always write full message to file
|
// Always write full message to file
|
||||||
let filename =
|
let filename =
|
||||||
|
|||||||
Reference in New Issue
Block a user