feat: Add SharedWorker and DataWorker initialization with SQLite WASM support (no-changelog) (#24308)

Co-authored-by: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Alex Grozav 2026-01-20 14:31:51 +02:00 committed by GitHub
parent 502479ccdb
commit b014357e1f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
43 changed files with 3300 additions and 279 deletions

View File

@ -1,3 +1,5 @@
export { GetNodeTypesByIdentifierRequestDto } from './node-types/get-node-types-by-identifier.dto';
export { AiAskRequestDto } from './ai/ai-ask-request.dto';
export { AiChatRequestDto } from './ai/ai-chat-request.dto';
export { AiBuilderChatRequestDto } from './ai/ai-build-request.dto';

View File

@ -0,0 +1,21 @@
import { z } from 'zod';
import { Z } from 'zod-class';
/**
* Schema for node type identifier in the format "name@version"
* e.g., "n8n-nodes-base.httpRequest@4.2" or "n8n-nodes-base.if@2"
*/
const nodeTypeIdentifierSchema = z
.string()
.regex(
/^[\w.-]+@\d+(\.\d+)?(\.\d+)?$/,
'Invalid node type identifier format. Expected "name@version"',
);
export class GetNodeTypesByIdentifierRequestDto extends Z.class({
/**
* Array of node type identifiers in the format "name@version"
* e.g., ["n8n-nodes-base.httpRequest@4.2", "n8n-nodes-base.if@2"]
*/
identifiers: z.array(nodeTypeIdentifierSchema).min(1).max(1000),
}) {}

View File

@ -39,6 +39,7 @@ describe('AuthService Browser ID Whitelist', () => {
expect(skipEndpoints).toContain('/types/nodes.json');
expect(skipEndpoints).toContain('/types/credentials.json');
expect(skipEndpoints).toContain('/types/node-versions.json');
});
it('should include oauth callback urls in the skip browser ID check endpoints', () => {

View File

@ -85,6 +85,7 @@ export class AuthService {
// Skip browser ID check for type files
'/types/nodes.json',
'/types/credentials.json',
'/types/node-versions.json',
'/mcp-oauth/authorize/',
// Skip browser ID check for chat hub attachments

View File

@ -1,12 +1,31 @@
import { GetNodeTypesByIdentifierRequestDto } from '@n8n/api-types';
import { GlobalConfig } from '@n8n/config';
import { Post, RestController } from '@n8n/decorators';
import { Body, Post, RestController } from '@n8n/decorators';
import { Request } from 'express';
import { readFile } from 'fs/promises';
import get from 'lodash/get';
import type { INodeTypeDescription, INodeTypeNameVersion } from 'n8n-workflow';
import { coerce } from 'semver';
import { NodeTypes } from '@/node-types';
/**
* Parse a node type identifier string (name@version) into name and version
* @param identifier - e.g., "n8n-nodes-base.httpRequest@4.2"
* @returns { name, version } or null if invalid
*/
function parseNodeTypeIdentifier(identifier: string): { name: string; version: number } | null {
const atIndex = identifier.lastIndexOf('@');
if (atIndex === -1) return null;
const name = identifier.substring(0, atIndex);
if (!name) return null;
const versionStr = identifier.substring(atIndex + 1);
if (!coerce(versionStr)) return null;
return { name, version: parseFloat(versionStr) };
}
@RestController('/node-types')
export class NodeTypesController {
constructor(
@ -17,47 +36,44 @@ export class NodeTypesController {
@Post('/')
async getNodeInfo(req: Request) {
const nodeInfos = get(req, 'body.nodeInfos', []) as INodeTypeNameVersion[];
const defaultLocale = this.globalConfig.defaultLocale;
if (defaultLocale === 'en') {
return nodeInfos.reduce<INodeTypeDescription[]>((acc, { name, version }) => {
const { description } = this.nodeTypes.getByNameAndVersion(name, version);
acc.push(description);
return acc;
}, []);
}
const populateTranslation = async (
name: string,
version: number,
nodeTypes: INodeTypeDescription[],
) => {
const { description, sourcePath } = this.nodeTypes.getWithSourcePath(name, version);
const translationPath = await this.nodeTypes.getNodeTranslationPath({
nodeSourcePath: sourcePath,
longNodeType: description.name,
locale: defaultLocale,
});
try {
const translation = await readFile(translationPath, 'utf8');
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
description.translation = JSON.parse(translation);
} catch {
// ignore - no translation exists at path
}
nodeTypes.push(description);
};
const nodeTypes: INodeTypeDescription[] = [];
const promises = nodeInfos.map(
async ({ name, version }) => await populateTranslation(name, version, nodeTypes),
const descriptions = await Promise.all(
nodeInfos.map(async ({ name, version }) => {
return await this.nodeTypes.getDescriptionWithTranslation(name, version, defaultLocale);
}),
);
await Promise.all(promises);
return descriptions;
}
/**
* Get node types by their identifier strings (name@version format)
* This endpoint is optimized for fetching specific node types when doing incremental syncs
*/
@Post('/by-identifier')
async getNodeTypesByIdentifier(
@Body payload: GetNodeTypesByIdentifierRequestDto,
): Promise<INodeTypeDescription[]> {
const { identifiers = [] } = payload;
const defaultLocale = this.globalConfig.defaultLocale;
const nodeTypes: INodeTypeDescription[] = [];
for (const identifier of identifiers) {
const parsed = parseNodeTypeIdentifier(identifier);
if (!parsed) continue;
try {
const description = await this.nodeTypes.getDescriptionWithTranslation(
parsed.name,
parsed.version,
defaultLocale,
);
nodeTypes.push(description);
} catch {
// Skip node types that don't exist (may have been removed)
}
}
return nodeTypes;
}

View File

@ -1,7 +1,7 @@
import { Service } from '@n8n/di';
import type { NeededNodeType } from '@n8n/task-runner';
import type { Dirent } from 'fs';
import { readdir } from 'fs/promises';
import { readdir, readFile } from 'fs/promises';
import { RoutingNode } from 'n8n-core';
import type { ExecuteContext } from 'n8n-core';
import type { INodeType, INodeTypeDescription, INodeTypes, IVersionedNodeType } from 'n8n-workflow';
@ -28,6 +28,35 @@ export class NodeTypes implements INodeTypes {
return { description: { ...description }, sourcePath: nodeType.sourcePath };
}
/**
* Get a node type description with its translation loaded (if available for the locale).
*/
async getDescriptionWithTranslation(
nodeTypeName: string,
version: number,
locale: string,
): Promise<INodeTypeDescription> {
const { description, sourcePath } = this.getWithSourcePath(nodeTypeName, version);
if (locale !== 'en') {
const translationPath = await this.getNodeTranslationPath({
nodeSourcePath: sourcePath,
longNodeType: description.name,
locale,
});
try {
const translation = await readFile(translationPath, 'utf8');
// eslint-disable-next-line @typescript-eslint/no-unsafe-assignment
description.translation = JSON.parse(translation);
} catch {
// ignore - no translation exists at path
}
}
return description;
}
getByName(nodeType: string): INodeType | IVersionedNodeType {
return this.loadNodesAndCredentials.getNode(nodeType).type;
}

View File

@ -288,7 +288,11 @@ export class Server extends AbstractServer {
// Protect type files with authentication regardless of UI availability
const authService = Container.get(AuthService);
const protectedTypeFiles = ['/types/nodes.json', '/types/credentials.json'];
const protectedTypeFiles = [
'/types/nodes.json',
'/types/credentials.json',
'/types/node-versions.json',
];
protectedTypeFiles.forEach((path) => {
this.app.get(
path,

View File

@ -3,6 +3,7 @@ import type { GlobalConfig, SecurityConfig } from '@n8n/config';
import { Container } from '@n8n/di';
import { mock } from 'jest-mock-extended';
import type { BinaryDataConfig, InstanceSettings } from 'n8n-core';
import type { INodeTypeDescription } from 'n8n-workflow';
import type { CredentialTypes } from '@/credential-types';
import type { CredentialsOverwrites } from '@/credentials-overwrites';
@ -398,4 +399,103 @@ describe('FrontendService', () => {
expect(settings.aiBuilder.enabled).toBe(false);
});
});
describe('node version identifiers', () => {
it('should create type@version identifiers for single and multi-version nodes', () => {
const { service } = createMockService();
const getNodeVersionIdentifiers = service.getNodeVersionIdentifiers.bind(service);
const nodes = [
{
name: 'n8n-nodes-base.single',
version: 1,
},
{
name: 'n8n-nodes-base.multi',
version: [1, 2],
},
] as unknown as INodeTypeDescription[];
const identifiers = getNodeVersionIdentifiers(nodes);
expect(identifiers).toEqual(
expect.arrayContaining([
'n8n-nodes-base.single@1',
'n8n-nodes-base.multi@1',
'n8n-nodes-base.multi@2',
]),
);
expect(identifiers).toHaveLength(3);
});
it('should ignore invalid entries and deduplicate identifiers', () => {
const { service } = createMockService();
const getNodeVersionIdentifiers = service.getNodeVersionIdentifiers.bind(service);
const nodes = [
{
name: 'n8n-nodes-base.duplicate',
version: [1, 1, 2],
},
{
name: 'n8n-nodes-base.duplicate',
version: 2,
},
{
name: undefined as unknown as string,
version: 3,
},
{
name: 'n8n-nodes-base.invalidVersion',
},
] as unknown as INodeTypeDescription[];
const identifiers = getNodeVersionIdentifiers(nodes);
expect(identifiers).toEqual(
expect.arrayContaining(['n8n-nodes-base.duplicate@1', 'n8n-nodes-base.duplicate@2']),
);
expect(identifiers).toHaveLength(2);
});
});
describe('generateTypes', () => {
it('should write node versions file with generated identifiers', async () => {
const { service } = createMockService();
const originalNodes = (loadNodesAndCredentials.types as any).nodes;
(loadNodesAndCredentials.types as any).nodes = [
{ name: 'n8n-nodes-base.single', version: 1 },
{ name: 'n8n-nodes-base.multi', version: [1, 2] },
];
const writeStaticJSONSpy = jest
.spyOn(service as any, 'writeStaticJSON')
.mockImplementation(() => {});
try {
await (service as any).generateTypes();
const nodeVersionCall = writeStaticJSONSpy.mock.calls.find(
([name]) => name === 'node-versions',
);
expect(nodeVersionCall).toBeDefined();
const [, identifiers] = nodeVersionCall as [string, string[]];
expect(identifiers).toEqual(
expect.arrayContaining([
'n8n-nodes-base.single@1',
'n8n-nodes-base.multi@1',
'n8n-nodes-base.multi@2',
]),
);
expect(identifiers).toHaveLength(3);
} finally {
writeStaticJSONSpy.mockRestore();
(loadNodesAndCredentials.types as any).nodes = originalNodes;
}
});
});
});

View File

@ -7,7 +7,7 @@ import { createWriteStream } from 'fs';
import { mkdir } from 'fs/promises';
import uniq from 'lodash/uniq';
import { BinaryDataConfig, InstanceSettings } from 'n8n-core';
import type { ICredentialType, INodeTypeBaseDescription } from 'n8n-workflow';
import type { ICredentialType, INodeTypeBaseDescription, INodeTypeDescription } from 'n8n-workflow';
import path from 'path';
import config from '@/config';
@ -374,6 +374,8 @@ export class FrontendService {
await mkdir(path.join(staticCacheDir, 'types'), { recursive: true });
const { credentials, nodes } = this.loadNodesAndCredentials.types;
this.writeStaticJSON('nodes', nodes);
const nodeVersionIdentifiers = this.getNodeVersionIdentifiers(nodes);
this.writeStaticJSON('node-versions', nodeVersionIdentifiers);
this.writeStaticJSON('credentials', credentials);
}
@ -562,7 +564,26 @@ export class FrontendService {
return Object.fromEntries(this.moduleRegistry.settings);
}
private writeStaticJSON(name: string, data: INodeTypeBaseDescription[] | ICredentialType[]) {
getNodeVersionIdentifiers(nodes: INodeTypeDescription[]): string[] {
const identifiers = new Set<string>();
for (const node of nodes) {
if (!node?.name || node.version === undefined) continue;
const versions = Array.isArray(node.version) ? node.version : [node.version];
for (const version of versions) {
if (version === undefined) continue;
identifiers.add(`${node.name}@${String(version)}`);
}
}
return Array.from(identifiers);
}
private writeStaticJSON(
name: string,
data: Array<INodeTypeBaseDescription | ICredentialType | string>,
) {
const { staticCacheDir } = this.instanceSettings;
const filePath = path.join(staticCacheDir, `types/${name}.json`);
const stream = createWriteStream(filePath, 'utf-8');

View File

@ -1,6 +1,7 @@
import type {
ActionResultRequestDto,
CommunityNodeType,
GetNodeTypesByIdentifierRequestDto,
OptionsRequestDto,
ResourceLocatorRequestDto,
ResourceMapperFieldsRequestDto,
@ -38,6 +39,26 @@ export async function getNodeTypes(baseUrl: string) {
return await fetchNodeTypesJsonWithRetry(baseUrl + 'types/nodes.json');
}
export async function getNodeTypeVersions(baseUrl: string): Promise<string[]> {
return await fetchNodeTypesJsonWithRetry(baseUrl + 'types/node-versions.json');
}
/**
* Fetch specific node types by their identifier (name@version format)
* This is useful for incremental syncs where only missing node types need to be fetched
*
* @param context - The REST API context containing base URL and auth info
* @param identifiers - Array of node type identifiers in "name@version" format
* @returns Array of node type descriptions for the requested identifiers
*/
export async function getNodeTypesByIdentifier(
context: IRestApiContext,
identifiers: string[],
): Promise<INodeTypeDescription[]> {
const body: GetNodeTypesByIdentifierRequestDto = { identifiers };
return await makeRestApiRequest(context, 'POST', '/node-types/by-identifier', body);
}
export async function fetchCommunityNodeTypes(
context: IRestApiContext,
): Promise<CommunityNodeType[]> {

View File

@ -50,7 +50,6 @@
"@n8n/utils": "workspace:*",
"@replit/codemirror-indentation-markers": "^6.5.3",
"@sentry/vue": "catalog:frontend",
"@sqlite.org/sqlite-wasm": "3.50.4-build1",
"@types/semver": "^7.7.0",
"@typescript/vfs": "^1.6.0",
"@vue-flow/background": "1.3.2",
@ -105,6 +104,7 @@
"vue-virtual-scroller": "2.0.0-beta.8",
"vue3-touch-events": "^4.1.3",
"vuedraggable": "4.1.0",
"wa-sqlite": "github:rhashimoto/wa-sqlite#779219540f66cecaa159da32b3b8936697ba10a7",
"web-tree-sitter": "0.24.3",
"xss": "catalog:"
},

View File

@ -183,6 +183,40 @@ class Worker {
terminate = vi.fn();
}
class MockMessagePort {
onmessage = vi.fn();
onmessageerror = vi.fn();
postMessage = vi.fn();
start = vi.fn();
close = vi.fn();
addEventListener = vi.fn();
removeEventListener = vi.fn();
dispatchEvent = vi.fn(() => true);
}
class SharedWorker {
port: MockMessagePort;
onerror = vi.fn();
constructor(_url: string | URL, _options?: string | WorkerOptions) {
this.port = new MockMessagePort();
}
addEventListener = vi.fn();
removeEventListener = vi.fn();
dispatchEvent = vi.fn(() => true);
}
class DataTransfer {
private data: Record<string, unknown> = {};
@ -201,6 +235,11 @@ Object.defineProperty(window, 'Worker', {
value: Worker,
});
Object.defineProperty(window, 'SharedWorker', {
writable: true,
value: SharedWorker,
});
Object.defineProperty(window, 'DataTransfer', {
writable: true,
value: DataTransfer,

View File

@ -18,7 +18,7 @@ export const LOCAL_STORAGE_DISMISSED_WHATS_NEW_CALLOUT = 'N8N_DISMISSED_WHATS_NE
export const LOCAL_STORAGE_FOCUS_PANEL = 'N8N_FOCUS_PANEL';
export const LOCAL_STORAGE_EXPERIMENTAL_DISMISSED_SUGGESTED_WORKFLOWS =
'N8N_EXPERIMENTAL_DISMISSED_SUGGESTED_WORKFLOWS';
export const LOCAL_STORAGE_RUN_DATA_WORKER = 'N8N_RUN_DATA_WORKER';
export const LOCAL_STORAGE_DATA_WORKER = 'N8N_DATA_WORKER';
export const LOCAL_STORAGE_CHAT_HUB_SELECTED_MODEL = (userId: string) =>
`${userId}_N8N_CHAT_HUB_SELECTED_MODEL`;
export const LOCAL_STORAGE_CHAT_HUB_CREDENTIALS = (userId: string) =>

View File

@ -3,7 +3,9 @@ import SourceControlInitializationErrorMessage from '@/features/integrations/sou
import { useExternalHooks } from '@/app/composables/useExternalHooks';
import { useTelemetry } from '@/app/composables/useTelemetry';
import { useToast } from '@/app/composables/useToast';
import { LOCAL_STORAGE_DATA_WORKER } from '@/app/constants/localStorage';
import { EnterpriseEditionFeature, VIEWS } from '@/app/constants';
import type { UserManagementAuthenticationMethod } from '@/Interface';
import {
registerModuleModals,
@ -212,6 +214,13 @@ export async function initializeAuthenticatedFeatures(
registerModuleModals();
registerModuleSettingsPages();
// Initialize run data worker and load node types
if (window.localStorage.getItem(LOCAL_STORAGE_DATA_WORKER) === 'true') {
const coordinator = await import('@/app/workers');
await coordinator.initialize();
await coordinator.loadNodeTypes(rootStore.baseUrl);
}
authenticatedFeaturesInitialized = true;
}

View File

@ -7,9 +7,6 @@ import { useSettingsStore } from './settings.store';
import { useRootStore } from '@n8n/stores/useRootStore';
import { useWebSocketClient } from '@/app/push-connection/useWebSocketClient';
import { useEventSourceClient } from '@/app/push-connection/useEventSourceClient';
import { useLocalStorage } from '@vueuse/core';
import { LOCAL_STORAGE_RUN_DATA_WORKER } from '@/app/constants';
import { runDataWorker } from '@/app/workers/run-data/instance';
export type OnPushMessageHandler = (event: PushMessage) => void;
@ -20,8 +17,6 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
const rootStore = useRootStore();
const settingsStore = useSettingsStore();
const isRunDataWorkerEnabled = useLocalStorage<boolean>(LOCAL_STORAGE_RUN_DATA_WORKER, false);
/**
* Queue of messages to be sent to the server. Messages are queued if
* the connection is down.
@ -68,12 +63,7 @@ export const usePushConnectionStore = defineStore(STORES.PUSH, () => {
// The `nodeExecuteAfterData` message is sent as binary data
// to be handled by a web worker in the future.
if (data instanceof ArrayBuffer) {
if (isRunDataWorkerEnabled.value) {
await runDataWorker.onNodeExecuteAfterData(data);
return;
} else {
data = new TextDecoder('utf-8').decode(new Uint8Array(data));
}
data = new TextDecoder('utf-8').decode(new Uint8Array(data));
}
let parsedData: PushMessage;

View File

@ -0,0 +1,128 @@
/**
* Coordinator Module
*
* This module provides the main thread API to communicate with the
* SharedWorker coordinator that manages tab coordination and routes
* queries to the active data worker.
*/
import * as Comlink from 'comlink';
import type { CoordinatorApi } from './worker';
// The SharedWorker coordinator that routes queries
export const sharedWorker = new SharedWorker(new URL('./worker.ts', import.meta.url), {
type: 'module',
name: 'n8n-coordinator',
});
sharedWorker.port.start();
// The dedicated data worker for this tab (only used if this tab becomes active)
export const dataWorker = new Worker(new URL('../data/worker.ts', import.meta.url), {
type: 'module',
name: 'n8n-data',
});
// Wrap the coordinator with Comlink
export const coordinator = Comlink.wrap<CoordinatorApi>(sharedWorker.port);
// State
const tabState: {
tabId: string | null;
isRegistered: boolean;
} = {
tabId: null,
isRegistered: false,
};
/**
* Register this tab with the coordinator
* This should be called once when the app initializes
*/
export async function registerTab(): Promise<string> {
if (tabState.isRegistered && tabState.tabId) {
return tabState.tabId;
}
console.log('[Coordinator] Registering tab...');
// Create a MessageChannel to pass the data worker to the coordinator
const channel = new MessageChannel();
// Connect one end to the data worker
// The data worker will expose its API on this port via its onmessage handler
dataWorker.postMessage({ type: 'connect', port: channel.port1 }, [channel.port1]);
// Register with the coordinator, passing port2 for it to communicate with our data worker
const newTabId = await coordinator.registerTab(Comlink.transfer(channel.port2, [channel.port2]));
tabState.tabId = newTabId;
tabState.isRegistered = true;
console.log(`[Coordinator] Registered as tab: ${newTabId}`);
// Set up cleanup on page unload
setupCleanupHandlers();
return newTabId;
}
/**
* Unregister the current tab and reset state
*/
function unregisterCurrentTab(): void {
if (tabState.tabId) {
coordinator.unregisterTab(tabState.tabId).catch(console.error);
tabState.isRegistered = false;
tabState.tabId = null;
}
}
/**
* Set up cleanup handlers for when the page unloads
*/
function setupCleanupHandlers(): void {
window.addEventListener('beforeunload', unregisterCurrentTab);
// Also use pagehide for mobile browsers
window.addEventListener('pagehide', unregisterCurrentTab);
// Re-register when page is restored from bfcache
window.addEventListener('pageshow', (event) => {
if (event.persisted && !tabState.isRegistered) {
console.log('[Coordinator] Page restored from bfcache, re-registering...');
registerTab().catch(console.error);
}
});
// Re-register when page becomes visible again (e.g., on mobile after switching apps)
document.addEventListener('visibilitychange', () => {
if (document.visibilityState === 'visible' && !tabState.isRegistered) {
console.log('[Coordinator] Page became visible, re-registering...');
registerTab().catch(console.error);
}
});
}
/**
* Get the current tab's ID
*/
export function getTabId(): string | null {
return tabState.tabId;
}
/**
* Get info about the coordinator state (for debugging)
*/
export async function getCoordinatorInfo(): Promise<{
activeTabId: string | null;
tabCount: number;
isInitialized: boolean;
}> {
return {
activeTabId: await coordinator.getActiveTabId(),
tabCount: await coordinator.getTabCount(),
isInitialized: await coordinator.isInitialized(),
};
}
// Re-export types
export type { CoordinatorApi } from './worker';

View File

@ -0,0 +1,36 @@
import { getRequiredActiveDataWorker } from './tabs';
import type { CoordinatorState } from './types';
/**
* Ensure the coordinator is initialized before performing operations
*
* @param state - The coordinator state
*/
export async function ensureInitialized(state: CoordinatorState): Promise<void> {
if (!state.initialized) {
await initialize(state);
}
}
/**
* Initialize the database (routes to active tab's worker)
*
* Promise caching is handled by the data worker itself, so concurrent
* calls are deduplicated at that level. The coordinator just tracks
* whether it has successfully initialized with the current active worker.
*
* @param state - The coordinator state
*/
export async function initialize(state: CoordinatorState): Promise<void> {
console.log('[Coordinator] Initialize requested');
if (state.initialized) {
console.log('[Coordinator] Already initialized');
return;
}
const worker = getRequiredActiveDataWorker(state);
await worker.initialize();
state.initialized = true;
console.log('[Coordinator] Initialization complete');
}

View File

@ -0,0 +1,171 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { CoordinatorState, TabConnection } from '../types';
import type { DataWorkerApi } from '../../data/worker';
import type * as Comlink from 'comlink';
import { loadNodeTypes } from './loadNodeTypes';
describe('Coordinator loadNodeTypes Operation', () => {
beforeEach(() => {
vi.spyOn(console, 'log').mockImplementation(() => {});
vi.spyOn(console, 'error').mockImplementation(() => {});
});
afterEach(() => {
vi.clearAllMocks();
});
function createMockDataWorker(
overrides: Partial<DataWorkerApi> = {},
): Comlink.Remote<DataWorkerApi> {
return {
initialize: vi.fn().mockResolvedValue(undefined),
exec: vi.fn().mockResolvedValue(undefined),
query: vi.fn().mockResolvedValue({ columns: [], rows: [] }),
queryWithParams: vi.fn().mockResolvedValue({ columns: [], rows: [] }),
close: vi.fn().mockResolvedValue(undefined),
isInitialized: vi.fn().mockReturnValue(true),
loadNodeTypes: vi.fn().mockResolvedValue(undefined),
...overrides,
} as unknown as Comlink.Remote<DataWorkerApi>;
}
function createMockTabConnection(overrides: Partial<TabConnection> = {}): TabConnection {
return {
id: 'test-tab-id',
port: {} as MessagePort,
dataWorker: createMockDataWorker(),
isActive: false,
...overrides,
};
}
function createMockState(overrides: Partial<CoordinatorState> = {}): CoordinatorState {
return {
tabs: new Map(),
activeTabId: null,
initialized: false,
...overrides,
};
}
function createStateWithActiveTab(
workerOverrides: Partial<DataWorkerApi> = {},
): CoordinatorState {
const mockDataWorker = createMockDataWorker(workerOverrides);
const tabConnection = createMockTabConnection({
id: 'active-tab',
dataWorker: mockDataWorker,
isActive: true,
});
const state = createMockState({ activeTabId: 'active-tab' });
state.tabs.set('active-tab', tabConnection);
return state;
}
describe('loadNodeTypes', () => {
it('should throw error when no active data worker is available', async () => {
const state = createMockState();
await expect(loadNodeTypes(state, 'http://localhost:5678')).rejects.toThrow(
'[Coordinator] No active data worker available',
);
});
it('should call loadNodeTypes on the active data worker with the correct baseUrl', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
const baseUrl = 'http://localhost:5678';
await loadNodeTypes(state, baseUrl);
expect(worker?.loadNodeTypes).toHaveBeenCalledWith(baseUrl);
});
it('should ensure initialization before loading node types', async () => {
const state = createStateWithActiveTab();
state.initialized = false;
const worker = state.tabs.get('active-tab')?.dataWorker;
await loadNodeTypes(state, 'http://localhost:5678');
expect(worker?.initialize).toHaveBeenCalled();
expect(worker?.loadNodeTypes).toHaveBeenCalled();
});
it('should not call initialize if already initialized', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
await loadNodeTypes(state, 'http://localhost:5678');
expect(worker?.initialize).not.toHaveBeenCalled();
expect(worker?.loadNodeTypes).toHaveBeenCalled();
});
it('should propagate errors from the data worker', async () => {
const state = createStateWithActiveTab({
loadNodeTypes: vi.fn().mockRejectedValue(new Error('Failed to load node types')),
});
state.initialized = true;
await expect(loadNodeTypes(state, 'http://localhost:5678')).rejects.toThrow(
'Failed to load node types',
);
});
it('should handle different base URLs', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
const testUrls = [
'http://localhost:5678',
'https://my-n8n.example.com',
'http://192.168.1.100:5678',
'https://n8n.cloud.example.com/api',
];
for (const url of testUrls) {
vi.clearAllMocks();
await loadNodeTypes(state, url);
expect(worker?.loadNodeTypes).toHaveBeenCalledWith(url);
}
});
it('should handle empty base URL', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
await loadNodeTypes(state, '');
expect(worker?.loadNodeTypes).toHaveBeenCalledWith('');
});
it('should propagate initialization errors', async () => {
const state = createStateWithActiveTab({
initialize: vi.fn().mockRejectedValue(new Error('Initialization failed')),
});
state.initialized = false;
await expect(loadNodeTypes(state, 'http://localhost:5678')).rejects.toThrow(
'Initialization failed',
);
});
it('should not call loadNodeTypes if initialization fails', async () => {
const loadNodeTypesFn = vi.fn();
const state = createStateWithActiveTab({
initialize: vi.fn().mockRejectedValue(new Error('Initialization failed')),
loadNodeTypes: loadNodeTypesFn,
});
state.initialized = false;
await expect(loadNodeTypes(state, 'http://localhost:5678')).rejects.toThrow();
expect(loadNodeTypesFn).not.toHaveBeenCalled();
});
});
});

View File

@ -0,0 +1,20 @@
/**
* Load Node Types Operation
*
* This module provides the operation for loading node types
* from the server through the coordinator to the active tab's data worker.
*/
import { withActiveWorker } from '../utils';
/**
* Load node types from the server (routes to active tab's worker)
*
* @param worker - The active data worker
* @param state - The coordinator state
* @param baseUrl - The base URL for API requests
*/
export const loadNodeTypes = withActiveWorker(async (worker, baseUrl: string) => {
console.log('[Coordinator] loadNodeTypes');
return await worker.loadNodeTypes(baseUrl);
});

View File

@ -0,0 +1,428 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { MockInstance } from 'vitest';
import type { CoordinatorState, TabConnection } from '../types';
import type { DataWorkerApi } from '../../data/worker';
import type * as Comlink from 'comlink';
import { ensureInitialized, initialize } from '../initialize';
import { exec, query, queryWithParams, isInitialized, getActiveTabId, getTabCount } from './query';
describe('Coordinator Query Operations', () => {
let consoleSpy: {
log: MockInstance;
error: MockInstance;
};
beforeEach(() => {
consoleSpy = {
log: vi.spyOn(console, 'log').mockImplementation(() => {}),
error: vi.spyOn(console, 'error').mockImplementation(() => {}),
};
});
afterEach(() => {
vi.clearAllMocks();
});
function createMockDataWorker(
overrides: Partial<DataWorkerApi> = {},
): Comlink.Remote<DataWorkerApi> {
return {
initialize: vi.fn().mockResolvedValue(undefined),
exec: vi.fn().mockResolvedValue(undefined),
query: vi.fn().mockResolvedValue({ columns: [], rows: [] }),
queryWithParams: vi.fn().mockResolvedValue({ columns: [], rows: [] }),
close: vi.fn().mockResolvedValue(undefined),
isInitialized: vi.fn().mockReturnValue(true),
loadNodeTypes: vi.fn().mockResolvedValue(undefined),
...overrides,
} as unknown as Comlink.Remote<DataWorkerApi>;
}
function createMockTabConnection(overrides: Partial<TabConnection> = {}): TabConnection {
return {
id: 'test-tab-id',
port: {} as MessagePort,
dataWorker: createMockDataWorker(),
isActive: false,
...overrides,
};
}
function createMockState(overrides: Partial<CoordinatorState> = {}): CoordinatorState {
return {
tabs: new Map(),
activeTabId: null,
initialized: false,
...overrides,
};
}
function createStateWithActiveTab(
workerOverrides: Partial<DataWorkerApi> = {},
): CoordinatorState {
const mockDataWorker = createMockDataWorker(workerOverrides);
const tabConnection = createMockTabConnection({
id: 'active-tab',
dataWorker: mockDataWorker,
isActive: true,
});
const state = createMockState({ activeTabId: 'active-tab' });
state.tabs.set('active-tab', tabConnection);
return state;
}
describe('ensureInitialized', () => {
it('should call initialize when not initialized', async () => {
const state = createStateWithActiveTab();
state.initialized = false;
const worker = state.tabs.get('active-tab')?.dataWorker;
await ensureInitialized(state);
expect(worker?.initialize).toHaveBeenCalled();
});
it('should not call initialize when already initialized', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
await ensureInitialized(state);
expect(worker?.initialize).not.toHaveBeenCalled();
});
it('should propagate errors from initialize', async () => {
const state = createStateWithActiveTab({
initialize: vi.fn().mockRejectedValue(new Error('Init failed')),
});
state.initialized = false;
await expect(ensureInitialized(state)).rejects.toThrow('Init failed');
});
});
describe('initialize', () => {
it('should throw error when no active data worker is available', async () => {
const state = createMockState();
await expect(initialize(state)).rejects.toThrow(
'[Coordinator] No active data worker available',
);
});
it('should call initialize on the active data worker', async () => {
const state = createStateWithActiveTab();
const worker = state.tabs.get('active-tab')?.dataWorker;
await initialize(state);
expect(worker?.initialize).toHaveBeenCalled();
});
it('should set initialized to true after successful initialization', async () => {
const state = createStateWithActiveTab();
await initialize(state);
expect(state.initialized).toBe(true);
});
it('should not call worker.initialize when already initialized', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
await initialize(state);
expect(worker?.initialize).not.toHaveBeenCalled();
});
it('should delegate concurrent initialization handling to data worker', async () => {
const initializeWorkerFn = vi
.fn()
.mockImplementation(async () => await new Promise((resolve) => setTimeout(resolve, 50)));
const state = createStateWithActiveTab({ initialize: initializeWorkerFn });
// Start two concurrent initializations
// The data worker handles promise caching, coordinator relies on state.initialized
const promise1 = initialize(state);
const promise2 = initialize(state);
await Promise.all([promise1, promise2]);
// Both calls go to worker (which handles deduplication internally)
// First call sets state.initialized=true, second call sees it and returns early
// But since both start before either finishes, both may call worker
expect(initializeWorkerFn).toHaveBeenCalled();
});
it('should log initialization messages', async () => {
const state = createStateWithActiveTab();
await initialize(state);
expect(consoleSpy.log).toHaveBeenCalledWith('[Coordinator] Initialize requested');
expect(consoleSpy.log).toHaveBeenCalledWith('[Coordinator] Initialization complete');
});
});
describe('exec', () => {
it('should throw error when no active data worker is available', async () => {
const state = createMockState();
await expect(exec(state, 'CREATE TABLE test (id INT)')).rejects.toThrow(
'[Coordinator] No active data worker available',
);
});
it('should call exec on the active data worker', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
const sql = 'INSERT INTO test VALUES (1)';
await exec(state, sql);
expect(worker?.exec).toHaveBeenCalledWith(sql);
});
it('should ensure initialization before executing', async () => {
const state = createStateWithActiveTab();
state.initialized = false;
const worker = state.tabs.get('active-tab')?.dataWorker;
await exec(state, 'DELETE FROM test');
expect(worker?.initialize).toHaveBeenCalled();
expect(worker?.exec).toHaveBeenCalled();
});
it('should propagate errors from the data worker', async () => {
const state = createStateWithActiveTab({
exec: vi.fn().mockRejectedValue(new Error('Exec failed')),
});
state.initialized = true;
await expect(exec(state, 'INVALID SQL')).rejects.toThrow('Exec failed');
});
it('should fetch worker after initialization to use current active tab', async () => {
const firstWorker = createMockDataWorker({
initialize: vi.fn().mockImplementation(async () => {
// Simulate active tab change during initialization
state.activeTabId = 'second-tab';
}),
});
const secondWorker = createMockDataWorker();
const firstTab = createMockTabConnection({
id: 'first-tab',
dataWorker: firstWorker,
isActive: true,
});
const secondTab = createMockTabConnection({
id: 'second-tab',
dataWorker: secondWorker,
isActive: false,
});
const state = createMockState({ activeTabId: 'first-tab', initialized: false });
state.tabs.set('first-tab', firstTab);
state.tabs.set('second-tab', secondTab);
await exec(state, 'INSERT INTO test VALUES (1)');
// Should use second worker (current active) not first worker (initial active)
expect(secondWorker.exec).toHaveBeenCalledWith('INSERT INTO test VALUES (1)');
expect(firstWorker.exec).not.toHaveBeenCalled();
});
});
describe('query', () => {
it('should throw error when no active data worker is available', async () => {
const state = createMockState();
await expect(query(state, 'SELECT * FROM test')).rejects.toThrow(
'[Coordinator] No active data worker available',
);
});
it('should call query on the active data worker', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
const sql = 'SELECT * FROM test';
await query(state, sql);
expect(worker?.query).toHaveBeenCalledWith(sql);
});
it('should return query results from the data worker', async () => {
const expectedResult = {
columns: ['id', 'name'],
rows: [
[1, 'Alice'],
[2, 'Bob'],
],
};
const state = createStateWithActiveTab({
query: vi.fn().mockResolvedValue(expectedResult),
});
state.initialized = true;
const result = await query(state, 'SELECT * FROM users');
expect(result).toEqual(expectedResult);
});
it('should ensure initialization before querying', async () => {
const state = createStateWithActiveTab();
state.initialized = false;
const worker = state.tabs.get('active-tab')?.dataWorker;
await query(state, 'SELECT 1');
expect(worker?.initialize).toHaveBeenCalled();
expect(worker?.query).toHaveBeenCalled();
});
it('should propagate errors from the data worker', async () => {
const state = createStateWithActiveTab({
query: vi.fn().mockRejectedValue(new Error('Query failed')),
});
state.initialized = true;
await expect(query(state, 'INVALID SQL')).rejects.toThrow('Query failed');
});
});
describe('queryWithParams', () => {
it('should throw error when no active data worker is available', async () => {
const state = createMockState();
await expect(queryWithParams(state, 'SELECT * FROM test WHERE id = ?', [1])).rejects.toThrow(
'[Coordinator] No active data worker available',
);
});
it('should call queryWithParams on the active data worker', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
const sql = 'SELECT * FROM test WHERE id = ?';
const params = [42];
await queryWithParams(state, sql, params);
expect(worker?.queryWithParams).toHaveBeenCalledWith(sql, params);
});
it('should return query results from the data worker', async () => {
const expectedResult = {
columns: ['id', 'name'],
rows: [[1, 'Alice']],
};
const state = createStateWithActiveTab({
queryWithParams: vi.fn().mockResolvedValue(expectedResult),
});
state.initialized = true;
const result = await queryWithParams(state, 'SELECT * FROM users WHERE id = ?', [1]);
expect(result).toEqual(expectedResult);
});
it('should handle multiple parameters', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
const sql = 'SELECT * FROM test WHERE id = ? AND name = ? AND active = ?';
// SQLite uses 0/1 for boolean values, not true/false
const params = [1, 'test', 1];
await queryWithParams(state, sql, params);
expect(worker?.queryWithParams).toHaveBeenCalledWith(sql, params);
});
it('should handle empty parameters array', async () => {
const state = createStateWithActiveTab();
state.initialized = true;
const worker = state.tabs.get('active-tab')?.dataWorker;
const sql = 'SELECT * FROM test';
await queryWithParams(state, sql, []);
expect(worker?.queryWithParams).toHaveBeenCalledWith(sql, []);
});
it('should ensure initialization before querying', async () => {
const state = createStateWithActiveTab();
state.initialized = false;
const worker = state.tabs.get('active-tab')?.dataWorker;
await queryWithParams(state, 'SELECT ?', [1]);
expect(worker?.initialize).toHaveBeenCalled();
expect(worker?.queryWithParams).toHaveBeenCalled();
});
it('should propagate errors from the data worker', async () => {
const state = createStateWithActiveTab({
queryWithParams: vi.fn().mockRejectedValue(new Error('Query with params failed')),
});
state.initialized = true;
await expect(queryWithParams(state, 'SELECT ?', [1])).rejects.toThrow(
'Query with params failed',
);
});
});
describe('isInitialized', () => {
it('should return false when not initialized', () => {
const state = createMockState({ initialized: false });
expect(isInitialized(state)).toBe(false);
});
it('should return true when initialized', () => {
const state = createMockState({ initialized: true });
expect(isInitialized(state)).toBe(true);
});
});
describe('getActiveTabId', () => {
it('should return null when no active tab', () => {
const state = createMockState({ activeTabId: null });
expect(getActiveTabId(state)).toBeNull();
});
it('should return the active tab ID', () => {
const state = createMockState({ activeTabId: 'tab-123' });
expect(getActiveTabId(state)).toBe('tab-123');
});
});
describe('getTabCount', () => {
it('should return 0 when no tabs are registered', () => {
const state = createMockState();
expect(getTabCount(state)).toBe(0);
});
it('should return the correct count of registered tabs', () => {
const state = createMockState();
state.tabs.set('tab-1', createMockTabConnection({ id: 'tab-1' }));
state.tabs.set('tab-2', createMockTabConnection({ id: 'tab-2' }));
state.tabs.set('tab-3', createMockTabConnection({ id: 'tab-3' }));
expect(getTabCount(state)).toBe(3);
});
});
});

View File

@ -0,0 +1,72 @@
/**
* Query Routing Operations
*
* This module provides operations for routing database queries
* through the coordinator to the active tab's data worker.
*/
import type { QueryResult, SQLiteParam } from '../../data/types';
import type { CoordinatorState } from '../types';
import { withActiveWorker } from '../utils';
/**
* Execute a SQL statement (routes to active tab's worker)
*
* @param state - The coordinator state
* @param sql - The SQL statement to execute
*/
export const exec = withActiveWorker(async (worker, sql: string) => await worker.exec(sql));
/**
* Execute a SQL query (routes to active tab's worker)
*
* @param state - The coordinator state
* @param sql - The SQL query to execute
* @returns Query result with columns and rows
*/
export const query = withActiveWorker(
async (worker, sql: string): Promise<QueryResult> => await worker.query(sql),
);
/**
* Execute a SQL query with parameters (routes to active tab's worker)
*
* @param state - The coordinator state
* @param sql - The SQL query to execute
* @param params - The parameters to bind to the query
* @returns Query result with columns and rows
*/
export const queryWithParams = withActiveWorker(
async (worker, sql: string, params: SQLiteParam[] = []): Promise<QueryResult> =>
await worker.queryWithParams(sql, params),
);
/**
* Check if the coordinator is initialized
*
* @param state - The coordinator state
* @returns Whether the coordinator is initialized
*/
export function isInitialized(state: CoordinatorState): boolean {
return state.initialized;
}
/**
* Get the current active tab ID
*
* @param state - The coordinator state
* @returns The active tab ID or null
*/
export function getActiveTabId(state: CoordinatorState): string | null {
return state.activeTabId;
}
/**
* Get the number of connected tabs
*
* @param state - The coordinator state
* @returns The number of connected tabs
*/
export function getTabCount(state: CoordinatorState): number {
return state.tabs.size;
}

View File

@ -0,0 +1,402 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { MockInstance } from 'vitest';
import * as Comlink from 'comlink';
import type { CoordinatorState, TabConnection } from './types';
import type { DataWorkerApi } from '../data/worker';
import {
generateTabId,
getActiveDataWorker,
selectNewActiveTab,
handleTabDisconnect,
registerTab,
unregisterTab,
} from './tabs';
vi.mock('comlink', () => ({
wrap: vi.fn(),
}));
describe('Coordinator Tab Operations', () => {
let consoleSpy: {
log: MockInstance;
error: MockInstance;
};
beforeEach(() => {
consoleSpy = {
log: vi.spyOn(console, 'log').mockImplementation(() => {}),
error: vi.spyOn(console, 'error').mockImplementation(() => {}),
};
});
afterEach(() => {
vi.clearAllMocks();
});
function createMockState(overrides: Partial<CoordinatorState> = {}): CoordinatorState {
return {
tabs: new Map(),
activeTabId: null,
initialized: false,
...overrides,
};
}
function createMockDataWorker(
overrides: Partial<DataWorkerApi> = {},
): Comlink.Remote<DataWorkerApi> {
return {
initialize: vi.fn().mockResolvedValue(undefined),
exec: vi.fn().mockResolvedValue(undefined),
query: vi.fn().mockResolvedValue({ columns: [], rows: [] }),
queryWithParams: vi.fn().mockResolvedValue({ columns: [], rows: [] }),
close: vi.fn().mockResolvedValue(undefined),
isInitialized: vi.fn().mockReturnValue(true),
loadNodeTypes: vi.fn().mockResolvedValue(undefined),
...overrides,
} as unknown as Comlink.Remote<DataWorkerApi>;
}
function createMockTabConnection(overrides: Partial<TabConnection> = {}): TabConnection {
return {
id: 'test-tab-id',
port: {} as MessagePort,
dataWorker: createMockDataWorker(),
isActive: false,
...overrides,
};
}
describe('generateTabId', () => {
it('should generate a tab ID with correct prefix', () => {
const tabId = generateTabId();
expect(tabId).toMatch(/^tab-/);
});
it('should generate a tab ID containing a timestamp', () => {
const before = Date.now();
const tabId = generateTabId();
const after = Date.now();
const parts = tabId.split('-');
const timestamp = parseInt(parts[1], 10);
expect(timestamp).toBeGreaterThanOrEqual(before);
expect(timestamp).toBeLessThanOrEqual(after);
});
it('should generate unique tab IDs', () => {
const ids = new Set<string>();
for (let i = 0; i < 100; i++) {
ids.add(generateTabId());
}
expect(ids.size).toBe(100);
});
it('should generate a tab ID with random suffix', () => {
const tabId = generateTabId();
const parts = tabId.split('-');
expect(parts.length).toBe(3);
expect(parts[2]).toMatch(/^[a-z0-9]+$/);
});
});
describe('getActiveDataWorker', () => {
it('should return null when there is no active tab ID', () => {
const state = createMockState({ activeTabId: null });
const result = getActiveDataWorker(state);
expect(result).toBeNull();
});
it('should return null when active tab is not in the tabs map', () => {
const state = createMockState({ activeTabId: 'non-existent-tab' });
const result = getActiveDataWorker(state);
expect(result).toBeNull();
});
it('should return null when active tab has no data worker', () => {
const tabConnection = createMockTabConnection({
id: 'tab-1',
dataWorker: null,
});
const state = createMockState({ activeTabId: 'tab-1' });
state.tabs.set('tab-1', tabConnection);
const result = getActiveDataWorker(state);
expect(result).toBeNull();
});
it('should return the data worker for the active tab', () => {
const mockDataWorker = createMockDataWorker();
const tabConnection = createMockTabConnection({
id: 'tab-1',
dataWorker: mockDataWorker,
});
const state = createMockState({ activeTabId: 'tab-1' });
state.tabs.set('tab-1', tabConnection);
const result = getActiveDataWorker(state);
expect(result).toBe(mockDataWorker);
});
});
describe('selectNewActiveTab', () => {
it('should set activeTabId to null when no tabs are available', async () => {
const state = createMockState();
await selectNewActiveTab(state);
expect(state.activeTabId).toBeNull();
expect(state.initialized).toBe(false);
});
it('should skip tabs without data workers', async () => {
const tabWithoutWorker = createMockTabConnection({
id: 'tab-no-worker',
dataWorker: null,
});
const tabWithWorker = createMockTabConnection({
id: 'tab-with-worker',
});
const state = createMockState();
state.tabs.set('tab-no-worker', tabWithoutWorker);
state.tabs.set('tab-with-worker', tabWithWorker);
await selectNewActiveTab(state);
expect(state.activeTabId).toBe('tab-with-worker');
expect(tabWithWorker.isActive).toBe(true);
});
it('should select the first available tab with a data worker', async () => {
const mockDataWorker = createMockDataWorker();
const tabConnection = createMockTabConnection({
id: 'tab-1',
dataWorker: mockDataWorker,
});
const state = createMockState();
state.tabs.set('tab-1', tabConnection);
await selectNewActiveTab(state);
expect(state.activeTabId).toBe('tab-1');
expect(tabConnection.isActive).toBe(true);
expect(mockDataWorker.initialize).toHaveBeenCalled();
});
it('should set initialized to true after successful initialization', async () => {
const tabConnection = createMockTabConnection({ id: 'tab-1' });
const state = createMockState({ initialized: false });
state.tabs.set('tab-1', tabConnection);
await selectNewActiveTab(state);
expect(state.initialized).toBe(true);
});
it('should try next tab if initialization fails', async () => {
const failingWorker = createMockDataWorker({
initialize: vi.fn().mockRejectedValue(new Error('Init failed')),
});
const successWorker = createMockDataWorker();
const failingTab = createMockTabConnection({
id: 'tab-failing',
dataWorker: failingWorker,
});
const successTab = createMockTabConnection({
id: 'tab-success',
dataWorker: successWorker,
});
const state = createMockState();
state.tabs.set('tab-failing', failingTab);
state.tabs.set('tab-success', successTab);
await selectNewActiveTab(state);
expect(failingTab.isActive).toBe(false);
expect(successTab.isActive).toBe(true);
expect(state.activeTabId).toBe('tab-success');
expect(state.initialized).toBe(true);
});
it('should set initialized to false if all tabs fail', async () => {
const failingWorker = createMockDataWorker({
initialize: vi.fn().mockRejectedValue(new Error('Init failed')),
});
const failingTab = createMockTabConnection({
id: 'tab-failing',
dataWorker: failingWorker,
});
const state = createMockState({ initialized: true });
state.tabs.set('tab-failing', failingTab);
await selectNewActiveTab(state);
expect(state.activeTabId).toBeNull();
expect(state.initialized).toBe(false);
});
});
describe('handleTabDisconnect', () => {
it('should do nothing if tab is not in the state', () => {
const state = createMockState();
handleTabDisconnect(state, 'non-existent-tab');
expect(state.tabs.size).toBe(0);
});
it('should remove the tab from the state', () => {
const tabConnection = createMockTabConnection({ id: 'tab-1' });
const state = createMockState();
state.tabs.set('tab-1', tabConnection);
handleTabDisconnect(state, 'tab-1');
expect(state.tabs.has('tab-1')).toBe(false);
});
it('should not change activeTabId if disconnected tab was not active', () => {
const tab1 = createMockTabConnection({ id: 'tab-1' });
const tab2 = createMockTabConnection({ id: 'tab-2' });
const state = createMockState({ activeTabId: 'tab-2' });
state.tabs.set('tab-1', tab1);
state.tabs.set('tab-2', tab2);
handleTabDisconnect(state, 'tab-1');
expect(state.activeTabId).toBe('tab-2');
});
it('should reset activeTabId and initialized when active tab disconnects', () => {
const tabConnection = createMockTabConnection({ id: 'tab-1' });
const state = createMockState({
activeTabId: 'tab-1',
initialized: true,
});
state.tabs.set('tab-1', tabConnection);
handleTabDisconnect(state, 'tab-1');
expect(state.activeTabId).toBeNull();
expect(state.initialized).toBe(false);
});
it('should trigger selectNewActiveTab when active tab disconnects', async () => {
const tab1 = createMockTabConnection({ id: 'tab-1' });
const tab2 = createMockTabConnection({ id: 'tab-2' });
const state = createMockState({
activeTabId: 'tab-1',
initialized: true,
});
state.tabs.set('tab-1', tab1);
state.tabs.set('tab-2', tab2);
handleTabDisconnect(state, 'tab-1');
// Wait for the async selectNewActiveTab to complete
await vi.waitFor(() => {
expect(state.activeTabId).toBe('tab-2');
});
});
});
describe('registerTab', () => {
it('should generate a unique tab ID', () => {
const state = createMockState();
const mockPort = {} as MessagePort;
vi.mocked(Comlink.wrap).mockReturnValue(createMockDataWorker());
const tabId = registerTab(state, mockPort);
expect(tabId).toMatch(/^tab-/);
});
it('should wrap the port with Comlink', () => {
const state = createMockState();
const mockPort = {} as MessagePort;
vi.mocked(Comlink.wrap).mockReturnValue(createMockDataWorker());
registerTab(state, mockPort);
expect(Comlink.wrap).toHaveBeenCalledWith(mockPort);
});
it('should add the tab connection to the state', () => {
const state = createMockState();
const mockPort = {} as MessagePort;
const mockDataWorker = createMockDataWorker();
vi.mocked(Comlink.wrap).mockReturnValue(mockDataWorker);
const tabId = registerTab(state, mockPort);
expect(state.tabs.has(tabId)).toBe(true);
const tabConnection = state.tabs.get(tabId);
expect(tabConnection?.id).toBe(tabId);
expect(tabConnection?.port).toBe(mockPort);
expect(tabConnection?.dataWorker).toBe(mockDataWorker);
});
it('should make the first tab active', () => {
const state = createMockState();
const mockPort = {} as MessagePort;
vi.mocked(Comlink.wrap).mockReturnValue(createMockDataWorker());
const tabId = registerTab(state, mockPort);
expect(state.activeTabId).toBe(tabId);
expect(state.tabs.get(tabId)?.isActive).toBe(true);
});
it('should not change active tab if one already exists', () => {
const existingTab = createMockTabConnection({ id: 'existing-tab', isActive: true });
const state = createMockState({ activeTabId: 'existing-tab' });
state.tabs.set('existing-tab', existingTab);
const mockPort = {} as MessagePort;
vi.mocked(Comlink.wrap).mockReturnValue(createMockDataWorker());
const newTabId = registerTab(state, mockPort);
expect(state.activeTabId).toBe('existing-tab');
expect(state.tabs.get(newTabId)?.isActive).toBe(false);
});
it('should log tab registration', () => {
const state = createMockState();
const mockPort = {} as MessagePort;
vi.mocked(Comlink.wrap).mockReturnValue(createMockDataWorker());
const tabId = registerTab(state, mockPort);
expect(consoleSpy.log).toHaveBeenCalledWith(`[Coordinator] Registering tab: ${tabId}`);
});
});
describe('unregisterTab', () => {
it('should call handleTabDisconnect with correct parameters', () => {
const tabConnection = createMockTabConnection({ id: 'tab-1' });
const state = createMockState();
state.tabs.set('tab-1', tabConnection);
unregisterTab(state, 'tab-1');
expect(state.tabs.has('tab-1')).toBe(false);
});
it('should handle unregistering non-existent tab gracefully', () => {
const state = createMockState();
expect(() => unregisterTab(state, 'non-existent')).not.toThrow();
});
});
});

View File

@ -0,0 +1,145 @@
/**
* Tab Management Operations
*
* This module provides operations for managing tab connections
* in the coordinator SharedWorker.
*/
import * as Comlink from 'comlink';
import type { DataWorkerApi } from '../data/worker';
import type { CoordinatorState, TabConnection } from './types';
/**
* Generate a unique tab ID
*/
export function generateTabId(): string {
return `tab-${Date.now()}-${Math.random().toString(36).substring(2, 9)}`;
}
/**
* Get the active tab's data worker
*
* @param state - The coordinator state
* @returns The active tab's data worker or null if no active tab
*/
export function getActiveDataWorker(state: CoordinatorState): Comlink.Remote<DataWorkerApi> | null {
if (!state.activeTabId) return null;
const tab = state.tabs.get(state.activeTabId);
return tab?.dataWorker ?? null;
}
/**
* Get the active data worker, throwing if not available
*
* @param state - The coordinator state
* @returns The active tab's data worker
* @throws Error if no active data worker is available
*/
export function getRequiredActiveDataWorker(
state: CoordinatorState,
): Comlink.Remote<DataWorkerApi> {
const worker = getActiveDataWorker(state);
if (!worker) {
throw new Error('[Coordinator] No active data worker available');
}
return worker;
}
/**
* Select a new active tab when the current one disconnects
*
* @param state - The coordinator state
*/
export async function selectNewActiveTab(state: CoordinatorState): Promise<void> {
// Find the first connected tab with a data worker
for (const [tabId, tab] of state.tabs) {
if (tab.dataWorker) {
console.log('[Coordinator] Selecting new active tab:', tabId);
state.activeTabId = tabId;
tab.isActive = true;
// Initialize the new active tab's data worker
try {
await tab.dataWorker.initialize();
state.initialized = true;
console.log(`[Coordinator] New active tab ${tabId} initialized`);
} catch (error) {
console.error('[Coordinator] Failed to initialize new active tab:', error);
// Try the next tab
tab.isActive = false;
state.activeTabId = null;
continue;
}
return;
}
}
console.log('[Coordinator] No tabs available to become active');
state.activeTabId = null;
state.initialized = false;
}
/**
* Handle tab disconnection
*
* @param state - The coordinator state
* @param tabId - The ID of the disconnected tab
*/
export function handleTabDisconnect(state: CoordinatorState, tabId: string): void {
console.log(`[Coordinator] Tab disconnected: ${tabId}`);
const tab = state.tabs.get(tabId);
if (tab) {
state.tabs.delete(tabId);
// If the disconnected tab was active, select a new one
if (state.activeTabId === tabId) {
state.activeTabId = null;
state.initialized = false;
selectNewActiveTab(state).catch(console.error);
}
}
}
/**
* Register a tab and its dedicated data worker with the coordinator
*
* @param state - The coordinator state
* @param dataWorkerPort - The MessagePort for communicating with the data worker
* @returns The generated tab ID
*/
export function registerTab(state: CoordinatorState, dataWorkerPort: MessagePort): string {
const tabId = generateTabId();
console.log(`[Coordinator] Registering tab: ${tabId}`);
// Wrap the data worker port with Comlink
const dataWorker = Comlink.wrap<DataWorkerApi>(dataWorkerPort);
const tabConnection: TabConnection = {
id: tabId,
port: dataWorkerPort,
dataWorker,
isActive: false,
};
state.tabs.set(tabId, tabConnection);
// If this is the first tab or no active tab, make it active
if (!state.activeTabId) {
console.log(`[Coordinator] Making tab ${tabId} the active tab`);
state.activeTabId = tabId;
tabConnection.isActive = true;
}
return tabId;
}
/**
* Unregister a tab when it closes
*
* @param state - The coordinator state
* @param tabId - The ID of the tab to unregister
*/
export function unregisterTab(state: CoordinatorState, tabId: string): void {
handleTabDisconnect(state, tabId);
}

View File

@ -0,0 +1,49 @@
/**
* Coordinator Types
*
* This module defines types used by the coordinator SharedWorker.
* These types are specific to the coordination layer that manages
* which tab's dedicated worker is active.
*/
import type * as Comlink from 'comlink';
import type { DataWorkerApi } from '../data/worker';
/**
* Represents a connected tab in the coordinator
*/
export interface TabConnection {
id: string;
port: MessagePort;
dataWorker: Comlink.Remote<DataWorkerApi> | null;
isActive: boolean;
}
/**
* State of the coordinator SharedWorker
*/
export interface CoordinatorState {
tabs: Map<string, TabConnection>;
activeTabId: string | null;
initialized: boolean;
}
/**
* Information about the coordinator state (for debugging)
*/
export interface CoordinatorInfo {
activeTabId: string | null;
tabCount: number;
isInitialized: boolean;
}
/**
* Message types for worker communication
*/
export type WorkerMessageType = 'connect' | 'disconnect' | 'query' | 'exec' | 'result' | 'error';
export interface WorkerMessage {
type: WorkerMessageType;
payload?: unknown;
port?: MessagePort;
}

View File

@ -0,0 +1,33 @@
/**
* Active Worker Utilities
*
* This module provides utilities for creating coordinator operations
* that route to the active tab's data worker.
*/
import type * as Comlink from 'comlink';
import type { DataWorkerApi } from '../data/worker';
import type { CoordinatorState } from './types';
import { ensureInitialized } from './initialize';
import { getRequiredActiveDataWorker } from './tabs';
/**
* Creates a function that ensures initialization and routes to the active worker.
*
* The order of operations is:
* 1. Ensure coordinator is initialized (calls initialize if needed)
* 2. Get the active data worker (AFTER initialization, as active tab may change)
* 3. Execute the provided action on the worker
*
* @param action - Callback that receives the worker and performs the operation
* @returns A function that handles init/routing, then calls the action
*/
export function withActiveWorker<TArgs extends unknown[], TReturn>(
action: (worker: Comlink.Remote<DataWorkerApi>, ...args: TArgs) => Promise<TReturn>,
): (state: CoordinatorState, ...args: TArgs) => Promise<TReturn> {
return async (state: CoordinatorState, ...args: TArgs): Promise<TReturn> => {
await ensureInitialized(state);
const worker = getRequiredActiveDataWorker(state);
return await action(worker, ...args);
};
}

View File

@ -0,0 +1,152 @@
/**
* SharedWorker Coordinator
*
* This SharedWorker coordinates which tab is "active" and routes all queries
* to the active tab's dedicated worker. Based on Notion's architecture.
*
* Architecture:
* - Each tab creates both a connection to this SharedWorker AND its own dedicated data worker
* - Only ONE tab's dedicated worker actually accesses the database at a time
* - This SharedWorker manages which tab is "active" using Web Locks
* - All queries from any tab are routed through here to the active tab's dedicated worker
*/
/// <reference lib="webworker" />
declare const self: SharedWorkerGlobalScope;
import * as Comlink from 'comlink';
import type { CoordinatorState } from './types';
import type { SQLiteParam } from '../data/types';
import {
registerTab as registerTabOp,
unregisterTab as unregisterTabOp,
handleTabDisconnect,
} from './tabs';
import {
exec as execOp,
query as queryOp,
queryWithParams as queryWithParamsOp,
isInitialized as isInitializedOp,
getActiveTabId as getActiveTabIdOp,
getTabCount as getTabCountOp,
} from './operations/query';
import { loadNodeTypes as loadNodeTypesOp } from './operations/loadNodeTypes';
import { initialize as initializeOp } from './initialize';
const state: CoordinatorState = {
tabs: new Map(),
activeTabId: null,
initialized: false,
};
// ============================================================================
// Public API exposed to tabs
// ============================================================================
const coordinatorApi = {
/**
* Register a tab and its dedicated data worker with the coordinator
*/
async registerTab(dataWorkerPort: MessagePort): Promise<string> {
return registerTabOp(state, dataWorkerPort);
},
/**
* Unregister a tab when it closes
*/
async unregisterTab(tabId: string): Promise<void> {
unregisterTabOp(state, tabId);
},
/**
* Initialize the database (routes to active tab's worker)
*/
async initialize(): Promise<void> {
await initializeOp(state);
},
/**
* Execute a SQL statement (routes to active tab's worker)
*/
async exec(sql: string): Promise<void> {
await execOp(state, sql);
},
/**
* Execute a SQL query (routes to active tab's worker)
*/
async query(sql: string) {
return await queryOp(state, sql);
},
/**
* Execute a SQL query with parameters (routes to active tab's worker)
*/
async queryWithParams(sql: string, params: SQLiteParam[] = []) {
return await queryWithParamsOp(state, sql, params);
},
/**
* Check if the coordinator is initialized
*/
isInitialized(): boolean {
return isInitializedOp(state);
},
/**
* Get the current active tab ID
*/
getActiveTabId(): string | null {
return getActiveTabIdOp(state);
},
/**
* Get the number of connected tabs
*/
getTabCount(): number {
return getTabCountOp(state);
},
/**
* Load node types from the server (routes to active tab's worker)
*/
async loadNodeTypes(baseUrl: string): Promise<void> {
await loadNodeTypesOp(state, baseUrl);
},
};
export type CoordinatorApi = typeof coordinatorApi;
// ============================================================================
// SharedWorker connection handling
// ============================================================================
const ports = new Set<MessagePort>();
self.onconnect = (e: MessageEvent) => {
const port = e.ports[0];
ports.add(port);
// Track which tab this port belongs to
let connectedTabId: string | null = null;
// Create a wrapped API that tracks the tab ID on registration
const wrappedApi = {
...coordinatorApi,
async registerTab(dataWorkerPort: MessagePort): Promise<string> {
connectedTabId = await coordinatorApi.registerTab(dataWorkerPort);
return connectedTabId;
},
};
// Handle port close/disconnect
port.onmessageerror = () => {
ports.delete(port);
if (connectedTabId) {
handleTabDisconnect(state, connectedTabId);
}
};
Comlink.expose(wrappedApi, port);
};

View File

@ -0,0 +1,82 @@
import { describe, it, expect } from 'vitest';
import { databaseSchema, getAllTableSchemas, getTableSchema } from './db';
describe('Data Worker Database Schema', () => {
describe('databaseSchema', () => {
it('should have nodeTypes table defined', () => {
expect(databaseSchema.tables).toHaveProperty('nodeTypes');
});
it('should have exactly 1 table', () => {
expect(Object.keys(databaseSchema.tables)).toHaveLength(1);
});
describe('nodeTypes table', () => {
it('should have correct name', () => {
expect(databaseSchema.tables.nodeTypes.name).toBe('nodeTypes');
});
it('should have CREATE TABLE statement', () => {
expect(databaseSchema.tables.nodeTypes.schema).toContain(
'CREATE TABLE IF NOT EXISTS nodeTypes',
);
});
it('should have required columns', () => {
const schema = databaseSchema.tables.nodeTypes.schema;
expect(schema).toContain('id TEXT PRIMARY KEY');
expect(schema).toContain('data TEXT NOT NULL');
expect(schema).toContain('updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP');
});
});
});
describe('getAllTableSchemas', () => {
it('should return an array of schemas', () => {
const schemas = getAllTableSchemas();
expect(Array.isArray(schemas)).toBe(true);
});
it('should return 1 schema (one for each table)', () => {
const schemas = getAllTableSchemas();
expect(schemas).toHaveLength(1);
});
it('should return strings containing CREATE TABLE statements', () => {
const schemas = getAllTableSchemas();
schemas.forEach((schema) => {
expect(typeof schema).toBe('string');
expect(schema).toContain('CREATE TABLE IF NOT EXISTS');
});
});
it('should include schema for nodeTypes table', () => {
const schemas = getAllTableSchemas();
const joinedSchemas = schemas.join(' ');
expect(joinedSchemas).toContain('nodeTypes');
});
it('should return a new array on each call', () => {
const schemas1 = getAllTableSchemas();
const schemas2 = getAllTableSchemas();
expect(schemas1).not.toBe(schemas2);
expect(schemas1).toEqual(schemas2);
});
});
describe('getTableSchema', () => {
it('should return the nodeTypes table schema', () => {
const schema = getTableSchema('nodeTypes');
expect(schema).toContain('CREATE TABLE IF NOT EXISTS nodeTypes');
expect(schema).toBe(databaseSchema.tables.nodeTypes.schema);
});
it('should return a string', () => {
const schema = getTableSchema('nodeTypes');
expect(typeof schema).toBe('string');
});
});
});

View File

@ -0,0 +1,50 @@
/**
* Database Configuration
*
* This module defines the database schema and table configurations
* for the SQLite database used by the run-data workers.
*/
export interface TableSchema {
name: string;
schema: string;
}
export interface DatabaseSchema {
tables: Record<string, TableSchema>;
}
/**
* Database schema definition
*
* Currently only includes nodeTypes table. Additional tables (executions,
* credentials, workflows) will be added when they are needed.
*/
export const databaseSchema: DatabaseSchema = {
tables: {
nodeTypes: {
name: 'nodeTypes',
schema: `
CREATE TABLE IF NOT EXISTS nodeTypes (
id TEXT PRIMARY KEY,
data TEXT NOT NULL,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`,
},
},
} as const;
/**
* Get all table creation SQL statements
*/
export function getAllTableSchemas(): string[] {
return Object.values(databaseSchema.tables).map((table) => table.schema);
}
/**
* Get a specific table schema by name
*/
export function getTableSchema(tableName: keyof typeof databaseSchema.tables): string {
return databaseSchema.tables[tableName].schema;
}

View File

@ -0,0 +1,387 @@
import { describe, it, expect, vi, beforeEach, afterEach } from 'vitest';
import type { MockInstance } from 'vitest';
import type { INodeTypeDescription } from 'n8n-workflow';
import type { DataWorkerState, QueryResult } from '../types';
import { loadNodeTypes, getNodeType, getAllNodeTypes } from './loadNodeTypes';
vi.mock('@n8n/rest-api-client/api/nodeTypes', () => ({
getNodeTypes: vi.fn(),
getNodeTypeVersions: vi.fn(),
getNodeTypesByIdentifier: vi.fn(),
}));
const { execWithParamsMock, queryMock } = vi.hoisted(() => ({
execWithParamsMock: vi.fn(),
queryMock: vi.fn(),
}));
vi.mock('./query', () => ({
execWithParams: execWithParamsMock,
query: queryMock,
withTrx: async <T>(_state: unknown, fn: () => Promise<T>): Promise<T> => {
await execWithParamsMock(_state, 'BEGIN TRANSACTION', []);
try {
const result = await fn();
await execWithParamsMock(_state, 'COMMIT', []);
return result;
} catch (error) {
await execWithParamsMock(_state, 'ROLLBACK', []);
throw error;
}
},
}));
import {
getNodeTypes,
getNodeTypeVersions,
getNodeTypesByIdentifier,
} from '@n8n/rest-api-client/api/nodeTypes';
import { execWithParams, query } from './query';
describe('Data Worker loadNodeTypes Operations', () => {
let consoleSpy: {
log: MockInstance;
error: MockInstance;
};
beforeEach(() => {
consoleSpy = {
log: vi.spyOn(console, 'log').mockImplementation(() => {}),
error: vi.spyOn(console, 'error').mockImplementation(() => {}),
};
});
afterEach(() => {
vi.clearAllMocks();
});
function createMockState(overrides: Partial<DataWorkerState> = {}): DataWorkerState {
return {
initialized: true,
sqlite3: {} as DataWorkerState['sqlite3'],
db: 1,
vfs: null,
initPromise: null,
...overrides,
};
}
function createMockNodeType(overrides: Partial<INodeTypeDescription> = {}): INodeTypeDescription {
return {
displayName: 'Test Node',
name: 'n8n-nodes-base.testNode',
group: ['transform'],
version: 1,
description: 'A test node',
defaults: { name: 'Test Node' },
inputs: ['main'],
outputs: ['main'],
properties: [],
...overrides,
} as INodeTypeDescription;
}
function createQueryResult(rows: unknown[][] = [], columns: string[] = []): QueryResult {
return { columns, rows };
}
describe('loadNodeTypes', () => {
it('should throw error when database is not initialized', async () => {
const state = createMockState({ initialized: false });
await expect(loadNodeTypes(state, 'http://localhost:5678')).rejects.toThrow(
'[DataWorker] Database not initialized',
);
});
describe('initial load (empty database)', () => {
it('should fetch all node types when database is empty', async () => {
const state = createMockState();
const mockNodeTypes = [
createMockNodeType({ name: 'n8n-nodes-base.node1', version: 1 }),
createMockNodeType({ name: 'n8n-nodes-base.node2', version: 1 }),
];
vi.mocked(query).mockResolvedValueOnce(createQueryResult([[0]]));
vi.mocked(getNodeTypes).mockResolvedValueOnce(mockNodeTypes);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
expect(getNodeTypes).toHaveBeenCalledWith('http://localhost:5678');
});
it('should wrap inserts in a transaction', async () => {
const state = createMockState();
const mockNodeTypes = [createMockNodeType({ name: 'n8n-nodes-base.node1', version: 1 })];
vi.mocked(query).mockResolvedValueOnce(createQueryResult([[0]]));
vi.mocked(getNodeTypes).mockResolvedValueOnce(mockNodeTypes);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
expect(execWithParams).toHaveBeenCalledWith(state, 'BEGIN TRANSACTION', []);
expect(execWithParams).toHaveBeenCalledWith(state, 'COMMIT', []);
});
it('should rollback transaction on error during initial load', async () => {
const state = createMockState();
const mockNodeTypes = [createMockNodeType({ name: 'n8n-nodes-base.node1', version: 1 })];
vi.mocked(query).mockResolvedValueOnce(createQueryResult([[0]]));
vi.mocked(getNodeTypes).mockResolvedValueOnce(mockNodeTypes);
vi.mocked(execWithParams)
.mockResolvedValueOnce(undefined) // BEGIN TRANSACTION
.mockRejectedValueOnce(new Error('Insert failed')); // INSERT
await expect(loadNodeTypes(state, 'http://localhost:5678')).rejects.toThrow(
'Insert failed',
);
expect(execWithParams).toHaveBeenCalledWith(state, 'ROLLBACK', []);
});
it('should handle node types with array versions', async () => {
const state = createMockState();
const mockNodeTypes = [
createMockNodeType({ name: 'n8n-nodes-base.multiVersion', version: [1, 2, 3] }),
];
vi.mocked(query).mockResolvedValueOnce(createQueryResult([[0]]));
vi.mocked(getNodeTypes).mockResolvedValueOnce(mockNodeTypes);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
// Should insert for each version with parameterized queries
expect(execWithParams).toHaveBeenCalledWith(
state,
'INSERT OR REPLACE INTO nodeTypes (id, data, updated_at) VALUES (?, ?, datetime(?))',
['n8n-nodes-base.multiVersion@1', expect.any(String), 'now'],
);
expect(execWithParams).toHaveBeenCalledWith(
state,
'INSERT OR REPLACE INTO nodeTypes (id, data, updated_at) VALUES (?, ?, datetime(?))',
['n8n-nodes-base.multiVersion@2', expect.any(String), 'now'],
);
expect(execWithParams).toHaveBeenCalledWith(
state,
'INSERT OR REPLACE INTO nodeTypes (id, data, updated_at) VALUES (?, ?, datetime(?))',
['n8n-nodes-base.multiVersion@3', expect.any(String), 'now'],
);
});
it('should handle single quotes in node type data via parameterized queries', async () => {
const state = createMockState();
const mockNodeTypes = [
createMockNodeType({
name: 'n8n-nodes-base.test',
description: "It's a test node",
version: 1,
}),
];
vi.mocked(query).mockResolvedValueOnce(createQueryResult([[0]]));
vi.mocked(getNodeTypes).mockResolvedValueOnce(mockNodeTypes);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
// With parameterized queries, single quotes are handled by the database driver
expect(execWithParams).toHaveBeenCalledWith(
state,
'INSERT OR REPLACE INTO nodeTypes (id, data, updated_at) VALUES (?, ?, datetime(?))',
['n8n-nodes-base.test@1', expect.stringContaining("It's a test node"), 'now'],
);
});
});
describe('incremental sync (non-empty database)', () => {
it('should check server versions for incremental sync', async () => {
const state = createMockState();
vi.mocked(query)
.mockResolvedValueOnce(createQueryResult([[5]])) // COUNT returns 5
.mockResolvedValueOnce(createQueryResult([['node1@1'], ['node2@1']])); // existing IDs
vi.mocked(getNodeTypeVersions).mockResolvedValueOnce(['node1@1', 'node2@1']);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
expect(getNodeTypeVersions).toHaveBeenCalledWith('http://localhost:5678');
});
it('should delete removed node types', async () => {
const state = createMockState();
vi.mocked(query)
.mockResolvedValueOnce(createQueryResult([[2]])) // COUNT returns 2
.mockResolvedValueOnce(createQueryResult([['node1@1'], ['node2@1']])); // existing IDs
vi.mocked(getNodeTypeVersions).mockResolvedValueOnce(['node1@1']); // node2@1 removed
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
expect(execWithParams).toHaveBeenCalledWith(state, 'DELETE FROM nodeTypes WHERE id = ?', [
'node2@1',
]);
});
it('should fetch and insert added node types', async () => {
const state = createMockState();
const newNodeType = createMockNodeType({ name: 'n8n-nodes-base.newNode', version: 1 });
vi.mocked(query)
.mockResolvedValueOnce(createQueryResult([[1]])) // COUNT returns 1
.mockResolvedValueOnce(createQueryResult([['existingNode@1']])); // existing IDs
vi.mocked(getNodeTypeVersions).mockResolvedValueOnce([
'existingNode@1',
'n8n-nodes-base.newNode@1',
]);
vi.mocked(getNodeTypesByIdentifier).mockResolvedValueOnce([newNodeType]);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
expect(getNodeTypesByIdentifier).toHaveBeenCalledWith(
expect.objectContaining({ baseUrl: 'http://localhost:5678' }),
['n8n-nodes-base.newNode@1'],
);
});
it('should not make changes when no differences detected', async () => {
const state = createMockState();
vi.mocked(query)
.mockResolvedValueOnce(createQueryResult([[2]])) // COUNT returns 2
.mockResolvedValueOnce(createQueryResult([['node1@1'], ['node2@1']])); // existing IDs
vi.mocked(getNodeTypeVersions).mockResolvedValueOnce(['node1@1', 'node2@1']); // same as DB
await loadNodeTypes(state, 'http://localhost:5678');
// Should not start a transaction if no changes
expect(execWithParams).not.toHaveBeenCalledWith(state, 'BEGIN TRANSACTION', []);
});
it('should rollback transaction on error during incremental sync', async () => {
const state = createMockState();
vi.mocked(query)
.mockResolvedValueOnce(createQueryResult([[1]])) // COUNT
.mockResolvedValueOnce(createQueryResult([['node1@1']])); // existing IDs
vi.mocked(getNodeTypeVersions).mockResolvedValueOnce(['node1@1', 'newNode@1']);
vi.mocked(getNodeTypesByIdentifier).mockRejectedValueOnce(new Error('Fetch failed'));
vi.mocked(execWithParams).mockResolvedValue(undefined);
await expect(loadNodeTypes(state, 'http://localhost:5678')).rejects.toThrow('Fetch failed');
expect(execWithParams).toHaveBeenCalledWith(state, 'ROLLBACK', []);
});
it('should strip trailing slash from baseUrl when creating REST API context', async () => {
const state = createMockState();
const newNodeType = createMockNodeType({ name: 'n8n-nodes-base.newNode', version: 1 });
vi.mocked(query)
.mockResolvedValueOnce(createQueryResult([[1]]))
.mockResolvedValueOnce(createQueryResult([['existingNode@1']]));
vi.mocked(getNodeTypeVersions).mockResolvedValueOnce([
'existingNode@1',
'n8n-nodes-base.newNode@1',
]);
vi.mocked(getNodeTypesByIdentifier).mockResolvedValueOnce([newNodeType]);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678/');
expect(getNodeTypesByIdentifier).toHaveBeenCalledWith(
expect.objectContaining({ baseUrl: 'http://localhost:5678' }),
expect.any(Array),
);
});
});
it('should log sync progress', async () => {
const state = createMockState();
vi.mocked(query).mockResolvedValueOnce(createQueryResult([[0]]));
vi.mocked(getNodeTypes).mockResolvedValueOnce([]);
vi.mocked(execWithParams).mockResolvedValue(undefined);
await loadNodeTypes(state, 'http://localhost:5678');
expect(consoleSpy.log).toHaveBeenCalledWith('[DataWorker] Starting node types sync...');
});
});
describe('getNodeType', () => {
it('should return null when node type is not found', async () => {
const state = createMockState();
vi.mocked(query).mockResolvedValueOnce(createQueryResult([]));
const result = await getNodeType(state, 'nonexistent', 1);
expect(result).toBeNull();
});
it('should return parsed node type when found', async () => {
const state = createMockState();
const mockNodeType = createMockNodeType({ name: 'n8n-nodes-base.test', version: 1 });
vi.mocked(query).mockResolvedValueOnce(createQueryResult([[JSON.stringify(mockNodeType)]]));
const result = await getNodeType(state, 'n8n-nodes-base.test', 1);
expect(result).toEqual(mockNodeType);
});
it('should query with correct node type ID format', async () => {
const state = createMockState();
vi.mocked(query).mockResolvedValueOnce(createQueryResult([]));
await getNodeType(state, 'n8n-nodes-base.myNode', 2);
expect(query).toHaveBeenCalledWith(
state,
"SELECT data FROM nodeTypes WHERE id = 'n8n-nodes-base.myNode@2'",
);
});
});
describe('getAllNodeTypes', () => {
it('should return empty array when no node types exist', async () => {
const state = createMockState();
vi.mocked(query).mockResolvedValueOnce(createQueryResult([]));
const result = await getAllNodeTypes(state);
expect(result).toEqual([]);
});
it('should return all parsed node types', async () => {
const state = createMockState();
const mockNodeType1 = createMockNodeType({ name: 'n8n-nodes-base.node1', version: 1 });
const mockNodeType2 = createMockNodeType({ name: 'n8n-nodes-base.node2', version: 1 });
vi.mocked(query).mockResolvedValueOnce(
createQueryResult([[JSON.stringify(mockNodeType1)], [JSON.stringify(mockNodeType2)]]),
);
const result = await getAllNodeTypes(state);
expect(result).toHaveLength(2);
expect(result[0]).toEqual(mockNodeType1);
expect(result[1]).toEqual(mockNodeType2);
});
it('should query the nodeTypes table', async () => {
const state = createMockState();
vi.mocked(query).mockResolvedValueOnce(createQueryResult([]));
await getAllNodeTypes(state);
expect(query).toHaveBeenCalledWith(state, 'SELECT data FROM nodeTypes');
});
});
});

View File

@ -0,0 +1,222 @@
/**
* Load Node Types
*
* This module handles loading and syncing node types from the server
* to the local SQLite database for caching.
*
* The functions accept the data worker state, allowing them to be
* called from within web workers while keeping the code modular.
*/
import {
getNodeTypes,
getNodeTypeVersions,
getNodeTypesByIdentifier,
} from '@n8n/rest-api-client/api/nodeTypes';
import type { INodeTypeDescription } from 'n8n-workflow';
import { jsonParse } from 'n8n-workflow';
import type { DataWorkerState } from '../types';
import { execWithParams, query, withTrx } from './query';
/**
* Generate a unique ID for a node type version
*/
function getNodeTypeId(name: string, version: number): string {
return `${name}@${version}`;
}
/**
* Convert a database row to a node type description
*/
function rowToNodeType(data: string): INodeTypeDescription {
return jsonParse<INodeTypeDescription>(data);
}
/**
* Extract all version numbers from a node type description
*/
function getNodeTypeVersionsFromDescription(nodeType: INodeTypeDescription): number[] {
const version = nodeType.version;
if (typeof version === 'number') {
return [version];
}
if (Array.isArray(version)) {
return version;
}
return [1];
}
/**
* Create a REST API context for making authenticated requests
*/
function createRestApiContext(baseUrl: string) {
return {
baseUrl: baseUrl.endsWith('/') ? baseUrl.slice(0, -1) : baseUrl,
pushRef: '',
};
}
/**
* Upsert a node type into the database
*
* @param state - The data worker state
* @param id - The unique identifier for the node type (name@version)
* @param nodeType - The node type description to store
*/
async function upsertNodeType(
state: DataWorkerState,
id: string,
nodeType: INodeTypeDescription,
): Promise<void> {
await execWithParams(
state,
'INSERT OR REPLACE INTO nodeTypes (id, data, updated_at) VALUES (?, ?, datetime(?))',
[id, JSON.stringify(nodeType), 'now'],
);
}
/**
* Delete a node type from the database
*
* @param state - The data worker state
* @param id - The unique identifier for the node type (name@version)
*/
async function deleteNodeType(state: DataWorkerState, id: string): Promise<void> {
await execWithParams(state, 'DELETE FROM nodeTypes WHERE id = ?', [id]);
}
/**
* Load node types from the server and sync with the local database
*
* @param state - The data worker state
* @param baseUrl - The base URL for API requests
*/
export async function loadNodeTypes(state: DataWorkerState, baseUrl: string): Promise<void> {
console.log('[DataWorker] loadNodeTypes');
if (!state.initialized) {
throw new Error('[DataWorker] Database not initialized');
}
console.log('[DataWorker] Starting node types sync...');
// Check if the database has any node types
const countResult = await query(state, 'SELECT COUNT(*) as count FROM nodeTypes');
const count = countResult.rows[0]?.[0] as number;
const isEmpty = count === 0;
console.log(`[DataWorker] Database has ${count} node types, isEmpty: ${isEmpty}`);
if (isEmpty) {
// Initial load: fetch all node types and store them
console.log('[DataWorker] Performing initial load of all node types...');
const nodeTypes: INodeTypeDescription[] = await getNodeTypes(baseUrl);
console.log(`[DataWorker] Fetched ${nodeTypes.length} node types from server`);
// Use a transaction for better performance
await withTrx(state, async () => {
for (const nodeType of nodeTypes) {
const versions = getNodeTypeVersionsFromDescription(nodeType);
for (const version of versions) {
const id = getNodeTypeId(nodeType.name, version);
await upsertNodeType(state, id, nodeType);
}
}
});
console.log('[DataWorker] Initial load complete');
} else {
// Incremental sync: check for changes
console.log('[DataWorker] Performing incremental sync...');
const serverVersions: string[] = await getNodeTypeVersions(baseUrl);
const serverVersionSet = new Set(serverVersions);
console.log(`[DataWorker] Server has ${serverVersions.length} node type versions`);
// Get existing node type IDs from the database
const existingResult = await query(state, 'SELECT id FROM nodeTypes');
const existingIds = new Set(existingResult.rows.map((row) => row[0] as string));
console.log(`[DataWorker] Database has ${existingIds.size} node type versions`);
// Find added node types (in server but not in DB)
const addedVersions = serverVersions.filter((id) => !existingIds.has(id));
// Find removed node types (in DB but not on server)
const removedVersions = [...existingIds].filter((id) => !serverVersionSet.has(id));
console.log(
`[DataWorker] Changes: ${addedVersions.length} added, ${removedVersions.length} removed`,
);
// Apply changes in a transaction
if (addedVersions.length > 0 || removedVersions.length > 0) {
await withTrx(state, async () => {
// Remove deleted node types
for (const id of removedVersions) {
await deleteNodeType(state, id);
}
// Fetch only the missing node types using the new endpoint
if (addedVersions.length > 0) {
console.log(
`[DataWorker] Fetching ${addedVersions.length} missing node types by identifier...`,
);
const restApiContext = createRestApiContext(baseUrl);
const nodeTypes: INodeTypeDescription[] = await getNodeTypesByIdentifier(
restApiContext,
addedVersions,
);
console.log(`[DataWorker] Received ${nodeTypes.length} node types from server`);
for (const nodeType of nodeTypes) {
const versions = getNodeTypeVersionsFromDescription(nodeType);
for (const version of versions) {
const id = getNodeTypeId(nodeType.name, version);
// Only insert if this is one of the added versions we requested
if (addedVersions.includes(id)) {
await upsertNodeType(state, id, nodeType);
}
}
}
}
});
console.log('[DataWorker] Incremental sync complete');
} else {
console.log('[DataWorker] No changes detected');
}
}
}
/**
* Get a node type from the local database
*
* @param state - The data worker state
* @param name - Node type name
* @param version - Node type version
* @returns The node type description or null if not found
*/
export async function getNodeType(
state: DataWorkerState,
name: string,
version: number,
): Promise<INodeTypeDescription | null> {
const id = getNodeTypeId(name, version);
const result = await query(state, `SELECT data FROM nodeTypes WHERE id = '${id}'`);
if (result.rows.length === 0) {
return null;
}
return rowToNodeType(result.rows[0][0] as string);
}
/**
* Get all node types from the local database
*
* @param state - The data worker state
* @returns Array of all node type descriptions
*/
export async function getAllNodeTypes(state: DataWorkerState): Promise<INodeTypeDescription[]> {
const result = await query(state, 'SELECT data FROM nodeTypes');
return result.rows.map((row) => rowToNodeType(row[0] as string));
}

View File

@ -0,0 +1,171 @@
/**
* Database Operations
*
* This module provides shared database operations that can be used
* across different modules in the data worker. It accepts the worker
* state as a parameter to keep the code modular and testable.
*
* @note wa-sqlite namespace import for SQLite constants (SQLITE_ROW, SQLITE_INTEGER, etc.)
* @docs https://github.com/rhashimoto/wa-sqlite#api
*/
import * as SQLite from 'wa-sqlite';
import type { DataWorkerState, QueryResult, SQLiteParam } from '../types';
export type { QueryResult, SQLiteParam };
/**
* Execute a SQL statement (for INSERT, UPDATE, DELETE, CREATE, etc.)
*
* @param state - The data worker state
* @param sql - The SQL statement to execute
*/
export async function exec(state: DataWorkerState, sql: string): Promise<void> {
console.log('[DataWorker] Executing:', sql.substring(0, 100) + (sql.length > 100 ? '...' : ''));
if (!state.sqlite3 || state.db === null) {
throw new Error('[DataWorker] Database not initialized');
}
await state.sqlite3.exec(state.db, sql);
}
/**
* Execute a SQL query and return results (for SELECT)
*
* @param state - The data worker state
* @param sql - The SQL query to execute
* @returns Query result with columns and rows
*/
export async function query(state: DataWorkerState, sql: string): Promise<QueryResult> {
console.log('[DataWorker] Querying:', sql.substring(0, 100) + (sql.length > 100 ? '...' : ''));
return await queryWithParams(state, sql);
}
/**
* Execute a SQL statement with bound parameters (for INSERT, UPDATE, DELETE)
*
* Uses wa-sqlite's statements() generator and bind_collection() for parameter binding.
* @see https://github.com/rhashimoto/wa-sqlite#api
*
* @param state - The data worker state
* @param sql - The SQL statement to execute
* @param params - The parameters to bind to the statement
*/
export async function execWithParams(
state: DataWorkerState,
sql: string,
params: SQLiteParam[] = [],
): Promise<void> {
if (!state.sqlite3 || state.db === null) {
throw new Error('[DataWorker] Database not initialized');
}
// Use statements() generator to get prepared statement
for await (const stmt of state.sqlite3.statements(state.db, sql)) {
// Bind parameters using bind_collection (handles arrays automatically)
if (params.length > 0) {
state.sqlite3.bind_collection(stmt, params);
}
// Execute the statement
await state.sqlite3.step(stmt);
}
}
/**
* Execute a SQL query with bound parameters
*
* Uses wa-sqlite's statements() generator and bind_collection() for parameter binding.
* The column() method automatically returns typed values.
* @see https://github.com/rhashimoto/wa-sqlite#api
*
* @param state - The data worker state
* @param sql - The SQL query to execute
* @param params - The parameters to bind to the query
* @returns Query result with columns and rows
*/
export async function queryWithParams(
state: DataWorkerState,
sql: string,
params: SQLiteParam[] = [],
): Promise<QueryResult> {
if (!state.sqlite3 || state.db === null) {
throw new Error('[DataWorker] Database not initialized');
}
const result: QueryResult = {
columns: [],
rows: [],
};
// Use statements() generator to get prepared statement
for await (const stmt of state.sqlite3.statements(state.db, sql)) {
// Bind parameters using bind_collection (handles arrays automatically)
if (params.length > 0) {
state.sqlite3.bind_collection(stmt, params);
}
// Get column names
const columnCount = state.sqlite3.column_count(stmt);
for (let i = 0; i < columnCount; i++) {
result.columns.push(state.sqlite3.column_name(stmt, i));
}
// Execute and fetch rows
while ((await state.sqlite3.step(stmt)) === SQLite.SQLITE_ROW) {
const row: unknown[] = [];
for (let i = 0; i < columnCount; i++) {
// column() returns automatically typed values (string, number, null, or Uint8Array)
row.push(state.sqlite3.column(stmt, i));
}
result.rows.push(row);
}
}
return result;
}
/**
* Execute operations within a transaction
*
* Automatically handles BEGIN/COMMIT/ROLLBACK. On success, commits the transaction.
* On error, rolls back and re-throws the error.
*
* @param state - The data worker state
* @param fn - Async function containing operations to execute within the transaction
* @returns The result of the callback function
*/
export async function withTrx<T>(state: DataWorkerState, fn: () => Promise<T>): Promise<T> {
await exec(state, 'BEGIN TRANSACTION');
try {
const result = await fn();
await exec(state, 'COMMIT');
return result;
} catch (error) {
await exec(state, 'ROLLBACK');
throw error;
}
}
/**
* Close the database connection
*
* @param state - The data worker state
*/
export async function close(state: DataWorkerState): Promise<void> {
console.log('[DataWorker] Closing database...');
if (state.sqlite3 && state.db !== null) {
await state.sqlite3.close(state.db);
state.db = null;
}
if (state.vfs) {
await state.vfs.close();
state.vfs = null;
}
state.initialized = false;
console.log('[DataWorker] Database closed');
}

View File

@ -0,0 +1,70 @@
/**
* Data Worker Types
*
* This module defines shared types used across the data worker modules.
* It's separated to avoid circular dependencies between modules.
*/
/// <reference types="wa-sqlite/src/types" />
// wa-sqlite imports - see: https://github.com/rhashimoto/wa-sqlite#api
import type * as SQLite from 'wa-sqlite';
import type { AccessHandlePoolVFS } from 'wa-sqlite/src/examples/AccessHandlePoolVFS.js';
/**
* Prepared statement result from prepare_v2
*/
export interface SQLitePreparedStatement {
stmt: number;
sql: string;
}
/**
* SQLiteAPI type derived from wa-sqlite Factory function
* @see https://github.com/rhashimoto/wa-sqlite#api
*/
export type SQLiteAPI = ReturnType<typeof SQLite.Factory>;
/**
* SQLite compatible parameter type for bind operations
* @see https://github.com/rhashimoto/wa-sqlite#api
*/
export type SQLiteParam = number | string | Uint8Array | number[] | bigint | null;
/**
* Re-export AccessHandlePoolVFS for convenience
*/
export type { AccessHandlePoolVFS };
/**
* Data Worker State
*
* Represents the internal state of the data worker including
* SQLite connection and VFS references.
*/
export interface DataWorkerState {
initialized: boolean;
sqlite3: SQLiteAPI | null;
db: number | null;
vfs: AccessHandlePoolVFS | null;
initPromise: Promise<void> | null;
}
/**
* Query Result
*
* Represents the result of a SQL query with column names and row data.
*/
export interface QueryResult {
columns: string[];
rows: unknown[][];
}
/**
* Database table row types
*/
export interface NodeTypeRow {
id: string;
data: string;
updated_at: string;
}

View File

@ -0,0 +1,224 @@
/**
* Dedicated Data Worker
*
* This worker handles the actual SQLite/OPFS operations.
* It uses wa-sqlite with AccessHandlePoolVFS which works without cross-origin isolation.
*
* Architecture (based on Notion's approach):
* - Only ONE dedicated worker should access the database at a time
* - The SharedWorker coordinates which tab is "active"
* - All queries are routed through the SharedWorker to the active tab's dedicated worker
*/
import * as Comlink from 'comlink';
// wa-sqlite imports - see: https://github.com/rhashimoto/wa-sqlite#api
// SQLiteESMFactory: WebAssembly module factory for SQLite
// SQLite namespace: Contains Factory function and SQLite constants (SQLITE_OK, SQLITE_OPEN_*, etc.)
import SQLiteESMFactory from 'wa-sqlite/dist/wa-sqlite.mjs';
import * as SQLite from 'wa-sqlite';
import { AccessHandlePoolVFS } from 'wa-sqlite/src/examples/AccessHandlePoolVFS.js';
import { getAllTableSchemas } from './db';
import { loadNodeTypes as loadNodeTypesOp } from './operations/loadNodeTypes';
import {
exec as execOp,
query as queryOp,
queryWithParams as queryWithParamsOp,
close as closeOp,
} from './operations/query';
import type { DataWorkerState, QueryResult, SQLiteAPI, SQLiteParam } from './types';
export type { DataWorkerState, QueryResult, SQLiteAPI, SQLiteParam } from './types';
const state: DataWorkerState = {
initialized: false,
sqlite3: null,
db: null,
vfs: null,
initPromise: null,
};
const DB_NAME = 'n8n';
const VFS_NAME = 'n8n-opfs';
const SQLITE_ACCESS_EXISTS =
(SQLite as { SQLITE_ACCESS_EXISTS?: number }).SQLITE_ACCESS_EXISTS ?? 0;
/**
* Check if OPFS is available in the current environment
*/
async function isOpfsAvailable(): Promise<boolean> {
try {
if (typeof navigator === 'undefined' || !navigator.storage?.getDirectory) {
return false;
}
const root = await navigator.storage.getDirectory();
return root !== null && root !== undefined;
} catch {
return false;
}
}
/**
* Determine if the database file already exists within the AccessHandlePoolVFS directory
*/
function databaseAlreadyExists(): boolean {
const vfs = state.vfs;
if (!vfs) {
return false;
}
const resultView = new DataView(new ArrayBuffer(4));
const rc = vfs.jAccess(DB_NAME, SQLITE_ACCESS_EXISTS, resultView);
if (rc !== SQLite.SQLITE_OK) {
console.warn('[DataWorker] Unable to check database existence via VFS, assuming new database');
return false;
}
return resultView.getInt32(0, true) === 1;
}
/**
* Initialize the SQLite database with OPFS persistence
*/
async function initialize(): Promise<void> {
// Return cached promise if initialization is already in progress
if (state.initPromise) {
return await state.initPromise;
}
if (state.initialized) {
console.log('[DataWorker] Already initialized');
return;
}
console.log('[DataWorker] Initializing...');
state.initPromise = (async () => {
const opfsAvailable = await isOpfsAvailable();
console.log('[DataWorker] OPFS available:', opfsAvailable);
console.log(
'[DataWorker] Storage API:',
typeof navigator !== 'undefined' && !!navigator.storage,
);
console.log(
'[DataWorker] getDirectory method:',
typeof navigator !== 'undefined' && !!navigator.storage?.getDirectory,
);
if (!opfsAvailable) {
console.warn(
'[DataWorker] OPFS is not available - database will not persist across sessions',
);
}
const module = await SQLiteESMFactory();
state.sqlite3 = SQLite.Factory(module) as SQLiteAPI;
console.log('[DataWorker] Creating AccessHandlePoolVFS...');
state.vfs = await AccessHandlePoolVFS.create(VFS_NAME, module);
console.log('[DataWorker] AccessHandlePoolVFS created successfully');
state.sqlite3.vfs_register(state.vfs, true);
const databaseExists = databaseAlreadyExists();
console.log('[DataWorker] Database exists:', databaseExists);
const openFlags = SQLite.SQLITE_OPEN_CREATE | SQLite.SQLITE_OPEN_READWRITE;
state.db = await state.sqlite3.open_v2(DB_NAME, openFlags, VFS_NAME);
console.log('[DataWorker] Database opened successfully');
console.log('[DataWorker] SQLite version:', state.sqlite3.libversion());
console.log('[DataWorker] VFS name:', VFS_NAME);
console.log('[DataWorker] Database name:', DB_NAME);
if (!databaseExists) {
console.log('[DataWorker] Creating tables...');
const tableSchemas = getAllTableSchemas();
for (const schema of tableSchemas) {
await state.sqlite3.exec(state.db, schema);
}
console.log('[DataWorker] Tables created successfully');
} else {
console.log('[DataWorker] Skipping table creation - database already exists');
}
state.initialized = true;
})();
try {
await state.initPromise;
} catch (error) {
console.error('[DataWorker] Failed to initialize:', error);
throw error;
} finally {
state.initPromise = null;
}
}
/**
* Execute a SQL statement (for INSERT, UPDATE, DELETE, CREATE, etc.)
*/
async function exec(sql: string): Promise<void> {
await execOp(state, sql);
}
/**
* Execute a SQL query and return results (for SELECT)
*/
async function query(sql: string): Promise<QueryResult> {
return await queryOp(state, sql);
}
/**
* Execute a SQL query with bound parameters
*/
async function queryWithParams(sql: string, params: SQLiteParam[] = []): Promise<QueryResult> {
return await queryWithParamsOp(state, sql, params);
}
/**
* Close the database connection
*/
async function close(): Promise<void> {
await closeOp(state);
}
/**
* Check if the worker is initialized
*/
function isInitialized(): boolean {
return state.initialized;
}
/**
* Load node types from the server and sync with the local database
* This runs entirely within the worker (fetch + SQL operations)
*/
async function loadNodeTypes(baseUrl: string): Promise<void> {
await loadNodeTypesOp(state, baseUrl);
}
// Export the worker API
export const dataWorkerApi = {
initialize,
exec,
query,
queryWithParams,
close,
isInitialized,
loadNodeTypes,
};
export type DataWorkerApi = typeof dataWorkerApi;
// Expose the API via Comlink for direct communication
Comlink.expose(dataWorkerApi);
// Also handle MessageChannel connections from the coordinator
self.onmessage = (event: MessageEvent) => {
if (event.data?.type === 'connect' && event.data.port instanceof MessagePort) {
console.log('[DataWorker] Received connection from coordinator');
Comlink.expose(dataWorkerApi, event.data.port);
}
};

View File

@ -1,48 +0,0 @@
import { sqlite3Worker1Promiser } from '@sqlite.org/sqlite-wasm';
import type { Promiser, DbId } from '@sqlite.org/sqlite-wasm';
export type DatabaseTable = {
name: string;
schema: string;
};
export type DatabaseConfig = {
filename: `file:${string}.sqlite3?vfs=opfs`;
tables: Record<string, DatabaseTable>;
};
export async function initializeDatabase(config: DatabaseConfig) {
// Initialize the SQLite worker
const promiser: Promiser = await new Promise((resolve) => {
const _promiser = sqlite3Worker1Promiser({
onready: () => resolve(_promiser),
});
});
if (!promiser) throw new Error('Failed to initialize promiser');
// Get configuration and open database
const cfg = await promiser('config-get', {});
const openResponse = await promiser('open', {
filename: config.filename,
});
if (openResponse.type === 'error') {
throw new Error(openResponse.result.message);
}
const dbId: DbId = openResponse.result.dbId;
for (const table of Object.values(config.tables)) {
await promiser('exec', {
dbId,
sql: table.schema,
});
}
return {
promiser,
dbId,
cfg,
};
}

View File

@ -0,0 +1,72 @@
/**
* Run Data Workers
*
* Main entry point for the run-data worker system.
*
* Architecture (based on Notion's approach):
* - Each tab creates a dedicated data worker (for actual DB operations)
* - Each tab connects to the SharedWorker coordinator
* - The coordinator routes all queries to the "active" tab's dedicated worker
* - Only one dedicated worker accesses OPFS at a time (prevents corruption)
*/
import { coordinator, registerTab } from './coordinator';
import type { SQLiteParam } from './data/types';
/**
* Ensure the tab is registered before performing operations
*/
async function ensureRegistered(): Promise<void> {
await registerTab();
}
/**
* Initialize the database
* This will route to the active tab's data worker
*/
export async function initialize(): Promise<void> {
await ensureRegistered();
await coordinator.initialize();
}
/**
* Execute a SQL statement (INSERT, UPDATE, DELETE, CREATE, etc.)
*/
export async function exec(sql: string): Promise<void> {
await ensureRegistered();
await coordinator.exec(sql);
}
/**
* Execute a SQL query and return results
*/
export async function query(sql: string): Promise<{ columns: string[]; rows: unknown[][] }> {
await ensureRegistered();
return await coordinator.query(sql);
}
/**
* Execute a SQL query with bound parameters
*/
export async function queryWithParams(
sql: string,
params: SQLiteParam[],
): Promise<{ columns: string[]; rows: unknown[][] }> {
await ensureRegistered();
return await coordinator.queryWithParams(sql, params);
}
/**
* Check if the database is initialized
*/
export async function isInitialized(): Promise<boolean> {
return await coordinator.isInitialized();
}
/**
* Load node types from the server
*/
export async function loadNodeTypes(baseUrl: string): Promise<void> {
await ensureRegistered();
await coordinator.loadNodeTypes(baseUrl);
}

View File

@ -1,19 +0,0 @@
import type { DatabaseConfig } from '@/app/workers/database';
export const databaseConfig: DatabaseConfig = {
filename: 'file:n8n.sqlite3?vfs=opfs',
tables: {
executions: {
name: 'executions',
schema: `
CREATE TABLE IF NOT EXISTS executions (
id INTEGER PRIMARY KEY,
workflow_id INTEGER NOT NULL,
data TEXT CHECK (json_valid(data)) NOT NULL,
workflow TEXT CHECK (json_valid(workflow)) NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
`,
},
},
} as const;

View File

@ -1,8 +0,0 @@
import * as Comlink from 'comlink';
import type { RunDataWorker } from '@/app/workers/run-data/worker';
const worker = new Worker(new URL('./worker.ts', import.meta.url), {
type: 'module',
});
export const runDataWorker = Comlink.wrap<RunDataWorker>(worker);

View File

@ -1,43 +0,0 @@
import * as Comlink from 'comlink';
import { databaseConfig } from '@/app/workers/run-data/db';
import { initializeDatabase } from '@/app/workers/database';
import type { Promiser, DbId } from '@sqlite.org/sqlite-wasm';
import type { NodeExecuteAfterData } from '@n8n/api-types/push/execution';
const state: {
initialized: boolean;
promiser: Promiser | undefined;
dbId: DbId;
} = {
initialized: false,
promiser: undefined,
dbId: undefined,
};
export const actions = {
async initialize() {
if (state.initialized) return;
const { promiser, dbId } = await initializeDatabase(databaseConfig);
state.promiser = promiser;
state.dbId = dbId;
state.initialized = true;
},
onNodeExecuteAfterData(buffer: ArrayBuffer) {
const data = new TextDecoder('utf-8').decode(new Uint8Array(buffer));
let parsedData: NodeExecuteAfterData;
try {
parsedData = JSON.parse(data);
} catch (error) {
return;
}
console.log('nodeExecuteAfterData in worker', parsedData);
},
};
export type RunDataWorker = typeof actions;
Comlink.expose(actions);

View File

@ -1,96 +0,0 @@
import type { Worker } from 'node:worker_threads';
declare module '@sqlite.org/sqlite-wasm' {
type OnreadyFunction = () => void;
export type Sqlite3Worker1PromiserConfig = {
onready?: OnreadyFunction;
worker?: Worker | (() => Worker);
generateMessageId?: (messageObject: unknown) => string;
debug?: (...args: any[]) => void;
onunhandled?: (event: MessageEvent) => void;
};
export type DbId = string | undefined;
export type PromiserMethods = {
'config-get': {
args: Record<string, never>;
result: {
dbID: DbId;
version: {
libVersion: string;
sourceId: string;
libVersionNumber: number;
downloadVersion: number;
};
bigIntEnabled: boolean;
opfsEnabled: boolean;
vfsList: string[];
};
};
open: {
args: Partial<{
filename?: string;
vfs?: string;
}>;
result: {
dbId: DbId;
filename: string;
persistent: boolean;
vfs: string;
};
};
exec: {
args: {
sql: string;
dbId?: DbId;
bind?: unknown[];
returnValue?: string;
};
result: {
dbId: DbId;
sql: string;
bind: unknown[];
returnValue: string;
resultRows?: unknown[][];
};
};
};
export type PromiserResponseSuccess<T extends keyof PromiserMethods> = {
type: T;
result: PromiserMethods[T]['result'];
messageId: string;
dbId: DbId;
workerReceivedTime: number;
workerRespondTime: number;
departureTime: number;
};
export type PromiserResponseError = {
type: 'error';
result: {
operation: string;
message: string;
errorClass: string;
input: object;
stack: unknown[];
};
messageId: string;
dbId: DbId;
};
export type PromiserResponse<T extends keyof PromiserMethods> =
| PromiserResponseSuccess<T>
| PromiserResponseError;
export type Promiser = <T extends keyof PromiserMethods>(
messageType: T,
messageArguments: PromiserMethods[T]['args'],
) => Promise<PromiserResponse<T>>;
export function sqlite3Worker1Promiser(
config?: Sqlite3Worker1PromiserConfig | OnreadyFunction,
): Promiser;
}

View File

@ -1,5 +1,6 @@
/// <reference types="vite/client" />
/// <reference types="vite-plugin-comlink/client" />
/// <reference types="wa-sqlite/src/types" />
import 'vue-router';
import type { VNode, ComponentPublicInstance } from 'vue';

View File

@ -147,3 +147,59 @@ declare module 'virtual:node-popularity-data' {
}>;
export default data;
}
/**
* wa-sqlite AccessHandlePoolVFS module declaration
* This VFS uses the Origin Private File System (OPFS) Access Handle Pool
* @see https://github.com/rhashimoto/wa-sqlite
*/
declare module 'wa-sqlite/src/examples/AccessHandlePoolVFS.js' {
export class AccessHandlePoolVFS implements SQLiteVFS {
name: string;
mxPathName?: number;
/** Create a new AccessHandlePoolVFS instance */
static create(name: string, module: unknown): Promise<AccessHandlePoolVFS>;
/** Close the VFS and release resources */
close(): Promise<void>;
/** Check if a file exists (synchronous access check) */
jAccess(name: string, flags: number, pResOut: DataView): number;
// SQLiteVFS interface methods
isReady(): boolean | Promise<boolean>;
xClose(fileId: number): number | Promise<number>;
xRead(
fileId: number,
pData: number,
iAmt: number,
iOffsetLo: number,
iOffsetHi: number,
): number | Promise<number>;
xWrite(
fileId: number,
pData: number,
iAmt: number,
iOffsetLo: number,
iOffsetHi: number,
): number | Promise<number>;
xTruncate(fileId: number, iSizeLo: number, iSizeHi: number): number | Promise<number>;
xSync(fileId: number, flags: number): number | Promise<number>;
xFileSize(fileId: number, pSize64: number): number | Promise<number>;
xLock(fileId: number, flags: number): number | Promise<number>;
xUnlock(fileId: number, flags: number): number | Promise<number>;
xCheckReservedLock(fileId: number, pResOut: number): number | Promise<number>;
xFileControl(fileId: number, flags: number, pOut: number): number | Promise<number>;
xDeviceCharacteristics(fileId: number): number | Promise<number>;
xOpen(
pVfs: number,
zName: number,
pFile: number,
flags: number,
pOutFlags: number,
): number | Promise<number>;
xDelete(pVfs: number, zName: number, syncDir: number): number | Promise<number>;
xAccess(pVfs: number, zName: number, flags: number, pResOut: number): number | Promise<number>;
}
}

View File

@ -106,6 +106,15 @@ const plugins: UserConfig['plugins'] = [
src: pathPosix.resolve('node_modules/curlconverter/dist/tree-sitter-bash.wasm'),
dest: resolve(__dirname, 'dist'),
},
// wa-sqlite WASM files for OPFS database support (no cross-origin isolation needed)
{
src: pathPosix.resolve('node_modules/wa-sqlite/dist/wa-sqlite.wasm'),
dest: 'assets',
},
{
src: pathPosix.resolve('node_modules/wa-sqlite/dist/wa-sqlite-async.wasm'),
dest: 'assets',
},
],
}),
vue(),
@ -227,6 +236,7 @@ export default mergeConfig(
target,
},
optimizeDeps: {
exclude: ['wa-sqlite'],
esbuildOptions: {
target,
},

View File

@ -2796,9 +2796,6 @@ importers:
'@sentry/vue':
specifier: catalog:frontend
version: 9.42.1(pinia@2.2.4(typescript@5.9.2)(vue@3.5.26(typescript@5.9.2)))(vue@3.5.26(typescript@5.9.2))
'@sqlite.org/sqlite-wasm':
specifier: 3.50.4-build1
version: 3.50.4-build1
'@types/semver':
specifier: ^7.7.0
version: 7.7.0
@ -2961,6 +2958,9 @@ importers:
vuedraggable:
specifier: 4.1.0
version: 4.1.0(vue@3.5.26(typescript@5.9.2))
wa-sqlite:
specifier: github:rhashimoto/wa-sqlite#779219540f66cecaa159da32b3b8936697ba10a7
version: https://codeload.github.com/rhashimoto/wa-sqlite/tar.gz/779219540f66cecaa159da32b3b8936697ba10a7
web-tree-sitter:
specifier: 0.24.3
version: 0.24.3
@ -8251,10 +8251,6 @@ packages:
resolution: {integrity: sha512-4aUIteuyxtBUhVdiQqcDhKFitwfd9hqoSDYY2KRXiWtgoWJ9Bmise+KfEPDiVHWeJepvF8xJO9/9+WDIciMFFw==}
engines: {node: '>=18.0.0'}
'@sqlite.org/sqlite-wasm@3.50.4-build1':
resolution: {integrity: sha512-Qig2Wso7gPkU1PtXwFzndh+CTRzrIFxVGqv6eCetjU7YqxlHItj+GvQYwYTppCRgAPawtRN/4AJcEgB9xDHGug==}
hasBin: true
'@standard-schema/spec@1.1.0':
resolution: {integrity: sha512-l2aFy5jALhniG5HgqrD6jXLi/rUWrKvqN/qJx6yoJsgKhblVd+iqqU4RCXavm/jPityDo5TCvKMnpjKnOriy0w==}
@ -18354,6 +18350,10 @@ packages:
resolution: {integrity: sha512-o8qghlI8NZHU1lLPrpi2+Uq7abh4GGPpYANlalzWxyWteJOCsr/P+oPBA49TOLu5FTZO4d3F9MnWJfiMo4BkmA==}
engines: {node: '>=18'}
wa-sqlite@https://codeload.github.com/rhashimoto/wa-sqlite/tar.gz/779219540f66cecaa159da32b3b8936697ba10a7:
resolution: {tarball: https://codeload.github.com/rhashimoto/wa-sqlite/tar.gz/779219540f66cecaa159da32b3b8936697ba10a7}
version: 1.0.9
walkdir@0.4.1:
resolution: {integrity: sha512-3eBwRyEln6E1MSzcxcVpQIhRG8Q1jLvEqRmCZqS3dsfXEDR/AhOF4d+jHg1qvDCpYaVRZjENPQyrVxAkQqxPgQ==}
engines: {node: '>=6.0.0'}
@ -25163,8 +25163,6 @@ snapshots:
dependencies:
tslib: 2.8.1
'@sqlite.org/sqlite-wasm@3.50.4-build1': {}
'@standard-schema/spec@1.1.0': {}
'@storybook/addon-a11y@10.1.11(storybook@10.1.11(@testing-library/dom@10.4.0)(bufferutil@4.0.9)(prettier@3.6.2)(react-dom@18.2.0(react@18.2.0))(react@18.2.0)(utf-8-validate@5.0.10))':
@ -37626,6 +37624,8 @@ snapshots:
dependencies:
xml-name-validator: 5.0.0
wa-sqlite@https://codeload.github.com/rhashimoto/wa-sqlite/tar.gz/779219540f66cecaa159da32b3b8936697ba10a7: {}
walkdir@0.4.1: {}
walker@1.0.8: