From efffc7532976b32e9ca57d8978f4041cb631ebc4 Mon Sep 17 00:00:00 2001 From: Stephen Wright Date: Mon, 6 Oct 2025 10:14:08 +0100 Subject: [PATCH] feat: User streams for compression / decompression (#20289) --- .../services/__tests__/export.service.test.ts | 24 ++- packages/cli/src/services/export.service.ts | 1 + .../utils/__tests__/compression.util.test.ts | 119 ------------ packages/cli/src/utils/compression.util.ts | 169 +++++++++++------- 4 files changed, 132 insertions(+), 181 deletions(-) delete mode 100644 packages/cli/src/utils/__tests__/compression.util.test.ts diff --git a/packages/cli/src/services/__tests__/export.service.test.ts b/packages/cli/src/services/__tests__/export.service.test.ts index f145e42bc65..ad396120cb7 100644 --- a/packages/cli/src/services/__tests__/export.service.test.ts +++ b/packages/cli/src/services/__tests__/export.service.test.ts @@ -5,8 +5,18 @@ import { mkdir, rm, readdir, appendFile } from 'fs/promises'; import { mock } from 'jest-mock-extended'; import type { Cipher } from 'n8n-core'; -// Mock fs/promises -jest.mock('fs/promises'); +// Mock fs/promises with proper implementations +jest.mock('fs/promises', () => ({ + mkdir: jest.fn(), + rm: jest.fn(), + readdir: jest.fn(), + appendFile: jest.fn(), +})); + +// Mock compression utility +jest.mock('@/utils/compression.util', () => ({ + compressFolder: jest.fn(), +})); // Mock validateDbTypeForExportEntities jest.mock('@/utils/validate-database-type', () => ({ @@ -69,6 +79,16 @@ describe('ExportService', () => { return []; }); + // Set up proper mock implementations for fs/promises + jest.mocked(mkdir).mockResolvedValue(undefined); + jest.mocked(rm).mockResolvedValue(undefined); + jest.mocked(readdir).mockResolvedValue([]); + jest.mocked(appendFile).mockResolvedValue(undefined); + + // Mock the compression utility + const { compressFolder } = require('@/utils/compression.util'); + jest.mocked(compressFolder).mockResolvedValue(undefined); + exportService = new ExportService(mockLogger, mockDataSource, mockCipher); }); diff --git a/packages/cli/src/services/export.service.ts b/packages/cli/src/services/export.service.ts index d409ee4da10..ffdb8950b2a 100644 --- a/packages/cli/src/services/export.service.ts +++ b/packages/cli/src/services/export.service.ts @@ -91,6 +91,7 @@ export class ExportService { this.logger.info('\nšŸš€ Starting entity export...'); this.logger.info(`šŸ“ Output directory: ${outputDir}`); + await rm(outputDir, { recursive: true }).catch(() => {}); // Ensure output directory exists await mkdir(outputDir, { recursive: true }); diff --git a/packages/cli/src/utils/__tests__/compression.util.test.ts b/packages/cli/src/utils/__tests__/compression.util.test.ts deleted file mode 100644 index 78dba1a36b8..00000000000 --- a/packages/cli/src/utils/__tests__/compression.util.test.ts +++ /dev/null @@ -1,119 +0,0 @@ -import { compressFolder, decompressFolder } from '../compression.util'; -import { mkdir, writeFile, readFile, rm } from 'fs/promises'; -import * as path from 'path'; -import { existsSync } from 'fs'; - -describe('CompressionUtil', () => { - const testDir = path.join(__dirname, 'test-data'); - const outputDir = path.join(__dirname, 'test-output'); - - beforeEach(async () => { - // Clean up test directories - if (existsSync(testDir)) { - await rm(testDir, { recursive: true, force: true }); - } - if (existsSync(outputDir)) { - await rm(outputDir, { recursive: true, force: true }); - } - - // Create test directory structure - await mkdir(testDir, { recursive: true }); - await mkdir(path.join(testDir, 'subdir'), { recursive: true }); - - // Create test files - await writeFile(path.join(testDir, 'file1.txt'), 'Hello World!'); - await writeFile(path.join(testDir, 'file2.json'), JSON.stringify({ test: 'data' })); - await writeFile(path.join(testDir, 'subdir', 'file3.txt'), 'Nested file content'); - }); - - afterEach(async () => { - // Clean up - if (existsSync(testDir)) { - await rm(testDir, { recursive: true, force: true }); - } - if (existsSync(outputDir)) { - await rm(outputDir, { recursive: true, force: true }); - } - }); - - describe('compressFolder', () => { - it('should compress a folder into a ZIP archive', async () => { - const zipPath = path.join(outputDir, 'test.zip'); - - await compressFolder(testDir, zipPath); - - expect(existsSync(zipPath)).toBe(true); - }); - - it('should compress with exclusion patterns', async () => { - const zipPath = path.join(outputDir, 'test-exclude.zip'); - - await compressFolder(testDir, zipPath, { - exclude: ['*.txt'], - }); - - expect(existsSync(zipPath)).toBe(true); - - // Extract and verify excluded files are not present - const extractDir = path.join(outputDir, 'extracted-exclude'); - await decompressFolder(zipPath, extractDir); - - // Verify that .txt files are excluded - expect(existsSync(path.join(extractDir, 'file1.txt'))).toBe(false); - expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(false); - - // Verify that .json files are included - expect(existsSync(path.join(extractDir, 'file2.json'))).toBe(true); - expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(false); - }); - - it('should compress with custom compression level', async () => { - const zipPath = path.join(outputDir, 'test-level.zip'); - - await compressFolder(testDir, zipPath, { - level: 1, - }); - - expect(existsSync(zipPath)).toBe(true); - }); - }); - - describe('decompressFolder', () => { - it('should decompress a ZIP archive to a folder', async () => { - const zipPath = path.join(outputDir, 'test.zip'); - const extractDir = path.join(outputDir, 'extracted'); - - // First compress - await compressFolder(testDir, zipPath); - - // Then decompress - await decompressFolder(zipPath, extractDir); - - // Verify files exist - expect(existsSync(path.join(extractDir, 'file1.txt'))).toBe(true); - expect(existsSync(path.join(extractDir, 'file2.json'))).toBe(true); - expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(true); - - // Verify content - const content1 = await readFile(path.join(extractDir, 'file1.txt'), 'utf-8'); - expect(content1).toBe('Hello World!'); - - const content2 = await readFile(path.join(extractDir, 'file2.json'), 'utf-8'); - expect(JSON.parse(content2)).toEqual({ test: 'data' }); - }); - - it('should decompress with exclusion patterns', async () => { - const zipPath = path.join(outputDir, 'test.zip'); - const extractDir = path.join(outputDir, 'extracted'); - - await compressFolder(testDir, zipPath); - await decompressFolder(zipPath, extractDir, { - exclude: ['*.txt'], - }); - - expect(existsSync(path.join(extractDir, 'file1.txt'))).toBe(false); - expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(false); - expect(existsSync(path.join(extractDir, 'file2.json'))).toBe(true); - }); - }); -}); diff --git a/packages/cli/src/utils/compression.util.ts b/packages/cli/src/utils/compression.util.ts index 83c6387ded2..c3b6626a1e8 100644 --- a/packages/cli/src/utils/compression.util.ts +++ b/packages/cli/src/utils/compression.util.ts @@ -1,10 +1,7 @@ import * as fflate from 'fflate'; -import { promisify } from 'util'; -import { readFile, readdir, writeFile, mkdir, stat } from 'fs/promises'; +import { readFile, readdir, writeFile, mkdir } from 'fs/promises'; import * as path from 'path'; - -const unzip = promisify(fflate.unzip); -const zip = promisify(fflate.zip); +import { createWriteStream, createReadStream } from 'fs'; // Reuse the same compression levels as the Compression node const ALREADY_COMPRESSED = [ @@ -71,8 +68,8 @@ function sanitizePath(fileName: string, outputDir: string): string { } /** - * Compress a folder into a ZIP archive - * Reuses the same patterns as the Compression node + * Compress a folder into a ZIP archive using streaming + * Based on fflate documentation: https://github.com/101arrowz/fflate */ export async function compressFolder( sourceDir: string, @@ -81,78 +78,126 @@ export async function compressFolder( ): Promise { const { level = 6, exclude = [], includeHidden = false } = options; - const zipData: fflate.Zippable = {}; - - await addDirectoryToZip(sourceDir, '', zipData, { exclude, includeHidden, level }); - // Ensure output directory exists const outputDir = path.dirname(outputPath); await mkdir(outputDir, { recursive: true }); - const buffer = await zip(zipData); - await writeFile(outputPath, buffer); + + // Create write stream for the output ZIP file + const outputStream = createWriteStream(outputPath); + + // Create streaming ZIP using fflate + const zip = new fflate.Zip(); + + // Handle data from the ZIP stream + zip.ondata = (error, data, final) => { + if (error) { + outputStream.destroy(error); + return; + } + outputStream.write(Buffer.from(data)); + if (final) { + outputStream.end(); + } + }; + + // Add directory contents to ZIP using streaming + await addDirectoryToZipStreaming(sourceDir, '', zip, { exclude, includeHidden, level }); + + // Finalize the ZIP + zip.end(); + + // Wait for the stream to finish + return await new Promise((resolve, reject) => { + outputStream.on('finish', resolve); + outputStream.on('error', reject); + }); } /** - * Decompress a ZIP archive to a folder - * Reuses the same patterns as the Compression node + * Decompress a ZIP archive to a folder using streaming + * Reuses the same patterns as the Compression node but with streaming approach */ -export async function decompressFolder( - sourcePath: string, - outputDir: string, - options: DecompressionOptions = {}, -): Promise { - const { overwrite = false, exclude = [] } = options; - +export async function decompressFolder(sourcePath: string, outputDir: string): Promise { // Ensure output directory exists await mkdir(outputDir, { recursive: true }); - const zipContent = await readFile(sourcePath); - const files = await unzip(zipContent); + return await new Promise(async (resolve, reject) => { + let filesToProcess = 0; - for (const [fileName, fileData] of Object.entries(files)) { - // Skip excluded files - if ( - exclude.some((pattern) => - pattern.startsWith('*.') ? fileName.endsWith(pattern.slice(1)) : fileName.includes(pattern), - ) - ) { - continue; - } + const unzip = new fflate.Unzip((stream) => { + if (!stream.name.endsWith('/')) { + filesToProcess++; - // Skip __MACOSX files (same logic as Compression node) - if (fileName.includes('__MACOSX')) { - continue; - } + const chunks: Uint8Array[] = []; + let totalLength = 0; - // Sanitize path to prevent zip slip attacks - const filePath = sanitizePath(fileName, outputDir); - const dirPath = path.dirname(filePath); + // Sanitize path to prevent zip slip attacks + const filePath = sanitizePath(stream.name, outputDir); + const dirPath = path.dirname(filePath); - // Create directory if it doesn't exist - await mkdir(dirPath, { recursive: true }); + // Create directory if it doesn't exist + mkdir(dirPath, { recursive: true }).catch((error) => { + if (error.code !== 'EEXIST') { + reject(error); + } + }); - // Check if file exists and handle overwrite - try { - await stat(filePath); - if (!overwrite) { - continue; // Skip existing files + stream.ondata = async (error, chunk, final) => { + if (error) { + reject(error); + return; + } + + chunks.push(chunk); + totalLength += chunk.length; + + if (final) { + const finalBuffer = new Uint8Array(totalLength); + let offset = 0; + for (const chunk of chunks) { + finalBuffer.set(chunk, offset); + offset += chunk.length; + } + await writeFile(filePath, Buffer.from(finalBuffer)); + + filesToProcess--; + + if (filesToProcess === 0) { + resolve(); + } + } + }; + + stream.start(); } - } catch { - // File doesn't exist, continue + }); + + unzip.register(fflate.AsyncUnzipInflate); + + // Create readable stream + const zipStream = createReadStream(sourcePath); + + for await (const chunk of zipStream) { + unzip.push(chunk as Uint8Array); } - await writeFile(filePath, Buffer.from(fileData.buffer)); - } + zipStream.on('error', reject); + + // If no files were processed (e.g., only directories), resolve immediately + if (filesToProcess === 0) { + resolve(); + } + }); } /** - * Add directory contents to zip data structure - * Follows the same pattern as Compression node + * Add directory contents to zip using streaming approach + * This version processes files one at a time instead of loading everything into memory */ -async function addDirectoryToZip( +async function addDirectoryToZipStreaming( dirPath: string, zipPath: string, - zipData: fflate.Zippable, + zip: fflate.Zip, options: { exclude: string[]; includeHidden: boolean; level: fflate.ZipOptions['level'] }, ): Promise { const { exclude, includeHidden, level } = options; @@ -180,19 +225,23 @@ async function addDirectoryToZip( const zipEntryPath = zipPath ? `${zipPath}/${entry.name}` : entry.name; if (entry.isDirectory()) { - await addDirectoryToZip(fullPath, zipEntryPath, zipData, options); + await addDirectoryToZipStreaming(fullPath, zipEntryPath, zip, options); } else { - const fileContent = await readFile(fullPath); + // Determine compression level based on file extension const fileExtension = path.extname(entry.name).toLowerCase().slice(1); - - // Use same compression logic as Compression node const compressionLevel: fflate.ZipOptions['level'] = ALREADY_COMPRESSED.includes( fileExtension, ) ? 0 : level; - zipData[zipEntryPath] = [new Uint8Array(fileContent), { level: compressionLevel }]; + // Create a ZIP stream for this specific file + const zipStream = new fflate.ZipDeflate(zipEntryPath, { level: compressionLevel }); + zip.add(zipStream); + + // Read file content and stream it + const fileContent = await readFile(fullPath); + zipStream.push(new Uint8Array(fileContent), true); } } }