Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions lib/internal/blob.js
Original file line number Diff line number Diff line change
Expand Up @@ -453,30 +453,40 @@ function createBlobReaderStream(reader) {
// There really should only be one read at a time so using an
// array here is purely defensive.
this.pendingPulls = [];
// Register a wakeup callback that the C++ side can invoke
// Lazily register a wakeup callback that the C++ side can invoke
// when new data is available after a STATUS_BLOCK.
reader.setWakeup(() => {
this.wakeup = () => {
if (this.pendingPulls.length > 0) {
this.readNext(c);
}
});
};
},
pull(c) {
const { promise, resolve, reject } = PromiseWithResolvers();
if (this.pendingPulls.length === 0) {
reader.setWakeup(this.wakeup);
}
this.pendingPulls.push({ resolve, reject });
this.readNext(c);
return promise;
},
clearWakeupIfIdle() {
if (this.pendingPulls.length === 0) {
reader.setWakeup(undefined);
}
},
readNext(c) {
reader.pull((status, buffer) => {
// If pendingPulls is empty here, the stream had to have
// been canceled, and we don't really care about the result.
// We can simply exit.
if (this.pendingPulls.length === 0) {
reader.setWakeup(undefined);
return;
}
if (status === 0) {
// EOS
reader.setWakeup(undefined);
c.close();
// This is to signal the end for byob readers
// see https://streams.spec.whatwg.org/#example-rbs-pull
Expand All @@ -488,6 +498,7 @@ function createBlobReaderStream(reader) {
// The read could fail for many different reasons when reading
// from a non-memory resident blob part (e.g. file-backed blob).
// The error details the system error code.
reader.setWakeup(undefined);
const error =
lazyDOMException('The blob could not be read',
'NotReadableError');
Expand All @@ -497,7 +508,7 @@ function createBlobReaderStream(reader) {
return;
} else if (status === 2) {
// STATUS_BLOCK: No data available yet. The wakeup callback
// registered in start() will re-invoke readNext when data
// registered in pull() will re-invoke readNext when data
// arrives.
return;
}
Expand All @@ -517,6 +528,7 @@ function createBlobReaderStream(reader) {
if (this.pendingPulls.length !== 0) {
const pending = this.pendingPulls.shift();
pending.resolve();
this.clearWakeupIfIdle();
}
return;
}
Expand All @@ -525,6 +537,7 @@ function createBlobReaderStream(reader) {
});
},
cancel(reason) {
reader.setWakeup(undefined);
// Reject any currently pending pulls here.
for (const pending of this.pendingPulls) {
pending.reject(reason);
Expand Down
56 changes: 56 additions & 0 deletions test/parallel/test-blob-stream-gc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Flags: --expose-gc --no-concurrent-array-buffer-sweeping
'use strict';

const common = require('../common');
const assert = require('assert');
const { setImmediate: setImmediatePromise } = require('timers/promises');

const MiB = 1024 * 1024;
const iterations = 64;
const maxRetained = 16 * MiB;

async function collectArrayBuffers() {
for (let i = 0; i < 3; i++) {
global.gc();
await setImmediatePromise();
}
}

async function assertNoBlobStreamRetention(name, fn) {
const buffer = Buffer.alloc(MiB);

await collectArrayBuffers();
const before = process.memoryUsage().arrayBuffers;

for (let i = 0; i < iterations; i++) {
await fn(buffer);
}

await collectArrayBuffers();
const retained = process.memoryUsage().arrayBuffers - before;

assert(
retained < maxRetained,
`${name} retained ${retained} bytes in arrayBuffers`,
);
}

(async () => {
await assertNoBlobStreamRetention('unused Blob streams',
common.mustCall(async (buffer) => {
new Blob([buffer]).stream();
}, iterations));

await assertNoBlobStreamRetention('cancelled Blob streams',
common.mustCall(async (buffer) => {
await new Blob([buffer]).stream()
.cancel();
}, iterations));

await assertNoBlobStreamRetention('drained Blob streams',
common.mustCall(async (buffer) => {
await new Response(
new Blob([buffer]).stream(),
).arrayBuffer();
}, iterations));
})().then(common.mustCall());
Loading