Skip to content

Commit 798f26f

Browse files
authored
fix(test): handle NDJSON chunking in log_stream tests (#4358)
The log endpoint streams NDJSON (newline-delimited JSON) generated by `tracing-subscriber`. However, the log stream tests assume each HTTP body chunk contains a complete JSON object, which isn't necessarily the case. HTTP streaming doesn't guarantee chunk boundaries align with JSON boundaries because TCP can split data anywhere. This causes occasional test failures when chunks split mid-JSON-object in my local machine. Fix by buffering incoming chunks and extracting complete lines (by newline delimiter) before parsing as JSON. We could arguably set the content type header for this endpoint to `application/x-ndjson` to be fully compliant with the NDJSON specs, but that that may be risking a breaking change for little value. Signed-off-by: Alejandro Martinez Ruiz <amr@buoyant.io>
1 parent 730c626 commit 798f26f

1 file changed

Lines changed: 28 additions & 3 deletions

File tree

linkerd/app/integration/src/tests/telemetry/log_stream.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ where
213213
let mut result = Vec::new();
214214
let logs = &mut result;
215215
let fut = async move {
216+
let mut buffer = Vec::new();
216217
while let Some(res) = body.frame().await {
217218
let chunk = match res {
218219
Ok(frame) => {
@@ -227,13 +228,37 @@ where
227228
break;
228229
}
229230
};
230-
let deserialized = serde_json::from_slice(&chunk[..]);
231+
232+
buffer.extend_from_slice(&chunk[..]);
233+
234+
// Process complete lines since the format is newline-delimited JSON (NDJSON)
235+
while let Some(newline_pos) = buffer.iter().position(|&b| b == b'\n') {
236+
let line = buffer.drain(..=newline_pos).collect::<Vec<_>>();
237+
// Skip empty lines
238+
if line.iter().all(|&b| b.is_ascii_whitespace()) {
239+
continue;
240+
}
241+
let deserialized = serde_json::from_slice(&line[..]);
242+
tracing::info!(?deserialized);
243+
match deserialized {
244+
Ok(json) => logs.push(json),
245+
Err(error) => panic!(
246+
"parsing logs as JSON failed\n error: {error}\n line: {:?}",
247+
String::from_utf8_lossy(&line[..])
248+
),
249+
}
250+
}
251+
}
252+
253+
// Handle remaining data in buffer
254+
if !buffer.is_empty() && !buffer.iter().all(|&b| b.is_ascii_whitespace()) {
255+
let deserialized = serde_json::from_slice(&buffer[..]);
231256
tracing::info!(?deserialized);
232257
match deserialized {
233258
Ok(json) => logs.push(json),
234259
Err(error) => panic!(
235-
"parsing logs as JSON failed\n error: {error}\n chunk: {:?}",
236-
String::from_utf8_lossy(&chunk[..])
260+
"parsing logs as JSON failed (incomplete final line)\nerror: {error}\nbuffer: {:?}",
261+
String::from_utf8_lossy(&buffer[..])
237262
),
238263
}
239264
}

0 commit comments

Comments
 (0)