mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-28 15:27:03 +02:00
Add upsert endpoint
This commit is contained in:
parent
70cb3b6d8b
commit
b1a2b053e6
|
|
@ -0,0 +1,13 @@
|
|||
import { z } from 'zod';
|
||||
import { Z } from 'zod-class';
|
||||
|
||||
import { dataStoreColumnNameSchema } from 'schemas/data-store.schema';
|
||||
|
||||
const dataStoreValueSchema = z.union([z.string(), z.number(), z.boolean(), z.date(), z.null()]);
|
||||
|
||||
const upsertDataStoreRowsShape = {
|
||||
rows: z.array(z.record(dataStoreValueSchema)),
|
||||
matchFields: z.array(dataStoreColumnNameSchema).min(1),
|
||||
};
|
||||
|
||||
export class UpsertDataStoreRowsDto extends Z.class(upsertDataStoreRowsShape) {}
|
||||
|
|
@ -82,6 +82,7 @@ export { OidcConfigDto } from './oidc/config.dto';
|
|||
|
||||
export { CreateDataStoreDto } from './data-store/create-data-store.dto';
|
||||
export { UpdateDataStoreDto } from './data-store/update-data-store.dto';
|
||||
export { UpsertDataStoreRowsDto } from './data-store/upsert-data-store-rows.dto';
|
||||
export { ListDataStoreQueryDto } from './data-store/list-data-store-query.dto';
|
||||
export { ListDataStoreContentQueryDto } from './data-store/list-data-store-content-query.dto';
|
||||
export { CreateDataStoreColumnDto } from './data-store/create-data-store-column.dto';
|
||||
|
|
|
|||
|
|
@ -611,8 +611,6 @@ describe('dataStore', () => {
|
|||
{},
|
||||
);
|
||||
|
||||
console.log(data);
|
||||
|
||||
expect(count).toEqual(2);
|
||||
expect(data).toEqual([
|
||||
{ c1: 1, c2: 'foo', id: 1 },
|
||||
|
|
@ -714,6 +712,69 @@ describe('dataStore', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('upsertRows', () => {
|
||||
it('updates a row if filter matches', async () => {
|
||||
// ARRANGE
|
||||
await dataStoreService.addColumn(dataStore1.id, { name: 'pid', type: 'string' });
|
||||
await dataStoreService.addColumn(dataStore1.id, { name: 'fullName', type: 'string' });
|
||||
await dataStoreService.addColumn(dataStore1.id, { name: 'age', type: 'number' });
|
||||
|
||||
// Insert initial row
|
||||
await dataStoreService.insertRows(dataStore1.id, [
|
||||
{ pid: '1995-111a', fullName: 'Alice', age: 30 },
|
||||
]);
|
||||
|
||||
// ACT
|
||||
const result = await dataStoreService.upsertRows(dataStore1.id, {
|
||||
rows: [{ pid: '1995-111a', fullName: 'Alicia', age: 31 }],
|
||||
matchFields: ['pid'],
|
||||
});
|
||||
|
||||
// ASSERT
|
||||
expect(result).toBe(true);
|
||||
|
||||
const { count, data } = await dataStoreRowsRepository.getManyAndCount(
|
||||
toTableName(dataStore1.id),
|
||||
{},
|
||||
);
|
||||
|
||||
expect(count).toEqual(1);
|
||||
expect(data).toEqual([{ fullName: 'Alicia', age: 31, id: 1, pid: '1995-111a' }]);
|
||||
});
|
||||
|
||||
it('inserts a row if filter does not match', async () => {
|
||||
// ARRANGE
|
||||
await dataStoreService.addColumn(dataStore1.id, { name: 'pid', type: 'string' });
|
||||
await dataStoreService.addColumn(dataStore1.id, { name: 'fullName', type: 'string' });
|
||||
await dataStoreService.addColumn(dataStore1.id, { name: 'age', type: 'number' });
|
||||
|
||||
// Insert initial row
|
||||
await dataStoreService.insertRows(dataStore1.id, [
|
||||
{ pid: '1995-111a', fullName: 'Alice', age: 30 },
|
||||
]);
|
||||
|
||||
// ACT
|
||||
const result = await dataStoreService.upsertRows(dataStore1.id, {
|
||||
rows: [{ pid: '1992-222b', fullName: 'Alice', age: 30 }],
|
||||
matchFields: ['pid'],
|
||||
});
|
||||
|
||||
// ASSERT
|
||||
expect(result).toBe(true);
|
||||
|
||||
const { count, data } = await dataStoreRowsRepository.getManyAndCount(
|
||||
toTableName(dataStore1.id),
|
||||
{},
|
||||
);
|
||||
|
||||
expect(count).toEqual(2);
|
||||
expect(data).toEqual([
|
||||
{ fullName: 'Alice', age: 30, id: 1, pid: '1995-111a' },
|
||||
{ fullName: 'Alice', age: 30, id: 2, pid: '1992-222b' },
|
||||
]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getManyRowsAndCount', () => {
|
||||
it('retrieves rows correctly', async () => {
|
||||
// ARRANGE
|
||||
|
|
|
|||
|
|
@ -1,10 +1,12 @@
|
|||
import type { DataStoreColumn } from '@n8n/api-types';
|
||||
import type { DataStoreColumn, DataStoreRows } from '@n8n/api-types';
|
||||
|
||||
import {
|
||||
createUserTableQuery,
|
||||
addColumnQuery,
|
||||
deleteColumnQuery,
|
||||
buildInsertQuery,
|
||||
buildUpdateQuery,
|
||||
splitRowsByExistence,
|
||||
} from '../utils/sql-utils';
|
||||
|
||||
describe('sql-utils', () => {
|
||||
|
|
@ -90,4 +92,130 @@ describe('sql-utils', () => {
|
|||
expect(parameters).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('buildUpdateQuery', () => {
|
||||
it('should generate a valid SQL update query with one match field', () => {
|
||||
const tableName = 'data_store_user_abc';
|
||||
const row = { name: 'Alice', age: 30, city: 'Paris' };
|
||||
const matchFields = ['name'];
|
||||
|
||||
const [query, parameters] = buildUpdateQuery(tableName, row, matchFields);
|
||||
|
||||
expect(query).toBe('UPDATE data_store_user_abc SET "age" = ?, "city" = ? WHERE "name" = ?');
|
||||
expect(parameters).toEqual([30, 'Paris', 'Alice']);
|
||||
});
|
||||
|
||||
it('should generate a valid SQL update query with multiple match fields', () => {
|
||||
const tableName = 'data_store_user_abc';
|
||||
const row = { name: 'Alice', age: 30, city: 'Paris' };
|
||||
const matchFields = ['name', 'city'];
|
||||
|
||||
const [query, parameters] = buildUpdateQuery(tableName, row, matchFields);
|
||||
|
||||
expect(query).toBe(
|
||||
'UPDATE data_store_user_abc SET "age" = ? WHERE "name" = ? AND "city" = ?',
|
||||
);
|
||||
expect(parameters).toEqual([30, 'Alice', 'Paris']);
|
||||
});
|
||||
|
||||
it('should return empty query and parameters if row is empty', () => {
|
||||
const tableName = 'data_store_user_abc';
|
||||
const row = {};
|
||||
const matchFields = ['id'];
|
||||
|
||||
const [query, parameters] = buildUpdateQuery(tableName, row, matchFields);
|
||||
|
||||
expect(query).toBe('');
|
||||
expect(parameters).toEqual([]);
|
||||
});
|
||||
|
||||
it('should return empty query and parameters if matchFields is empty', () => {
|
||||
const tableName = 'data_store_user_abc';
|
||||
const row = { name: 'Alice', age: 30 };
|
||||
const matchFields: string[] = [];
|
||||
|
||||
const [query, parameters] = buildUpdateQuery(tableName, row, matchFields);
|
||||
|
||||
expect(query).toBe('');
|
||||
expect(parameters).toEqual([]);
|
||||
});
|
||||
|
||||
it('should return empty query and parameters if only matchFields are present in row', () => {
|
||||
const tableName = 'data_store_user_abc';
|
||||
const row = { id: 1 };
|
||||
const matchFields = ['id'];
|
||||
|
||||
const [query, parameters] = buildUpdateQuery(tableName, row, matchFields);
|
||||
|
||||
expect(query).toBe('');
|
||||
expect(parameters).toEqual([]);
|
||||
});
|
||||
});
|
||||
|
||||
describe('splitRowsByExistence', () => {
|
||||
it('should correctly separate rows into insert and update based on matchFields', () => {
|
||||
const existing = [
|
||||
{ name: 'Alice', age: 30 },
|
||||
{ name: 'Bob', age: 25 },
|
||||
];
|
||||
const matchFields = ['name'];
|
||||
const rows: DataStoreRows = [
|
||||
{ name: 'Alice', age: 30 },
|
||||
{ name: 'Bob', age: 26 },
|
||||
{ name: 'Charlie', age: 35 },
|
||||
];
|
||||
|
||||
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
|
||||
|
||||
expect(rowsToUpdate).toEqual([
|
||||
{ name: 'Alice', age: 30 },
|
||||
{ name: 'Bob', age: 26 },
|
||||
]);
|
||||
expect(rowsToInsert).toEqual([{ name: 'Charlie', age: 35 }]);
|
||||
});
|
||||
|
||||
it('should treat rows as new if matchFields combination does not exist', () => {
|
||||
const existing = [{ name: 'Bob', city: 'Berlin' }];
|
||||
const matchFields = ['name', 'city'];
|
||||
const rows: DataStoreRows = [{ name: 'Alice', city: 'Berlin' }];
|
||||
|
||||
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
|
||||
|
||||
expect(rowsToUpdate).toEqual([]);
|
||||
expect(rowsToInsert).toEqual([{ name: 'Alice', city: 'Berlin' }]);
|
||||
});
|
||||
|
||||
it('should insert all rows if existing set is empty', () => {
|
||||
const existing: Array<Record<string, unknown>> = [];
|
||||
const matchFields = ['name'];
|
||||
const rows: DataStoreRows = [{ name: 'Alice' }, { name: 'Bob' }];
|
||||
|
||||
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
|
||||
|
||||
expect(rowsToUpdate).toEqual([]);
|
||||
expect(rowsToInsert).toEqual(rows);
|
||||
});
|
||||
|
||||
it('should update all rows if all keys match existing', () => {
|
||||
const existing = [{ name: 'Alice' }, { name: 'Bob' }];
|
||||
const matchFields = ['name'];
|
||||
const rows: DataStoreRows = [{ name: 'Alice' }, { name: 'Bob' }];
|
||||
|
||||
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
|
||||
|
||||
expect(rowsToInsert).toEqual([]);
|
||||
expect(rowsToUpdate).toEqual(rows);
|
||||
});
|
||||
|
||||
it('should handle empty input rows', () => {
|
||||
const existing = [{ name: 'Alice' }];
|
||||
const matchFields = ['name'];
|
||||
const rows: DataStoreRows = [];
|
||||
|
||||
const { rowsToInsert, rowsToUpdate } = splitRowsByExistence(existing, matchFields, rows);
|
||||
|
||||
expect(rowsToInsert).toEqual([]);
|
||||
expect(rowsToUpdate).toEqual([]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -2,11 +2,17 @@ import type {
|
|||
ListDataStoreContentQueryDto,
|
||||
DataStoreUserTableName,
|
||||
DataStoreRows,
|
||||
UpsertDataStoreRowsDto,
|
||||
} from '@n8n/api-types';
|
||||
import { Service } from '@n8n/di';
|
||||
import { DataSource, DataSourceOptions, SelectQueryBuilder } from '@n8n/typeorm';
|
||||
|
||||
import { buildInsertQuery, quoteIdentifier } from './utils/sql-utils';
|
||||
import {
|
||||
buildInsertQuery,
|
||||
buildUpdateQuery,
|
||||
quoteIdentifier,
|
||||
splitRowsByExistence,
|
||||
} from './utils/sql-utils';
|
||||
|
||||
// type QueryBuilder = SelectQueryBuilder<Record<PropertyKey, unknown>>;
|
||||
type QueryBuilder = SelectQueryBuilder<any>;
|
||||
|
|
@ -33,7 +39,36 @@ export class DataStoreRowsRepository {
|
|||
constructor(private dataSource: DataSource) {}
|
||||
|
||||
async insertRows(tableName: DataStoreUserTableName, rows: DataStoreRows) {
|
||||
await this.dataSource.query(...buildInsertQuery(tableName, rows));
|
||||
const dbType = this.dataSource.options.type;
|
||||
await this.dataSource.query(...buildInsertQuery(tableName, rows, dbType));
|
||||
return true;
|
||||
}
|
||||
|
||||
async upsertRows(tableName: DataStoreUserTableName, dto: UpsertDataStoreRowsDto) {
|
||||
const dbType = this.dataSource.options.type;
|
||||
const { rows, matchFields } = dto;
|
||||
|
||||
if (rows.length === 0) {
|
||||
return false;
|
||||
}
|
||||
|
||||
const { rowsToInsert, rowsToUpdate } = await this.fetchAndSplitRowsByExistence(
|
||||
tableName,
|
||||
matchFields,
|
||||
rows,
|
||||
);
|
||||
|
||||
if (rowsToInsert.length > 0) {
|
||||
await this.insertRows(tableName, rowsToInsert);
|
||||
}
|
||||
|
||||
if (rowsToUpdate.length > 0) {
|
||||
for (const row of rowsToUpdate) {
|
||||
const [query, parameters] = buildUpdateQuery(tableName, row, matchFields, dbType);
|
||||
await this.dataSource.query(query, parameters);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
@ -108,4 +143,33 @@ export class DataStoreRowsRepository {
|
|||
query.skip(dto.skip);
|
||||
query.take(dto.take);
|
||||
}
|
||||
|
||||
private async fetchAndSplitRowsByExistence(
|
||||
tableName: string,
|
||||
matchFields: string[],
|
||||
rows: DataStoreRows,
|
||||
): Promise<{ rowsToInsert: DataStoreRows; rowsToUpdate: DataStoreRows }> {
|
||||
const whereClauses: string[] = [];
|
||||
const params: unknown[] = [];
|
||||
|
||||
for (const row of rows) {
|
||||
const clause = matchFields
|
||||
.map((field) => {
|
||||
params.push(row[field]);
|
||||
return `"${field}" = $${params.length}`;
|
||||
})
|
||||
.join(' AND ');
|
||||
whereClauses.push(`(${clause})`);
|
||||
}
|
||||
|
||||
const query = `
|
||||
SELECT ${matchFields.map((field) => `"${field}"`).join(', ')}
|
||||
FROM "${tableName}"
|
||||
WHERE ${whereClauses.join(' OR ')}
|
||||
`;
|
||||
|
||||
const existing: Array<Record<string, unknown>> = await this.dataSource.query(query, params);
|
||||
|
||||
return splitRowsByExistence(existing, matchFields, rows);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ import type {
|
|||
MoveDataStoreColumnDto,
|
||||
UpdateDataStoreDto,
|
||||
DataStoreRows,
|
||||
UpsertDataStoreRowsDto,
|
||||
} from '@n8n/api-types';
|
||||
import { AuthenticatedRequest } from '@n8n/db';
|
||||
import {
|
||||
|
|
@ -136,4 +137,15 @@ export class DataStoreController {
|
|||
) {
|
||||
return await this.dataStoreService.insertRows(dataStoreId, dto);
|
||||
}
|
||||
|
||||
@Post('/:dataStoreId/upsert')
|
||||
@ProjectScope('dataStore:writeRow')
|
||||
async upsertDataStoreRows(
|
||||
_req: AuthenticatedRequest<{ projectId: string }>,
|
||||
_res: Response,
|
||||
@Param('dataStoreId') dataStoreId: string,
|
||||
@Body dto: UpsertDataStoreRowsDto,
|
||||
) {
|
||||
return await this.dataStoreService.upsertRows(dataStoreId, dto);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import type {
|
|||
DataStoreRows,
|
||||
DataStoreUserTableName,
|
||||
IDataStoreService,
|
||||
UpsertDataStoreRowsDto,
|
||||
} from '@n8n/api-types';
|
||||
import { UpdateDataStoreDto } from '@n8n/api-types/src/dto/data-store/update-data-store.dto';
|
||||
import { Logger } from '@n8n/backend-common';
|
||||
|
|
@ -320,4 +321,13 @@ export class DataStoreService implements IDataStoreService {
|
|||
|
||||
return await this.dataStoreRowsRepository.insertRows(toTableName(dataStoreId), rows);
|
||||
}
|
||||
|
||||
async upsertRows(dataStoreId: string, dto: UpsertDataStoreRowsDto) {
|
||||
const validationResult = await this.validateRows(dataStoreId, dto.rows as DataStoreRows);
|
||||
if (!validationResult) {
|
||||
return validationResult;
|
||||
}
|
||||
|
||||
return await this.dataStoreRowsRepository.upsertRows(toTableName(dataStoreId), dto);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -113,6 +113,61 @@ export function buildInsertQuery(
|
|||
return [query, parameters];
|
||||
}
|
||||
|
||||
export function buildUpdateQuery(
|
||||
tableName: DataStoreUserTableName,
|
||||
row: Record<string, unknown>,
|
||||
matchFields: string[],
|
||||
dbType: DataSourceOptions['type'] = 'sqlite',
|
||||
): [string, unknown[]] {
|
||||
if (Object.keys(row).length === 0 || matchFields.length === 0) {
|
||||
return ['', []];
|
||||
}
|
||||
|
||||
const allKeys = Object.keys(row);
|
||||
const updateKeys = allKeys.filter((key) => !matchFields.includes(key));
|
||||
|
||||
if (updateKeys.length === 0) {
|
||||
return ['', []];
|
||||
}
|
||||
|
||||
const setClause = updateKeys.map((key) => `${quoteIdentifier(key, dbType)} = ?`).join(', ');
|
||||
|
||||
const whereClause = matchFields.map((key) => `${quoteIdentifier(key, dbType)} = ?`).join(' AND ');
|
||||
|
||||
const parameters = [...updateKeys.map((k) => row[k]), ...matchFields.map((k) => row[k])];
|
||||
|
||||
const query = `UPDATE ${tableName} SET ${setClause} WHERE ${whereClause}`;
|
||||
|
||||
return [query, parameters];
|
||||
}
|
||||
|
||||
export function splitRowsByExistence(
|
||||
existing: Array<Record<string, unknown>>,
|
||||
matchFields: string[],
|
||||
rows: DataStoreRows,
|
||||
): { rowsToInsert: DataStoreRows; rowsToUpdate: DataStoreRows } {
|
||||
// Extracts only the fields relevant to matching and serializes them for comparison
|
||||
const getMatchKey = (row: Record<string, unknown>): string =>
|
||||
JSON.stringify(Object.fromEntries(matchFields.map((field) => [field, row[field]])));
|
||||
|
||||
const existingSet = new Set(existing.map((row) => getMatchKey(row)));
|
||||
|
||||
const rowsToUpdate: DataStoreRows = [];
|
||||
const rowsToInsert: DataStoreRows = [];
|
||||
|
||||
for (const row of rows) {
|
||||
const key = getMatchKey(row);
|
||||
|
||||
if (existingSet.has(key)) {
|
||||
rowsToUpdate.push(row);
|
||||
} else {
|
||||
rowsToInsert.push(row);
|
||||
}
|
||||
}
|
||||
|
||||
return { rowsToInsert, rowsToUpdate };
|
||||
}
|
||||
|
||||
export function quoteIdentifier(name: string, dbType: DataSourceOptions['type']): string {
|
||||
switch (dbType) {
|
||||
case 'postgres':
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user