From b1a2b053e64fbbe004cda62523ef2a3c685804cb Mon Sep 17 00:00:00 2001 From: Daria Staferova Date: Mon, 4 Aug 2025 10:55:15 +0300 Subject: [PATCH] Add upsert endpoint --- .../data-store/upsert-data-store-rows.dto.ts | 13 ++ packages/@n8n/api-types/src/dto/index.ts | 1 + .../__tests__/data-store.service.test.ts | 65 ++++++++- .../data-store/__tests__/sql-utils.test.ts | 130 +++++++++++++++++- .../data-store/data-store-rows.repository.ts | 68 ++++++++- .../data-store/data-store.controller.ts | 12 ++ .../modules/data-store/data-store.service.ts | 10 ++ .../src/modules/data-store/utils/sql-utils.ts | 55 ++++++++ 8 files changed, 349 insertions(+), 5 deletions(-) create mode 100644 packages/@n8n/api-types/src/dto/data-store/upsert-data-store-rows.dto.ts diff --git a/packages/@n8n/api-types/src/dto/data-store/upsert-data-store-rows.dto.ts b/packages/@n8n/api-types/src/dto/data-store/upsert-data-store-rows.dto.ts new file mode 100644 index 00000000000..5171c69392d --- /dev/null +++ b/packages/@n8n/api-types/src/dto/data-store/upsert-data-store-rows.dto.ts @@ -0,0 +1,13 @@ +import { z } from 'zod'; +import { Z } from 'zod-class'; + +import { dataStoreColumnNameSchema } from 'schemas/data-store.schema'; + +const dataStoreValueSchema = z.union([z.string(), z.number(), z.boolean(), z.date(), z.null()]); + +const upsertDataStoreRowsShape = { + rows: z.array(z.record(dataStoreValueSchema)), + matchFields: z.array(dataStoreColumnNameSchema).min(1), +}; + +export class UpsertDataStoreRowsDto extends Z.class(upsertDataStoreRowsShape) {} diff --git a/packages/@n8n/api-types/src/dto/index.ts b/packages/@n8n/api-types/src/dto/index.ts index 2f1fea1d2be..27fa6c8ad3f 100644 --- a/packages/@n8n/api-types/src/dto/index.ts +++ b/packages/@n8n/api-types/src/dto/index.ts @@ -82,6 +82,7 @@ export { OidcConfigDto } from './oidc/config.dto'; export { CreateDataStoreDto } from './data-store/create-data-store.dto'; export { UpdateDataStoreDto } from './data-store/update-data-store.dto'; +export { UpsertDataStoreRowsDto } from './data-store/upsert-data-store-rows.dto'; export { ListDataStoreQueryDto } from './data-store/list-data-store-query.dto'; export { ListDataStoreContentQueryDto } from './data-store/list-data-store-content-query.dto'; export { CreateDataStoreColumnDto } from './data-store/create-data-store-column.dto'; diff --git a/packages/cli/src/modules/data-store/__tests__/data-store.service.test.ts b/packages/cli/src/modules/data-store/__tests__/data-store.service.test.ts index 6e52cc29dab..33125444928 100644 --- a/packages/cli/src/modules/data-store/__tests__/data-store.service.test.ts +++ b/packages/cli/src/modules/data-store/__tests__/data-store.service.test.ts @@ -611,8 +611,6 @@ describe('dataStore', () => { {}, ); - console.log(data); - expect(count).toEqual(2); expect(data).toEqual([ { c1: 1, c2: 'foo', id: 1 }, @@ -714,6 +712,69 @@ describe('dataStore', () => { }); }); + describe('upsertRows', () => { + it('updates a row if filter matches', async () => { + // ARRANGE + await dataStoreService.addColumn(dataStore1.id, { name: 'pid', type: 'string' }); + await dataStoreService.addColumn(dataStore1.id, { name: 'fullName', type: 'string' }); + await dataStoreService.addColumn(dataStore1.id, { name: 'age', type: 'number' }); + + // Insert initial row + await dataStoreService.insertRows(dataStore1.id, [ + { pid: '1995-111a', fullName: 'Alice', age: 30 }, + ]); + + // ACT + const result = await dataStoreService.upsertRows(dataStore1.id, { + rows: [{ pid: '1995-111a', fullName: 'Alicia', age: 31 }], + matchFields: ['pid'], + }); + + // ASSERT + expect(result).toBe(true); + + const { count, data } = await dataStoreRowsRepository.getManyAndCount( + toTableName(dataStore1.id), + {}, + ); + + expect(count).toEqual(1); + expect(data).toEqual([{ fullName: 'Alicia', age: 31, id: 1, pid: '1995-111a' }]); + }); + + it('inserts a row if filter does not match', async () => { + // ARRANGE + await dataStoreService.addColumn(dataStore1.id, { name: 'pid', type: 'string' }); + await dataStoreService.addColumn(dataStore1.id, { name: 'fullName', type: 'string' }); + await dataStoreService.addColumn(dataStore1.id, { name: 'age', type: 'number' }); + + // Insert initial row + await dataStoreService.insertRows(dataStore1.id, [ + { pid: '1995-111a', fullName: 'Alice', age: 30 }, + ]); + + // ACT + const result = await dataStoreService.upsertRows(dataStore1.id, { + rows: [{ pid: '1992-222b', fullName: 'Alice', age: 30 }], + matchFields: ['pid'], + }); + + // ASSERT + expect(result).toBe(true); + + const { count, data } = await dataStoreRowsRepository.getManyAndCount( + toTableName(dataStore1.id), + {}, + ); + + expect(count).toEqual(2); + expect(data).toEqual([ + { fullName: 'Alice', age: 30, id: 1, pid: '1995-111a' }, + { fullName: 'Alice', age: 30, id: 2, pid: '1992-222b' }, + ]); + }); + }); + describe('getManyRowsAndCount', () => { it('retrieves rows correctly', async () => { // ARRANGE diff --git a/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts b/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts index 2ffa347d34e..ed34ef54c59 100644 --- a/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts +++ b/packages/cli/src/modules/data-store/__tests__/sql-utils.test.ts @@ -1,10 +1,12 @@ -import type { DataStoreColumn } from '@n8n/api-types'; +import type { DataStoreColumn, DataStoreRows } from '@n8n/api-types'; import { createUserTableQuery, addColumnQuery, deleteColumnQuery, buildInsertQuery, + buildUpdateQuery, + splitRowsByExistence, } from '../utils/sql-utils'; describe('sql-utils', () => { @@ -90,4 +92,130 @@ describe('sql-utils', () => { expect(parameters).toEqual([]); }); }); + + describe('buildUpdateQuery', () => { + it('should generate a valid SQL update query with one match field', () => { + const tableName = 'data_store_user_abc'; + const row = { name: 'Alice', age: 30, city: 'Paris' }; + const matchFields = ['name']; + + const [query, parameters] = buildUpdateQuery(tableName, row, matchFields); + + expect(query).toBe('UPDATE data_store_user_abc SET "age" = ?, "city" = ? WHERE "name" = ?'); + expect(parameters).toEqual([30, 'Paris', 'Alice']); + }); + + it('should generate a valid SQL update query with multiple match fields', () => { + const tableName = 'data_store_user_abc'; + const row = { name: 'Alice', age: 30, city: 'Paris' }; + const matchFields = ['name', 'city']; + + const [query, parameters] = buildUpdateQuery(tableName, row, matchFields); + + expect(query).toBe( + 'UPDATE data_store_user_abc SET "age" = ? WHERE "name" = ? AND "city" = ?', + ); + expect(parameters).toEqual([30, 'Alice', 'Paris']); + }); + + it('should return empty query and parameters if row is empty', () => { + const tableName = 'data_store_user_abc'; + const row = {}; + const matchFields = ['id']; + + const [query, parameters] = buildUpdateQuery(tableName, row, matchFields); + + expect(query).toBe(''); + expect(parameters).toEqual([]); + }); + + it('should return empty query and parameters if matchFields is empty', () => { + const tableName = 'data_store_user_abc'; + const row = { name: 'Alice', age: 30 }; + const matchFields: string[] = []; + + const [query, parameters] = buildUpdateQuery(tableName, row, matchFields); + + expect(query).toBe(''); + expect(parameters).toEqual([]); + }); + + it('should return empty query and parameters if only matchFields are present in row', () => { + const tableName = 'data_store_user_abc'; + const row = { id: 1 }; + const matchFields = ['id']; + + const [query, parameters] = buildUpdateQuery(tableName, row, matchFields); + + expect(query).toBe(''); + expect(parameters).toEqual([]); + }); + }); + + describe('splitRowsByExistence', () => { + it('should correctly separate rows into insert and update based on matchFields', () => { + const existing = [ + { name: 'Alice', age: 30 }, + { name: 'Bob', age: 25 }, + ]; + const matchFields = ['name']; + const rows: DataStoreRows = [ + { name: 'Alice', age: 30 }, + { name: 'Bob', age: 26 }, + { name: 'Charlie', age: 35 }, + ]; + + const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows); + + expect(rowsToUpdate).toEqual([ + { name: 'Alice', age: 30 }, + { name: 'Bob', age: 26 }, + ]); + expect(rowsToInsert).toEqual([{ name: 'Charlie', age: 35 }]); + }); + + it('should treat rows as new if matchFields combination does not exist', () => { + const existing = [{ name: 'Bob', city: 'Berlin' }]; + const matchFields = ['name', 'city']; + const rows: DataStoreRows = [{ name: 'Alice', city: 'Berlin' }]; + + const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows); + + expect(rowsToUpdate).toEqual([]); + expect(rowsToInsert).toEqual([{ name: 'Alice', city: 'Berlin' }]); + }); + + it('should insert all rows if existing set is empty', () => { + const existing: Array> = []; + const matchFields = ['name']; + const rows: DataStoreRows = [{ name: 'Alice' }, { name: 'Bob' }]; + + const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows); + + expect(rowsToUpdate).toEqual([]); + expect(rowsToInsert).toEqual(rows); + }); + + it('should update all rows if all keys match existing', () => { + const existing = [{ name: 'Alice' }, { name: 'Bob' }]; + const matchFields = ['name']; + const rows: DataStoreRows = [{ name: 'Alice' }, { name: 'Bob' }]; + + const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows); + + expect(rowsToInsert).toEqual([]); + expect(rowsToUpdate).toEqual(rows); + }); + + it('should handle empty input rows', () => { + const existing = [{ name: 'Alice' }]; + const matchFields = ['name']; + const rows: DataStoreRows = []; + + const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows); + + expect(rowsToInsert).toEqual([]); + expect(rowsToUpdate).toEqual([]); + }); + }); }); diff --git a/packages/cli/src/modules/data-store/data-store-rows.repository.ts b/packages/cli/src/modules/data-store/data-store-rows.repository.ts index ff167972e1f..a958245b357 100644 --- a/packages/cli/src/modules/data-store/data-store-rows.repository.ts +++ b/packages/cli/src/modules/data-store/data-store-rows.repository.ts @@ -2,11 +2,17 @@ import type { ListDataStoreContentQueryDto, DataStoreUserTableName, DataStoreRows, + UpsertDataStoreRowsDto, } from '@n8n/api-types'; import { Service } from '@n8n/di'; import { DataSource, DataSourceOptions, SelectQueryBuilder } from '@n8n/typeorm'; -import { buildInsertQuery, quoteIdentifier } from './utils/sql-utils'; +import { + buildInsertQuery, + buildUpdateQuery, + quoteIdentifier, + splitRowsByExistence, +} from './utils/sql-utils'; // type QueryBuilder = SelectQueryBuilder>; type QueryBuilder = SelectQueryBuilder; @@ -33,7 +39,36 @@ export class DataStoreRowsRepository { constructor(private dataSource: DataSource) {} async insertRows(tableName: DataStoreUserTableName, rows: DataStoreRows) { - await this.dataSource.query(...buildInsertQuery(tableName, rows)); + const dbType = this.dataSource.options.type; + await this.dataSource.query(...buildInsertQuery(tableName, rows, dbType)); + return true; + } + + async upsertRows(tableName: DataStoreUserTableName, dto: UpsertDataStoreRowsDto) { + const dbType = this.dataSource.options.type; + const { rows, matchFields } = dto; + + if (rows.length === 0) { + return false; + } + + const { rowsToInsert, rowsToUpdate } = await this.fetchAndSplitRowsByExistence( + tableName, + matchFields, + rows, + ); + + if (rowsToInsert.length > 0) { + await this.insertRows(tableName, rowsToInsert); + } + + if (rowsToUpdate.length > 0) { + for (const row of rowsToUpdate) { + const [query, parameters] = buildUpdateQuery(tableName, row, matchFields, dbType); + await this.dataSource.query(query, parameters); + } + } + return true; } @@ -108,4 +143,33 @@ export class DataStoreRowsRepository { query.skip(dto.skip); query.take(dto.take); } + + private async fetchAndSplitRowsByExistence( + tableName: string, + matchFields: string[], + rows: DataStoreRows, + ): Promise<{ rowsToInsert: DataStoreRows; rowsToUpdate: DataStoreRows }> { + const whereClauses: string[] = []; + const params: unknown[] = []; + + for (const row of rows) { + const clause = matchFields + .map((field) => { + params.push(row[field]); + return `"${field}" = $${params.length}`; + }) + .join(' AND '); + whereClauses.push(`(${clause})`); + } + + const query = ` + SELECT ${matchFields.map((field) => `"${field}"`).join(', ')} + FROM "${tableName}" + WHERE ${whereClauses.join(' OR ')} + `; + + const existing: Array> = await this.dataSource.query(query, params); + + return splitRowsByExistence(existing, matchFields, rows); + } } diff --git a/packages/cli/src/modules/data-store/data-store.controller.ts b/packages/cli/src/modules/data-store/data-store.controller.ts index 7a9ccb3931f..01e671b7980 100644 --- a/packages/cli/src/modules/data-store/data-store.controller.ts +++ b/packages/cli/src/modules/data-store/data-store.controller.ts @@ -7,6 +7,7 @@ import type { MoveDataStoreColumnDto, UpdateDataStoreDto, DataStoreRows, + UpsertDataStoreRowsDto, } from '@n8n/api-types'; import { AuthenticatedRequest } from '@n8n/db'; import { @@ -136,4 +137,15 @@ export class DataStoreController { ) { return await this.dataStoreService.insertRows(dataStoreId, dto); } + + @Post('/:dataStoreId/upsert') + @ProjectScope('dataStore:writeRow') + async upsertDataStoreRows( + _req: AuthenticatedRequest<{ projectId: string }>, + _res: Response, + @Param('dataStoreId') dataStoreId: string, + @Body dto: UpsertDataStoreRowsDto, + ) { + return await this.dataStoreService.upsertRows(dataStoreId, dto); + } } diff --git a/packages/cli/src/modules/data-store/data-store.service.ts b/packages/cli/src/modules/data-store/data-store.service.ts index 654f5cca4de..1af26d40a8d 100644 --- a/packages/cli/src/modules/data-store/data-store.service.ts +++ b/packages/cli/src/modules/data-store/data-store.service.ts @@ -8,6 +8,7 @@ import type { DataStoreRows, DataStoreUserTableName, IDataStoreService, + UpsertDataStoreRowsDto, } from '@n8n/api-types'; import { UpdateDataStoreDto } from '@n8n/api-types/src/dto/data-store/update-data-store.dto'; import { Logger } from '@n8n/backend-common'; @@ -320,4 +321,13 @@ export class DataStoreService implements IDataStoreService { return await this.dataStoreRowsRepository.insertRows(toTableName(dataStoreId), rows); } + + async upsertRows(dataStoreId: string, dto: UpsertDataStoreRowsDto) { + const validationResult = await this.validateRows(dataStoreId, dto.rows as DataStoreRows); + if (!validationResult) { + return validationResult; + } + + return await this.dataStoreRowsRepository.upsertRows(toTableName(dataStoreId), dto); + } } diff --git a/packages/cli/src/modules/data-store/utils/sql-utils.ts b/packages/cli/src/modules/data-store/utils/sql-utils.ts index 98dc71c63ad..bf67c22d23c 100644 --- a/packages/cli/src/modules/data-store/utils/sql-utils.ts +++ b/packages/cli/src/modules/data-store/utils/sql-utils.ts @@ -113,6 +113,61 @@ export function buildInsertQuery( return [query, parameters]; } +export function buildUpdateQuery( + tableName: DataStoreUserTableName, + row: Record, + matchFields: string[], + dbType: DataSourceOptions['type'] = 'sqlite', +): [string, unknown[]] { + if (Object.keys(row).length === 0 || matchFields.length === 0) { + return ['', []]; + } + + const allKeys = Object.keys(row); + const updateKeys = allKeys.filter((key) => !matchFields.includes(key)); + + if (updateKeys.length === 0) { + return ['', []]; + } + + const setClause = updateKeys.map((key) => `${quoteIdentifier(key, dbType)} = ?`).join(', '); + + const whereClause = matchFields.map((key) => `${quoteIdentifier(key, dbType)} = ?`).join(' AND '); + + const parameters = [...updateKeys.map((k) => row[k]), ...matchFields.map((k) => row[k])]; + + const query = `UPDATE ${tableName} SET ${setClause} WHERE ${whereClause}`; + + return [query, parameters]; +} + +export function splitRowsByExistence( + existing: Array>, + matchFields: string[], + rows: DataStoreRows, +): { rowsToInsert: DataStoreRows; rowsToUpdate: DataStoreRows } { + // Extracts only the fields relevant to matching and serializes them for comparison + const getMatchKey = (row: Record): string => + JSON.stringify(Object.fromEntries(matchFields.map((field) => [field, row[field]]))); + + const existingSet = new Set(existing.map((row) => getMatchKey(row))); + + const rowsToUpdate: DataStoreRows = []; + const rowsToInsert: DataStoreRows = []; + + for (const row of rows) { + const key = getMatchKey(row); + + if (existingSet.has(key)) { + rowsToUpdate.push(row); + } else { + rowsToInsert.push(row); + } + } + + return { rowsToInsert, rowsToUpdate }; +} + export function quoteIdentifier(name: string, dbType: DataSourceOptions['type']): string { switch (dbType) { case 'postgres':