From 582b6ae9eaaef6a616233e9bd4eda7230c36eb0a Mon Sep 17 00:00:00 2001 From: Michael Kret <88898367+michael-radency@users.noreply.github.com> Date: Mon, 11 May 2026 13:16:14 +0300 Subject: [PATCH] fix(MongoDB Node): Resolve collection parameter per item in write operations (#29956) --- .../nodes-base/nodes/MongoDb/MongoDb.node.ts | 581 +++++++++++++----- .../nodes/MongoDb/test/MongoDB.test.ts | 506 ++++++++++++++- 2 files changed, 914 insertions(+), 173 deletions(-) diff --git a/packages/nodes-base/nodes/MongoDb/MongoDb.node.ts b/packages/nodes-base/nodes/MongoDb/MongoDb.node.ts index a4f3e5587ea..8d827605409 100644 --- a/packages/nodes-base/nodes/MongoDb/MongoDb.node.ts +++ b/packages/nodes-base/nodes/MongoDb/MongoDb.node.ts @@ -5,7 +5,7 @@ import type { Sort, } from 'mongodb'; import { ObjectId } from 'mongodb'; -import { ApplicationError, NodeConnectionTypes } from 'n8n-workflow'; +import { ApplicationError, NodeConnectionTypes, NodeOperationError } from 'n8n-workflow'; import type { IExecuteFunctions, ICredentialsDecrypted, @@ -37,7 +37,7 @@ export class MongoDb implements INodeType { name: 'mongoDb', icon: 'file:mongodb.svg', group: ['input'], - version: [1, 1.1, 1.2], + version: [1, 1.1, 1.2, 1.3], description: 'Find, insert and update documents in MongoDB', defaults: { name: 'MongoDB', @@ -239,100 +239,66 @@ export class MongoDb implements INodeType { if (operation === 'findOneAndReplace') { fallbackPairedItems = fallbackPairedItems ?? generatePairedItemData(items.length); - const fields = prepareFields(this.getNodeParameter('fields', 0) as string); - const useDotNotation = this.getNodeParameter('options.useDotNotation', 0, false) as boolean; - const dateFields = prepareFields( - this.getNodeParameter('options.dateFields', 0, '') as string, - ); + if (nodeVersion >= 1.3) { + for (let i = 0; i < itemsLength; i++) { + const fields = prepareFields(this.getNodeParameter('fields', i) as string); + const useDotNotation = this.getNodeParameter( + 'options.useDotNotation', + i, + false, + ) as boolean; + const dateFields = prepareFields( + this.getNodeParameter('options.dateFields', i, '') as string, + ); + const updateKey = ((this.getNodeParameter('updateKey', i) as string) || '').trim(); + const updateOptions = (this.getNodeParameter('upsert', i) as boolean) + ? { upsert: true } + : undefined; + const [item] = prepareItems({ + items: [items[i]], + fields, + updateKey, + useDotNotation, + dateFields, + }); - const updateKey = ((this.getNodeParameter('updateKey', 0) as string) || '').trim(); - - const updateOptions = (this.getNodeParameter('upsert', 0) as boolean) - ? { upsert: true } - : undefined; - - const updateItems = prepareItems({ items, fields, updateKey, useDotNotation, dateFields }); - - for (const item of updateItems) { - try { - const filter = { [updateKey]: item[updateKey] }; - if (updateKey === '_id') { - filter[updateKey] = new ObjectId(item[updateKey] as string); - delete item._id; + if (!item) { + if (this.continueOnFail()) { + returnData.push({ + json: { error: 'Item is missing the updateKey field' }, + pairedItem: { item: i }, + }); + continue; + } + throw new NodeOperationError(this.getNode(), 'Item is missing the updateKey field', { + itemIndex: i, + }); } - await mdb - .collection(this.getNodeParameter('collection', 0) as string) - .findOneAndReplace(filter, item, updateOptions as FindOneAndReplaceOptions); - } catch (error) { - if (this.continueOnFail()) { - item.json = { error: (error as JsonObject).message }; - continue; + try { + const filter = { [updateKey]: item[updateKey] }; + if (updateKey === '_id') { + filter[updateKey] = new ObjectId(item[updateKey] as string); + delete item._id; + } + + await mdb + .collection(this.getNodeParameter('collection', i) as string) + .findOneAndReplace(filter, item, updateOptions as FindOneAndReplaceOptions); + + returnData.push({ json: item, pairedItem: { item: i } }); + } catch (error) { + if (this.continueOnFail()) { + returnData.push({ + json: { error: (error as JsonObject).message }, + pairedItem: { item: i }, + }); + continue; + } + throw error; } - throw error; } - } - - returnData = this.helpers.constructExecutionMetaData( - this.helpers.returnJsonArray(updateItems), - { itemData: fallbackPairedItems }, - ); - } - - if (operation === 'findOneAndUpdate') { - fallbackPairedItems = fallbackPairedItems ?? generatePairedItemData(items.length); - const fields = prepareFields(this.getNodeParameter('fields', 0) as string); - const useDotNotation = this.getNodeParameter('options.useDotNotation', 0, false) as boolean; - const dateFields = prepareFields( - this.getNodeParameter('options.dateFields', 0, '') as string, - ); - - const updateKey = ((this.getNodeParameter('updateKey', 0) as string) || '').trim(); - - const updateOptions = (this.getNodeParameter('upsert', 0) as boolean) - ? { upsert: true } - : undefined; - - const updateItems = prepareItems({ - items, - fields, - updateKey, - useDotNotation, - dateFields, - isUpdate: nodeVersion >= 1.2, - }); - - for (const item of updateItems) { - try { - const filter = { [updateKey]: item[updateKey] }; - if (updateKey === '_id') { - filter[updateKey] = new ObjectId(item[updateKey] as string); - delete item._id; - } - - await mdb - .collection(this.getNodeParameter('collection', 0) as string) - .findOneAndUpdate(filter, { $set: item }, updateOptions as FindOneAndUpdateOptions); - } catch (error) { - if (this.continueOnFail()) { - item.json = { error: (error as JsonObject).message }; - continue; - } - throw error; - } - } - - returnData = this.helpers.constructExecutionMetaData( - this.helpers.returnJsonArray(updateItems), - { itemData: fallbackPairedItems }, - ); - } - - if (operation === 'insert') { - fallbackPairedItems = fallbackPairedItems ?? generatePairedItemData(items.length); - let responseData: IDataObject[] = []; - try { - // Prepare the data to insert and copy it to be returned + } else { const fields = prepareFields(this.getNodeParameter('fields', 0) as string); const useDotNotation = this.getNodeParameter( 'options.useDotNotation', @@ -343,86 +309,399 @@ export class MongoDb implements INodeType { this.getNodeParameter('options.dateFields', 0, '') as string, ); - const insertItems = prepareItems({ + const updateKey = ((this.getNodeParameter('updateKey', 0) as string) || '').trim(); + + const updateOptions = (this.getNodeParameter('upsert', 0) as boolean) + ? { upsert: true } + : undefined; + + const updateItems = prepareItems({ items, fields, - updateKey: '', + updateKey, useDotNotation, dateFields, }); - const { insertedIds } = await mdb - .collection(this.getNodeParameter('collection', 0) as string) - .insertMany(insertItems); + for (const item of updateItems) { + try { + const filter = { [updateKey]: item[updateKey] }; + if (updateKey === '_id') { + filter[updateKey] = new ObjectId(item[updateKey] as string); + delete item._id; + } - // Add the id to the data - for (const i of Object.keys(insertedIds)) { - responseData.push({ - ...insertItems[parseInt(i, 10)], - id: insertedIds[parseInt(i, 10)] as unknown as string, - }); - } - } catch (error) { - if (this.continueOnFail()) { - responseData = [{ error: (error as JsonObject).message }]; - } else { - throw error; + await mdb + .collection(this.getNodeParameter('collection', 0) as string) + .findOneAndReplace(filter, item, updateOptions as FindOneAndReplaceOptions); + } catch (error) { + if (this.continueOnFail()) { + item.json = { error: (error as JsonObject).message }; + continue; + } + throw error; + } } + + returnData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(updateItems), + { itemData: fallbackPairedItems }, + ); } + } - returnData = this.helpers.constructExecutionMetaData( - this.helpers.returnJsonArray(responseData), - { itemData: fallbackPairedItems }, - ); + if (operation === 'findOneAndUpdate') { + fallbackPairedItems = fallbackPairedItems ?? generatePairedItemData(items.length); + if (nodeVersion >= 1.3) { + for (let i = 0; i < itemsLength; i++) { + const fields = prepareFields(this.getNodeParameter('fields', i) as string); + const useDotNotation = this.getNodeParameter( + 'options.useDotNotation', + i, + false, + ) as boolean; + const dateFields = prepareFields( + this.getNodeParameter('options.dateFields', i, '') as string, + ); + const updateKey = ((this.getNodeParameter('updateKey', i) as string) || '').trim(); + const updateOptions = (this.getNodeParameter('upsert', i) as boolean) + ? { upsert: true } + : undefined; + const [item] = prepareItems({ + items: [items[i]], + fields, + updateKey, + useDotNotation, + dateFields, + isUpdate: true, + }); + + if (!item) { + if (this.continueOnFail()) { + returnData.push({ + json: { error: 'Item is missing the updateKey field' }, + pairedItem: { item: i }, + }); + continue; + } + throw new NodeOperationError(this.getNode(), 'Item is missing the updateKey field', { + itemIndex: i, + }); + } + + try { + const filter = { [updateKey]: item[updateKey] }; + if (updateKey === '_id') { + filter[updateKey] = new ObjectId(item[updateKey] as string); + delete item._id; + } + + await mdb + .collection(this.getNodeParameter('collection', i) as string) + .findOneAndUpdate(filter, { $set: item }, updateOptions as FindOneAndUpdateOptions); + + returnData.push({ json: item, pairedItem: { item: i } }); + } catch (error) { + if (this.continueOnFail()) { + returnData.push({ + json: { error: (error as JsonObject).message }, + pairedItem: { item: i }, + }); + continue; + } + throw error; + } + } + } else { + const fields = prepareFields(this.getNodeParameter('fields', 0) as string); + const useDotNotation = this.getNodeParameter( + 'options.useDotNotation', + 0, + false, + ) as boolean; + const dateFields = prepareFields( + this.getNodeParameter('options.dateFields', 0, '') as string, + ); + + const updateKey = ((this.getNodeParameter('updateKey', 0) as string) || '').trim(); + + const updateOptions = (this.getNodeParameter('upsert', 0) as boolean) + ? { upsert: true } + : undefined; + + const updateItems = prepareItems({ + items, + fields, + updateKey, + useDotNotation, + dateFields, + isUpdate: nodeVersion >= 1.2, + }); + + for (const item of updateItems) { + try { + const filter = { [updateKey]: item[updateKey] }; + if (updateKey === '_id') { + filter[updateKey] = new ObjectId(item[updateKey] as string); + delete item._id; + } + + await mdb + .collection(this.getNodeParameter('collection', 0) as string) + .findOneAndUpdate(filter, { $set: item }, updateOptions as FindOneAndUpdateOptions); + } catch (error) { + if (this.continueOnFail()) { + item.json = { error: (error as JsonObject).message }; + continue; + } + throw error; + } + } + + returnData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(updateItems), + { itemData: fallbackPairedItems }, + ); + } + } + + if (operation === 'insert') { + fallbackPairedItems = fallbackPairedItems ?? generatePairedItemData(items.length); + if (nodeVersion >= 1.3) { + // Phase 1: prepare items and group by collection name + const groups = new Map>(); + + for (let i = 0; i < itemsLength; i++) { + try { + const fields = prepareFields(this.getNodeParameter('fields', i) as string); + const useDotNotation = this.getNodeParameter( + 'options.useDotNotation', + i, + false, + ) as boolean; + const dateFields = prepareFields( + this.getNodeParameter('options.dateFields', i, '') as string, + ); + const [insertItem] = prepareItems({ + items: [items[i]], + fields, + updateKey: '', + useDotNotation, + dateFields, + }); + + if (!insertItem) continue; + + const collection = this.getNodeParameter('collection', i) as string; + const group = groups.get(collection) ?? []; + groups.set(collection, group); + group.push({ item: insertItem, originalIndex: i }); + } catch (error) { + if (this.continueOnFail()) { + returnData.push({ + json: { error: (error as JsonObject).message }, + pairedItem: { item: i }, + }); + } else { + throw error; + } + } + } + + // Phase 2: insertMany per collection group + for (const [collection, groupItems] of groups) { + try { + const { insertedIds } = await mdb + .collection(collection) + .insertMany(groupItems.map((g) => g.item)); + + for (let idx = 0; idx < groupItems.length; idx++) { + const g = groupItems[idx]; + returnData.push({ + json: { ...g.item, id: insertedIds[idx] as unknown as string }, + pairedItem: { item: g.originalIndex }, + }); + } + } catch (error) { + if (this.continueOnFail()) { + for (const g of groupItems) { + returnData.push({ + json: { error: (error as JsonObject).message }, + pairedItem: { item: g.originalIndex }, + }); + } + continue; + } + throw error; + } + } + + returnData.sort((a, b) => { + const aIdx = (a.pairedItem as { item: number }).item; + const bIdx = (b.pairedItem as { item: number }).item; + return aIdx - bIdx; + }); + } else { + let responseData: IDataObject[] = []; + try { + // Prepare the data to insert and copy it to be returned + const fields = prepareFields(this.getNodeParameter('fields', 0) as string); + const useDotNotation = this.getNodeParameter( + 'options.useDotNotation', + 0, + false, + ) as boolean; + const dateFields = prepareFields( + this.getNodeParameter('options.dateFields', 0, '') as string, + ); + + const insertItems = prepareItems({ + items, + fields, + updateKey: '', + useDotNotation, + dateFields, + }); + + const { insertedIds } = await mdb + .collection(this.getNodeParameter('collection', 0) as string) + .insertMany(insertItems); + + // Add the id to the data + for (const i of Object.keys(insertedIds)) { + responseData.push({ + ...insertItems[parseInt(i, 10)], + id: insertedIds[parseInt(i, 10)] as unknown as string, + }); + } + } catch (error) { + if (this.continueOnFail()) { + responseData = [{ error: (error as JsonObject).message }]; + } else { + throw error; + } + } + + returnData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(responseData), + { itemData: fallbackPairedItems }, + ); + } } if (operation === 'update') { fallbackPairedItems = fallbackPairedItems ?? generatePairedItemData(items.length); - const fields = prepareFields(this.getNodeParameter('fields', 0) as string); - const useDotNotation = this.getNodeParameter('options.useDotNotation', 0, false) as boolean; - const dateFields = prepareFields( - this.getNodeParameter('options.dateFields', 0, '') as string, - ); + if (nodeVersion >= 1.3) { + for (let i = 0; i < itemsLength; i++) { + const fields = prepareFields(this.getNodeParameter('fields', i) as string); + const useDotNotation = this.getNodeParameter( + 'options.useDotNotation', + i, + false, + ) as boolean; + const dateFields = prepareFields( + this.getNodeParameter('options.dateFields', i, '') as string, + ); + const updateKey = ((this.getNodeParameter('updateKey', i) as string) || '').trim(); + const updateOptions = (this.getNodeParameter('upsert', i) as boolean) + ? { upsert: true } + : undefined; + const [item] = prepareItems({ + items: [items[i]], + fields, + updateKey, + useDotNotation, + dateFields, + isUpdate: true, + }); - const updateKey = ((this.getNodeParameter('updateKey', 0) as string) || '').trim(); - - const updateOptions = (this.getNodeParameter('upsert', 0) as boolean) - ? { upsert: true } - : undefined; - - const updateItems = prepareItems({ - items, - fields, - updateKey, - useDotNotation, - dateFields, - isUpdate: nodeVersion >= 1.2, - }); - - for (const item of updateItems) { - try { - const filter = { [updateKey]: item[updateKey] }; - if (updateKey === '_id') { - filter[updateKey] = new ObjectId(item[updateKey] as string); - delete item._id; + if (!item) { + if (this.continueOnFail()) { + returnData.push({ + json: { error: 'Item is missing the updateKey field' }, + pairedItem: { item: i }, + }); + continue; + } + throw new NodeOperationError(this.getNode(), 'Item is missing the updateKey field', { + itemIndex: i, + }); } - await mdb - .collection(this.getNodeParameter('collection', 0) as string) - .updateOne(filter, { $set: item }, updateOptions as UpdateOptions); - } catch (error) { - if (this.continueOnFail()) { - item.json = { error: (error as JsonObject).message }; - continue; + try { + const filter = { [updateKey]: item[updateKey] }; + if (updateKey === '_id') { + filter[updateKey] = new ObjectId(item[updateKey] as string); + delete item._id; + } + + await mdb + .collection(this.getNodeParameter('collection', i) as string) + .updateOne(filter, { $set: item }, updateOptions as UpdateOptions); + + returnData.push({ json: item, pairedItem: { item: i } }); + } catch (error) { + if (this.continueOnFail()) { + returnData.push({ + json: { error: (error as JsonObject).message }, + pairedItem: { item: i }, + }); + continue; + } + throw error; } - throw error; } - } + } else { + const fields = prepareFields(this.getNodeParameter('fields', 0) as string); + const useDotNotation = this.getNodeParameter( + 'options.useDotNotation', + 0, + false, + ) as boolean; + const dateFields = prepareFields( + this.getNodeParameter('options.dateFields', 0, '') as string, + ); - returnData = this.helpers.constructExecutionMetaData( - this.helpers.returnJsonArray(updateItems), - { itemData: fallbackPairedItems }, - ); + const updateKey = ((this.getNodeParameter('updateKey', 0) as string) || '').trim(); + + const updateOptions = (this.getNodeParameter('upsert', 0) as boolean) + ? { upsert: true } + : undefined; + + const updateItems = prepareItems({ + items, + fields, + updateKey, + useDotNotation, + dateFields, + isUpdate: nodeVersion >= 1.2, + }); + + for (const item of updateItems) { + try { + const filter = { [updateKey]: item[updateKey] }; + if (updateKey === '_id') { + filter[updateKey] = new ObjectId(item[updateKey] as string); + delete item._id; + } + + await mdb + .collection(this.getNodeParameter('collection', 0) as string) + .updateOne(filter, { $set: item }, updateOptions as UpdateOptions); + } catch (error) { + if (this.continueOnFail()) { + item.json = { error: (error as JsonObject).message }; + continue; + } + throw error; + } + } + + returnData = this.helpers.constructExecutionMetaData( + this.helpers.returnJsonArray(updateItems), + { itemData: fallbackPairedItems }, + ); + } } if (operation === 'listSearchIndexes') { diff --git a/packages/nodes-base/nodes/MongoDb/test/MongoDB.test.ts b/packages/nodes-base/nodes/MongoDb/test/MongoDB.test.ts index 6e24a372c92..207b4f80488 100644 --- a/packages/nodes-base/nodes/MongoDb/test/MongoDB.test.ts +++ b/packages/nodes-base/nodes/MongoDb/test/MongoDB.test.ts @@ -1,6 +1,19 @@ import { NodeTestHarness } from '@nodes-testing/node-test-harness'; -import { Collection, MongoClient } from 'mongodb'; -import type { INodeParameters, WorkflowTestData } from 'n8n-workflow'; +import { mockDeep } from 'jest-mock-extended'; +import { Collection, Db, MongoClient, ObjectId } from 'mongodb'; +import { constructExecutionMetaData, returnJsonArray } from 'n8n-core'; +import type { + IExecuteFunctions, + INode, + INodeParameters, + NodeParameterValueType, + WorkflowTestData, +} from 'n8n-workflow'; + +import { MongoDb } from '../MongoDb.node'; + +const manualTriggerName = 'When clicking "Execute Workflow"'; +const searchIndexName = 'my-index'; MongoClient.connect = async function () { const driverInfo = { @@ -8,7 +21,7 @@ MongoClient.connect = async function () { version: '1.2', }; const client = new MongoClient('mongodb://localhost:27017', { driverInfo }); - return client; + return await Promise.resolve(client); }; function buildWorkflow({ @@ -23,7 +36,7 @@ function buildWorkflow({ { parameters: {}, id: '8b7bb389-e4ef-424a-bca1-e7ead60e43eb', - name: 'When clicking "Execute Workflow"', + name: manualTriggerName, type: 'n8n-nodes-base.manualTrigger', typeVersion: 1, position: [740, 380], @@ -44,7 +57,7 @@ function buildWorkflow({ }, ], connections: { - 'When clicking "Execute Workflow"': { + [manualTriggerName]: { main: [ [ { @@ -69,14 +82,462 @@ function buildWorkflow({ return test; } +const inputItems = [ + { json: { id: '1', value: 'first', collection: 'collection-1' } }, + { json: { id: '2', value: 'second', collection: 'collection-2' } }, + { json: { id: '3', value: 'third', collection: 'collection-3' } }, +]; + +function mockExecuteFunctions(typeVersion: number, operation: string) { + const executeFunctions = mockDeep(); + + executeFunctions.getCredentials.mockResolvedValue({ + configurationType: 'connectionString', + connectionString: 'mongodb://localhost:27017', + database: 'test', + }); + executeFunctions.getNode.mockReturnValue({ typeVersion } as INode); + executeFunctions.getInputData.mockReturnValue(inputItems); + executeFunctions.continueOnFail.mockReturnValue(false); + executeFunctions.helpers.returnJsonArray.mockImplementation(returnJsonArray); + executeFunctions.helpers.constructExecutionMetaData.mockImplementation( + constructExecutionMetaData, + ); + executeFunctions.getNodeParameter.mockImplementation( + (parameterName: string, itemIndex = 0, fallbackValue?: NodeParameterValueType) => { + switch (parameterName) { + case 'operation': + return operation; + case 'collection': + return inputItems[itemIndex].json.collection; + case 'fields': + return 'id,value'; + case 'updateKey': + return 'id'; + case 'upsert': + return false; + case 'options.useDotNotation': + return false; + case 'options.dateFields': + return ''; + default: + return fallbackValue; + } + }, + ); + + return executeFunctions; +} + +function collectionNames(collectionSpy: jest.SpyInstance): string[] { + return collectionSpy.mock.calls.reduce((names, call) => { + const [collectionName] = call as unknown[]; + + if (typeof collectionName === 'string') { + names.push(collectionName); + } + + return names; + }, []); +} + +function searchIndexOperationResult(indexName: string) { + return { json: { [indexName]: true } }; +} + describe('MongoDB CRUD Node', () => { const testHarness = new NodeTestHarness(); + describe('document operations in version 1.3', () => { + let collectionSpy: jest.SpyInstance; + const node = new MongoDb(); + + beforeEach(() => { + collectionSpy = jest.spyOn(Db.prototype, 'collection'); + }); + + afterEach(() => { + collectionSpy.mockRestore(); + jest.clearAllMocks(); + }); + + it('groups insert items by collection and uses insertMany per group', async () => { + const insertOneSpy = jest.spyOn(Collection.prototype, 'insertOne'); + const insertManySpy = jest.spyOn(Collection.prototype, 'insertMany'); + insertManySpy.mockResolvedValue({ + acknowledged: true, + insertedCount: 1, + insertedIds: { 0: new ObjectId() }, + }); + + await node.execute.call(mockExecuteFunctions(1.3, 'insert')); + + // Each item goes to a different collection → 3 groups → 3 insertMany calls + expect(collectionNames(collectionSpy)).toEqual([ + 'collection-1', + 'collection-2', + 'collection-3', + ]); + expect(insertManySpy).toHaveBeenCalledTimes(3); + expect(insertOneSpy).not.toHaveBeenCalled(); + }); + + it('uses a single insertMany when all items share the same collection', async () => { + const insertManySpy = jest.spyOn(Collection.prototype, 'insertMany'); + insertManySpy.mockResolvedValue({ + acknowledged: true, + insertedCount: 3, + insertedIds: Object.fromEntries( + [new ObjectId(), new ObjectId(), new ObjectId()].map((id, index) => [index, id]), + ), + }); + + const sameCollectionMock = mockExecuteFunctions(1.3, 'insert'); + sameCollectionMock.getNodeParameter.mockImplementation( + (parameterName: string, _itemIndex = 0, fallbackValue?: NodeParameterValueType) => { + switch (parameterName) { + case 'operation': + return 'insert'; + case 'collection': + return 'shared-collection'; + case 'fields': + return 'id,value'; + case 'options.useDotNotation': + return false; + case 'options.dateFields': + return ''; + default: + return fallbackValue; + } + }, + ); + + await node.execute.call(sameCollectionMock); + + expect(insertManySpy).toHaveBeenCalledTimes(1); + expect(insertManySpy).toHaveBeenCalledWith(expect.arrayContaining([expect.any(Object)])); + }); + + it('pairs each insert output item to its input item', async () => { + const insertManySpy = jest.spyOn(Collection.prototype, 'insertMany'); + insertManySpy.mockResolvedValue({ + acknowledged: true, + insertedCount: 1, + insertedIds: { 0: new ObjectId() }, + }); + + const [items] = await node.execute.call(mockExecuteFunctions(1.3, 'insert')); + + expect(items[0].pairedItem).toEqual({ item: 0 }); + expect(items[1].pairedItem).toEqual({ item: 1 }); + expect(items[2].pairedItem).toEqual({ item: 2 }); + }); + + it('preserves input order when insert items span multiple collections', async () => { + // items[0] → col1, items[1] → col2, items[2] → col1 + // groups: {col1: [0,2], col2: [1]} — without sort output would be [0,2,1] + const interleavedItems = [ + { json: { id: '1', value: 'first', collection: 'col1' } }, + { json: { id: '2', value: 'second', collection: 'col2' } }, + { json: { id: '3', value: 'third', collection: 'col1' } }, + ]; + + const insertManySpy = jest.spyOn(Collection.prototype, 'insertMany'); + insertManySpy + .mockResolvedValueOnce({ + acknowledged: true, + insertedCount: 2, + insertedIds: { 0: new ObjectId(), 1: new ObjectId() }, + }) + .mockResolvedValueOnce({ + acknowledged: true, + insertedCount: 1, + insertedIds: { 0: new ObjectId() }, + }); + + const mock = mockExecuteFunctions(1.3, 'insert'); + mock.getInputData.mockReturnValue(interleavedItems); + mock.getNodeParameter.mockImplementation( + (parameterName: string, itemIndex = 0, fallbackValue?: NodeParameterValueType) => { + switch (parameterName) { + case 'operation': + return 'insert'; + case 'collection': + return interleavedItems[itemIndex].json.collection; + case 'fields': + return 'id,value'; + case 'options.useDotNotation': + return false; + case 'options.dateFields': + return ''; + default: + return fallbackValue; + } + }, + ); + + const [items] = await node.execute.call(mock); + + expect(items[0].pairedItem).toEqual({ item: 0 }); + expect(items[1].pairedItem).toEqual({ item: 1 }); + expect(items[2].pairedItem).toEqual({ item: 2 }); + }); + + it('resolves update collections against each input item', async () => { + const updateOneSpy = jest.spyOn(Collection.prototype, 'updateOne'); + updateOneSpy.mockResolvedValue({ + acknowledged: true, + matchedCount: 1, + modifiedCount: 1, + upsertedCount: 0, + upsertedId: null, + }); + + await node.execute.call(mockExecuteFunctions(1.3, 'update')); + + expect(collectionNames(collectionSpy)).toEqual([ + 'collection-1', + 'collection-2', + 'collection-3', + ]); + expect(updateOneSpy).toHaveBeenCalledTimes(3); + }); + + it('pairs each update output item to its input item', async () => { + const updateOneSpy = jest.spyOn(Collection.prototype, 'updateOne'); + updateOneSpy.mockResolvedValue({ + acknowledged: true, + matchedCount: 1, + modifiedCount: 1, + upsertedCount: 0, + upsertedId: null, + }); + + const [items] = await node.execute.call(mockExecuteFunctions(1.3, 'update')); + + expect(items[0].pairedItem).toEqual({ item: 0 }); + expect(items[1].pairedItem).toEqual({ item: 1 }); + expect(items[2].pairedItem).toEqual({ item: 2 }); + }); + + it('resolves find-and-update collections against each input item', async () => { + const findOneAndUpdateSpy = jest.spyOn(Collection.prototype, 'findOneAndUpdate'); + findOneAndUpdateSpy.mockResolvedValue(null); + + await node.execute.call(mockExecuteFunctions(1.3, 'findOneAndUpdate')); + + expect(collectionNames(collectionSpy)).toEqual([ + 'collection-1', + 'collection-2', + 'collection-3', + ]); + expect(findOneAndUpdateSpy).toHaveBeenCalledTimes(3); + }); + + it('pairs each find-and-update output item to its input item', async () => { + const findOneAndUpdateSpy = jest.spyOn(Collection.prototype, 'findOneAndUpdate'); + findOneAndUpdateSpy.mockResolvedValue(null); + + const [items] = await node.execute.call(mockExecuteFunctions(1.3, 'findOneAndUpdate')); + + expect(items[0].pairedItem).toEqual({ item: 0 }); + expect(items[1].pairedItem).toEqual({ item: 1 }); + expect(items[2].pairedItem).toEqual({ item: 2 }); + }); + + it('resolves find-and-replace collections against each input item', async () => { + const findOneAndReplaceSpy = jest.spyOn(Collection.prototype, 'findOneAndReplace'); + findOneAndReplaceSpy.mockResolvedValue(null); + + await node.execute.call(mockExecuteFunctions(1.3, 'findOneAndReplace')); + + expect(collectionNames(collectionSpy)).toEqual([ + 'collection-1', + 'collection-2', + 'collection-3', + ]); + expect(findOneAndReplaceSpy).toHaveBeenCalledTimes(3); + }); + + it('pairs each find-and-replace output item to its input item', async () => { + const findOneAndReplaceSpy = jest.spyOn(Collection.prototype, 'findOneAndReplace'); + findOneAndReplaceSpy.mockResolvedValue(null); + + const [items] = await node.execute.call(mockExecuteFunctions(1.3, 'findOneAndReplace')); + + expect(items[0].pairedItem).toEqual({ item: 0 }); + expect(items[1].pairedItem).toEqual({ item: 1 }); + expect(items[2].pairedItem).toEqual({ item: 2 }); + }); + + describe.each(['findOneAndReplace', 'findOneAndUpdate', 'update'])( + '%s: item missing the updateKey field', + (operation) => { + const itemsMissingKey = [{ json: { value: 'no-id-field', collection: 'col1' } }]; + + function mockMissingKey(continueOnFail: boolean) { + const mock = mockExecuteFunctions(1.3, operation); + mock.getInputData.mockReturnValue(itemsMissingKey); + mock.continueOnFail.mockReturnValue(continueOnFail); + mock.getNodeParameter.mockImplementation( + (parameterName: string, _itemIndex = 0, fallbackValue?: NodeParameterValueType) => { + switch (parameterName) { + case 'operation': + return operation; + case 'collection': + return 'col1'; + case 'fields': + return 'value'; + case 'updateKey': + return 'id'; + case 'upsert': + return false; + case 'options.useDotNotation': + return false; + case 'options.dateFields': + return ''; + default: + return fallbackValue; + } + }, + ); + return mock; + } + + // The !item check fires before any DB call, so no collection spy is needed + it('throws NodeOperationError when continueOnFail is off', async () => { + await expect(node.execute.call(mockMissingKey(false))).rejects.toThrow( + 'Item is missing the updateKey field', + ); + }); + + it('pushes error item with pairedItem when continueOnFail is on', async () => { + const [items] = await node.execute.call(mockMissingKey(true)); + expect(items).toHaveLength(1); + expect(items[0].json.error).toBe('Item is missing the updateKey field'); + expect(items[0].pairedItem).toEqual({ item: 0 }); + }); + }, + ); + }); + + describe('document operations in version 1.2', () => { + let collectionSpy: jest.SpyInstance; + const node = new MongoDb(); + + beforeEach(() => { + collectionSpy = jest.spyOn(Db.prototype, 'collection'); + }); + + afterEach(() => { + collectionSpy.mockRestore(); + jest.clearAllMocks(); + }); + + it('keeps insert using the first item collection', async () => { + const insertOneSpy = jest.spyOn(Collection.prototype, 'insertOne'); + const insertManySpy = jest.spyOn(Collection.prototype, 'insertMany'); + insertManySpy.mockResolvedValue({ + acknowledged: true, + insertedCount: 3, + insertedIds: Object.fromEntries( + [new ObjectId(), new ObjectId(), new ObjectId()].map((id, index) => [index, id]), + ), + }); + + await node.execute.call(mockExecuteFunctions(1.2, 'insert')); + + expect(collectionNames(collectionSpy)).toEqual(['collection-1']); + expect(insertManySpy).toHaveBeenCalledTimes(1); + expect(insertOneSpy).not.toHaveBeenCalled(); + }); + + it('pairs all insert output items to all input items as fallback', async () => { + const insertManySpy = jest.spyOn(Collection.prototype, 'insertMany'); + insertManySpy.mockResolvedValue({ + acknowledged: true, + insertedCount: 3, + insertedIds: Object.fromEntries( + [new ObjectId(), new ObjectId(), new ObjectId()].map((id, index) => [index, id]), + ), + }); + + const [items] = await node.execute.call(mockExecuteFunctions(1.2, 'insert')); + + const fallbackPairedItems = [{ item: 0 }, { item: 1 }, { item: 2 }]; + expect(items[0].pairedItem).toEqual(fallbackPairedItems); + expect(items[1].pairedItem).toEqual(fallbackPairedItems); + expect(items[2].pairedItem).toEqual(fallbackPairedItems); + }); + + it('keeps update using the first item collection', async () => { + const updateOneSpy = jest.spyOn(Collection.prototype, 'updateOne'); + updateOneSpy.mockResolvedValue({ + acknowledged: true, + matchedCount: 1, + modifiedCount: 1, + upsertedCount: 0, + upsertedId: null, + }); + + await node.execute.call(mockExecuteFunctions(1.2, 'update')); + + expect(collectionNames(collectionSpy)).toEqual([ + 'collection-1', + 'collection-1', + 'collection-1', + ]); + expect(updateOneSpy).toHaveBeenCalledTimes(3); + }); + + it('pairs all update output items to all input items as fallback', async () => { + const updateOneSpy = jest.spyOn(Collection.prototype, 'updateOne'); + updateOneSpy.mockResolvedValue({ + acknowledged: true, + matchedCount: 1, + modifiedCount: 1, + upsertedCount: 0, + upsertedId: null, + }); + + const [items] = await node.execute.call(mockExecuteFunctions(1.2, 'update')); + + const fallbackPairedItems = [{ item: 0 }, { item: 1 }, { item: 2 }]; + expect(items[0].pairedItem).toEqual(fallbackPairedItems); + expect(items[1].pairedItem).toEqual(fallbackPairedItems); + expect(items[2].pairedItem).toEqual(fallbackPairedItems); + }); + + it('pairs all find-and-update output items to all input items as fallback', async () => { + const findOneAndUpdateSpy = jest.spyOn(Collection.prototype, 'findOneAndUpdate'); + findOneAndUpdateSpy.mockResolvedValue(null); + + const [items] = await node.execute.call(mockExecuteFunctions(1.2, 'findOneAndUpdate')); + + const fallbackPairedItems = [{ item: 0 }, { item: 1 }, { item: 2 }]; + expect(items[0].pairedItem).toEqual(fallbackPairedItems); + expect(items[1].pairedItem).toEqual(fallbackPairedItems); + expect(items[2].pairedItem).toEqual(fallbackPairedItems); + }); + + it('pairs all find-and-replace output items to all input items as fallback', async () => { + const findOneAndReplaceSpy = jest.spyOn(Collection.prototype, 'findOneAndReplace'); + findOneAndReplaceSpy.mockResolvedValue(null); + + const [items] = await node.execute.call(mockExecuteFunctions(1.2, 'findOneAndReplace')); + + const fallbackPairedItems = [{ item: 0 }, { item: 1 }, { item: 2 }]; + expect(items[0].pairedItem).toEqual(fallbackPairedItems); + expect(items[1].pairedItem).toEqual(fallbackPairedItems); + expect(items[2].pairedItem).toEqual(fallbackPairedItems); + }); + }); + describe('createSearchIndex operation', () => { const spy: jest.SpyInstance = jest.spyOn(Collection.prototype, 'createSearchIndex'); afterAll(() => jest.restoreAllMocks()); beforeAll(() => { - spy.mockResolvedValueOnce('my-index'); + spy.mockResolvedValueOnce(searchIndexName); }); testHarness.setupTest( @@ -87,15 +548,15 @@ describe('MongoDB CRUD Node', () => { collection: 'foo', indexType: 'vectorSearch', indexDefinition: JSON.stringify({ mappings: {} }), - indexNameRequired: 'my-index', + indexNameRequired: searchIndexName, }, - expectedResult: [{ json: { indexName: 'my-index' } }], + expectedResult: [{ json: { indexName: searchIndexName } }], }), ); it('calls the spy with the expected arguments', function () { expect(spy).toBeCalledWith({ - name: 'my-index', + name: searchIndexName, definition: { mappings: {} }, type: 'vectorSearch', }); @@ -108,7 +569,7 @@ describe('MongoDB CRUD Node', () => { beforeAll(() => { spy = jest.spyOn(Collection.prototype, 'listSearchIndexes'); const mockCursor = { - toArray: async () => [], + toArray: async () => await Promise.resolve([]), }; spy.mockReturnValue(mockCursor); }); @@ -136,7 +597,7 @@ describe('MongoDB CRUD Node', () => { beforeAll(() => { spy = jest.spyOn(Collection.prototype, 'listSearchIndexes'); const mockCursor = { - toArray: async () => [], + toArray: async () => await Promise.resolve([]), }; spy.mockReturnValue(mockCursor); }); @@ -149,14 +610,14 @@ describe('MongoDB CRUD Node', () => { resource: 'searchIndexes', operation: 'listSearchIndexes', collection: 'foo', - indexName: 'my-index', + indexName: searchIndexName, }, expectedResult: [], }), ); it('calls the spy with the expected arguments', function () { - expect(spy).toHaveBeenCalledWith('my-index'); + expect(spy).toHaveBeenCalledWith(searchIndexName); }); }); @@ -165,7 +626,8 @@ describe('MongoDB CRUD Node', () => { beforeAll(() => { spy = jest.spyOn(Collection.prototype, 'listSearchIndexes'); const mockCursor = { - toArray: async () => [{ name: 'my-index' }, { name: 'my-index-2' }], + toArray: async () => + await Promise.resolve([{ name: searchIndexName }, { name: 'my-index-2' }]), }; spy.mockReturnValue(mockCursor); }); @@ -178,11 +640,11 @@ describe('MongoDB CRUD Node', () => { operation: 'listSearchIndexes', resource: 'searchIndexes', collection: 'foo', - indexName: 'my-index', + indexName: searchIndexName, }, expectedResult: [ { - json: { name: 'my-index' }, + json: { name: searchIndexName }, }, { json: { name: 'my-index-2' }, @@ -207,14 +669,14 @@ describe('MongoDB CRUD Node', () => { operation: 'dropSearchIndex', resource: 'searchIndexes', collection: 'foo', - indexNameRequired: 'my-index', + indexNameRequired: searchIndexName, }, - expectedResult: [{ json: { 'my-index': true } }], + expectedResult: [searchIndexOperationResult(searchIndexName)], }), ); it('calls the spy with the expected arguments', function () { - expect(spy).toBeCalledWith('my-index'); + expect(spy).toBeCalledWith(searchIndexName); }); }); @@ -232,19 +694,19 @@ describe('MongoDB CRUD Node', () => { operation: 'updateSearchIndex', resource: 'searchIndexes', collection: 'foo', - indexNameRequired: 'my-index', + indexNameRequired: searchIndexName, indexDefinition: JSON.stringify({ mappings: { dynamic: true, }, }), }, - expectedResult: [{ json: { 'my-index': true } }], + expectedResult: [searchIndexOperationResult(searchIndexName)], }), ); it('calls the spy with the expected arguments', function () { - expect(spy).toBeCalledWith('my-index', { mappings: { dynamic: true } }); + expect(spy).toBeCalledWith(searchIndexName, { mappings: { dynamic: true } }); }); }); });