feat: User streams for compression / decompression (#20289)

This commit is contained in:
Stephen Wright 2025-10-06 10:14:08 +01:00 committed by GitHub
parent 208027b171
commit efffc75329
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 132 additions and 181 deletions

View File

@ -5,8 +5,18 @@ import { mkdir, rm, readdir, appendFile } from 'fs/promises';
import { mock } from 'jest-mock-extended';
import type { Cipher } from 'n8n-core';
// Mock fs/promises
jest.mock('fs/promises');
// Mock fs/promises with proper implementations
jest.mock('fs/promises', () => ({
mkdir: jest.fn(),
rm: jest.fn(),
readdir: jest.fn(),
appendFile: jest.fn(),
}));
// Mock compression utility
jest.mock('@/utils/compression.util', () => ({
compressFolder: jest.fn(),
}));
// Mock validateDbTypeForExportEntities
jest.mock('@/utils/validate-database-type', () => ({
@ -69,6 +79,16 @@ describe('ExportService', () => {
return [];
});
// Set up proper mock implementations for fs/promises
jest.mocked(mkdir).mockResolvedValue(undefined);
jest.mocked(rm).mockResolvedValue(undefined);
jest.mocked(readdir).mockResolvedValue([]);
jest.mocked(appendFile).mockResolvedValue(undefined);
// Mock the compression utility
const { compressFolder } = require('@/utils/compression.util');
jest.mocked(compressFolder).mockResolvedValue(undefined);
exportService = new ExportService(mockLogger, mockDataSource, mockCipher);
});

View File

@ -91,6 +91,7 @@ export class ExportService {
this.logger.info('\n🚀 Starting entity export...');
this.logger.info(`📁 Output directory: ${outputDir}`);
await rm(outputDir, { recursive: true }).catch(() => {});
// Ensure output directory exists
await mkdir(outputDir, { recursive: true });

View File

@ -1,119 +0,0 @@
import { compressFolder, decompressFolder } from '../compression.util';
import { mkdir, writeFile, readFile, rm } from 'fs/promises';
import * as path from 'path';
import { existsSync } from 'fs';
describe('CompressionUtil', () => {
const testDir = path.join(__dirname, 'test-data');
const outputDir = path.join(__dirname, 'test-output');
beforeEach(async () => {
// Clean up test directories
if (existsSync(testDir)) {
await rm(testDir, { recursive: true, force: true });
}
if (existsSync(outputDir)) {
await rm(outputDir, { recursive: true, force: true });
}
// Create test directory structure
await mkdir(testDir, { recursive: true });
await mkdir(path.join(testDir, 'subdir'), { recursive: true });
// Create test files
await writeFile(path.join(testDir, 'file1.txt'), 'Hello World!');
await writeFile(path.join(testDir, 'file2.json'), JSON.stringify({ test: 'data' }));
await writeFile(path.join(testDir, 'subdir', 'file3.txt'), 'Nested file content');
});
afterEach(async () => {
// Clean up
if (existsSync(testDir)) {
await rm(testDir, { recursive: true, force: true });
}
if (existsSync(outputDir)) {
await rm(outputDir, { recursive: true, force: true });
}
});
describe('compressFolder', () => {
it('should compress a folder into a ZIP archive', async () => {
const zipPath = path.join(outputDir, 'test.zip');
await compressFolder(testDir, zipPath);
expect(existsSync(zipPath)).toBe(true);
});
it('should compress with exclusion patterns', async () => {
const zipPath = path.join(outputDir, 'test-exclude.zip');
await compressFolder(testDir, zipPath, {
exclude: ['*.txt'],
});
expect(existsSync(zipPath)).toBe(true);
// Extract and verify excluded files are not present
const extractDir = path.join(outputDir, 'extracted-exclude');
await decompressFolder(zipPath, extractDir);
// Verify that .txt files are excluded
expect(existsSync(path.join(extractDir, 'file1.txt'))).toBe(false);
expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(false);
// Verify that .json files are included
expect(existsSync(path.join(extractDir, 'file2.json'))).toBe(true);
expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(false);
});
it('should compress with custom compression level', async () => {
const zipPath = path.join(outputDir, 'test-level.zip');
await compressFolder(testDir, zipPath, {
level: 1,
});
expect(existsSync(zipPath)).toBe(true);
});
});
describe('decompressFolder', () => {
it('should decompress a ZIP archive to a folder', async () => {
const zipPath = path.join(outputDir, 'test.zip');
const extractDir = path.join(outputDir, 'extracted');
// First compress
await compressFolder(testDir, zipPath);
// Then decompress
await decompressFolder(zipPath, extractDir);
// Verify files exist
expect(existsSync(path.join(extractDir, 'file1.txt'))).toBe(true);
expect(existsSync(path.join(extractDir, 'file2.json'))).toBe(true);
expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(true);
// Verify content
const content1 = await readFile(path.join(extractDir, 'file1.txt'), 'utf-8');
expect(content1).toBe('Hello World!');
const content2 = await readFile(path.join(extractDir, 'file2.json'), 'utf-8');
expect(JSON.parse(content2)).toEqual({ test: 'data' });
});
it('should decompress with exclusion patterns', async () => {
const zipPath = path.join(outputDir, 'test.zip');
const extractDir = path.join(outputDir, 'extracted');
await compressFolder(testDir, zipPath);
await decompressFolder(zipPath, extractDir, {
exclude: ['*.txt'],
});
expect(existsSync(path.join(extractDir, 'file1.txt'))).toBe(false);
expect(existsSync(path.join(extractDir, 'subdir', 'file3.txt'))).toBe(false);
expect(existsSync(path.join(extractDir, 'file2.json'))).toBe(true);
});
});
});

View File

@ -1,10 +1,7 @@
import * as fflate from 'fflate';
import { promisify } from 'util';
import { readFile, readdir, writeFile, mkdir, stat } from 'fs/promises';
import { readFile, readdir, writeFile, mkdir } from 'fs/promises';
import * as path from 'path';
const unzip = promisify(fflate.unzip);
const zip = promisify(fflate.zip);
import { createWriteStream, createReadStream } from 'fs';
// Reuse the same compression levels as the Compression node
const ALREADY_COMPRESSED = [
@ -71,8 +68,8 @@ function sanitizePath(fileName: string, outputDir: string): string {
}
/**
* Compress a folder into a ZIP archive
* Reuses the same patterns as the Compression node
* Compress a folder into a ZIP archive using streaming
* Based on fflate documentation: https://github.com/101arrowz/fflate
*/
export async function compressFolder(
sourceDir: string,
@ -81,78 +78,126 @@ export async function compressFolder(
): Promise<void> {
const { level = 6, exclude = [], includeHidden = false } = options;
const zipData: fflate.Zippable = {};
await addDirectoryToZip(sourceDir, '', zipData, { exclude, includeHidden, level });
// Ensure output directory exists
const outputDir = path.dirname(outputPath);
await mkdir(outputDir, { recursive: true });
const buffer = await zip(zipData);
await writeFile(outputPath, buffer);
// Create write stream for the output ZIP file
const outputStream = createWriteStream(outputPath);
// Create streaming ZIP using fflate
const zip = new fflate.Zip();
// Handle data from the ZIP stream
zip.ondata = (error, data, final) => {
if (error) {
outputStream.destroy(error);
return;
}
outputStream.write(Buffer.from(data));
if (final) {
outputStream.end();
}
};
// Add directory contents to ZIP using streaming
await addDirectoryToZipStreaming(sourceDir, '', zip, { exclude, includeHidden, level });
// Finalize the ZIP
zip.end();
// Wait for the stream to finish
return await new Promise<void>((resolve, reject) => {
outputStream.on('finish', resolve);
outputStream.on('error', reject);
});
}
/**
* Decompress a ZIP archive to a folder
* Reuses the same patterns as the Compression node
* Decompress a ZIP archive to a folder using streaming
* Reuses the same patterns as the Compression node but with streaming approach
*/
export async function decompressFolder(
sourcePath: string,
outputDir: string,
options: DecompressionOptions = {},
): Promise<void> {
const { overwrite = false, exclude = [] } = options;
export async function decompressFolder(sourcePath: string, outputDir: string): Promise<void> {
// Ensure output directory exists
await mkdir(outputDir, { recursive: true });
const zipContent = await readFile(sourcePath);
const files = await unzip(zipContent);
return await new Promise<void>(async (resolve, reject) => {
let filesToProcess = 0;
for (const [fileName, fileData] of Object.entries(files)) {
// Skip excluded files
if (
exclude.some((pattern) =>
pattern.startsWith('*.') ? fileName.endsWith(pattern.slice(1)) : fileName.includes(pattern),
)
) {
continue;
}
const unzip = new fflate.Unzip((stream) => {
if (!stream.name.endsWith('/')) {
filesToProcess++;
// Skip __MACOSX files (same logic as Compression node)
if (fileName.includes('__MACOSX')) {
continue;
}
const chunks: Uint8Array[] = [];
let totalLength = 0;
// Sanitize path to prevent zip slip attacks
const filePath = sanitizePath(fileName, outputDir);
const dirPath = path.dirname(filePath);
// Sanitize path to prevent zip slip attacks
const filePath = sanitizePath(stream.name, outputDir);
const dirPath = path.dirname(filePath);
// Create directory if it doesn't exist
await mkdir(dirPath, { recursive: true });
// Create directory if it doesn't exist
mkdir(dirPath, { recursive: true }).catch((error) => {
if (error.code !== 'EEXIST') {
reject(error);
}
});
// Check if file exists and handle overwrite
try {
await stat(filePath);
if (!overwrite) {
continue; // Skip existing files
stream.ondata = async (error, chunk, final) => {
if (error) {
reject(error);
return;
}
chunks.push(chunk);
totalLength += chunk.length;
if (final) {
const finalBuffer = new Uint8Array(totalLength);
let offset = 0;
for (const chunk of chunks) {
finalBuffer.set(chunk, offset);
offset += chunk.length;
}
await writeFile(filePath, Buffer.from(finalBuffer));
filesToProcess--;
if (filesToProcess === 0) {
resolve();
}
}
};
stream.start();
}
} catch {
// File doesn't exist, continue
});
unzip.register(fflate.AsyncUnzipInflate);
// Create readable stream
const zipStream = createReadStream(sourcePath);
for await (const chunk of zipStream) {
unzip.push(chunk as Uint8Array);
}
await writeFile(filePath, Buffer.from(fileData.buffer));
}
zipStream.on('error', reject);
// If no files were processed (e.g., only directories), resolve immediately
if (filesToProcess === 0) {
resolve();
}
});
}
/**
* Add directory contents to zip data structure
* Follows the same pattern as Compression node
* Add directory contents to zip using streaming approach
* This version processes files one at a time instead of loading everything into memory
*/
async function addDirectoryToZip(
async function addDirectoryToZipStreaming(
dirPath: string,
zipPath: string,
zipData: fflate.Zippable,
zip: fflate.Zip,
options: { exclude: string[]; includeHidden: boolean; level: fflate.ZipOptions['level'] },
): Promise<void> {
const { exclude, includeHidden, level } = options;
@ -180,19 +225,23 @@ async function addDirectoryToZip(
const zipEntryPath = zipPath ? `${zipPath}/${entry.name}` : entry.name;
if (entry.isDirectory()) {
await addDirectoryToZip(fullPath, zipEntryPath, zipData, options);
await addDirectoryToZipStreaming(fullPath, zipEntryPath, zip, options);
} else {
const fileContent = await readFile(fullPath);
// Determine compression level based on file extension
const fileExtension = path.extname(entry.name).toLowerCase().slice(1);
// Use same compression logic as Compression node
const compressionLevel: fflate.ZipOptions['level'] = ALREADY_COMPRESSED.includes(
fileExtension,
)
? 0
: level;
zipData[zipEntryPath] = [new Uint8Array(fileContent), { level: compressionLevel }];
// Create a ZIP stream for this specific file
const zipStream = new fflate.ZipDeflate(zipEntryPath, { level: compressionLevel });
zip.add(zipStream);
// Read file content and stream it
const fileContent = await readFile(fullPath);
zipStream.push(new Uint8Array(fileContent), true);
}
}
}