fix(core): Honor chunkSize when streaming S3-backed binary data (#30919)

This commit is contained in:
Lorent Lempereur 2026-05-22 17:05:35 +02:00 committed by GitHub
parent 6bdd9656b5
commit e2c2a5a62c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 227 additions and 10 deletions

View File

@ -72,7 +72,22 @@ describe('getAsStream()', () => {
const stream = await objectStoreManager.getAsStream(fileId);
expect(stream).toBeInstanceOf(Readable);
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'stream' });
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, {
mode: 'stream',
chunkSize: undefined,
});
});
it('should forward chunkSize to the service', async () => {
objectStoreService.get.mockResolvedValue(mockStream);
const providedChunkSize = 5 * 1024 * 1024;
await objectStoreManager.getAsStream(fileId, providedChunkSize);
expect(objectStoreService.get).toHaveBeenCalledWith(fileId, {
mode: 'stream',
chunkSize: providedChunkSize,
});
});
});

View File

@ -2,7 +2,7 @@ import { UnexpectedError } from 'n8n-workflow';
import { Readable } from 'node:stream';
import { createGunzip } from 'node:zlib';
import { binaryToBuffer } from '@/binary-data/utils';
import { binaryToBuffer, createFixedSizeChunker } from '@/binary-data/utils';
describe('BinaryData/utils', () => {
describe('binaryToBuffer', () => {
@ -32,4 +32,57 @@ describe('BinaryData/utils', () => {
await expect(promise).rejects.toThrow('Failed to decompress response');
});
});
describe('createFixedSizeChunker', () => {
const drain = async (source: Readable): Promise<Buffer[]> => {
return await new Promise((resolve, reject) => {
const out: Buffer[] = [];
source.on('data', (chunk: Buffer) => out.push(Buffer.from(chunk)));
source.on('end', () => resolve(out));
source.on('error', reject);
});
};
it('should emit chunks of exactly chunkSize with a smaller final chunk', async () => {
const source = Readable.from([Buffer.from([1, 2]), Buffer.from([3, 4, 5, 6, 7, 8])]);
const chunks = await drain(source.pipe(createFixedSizeChunker(3)));
expect(chunks.map((chunk) => chunk.length)).toEqual([3, 3, 2]);
expect(Buffer.concat(chunks)).toEqual(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8]));
});
it('should split a single large input into multiple sized chunks', async () => {
const source = Readable.from([Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9])]);
const chunks = await drain(source.pipe(createFixedSizeChunker(4)));
expect(chunks.map((chunk) => chunk.length)).toEqual([4, 4, 1]);
expect(Buffer.concat(chunks)).toEqual(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9]));
});
it('should accumulate many small inputs into one full chunk', async () => {
const source = Readable.from(Array.from({ length: 5 }, (_, i) => Buffer.from([i + 1])));
const chunks = await drain(source.pipe(createFixedSizeChunker(5)));
expect(chunks.map((chunk) => chunk.length)).toEqual([5]);
expect(Buffer.concat(chunks)).toEqual(Buffer.from([1, 2, 3, 4, 5]));
});
it('should emit one undersized chunk when total bytes are less than chunkSize', async () => {
const source = Readable.from([Buffer.from([1, 2, 3])]);
const chunks = await drain(source.pipe(createFixedSizeChunker(10)));
expect(chunks.map((chunk) => chunk.length)).toEqual([3]);
});
it('should emit nothing for an empty source', async () => {
const source = Readable.from([] as Buffer[]);
const chunks = await drain(source.pipe(createFixedSizeChunker(4)));
expect(chunks).toEqual([]);
});
it.each([0, -1])('should throw when chunkSize is %s', (chunkSize) => {
expect(() => createFixedSizeChunker(chunkSize)).toThrow(UnexpectedError);
});
});
});

View File

@ -36,8 +36,8 @@ export class ObjectStoreManager implements BinaryData.Manager {
return await this.objectStoreService.get(fileId, { mode: 'buffer' });
}
async getAsStream(fileId: string) {
return await this.objectStoreService.get(fileId, { mode: 'stream' });
async getAsStream(fileId: string, chunkSize?: number) {
return await this.objectStoreService.get(fileId, { mode: 'stream', chunkSize });
}
async getMetadata(fileId: string): Promise<BinaryData.Metadata> {

View File

@ -328,6 +328,79 @@ describe('ObjectStoreService', () => {
await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE);
});
it('should re-emit stream data as chunks of exactly chunkSize (last chunk smaller)', async () => {
// Source pushes 4 x 2 bytes; chunkSize=3 should rewrite boundaries to [3, 3, 2].
// We listen to 'data' (which fires once per push) rather than iterate with `for await`,
// because the async iterator coalesces buffered pushes when all the data lands before the consumer starts reading.
// This is a quirk that only affects synthetic tests, not real streaming.
let pushes = 0;
const body = new Readable({
read() {
if (pushes < 4) {
this.push(Buffer.from([pushes * 2, pushes * 2 + 1]));
pushes++;
} else {
this.push(null);
}
},
});
mockS3Send.mockResolvedValueOnce({ Body: body });
const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 3 });
const chunks: Buffer[] = await new Promise((resolve, reject) => {
const out: Buffer[] = [];
result.on('data', (chunk: Buffer) => out.push(Buffer.from(chunk)));
result.on('end', () => resolve(out));
result.on('error', reject);
});
expect(chunks.map((c) => c.length)).toEqual([3, 3, 2]);
expect(Buffer.concat(chunks)).toEqual(Buffer.from([0, 1, 2, 3, 4, 5, 6, 7]));
});
it('should not rechunk when chunkSize is omitted or zero', async () => {
const body = new Readable({ read() {} });
mockS3Send.mockResolvedValueOnce({ Body: body });
const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 0 });
expect(result).toBeInstanceOf(PassThrough);
});
it('should propagate body errors to the rechunked consumer', async () => {
const failure = new Error('boom');
const body = new Readable({
read() {
this.destroy(failure);
},
});
mockS3Send.mockResolvedValueOnce({ Body: body });
const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 4 });
await expect(
new Promise((_, reject) => {
result.on('error', reject);
result.resume();
}),
).rejects.toThrow('boom');
});
it('should abort the S3 request when the rechunked consumer is destroyed', async () => {
const body = new Readable({ read() {} });
mockS3Send.mockResolvedValueOnce({ Body: body });
const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 4 });
const abortSignal = mockS3Send.mock.calls[0][1].abortSignal as AbortSignal;
expect(abortSignal.aborted).toBe(false);
result.destroy();
await new Promise((resolve) => result.on('close', resolve));
expect(abortSignal.aborted).toBe(true);
});
});
describe('deleteOne()', () => {

View File

@ -18,12 +18,12 @@ import { Logger } from '@n8n/backend-common';
import { Service } from '@n8n/di';
import { ensureError, UnexpectedError } from 'n8n-workflow';
import { createHash } from 'node:crypto';
import { PassThrough, Readable } from 'node:stream';
import { PassThrough, Readable, pipeline } from 'node:stream';
import { ObjectStoreConfig } from './object-store.config';
import type { MetadataResponseHeaders } from './types';
import type { BinaryData } from '../types';
import { streamToBuffer } from '../utils';
import { createFixedSizeChunker, streamToBuffer } from '../utils';
@Service()
export class ObjectStoreService {
@ -126,10 +126,18 @@ export class ObjectStoreService {
/**
* Download an object as a stream or buffer from the configured bucket.
*
* In `stream` mode, pass `chunkSize` to guarantee that the returned stream
* emits chunks of exactly that many bytes (the final chunk may be smaller).
* Without it, chunk boundaries follow whatever the underlying socket emits,
* which can break consumers like S3 multipart upload that treat each emitted chunk as a fixed-size unit.
*/
async get(fileId: string, { mode }: { mode: 'buffer' }): Promise<Buffer>;
async get(fileId: string, { mode }: { mode: 'stream' }): Promise<Readable>;
async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }): Promise<Buffer | Readable> {
async get(fileId: string, opts: { mode: 'buffer' }): Promise<Buffer>;
async get(fileId: string, opts: { mode: 'stream'; chunkSize?: number }): Promise<Readable>;
async get(
fileId: string,
{ mode, chunkSize = 0 }: { mode: 'stream' | 'buffer'; chunkSize?: number },
): Promise<Buffer | Readable> {
this.logger.debug('Sending GET request to S3', { bucket: this.bucket, key: fileId });
const command = new GetObjectCommand({
@ -171,6 +179,11 @@ export class ObjectStoreService {
body.on('error', (error) => wrapper.destroy(error));
body.pipe(wrapper);
if (chunkSize > 0) {
const rechunker = createFixedSizeChunker(chunkSize);
pipeline(wrapper, rechunker, () => {}); // Error/destroy propagation is handled via stream events on `rechunker`
return rechunker;
}
return wrapper;
}

View File

@ -1,5 +1,5 @@
import { UnexpectedError } from 'n8n-workflow';
import type { Readable } from 'node:stream';
import { Transform, type Readable } from 'node:stream';
import type { BinaryData } from './types';
@ -31,6 +31,69 @@ export async function binaryToBuffer(body: Buffer | Readable) {
return await streamToBuffer(body);
}
/**
* A `Transform` that re-emits its input as chunks of exactly `chunkSize` bytes, with a possibly smaller final chunk.
* `chunkSize` must be a positive integer: values `<= 0` throws an `UnexpectedError`.
*
* Between transforms the internal queue carries at most one partial chunk (< `chunkSize` bytes).
*
* Wire the upstream source into the chunker with `node:stream.pipeline()`, not plain `.pipe()`.
* `pipeline()` propagates errors from upstream to the chunker
* (so consumers see them) and propagates destroy from the chunker to upstream
* (so sockets don't dangle when the consumer aborts).
* `.pipe()` does neither.
*/
export function createFixedSizeChunker(chunkSize: number): Transform {
if (chunkSize <= 0) {
throw new UnexpectedError(`createFixedSizeChunker requires chunkSize > 0, got ${chunkSize}`);
}
const queue: Buffer[] = [];
let queued = 0;
const take = (size: number): Buffer => {
const out = Buffer.allocUnsafe(size);
let written = 0;
while (written < size) {
const head = queue[0];
const need = size - written;
if (head.length <= need) {
head.copy(out, written);
written += head.length;
queue.shift();
} else {
head.copy(out, written, 0, need);
queue[0] = head.subarray(need);
written += need;
}
}
queued -= size;
return out;
};
return new Transform({
transform(chunk: Buffer, _encoding, done) {
queue.push(chunk);
queued += chunk.length;
while (queued >= chunkSize) {
this.push(take(chunkSize));
}
done();
},
flush(done) {
if (queued > 0) {
this.push(take(queued));
}
done();
},
destroy(error, done) {
queue.length = 0;
queued = 0;
done(error);
},
});
}
export const FileLocation = {
ofExecution: (workflowId: string, executionId: string): BinaryData.FileLocation => ({
type: 'execution',