fix: correctly poll all out-of-order streaming chunks (closes #4326) (#4333)

* fix: correctly poll all out-of-order streaming chunks (closes #4326)

* [autofix.ci] apply automated fixes

* [autofix.ci] apply automated fixes (attempt 2/3)

---------

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Greg Johnston
2025-09-26 07:45:41 -04:00
committed by GitHub
parent 3b9ccdf57e
commit a453b7d1bd

View File

@@ -378,83 +378,91 @@ impl Stream for StreamBuilder {
let next_chunk = this.chunks.pop_front(); let next_chunk = this.chunks.pop_front();
match next_chunk { match next_chunk {
None => { None => {
// now, handle out-of-order chunks if this.pending_ooo.is_empty() {
if let Some(mut pending) = this.pending_ooo.pop_front() { if this.sync_buf.is_empty() {
match pending.as_mut().poll(cx) { Poll::Ready(None)
Poll::Ready(OooChunk { } else {
id, Poll::Ready(Some(mem::take(&mut this.sync_buf)))
chunks, }
replace, } else {
nonce, // check if *any* pending out-of-order chunk is ready
}) => { for mut chunk in mem::take(&mut this.pending_ooo) {
let opening = format!("<!--s-{id}o-->"); match chunk.as_mut().poll(cx) {
let placeholder_at = Poll::Ready(OooChunk {
this.sync_buf.find(&opening); id,
if let Some(start) = placeholder_at { chunks,
let closing = format!("<!--s-{id}c-->"); replace,
let end = nonce,
this.sync_buf.find(&closing).unwrap(); }) => {
let chunks_iter = chunks.into_iter().rev(); let opening = format!("<!--s-{id}o-->");
let placeholder_at =
this.sync_buf.find(&opening);
if let Some(start) = placeholder_at {
let closing = format!("<!--s-{id}c-->");
let end = this
.sync_buf
.find(&closing)
.unwrap();
let chunks_iter =
chunks.into_iter().rev();
// TODO can probably make this more efficient // TODO can probably make this more efficient
let (before, replaced) = let (before, replaced) =
this.sync_buf.split_at(start); this.sync_buf.split_at(start);
let (_, after) = replaced let (_, after) = replaced.split_at(
.split_at(end - start + closing.len()); end - start + closing.len(),
let mut buf = String::new(); );
buf.push_str(before); let mut buf = String::new();
buf.push_str(before);
let mut held_chunks = VecDeque::new(); let mut held_chunks = VecDeque::new();
for chunk in chunks_iter { for chunk in chunks_iter {
if let StreamChunk::Sync(ready) = chunk if let StreamChunk::Sync(ready) =
{ chunk
buf.push_str(&ready); {
} else { buf.push_str(&ready);
held_chunks.push_front(chunk); } else {
held_chunks.push_front(chunk);
}
} }
} buf.push_str(after);
buf.push_str(after); this.sync_buf = buf;
this.sync_buf = buf; for chunk in held_chunks {
for chunk in held_chunks {
this.chunks.push_front(chunk);
}
} else {
OooChunk::push_start(
&id,
&mut this.sync_buf,
);
for chunk in chunks.into_iter().rev() {
if let StreamChunk::Sync(ready) = chunk
{
this.sync_buf.push_str(&ready);
} else {
this.chunks.push_front(chunk); this.chunks.push_front(chunk);
} }
} else {
OooChunk::push_start(
&id,
&mut this.sync_buf,
);
for chunk in chunks.into_iter().rev() {
if let StreamChunk::Sync(ready) =
chunk
{
this.sync_buf.push_str(&ready);
} else {
this.chunks.push_front(chunk);
}
}
OooChunk::push_end_with_nonce(
replace,
&id,
&mut this.sync_buf,
nonce.as_deref(),
);
} }
OooChunk::push_end_with_nonce(
replace,
&id,
&mut this.sync_buf,
nonce.as_deref(),
);
} }
self.poll_next(cx) Poll::Pending => {
} this.pending_ooo.push_back(chunk);
Poll::Pending => {
this.pending_ooo.push_back(pending);
if this.sync_buf.is_empty() {
Poll::Pending
} else {
Poll::Ready(Some(mem::take(
&mut this.sync_buf,
)))
} }
} }
} }
} else if this.sync_buf.is_empty() {
Poll::Ready(None) if this.sync_buf.is_empty() {
} else { Poll::Pending
Poll::Ready(Some(mem::take(&mut this.sync_buf))) } else {
Poll::Ready(Some(mem::take(&mut this.sync_buf)))
}
} }
} }
Some(StreamChunk::Sync(value)) => { Some(StreamChunk::Sync(value)) => {