From e2c2a5a62cf69590b04fe5bb202cf3d235517f00 Mon Sep 17 00:00:00 2001 From: Lorent Lempereur Date: Fri, 22 May 2026 17:05:35 +0200 Subject: [PATCH] fix(core): Honor chunkSize when streaming S3-backed binary data (#30919) --- .../__tests__/object-store.manager.test.ts | 17 ++++- .../src/binary-data/__tests__/utils.test.ts | 55 +++++++++++++- .../src/binary-data/object-store.manager.ts | 4 +- .../__tests__/object-store.service.test.ts | 73 +++++++++++++++++++ .../object-store/object-store.service.ee.ts | 23 ++++-- packages/core/src/binary-data/utils.ts | 65 ++++++++++++++++- 6 files changed, 227 insertions(+), 10 deletions(-) diff --git a/packages/core/src/binary-data/__tests__/object-store.manager.test.ts b/packages/core/src/binary-data/__tests__/object-store.manager.test.ts index 5a47e724af0..dd091eab8b7 100644 --- a/packages/core/src/binary-data/__tests__/object-store.manager.test.ts +++ b/packages/core/src/binary-data/__tests__/object-store.manager.test.ts @@ -72,7 +72,22 @@ describe('getAsStream()', () => { const stream = await objectStoreManager.getAsStream(fileId); expect(stream).toBeInstanceOf(Readable); - expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { mode: 'stream' }); + expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { + mode: 'stream', + chunkSize: undefined, + }); + }); + + it('should forward chunkSize to the service', async () => { + objectStoreService.get.mockResolvedValue(mockStream); + + const providedChunkSize = 5 * 1024 * 1024; + await objectStoreManager.getAsStream(fileId, providedChunkSize); + + expect(objectStoreService.get).toHaveBeenCalledWith(fileId, { + mode: 'stream', + chunkSize: providedChunkSize, + }); }); }); diff --git a/packages/core/src/binary-data/__tests__/utils.test.ts b/packages/core/src/binary-data/__tests__/utils.test.ts index ece7739e0f5..1b9e7321544 100644 --- a/packages/core/src/binary-data/__tests__/utils.test.ts +++ b/packages/core/src/binary-data/__tests__/utils.test.ts @@ -2,7 +2,7 @@ import { UnexpectedError } from 'n8n-workflow'; import { Readable } from 'node:stream'; import { createGunzip } from 'node:zlib'; -import { binaryToBuffer } from '@/binary-data/utils'; +import { binaryToBuffer, createFixedSizeChunker } from '@/binary-data/utils'; describe('BinaryData/utils', () => { describe('binaryToBuffer', () => { @@ -32,4 +32,57 @@ describe('BinaryData/utils', () => { await expect(promise).rejects.toThrow('Failed to decompress response'); }); }); + + describe('createFixedSizeChunker', () => { + const drain = async (source: Readable): Promise => { + return await new Promise((resolve, reject) => { + const out: Buffer[] = []; + source.on('data', (chunk: Buffer) => out.push(Buffer.from(chunk))); + source.on('end', () => resolve(out)); + source.on('error', reject); + }); + }; + + it('should emit chunks of exactly chunkSize with a smaller final chunk', async () => { + const source = Readable.from([Buffer.from([1, 2]), Buffer.from([3, 4, 5, 6, 7, 8])]); + const chunks = await drain(source.pipe(createFixedSizeChunker(3))); + + expect(chunks.map((chunk) => chunk.length)).toEqual([3, 3, 2]); + expect(Buffer.concat(chunks)).toEqual(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8])); + }); + + it('should split a single large input into multiple sized chunks', async () => { + const source = Readable.from([Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9])]); + const chunks = await drain(source.pipe(createFixedSizeChunker(4))); + + expect(chunks.map((chunk) => chunk.length)).toEqual([4, 4, 1]); + expect(Buffer.concat(chunks)).toEqual(Buffer.from([1, 2, 3, 4, 5, 6, 7, 8, 9])); + }); + + it('should accumulate many small inputs into one full chunk', async () => { + const source = Readable.from(Array.from({ length: 5 }, (_, i) => Buffer.from([i + 1]))); + const chunks = await drain(source.pipe(createFixedSizeChunker(5))); + + expect(chunks.map((chunk) => chunk.length)).toEqual([5]); + expect(Buffer.concat(chunks)).toEqual(Buffer.from([1, 2, 3, 4, 5])); + }); + + it('should emit one undersized chunk when total bytes are less than chunkSize', async () => { + const source = Readable.from([Buffer.from([1, 2, 3])]); + const chunks = await drain(source.pipe(createFixedSizeChunker(10))); + + expect(chunks.map((chunk) => chunk.length)).toEqual([3]); + }); + + it('should emit nothing for an empty source', async () => { + const source = Readable.from([] as Buffer[]); + const chunks = await drain(source.pipe(createFixedSizeChunker(4))); + + expect(chunks).toEqual([]); + }); + + it.each([0, -1])('should throw when chunkSize is %s', (chunkSize) => { + expect(() => createFixedSizeChunker(chunkSize)).toThrow(UnexpectedError); + }); + }); }); diff --git a/packages/core/src/binary-data/object-store.manager.ts b/packages/core/src/binary-data/object-store.manager.ts index bd980d33aea..63806702116 100644 --- a/packages/core/src/binary-data/object-store.manager.ts +++ b/packages/core/src/binary-data/object-store.manager.ts @@ -36,8 +36,8 @@ export class ObjectStoreManager implements BinaryData.Manager { return await this.objectStoreService.get(fileId, { mode: 'buffer' }); } - async getAsStream(fileId: string) { - return await this.objectStoreService.get(fileId, { mode: 'stream' }); + async getAsStream(fileId: string, chunkSize?: number) { + return await this.objectStoreService.get(fileId, { mode: 'stream', chunkSize }); } async getMetadata(fileId: string): Promise { diff --git a/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts b/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts index f5d82cb2960..3a481a71949 100644 --- a/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts +++ b/packages/core/src/binary-data/object-store/__tests__/object-store.service.test.ts @@ -328,6 +328,79 @@ describe('ObjectStoreService', () => { await expect(promise).rejects.toThrowError(FAILED_REQUEST_ERROR_MESSAGE); }); + + it('should re-emit stream data as chunks of exactly chunkSize (last chunk smaller)', async () => { + // Source pushes 4 x 2 bytes; chunkSize=3 should rewrite boundaries to [3, 3, 2]. + // We listen to 'data' (which fires once per push) rather than iterate with `for await`, + // because the async iterator coalesces buffered pushes when all the data lands before the consumer starts reading. + // This is a quirk that only affects synthetic tests, not real streaming. + let pushes = 0; + const body = new Readable({ + read() { + if (pushes < 4) { + this.push(Buffer.from([pushes * 2, pushes * 2 + 1])); + pushes++; + } else { + this.push(null); + } + }, + }); + mockS3Send.mockResolvedValueOnce({ Body: body }); + + const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 3 }); + + const chunks: Buffer[] = await new Promise((resolve, reject) => { + const out: Buffer[] = []; + result.on('data', (chunk: Buffer) => out.push(Buffer.from(chunk))); + result.on('end', () => resolve(out)); + result.on('error', reject); + }); + + expect(chunks.map((c) => c.length)).toEqual([3, 3, 2]); + expect(Buffer.concat(chunks)).toEqual(Buffer.from([0, 1, 2, 3, 4, 5, 6, 7])); + }); + + it('should not rechunk when chunkSize is omitted or zero', async () => { + const body = new Readable({ read() {} }); + mockS3Send.mockResolvedValueOnce({ Body: body }); + + const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 0 }); + + expect(result).toBeInstanceOf(PassThrough); + }); + + it('should propagate body errors to the rechunked consumer', async () => { + const failure = new Error('boom'); + const body = new Readable({ + read() { + this.destroy(failure); + }, + }); + mockS3Send.mockResolvedValueOnce({ Body: body }); + + const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 4 }); + + await expect( + new Promise((_, reject) => { + result.on('error', reject); + result.resume(); + }), + ).rejects.toThrow('boom'); + }); + + it('should abort the S3 request when the rechunked consumer is destroyed', async () => { + const body = new Readable({ read() {} }); + mockS3Send.mockResolvedValueOnce({ Body: body }); + + const result = await objectStoreService.get(fileId, { mode: 'stream', chunkSize: 4 }); + const abortSignal = mockS3Send.mock.calls[0][1].abortSignal as AbortSignal; + expect(abortSignal.aborted).toBe(false); + + result.destroy(); + await new Promise((resolve) => result.on('close', resolve)); + + expect(abortSignal.aborted).toBe(true); + }); }); describe('deleteOne()', () => { diff --git a/packages/core/src/binary-data/object-store/object-store.service.ee.ts b/packages/core/src/binary-data/object-store/object-store.service.ee.ts index a70bf3e9219..6079ab08006 100644 --- a/packages/core/src/binary-data/object-store/object-store.service.ee.ts +++ b/packages/core/src/binary-data/object-store/object-store.service.ee.ts @@ -18,12 +18,12 @@ import { Logger } from '@n8n/backend-common'; import { Service } from '@n8n/di'; import { ensureError, UnexpectedError } from 'n8n-workflow'; import { createHash } from 'node:crypto'; -import { PassThrough, Readable } from 'node:stream'; +import { PassThrough, Readable, pipeline } from 'node:stream'; import { ObjectStoreConfig } from './object-store.config'; import type { MetadataResponseHeaders } from './types'; import type { BinaryData } from '../types'; -import { streamToBuffer } from '../utils'; +import { createFixedSizeChunker, streamToBuffer } from '../utils'; @Service() export class ObjectStoreService { @@ -126,10 +126,18 @@ export class ObjectStoreService { /** * Download an object as a stream or buffer from the configured bucket. + * + * In `stream` mode, pass `chunkSize` to guarantee that the returned stream + * emits chunks of exactly that many bytes (the final chunk may be smaller). + * Without it, chunk boundaries follow whatever the underlying socket emits, + * which can break consumers like S3 multipart upload that treat each emitted chunk as a fixed-size unit. */ - async get(fileId: string, { mode }: { mode: 'buffer' }): Promise; - async get(fileId: string, { mode }: { mode: 'stream' }): Promise; - async get(fileId: string, { mode }: { mode: 'stream' | 'buffer' }): Promise { + async get(fileId: string, opts: { mode: 'buffer' }): Promise; + async get(fileId: string, opts: { mode: 'stream'; chunkSize?: number }): Promise; + async get( + fileId: string, + { mode, chunkSize = 0 }: { mode: 'stream' | 'buffer'; chunkSize?: number }, + ): Promise { this.logger.debug('Sending GET request to S3', { bucket: this.bucket, key: fileId }); const command = new GetObjectCommand({ @@ -171,6 +179,11 @@ export class ObjectStoreService { body.on('error', (error) => wrapper.destroy(error)); body.pipe(wrapper); + if (chunkSize > 0) { + const rechunker = createFixedSizeChunker(chunkSize); + pipeline(wrapper, rechunker, () => {}); // Error/destroy propagation is handled via stream events on `rechunker` + return rechunker; + } return wrapper; } diff --git a/packages/core/src/binary-data/utils.ts b/packages/core/src/binary-data/utils.ts index c6557130b74..cb9cadd420f 100644 --- a/packages/core/src/binary-data/utils.ts +++ b/packages/core/src/binary-data/utils.ts @@ -1,5 +1,5 @@ import { UnexpectedError } from 'n8n-workflow'; -import type { Readable } from 'node:stream'; +import { Transform, type Readable } from 'node:stream'; import type { BinaryData } from './types'; @@ -31,6 +31,69 @@ export async function binaryToBuffer(body: Buffer | Readable) { return await streamToBuffer(body); } +/** + * A `Transform` that re-emits its input as chunks of exactly `chunkSize` bytes, with a possibly smaller final chunk. + * `chunkSize` must be a positive integer: values `<= 0` throws an `UnexpectedError`. + * + * Between transforms the internal queue carries at most one partial chunk (< `chunkSize` bytes). + * + * Wire the upstream source into the chunker with `node:stream.pipeline()`, not plain `.pipe()`. + * `pipeline()` propagates errors from upstream to the chunker + * (so consumers see them) and propagates destroy from the chunker to upstream + * (so sockets don't dangle when the consumer aborts). + * `.pipe()` does neither. + */ +export function createFixedSizeChunker(chunkSize: number): Transform { + if (chunkSize <= 0) { + throw new UnexpectedError(`createFixedSizeChunker requires chunkSize > 0, got ${chunkSize}`); + } + + const queue: Buffer[] = []; + let queued = 0; + + const take = (size: number): Buffer => { + const out = Buffer.allocUnsafe(size); + let written = 0; + while (written < size) { + const head = queue[0]; + const need = size - written; + if (head.length <= need) { + head.copy(out, written); + written += head.length; + queue.shift(); + } else { + head.copy(out, written, 0, need); + queue[0] = head.subarray(need); + written += need; + } + } + queued -= size; + return out; + }; + + return new Transform({ + transform(chunk: Buffer, _encoding, done) { + queue.push(chunk); + queued += chunk.length; + while (queued >= chunkSize) { + this.push(take(chunkSize)); + } + done(); + }, + flush(done) { + if (queued > 0) { + this.push(take(queued)); + } + done(); + }, + destroy(error, done) { + queue.length = 0; + queued = 0; + done(error); + }, + }); +} + export const FileLocation = { ofExecution: (workflowId: string, executionId: string): BinaryData.FileLocation => ({ type: 'execution',