diff --git a/.github/workflows/test-workflows.yml b/.github/workflows/test-workflows.yml new file mode 100644 index 00000000000..8250a5a9b44 --- /dev/null +++ b/.github/workflows/test-workflows.yml @@ -0,0 +1,85 @@ +name: Run test workflows + +on: + schedule: + - cron: "0 2 * * *" + workflow_dispatch: + + +jobs: + run-test-workflows: + + runs-on: ubuntu-latest + + strategy: + matrix: + node-version: [14.x] + steps: + - + name: Checkout + uses: actions/checkout@v2 + with: + path: n8n + - + name: Checkout workflows repo + uses: actions/checkout@v2 + with: + repository: n8n-io/test-workflows + path: test-workflows + - + name: Use Node.js ${{ matrix.node-version }} + uses: actions/setup-node@v1 + with: + node-version: ${{ matrix.node-version }} + - + name: npm install and build + run: | + cd n8n + npm install + npm run bootstrap + npm run build --if-present + env: + CI: true + shell: bash + - + name: Import credentials + run: n8n/packages/cli/bin/n8n import:credentials --input=test-workflows/credentials.json + shell: bash + env: + N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}} + - + name: Import workflows + run: n8n/packages/cli/bin/n8n import:workflow --separate --input=test-workflows/workflows + shell: bash + env: + N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}} + - + name: Copy static assets + run: | + cp n8n/assets/n8n-logo.png /tmp/n8n-logo.png + cp n8n/assets/n8n-screenshot.png /tmp/n8n-screenshot.png + cp n8n/node_modules/pdf-parse/test/data/05-versions-space.pdf /tmp/05-versions-space.pdf + cp n8n/node_modules/pdf-parse/test/data/04-valid.pdf /tmp/04-valid.pdf + shell: bash + - + name: Run tests + run: n8n/packages/cli/bin/n8n executeBatch --shallow --skipList=test-workflows/skipList.txt --shortOutput --concurrency=16 --compare=test-workflows/snapshots + shell: bash + env: + N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}} + - + name: Export credentials + if: always() + run: n8n/packages/cli/bin/n8n export:credentials --output=test-workflows/credentials.json --all --pretty + shell: bash + env: + N8N_ENCRYPTION_KEY: ${{secrets.ENCRYPTION_KEY}} + - + name: Commit and push credential changes + if: always() + run: | + cd test-workflows + git config --global user.name 'n8n test bot' + git config --global user.email 'n8n-test-bot@users.noreply.github.com' + git commit -am "Automated credential update" + git push --force --quiet "https://janober:${{ secrets.TOKEN }}@github.com/n8n-io/test-workflows.git" main:main diff --git a/packages/cli/BREAKING-CHANGES.md b/packages/cli/BREAKING-CHANGES.md index 2a762e04768..a7ac4db33b7 100644 --- a/packages/cli/BREAKING-CHANGES.md +++ b/packages/cli/BREAKING-CHANGES.md @@ -2,6 +2,16 @@ This list shows all the versions which include breaking changes and how to upgrade. +## 0.127.0 + +### What changed? + +For the Zoho node, the `lead:create` operation now requires a "Company" parameter, the parameter "Address" is now inside "Additional Options", and the parameters "Title" and "Is Duplicate Record" were removed. Also, the `lead:delete` operation now returns only the `id` of the deleted lead. + +### When is action necessary? + +If you are using `lead:create` with "Company" or "Address", reset the parameters; for the other two parameters, no action needed. If you are using the response from `lead:delete`, reselect the `id` key. + ## 0.118.0 ### What changed? diff --git a/packages/cli/commands/Interfaces.d.ts b/packages/cli/commands/Interfaces.d.ts new file mode 100644 index 00000000000..aedd1945397 --- /dev/null +++ b/packages/cli/commands/Interfaces.d.ts @@ -0,0 +1,54 @@ +interface IResult { + totalWorkflows: number; + summary: { + failedExecutions: number, + successfulExecutions: number, + warningExecutions: number, + errors: IExecutionError[], + warnings: IExecutionError[], + }; + coveredNodes: { + [nodeType: string]: number + }; + executions: IExecutionResult[]; +} +interface IExecutionResult { + workflowId: string | number; + workflowName: string; + executionTime: number; // Given in seconds with decimals for milisseconds + finished: boolean; + executionStatus: ExecutionStatus; + error?: string; + changes?: string; + coveredNodes: { + [nodeType: string]: number + }; +} + +interface IExecutionError { + workflowId: string | number; + error: string; +} + +interface IWorkflowExecutionProgress { + workflowId: string | number; + status: ExecutionStatus; +} + +interface INodeSpecialCases { + [nodeName: string]: INodeSpecialCase; +} + +interface INodeSpecialCase { + ignoredProperties?: string[]; + capResults?: number; +} + +type ExecutionStatus = 'success' | 'error' | 'warning' | 'running'; + +declare module 'json-diff' { + interface IDiffOptions { + keysOnly?: boolean; + } + export function diff(obj1: unknown, obj2: unknown, diffOptions: IDiffOptions): string; +} diff --git a/packages/cli/commands/execute.ts b/packages/cli/commands/execute.ts index 9eafd28c9e9..48ae6a19f00 100644 --- a/packages/cli/commands/execute.ts +++ b/packages/cli/commands/execute.ts @@ -22,7 +22,7 @@ import { WorkflowRunner, } from '../src'; -import { +import { getLogger, } from '../src/Logger'; @@ -46,6 +46,9 @@ export class Execute extends Command { id: flags.string({ description: 'id of the workflow to execute', }), + rawOutput: flags.boolean({ + description: 'Outputs only JSON data, with no other text', + }), }; @@ -183,10 +186,11 @@ export class Execute extends Command { stack: error.stack, }; } - - console.info('Execution was successful:'); - console.info('===================================='); - console.info(JSON.stringify(data, null, 2)); + if (flags.rawOutput === undefined) { + this.log('Execution was successful:'); + this.log('===================================='); + } + this.log(JSON.stringify(data, null, 2)); } catch (e) { console.error('Error executing workflow. See log messages for details.'); logger.error('\nExecution error:'); diff --git a/packages/cli/commands/executeBatch.ts b/packages/cli/commands/executeBatch.ts new file mode 100644 index 00000000000..7bc6bb858d3 --- /dev/null +++ b/packages/cli/commands/executeBatch.ts @@ -0,0 +1,796 @@ +import * as fs from 'fs'; +import { + Command, + flags, +} from '@oclif/command'; + +import { + UserSettings, +} from 'n8n-core'; + +import { + INode, + INodeExecutionData, + ITaskData, +} from 'n8n-workflow'; + +import { + ActiveExecutions, + CredentialsOverwrites, + CredentialTypes, + Db, + ExternalHooks, + IExecutionsCurrentSummary, + IWorkflowDb, + IWorkflowExecutionDataProcess, + LoadNodesAndCredentials, + NodeTypes, + WorkflowCredentials, + WorkflowRunner, +} from '../src'; + +import { + sep, +} from 'path'; + +import { + diff, +} from 'json-diff'; + +import { + getLogger, +} from '../src/Logger'; + +import { + LoggerProxy, +} from 'n8n-workflow'; + +export class ExecuteBatch extends Command { + static description = '\nExecutes multiple workflows once'; + + static cancelled = false; + + static workflowExecutionsProgress: IWorkflowExecutionProgress[][]; + + static shallow = false; + + static compare: string; + + static snapshot: string; + + static concurrency = 1; + + static debug = false; + + static executionTimeout = 3 * 60 * 1000; + + static examples = [ + `$ n8n executeAll`, + `$ n8n executeAll --concurrency=10 --skipList=/data/skipList.txt`, + `$ n8n executeAll --debug --output=/data/output.json`, + `$ n8n executeAll --ids=10,13,15 --shortOutput`, + `$ n8n executeAll --snapshot=/data/snapshots --shallow`, + `$ n8n executeAll --compare=/data/previousExecutionData --retries=2`, + ]; + + static flags = { + help: flags.help({ char: 'h' }), + debug: flags.boolean({ + description: 'Toggles on displaying all errors and debug messages.', + }), + ids: flags.string({ + description: 'Specifies workflow IDs to get executed, separated by a comma.', + }), + concurrency: flags.integer({ + default: 1, + description: 'How many workflows can run in parallel. Defaults to 1 which means no concurrency.', + }), + output: flags.string({ + description: 'Enable execution saving, You must inform an existing folder to save execution via this param', + }), + snapshot: flags.string({ + description: 'Enables snapshot saving. You must inform an existing folder to save snapshots via this param.', + }), + compare: flags.string({ + description: 'Compares current execution with an existing snapshot. You must inform an existing folder where the snapshots are saved.', + }), + shallow: flags.boolean({ + description: 'Compares only if attributes output from node are the same, with no regards to neste JSON objects.', + }), + skipList: flags.string({ + description: 'File containing a comma separated list of workflow IDs to skip.', + }), + retries: flags.integer({ + description: 'Retries failed workflows up to N tries. Default is 1. Set 0 to disable.', + default: 1, + }), + shortOutput: flags.boolean({ + description: 'Omits the full execution information from output, displaying only summary.', + }), + }; + + /** + * Gracefully handles exit. + * @param {boolean} skipExit Whether to skip exit or number according to received signal + */ + static async stopProcess(skipExit: boolean | number = false) { + + if (ExecuteBatch.cancelled === true) { + process.exit(0); + } + + ExecuteBatch.cancelled = true; + const activeExecutionsInstance = ActiveExecutions.getInstance(); + const stopPromises = activeExecutionsInstance.getActiveExecutions().map(async execution => { + activeExecutionsInstance.stopExecution(execution.id); + }); + + await Promise.allSettled(stopPromises); + + setTimeout(() => { + process.exit(0); + }, 30000); + + let executingWorkflows = activeExecutionsInstance.getActiveExecutions() as IExecutionsCurrentSummary[]; + + let count = 0; + while (executingWorkflows.length !== 0) { + if (count++ % 4 === 0) { + console.log(`Waiting for ${executingWorkflows.length} active executions to finish...`); + executingWorkflows.map(execution => { + console.log(` - Execution ID ${execution.id}, workflow ID: ${execution.workflowId}`); + }); + } + await new Promise((resolve) => { + setTimeout(resolve, 500); + }); + executingWorkflows = activeExecutionsInstance.getActiveExecutions(); + } + // We may receive true but when called from `process.on` + // we get the signal (SIGNIT, etc.) + if (skipExit !== true) { + process.exit(0); + } + } + + formatJsonOutput(data: object) { + return JSON.stringify(data, null, 2); + } + + shouldBeConsideredAsWarning(errorMessage: string) { + + const warningStrings = [ + 'refresh token is invalid', + 'unable to connect to', + 'econnreset', + '429', + 'econnrefused', + 'missing a required parameter', + ]; + + errorMessage = errorMessage.toLowerCase(); + + for (let i = 0; i < warningStrings.length; i++) { + if (errorMessage.includes(warningStrings[i])) { + return true; + } + } + + return false; + } + + + async run() { + + process.on('SIGTERM', ExecuteBatch.stopProcess); + process.on('SIGINT', ExecuteBatch.stopProcess); + + const logger = getLogger(); + LoggerProxy.init(logger); + + const { flags } = this.parse(ExecuteBatch); + + ExecuteBatch.debug = flags.debug === true; + ExecuteBatch.concurrency = flags.concurrency || 1; + + const ids: number[] = []; + const skipIds: number[] = []; + + if (flags.snapshot !== undefined) { + if (fs.existsSync(flags.snapshot)) { + if (!fs.lstatSync(flags.snapshot).isDirectory()) { + console.log(`The parameter --snapshot must be an existing directory`); + return; + } + } else { + console.log(`The parameter --snapshot must be an existing directory`); + return; + } + + ExecuteBatch.snapshot = flags.snapshot; + } + if (flags.compare !== undefined) { + if (fs.existsSync(flags.compare)) { + if (!fs.lstatSync(flags.compare).isDirectory()) { + console.log(`The parameter --compare must be an existing directory`); + return; + } + } else { + console.log(`The parameter --compare must be an existing directory`); + return; + } + + ExecuteBatch.compare = flags.compare; + } + + if (flags.output !== undefined) { + if (fs.existsSync(flags.output)) { + if (fs.lstatSync(flags.output).isDirectory()) { + console.log(`The parameter --output must be a writable file`); + return; + } + } + } + + if (flags.ids !== undefined) { + const paramIds = flags.ids.split(','); + const re = /\d+/; + const matchedIds = paramIds.filter(id => id.match(re)).map(id => parseInt(id.trim(), 10)); + + if (matchedIds.length === 0) { + console.log(`The parameter --ids must be a list of numeric IDs separated by a comma.`); + return; + } + + ids.push(...matchedIds); + } + + if (flags.skipList !== undefined) { + if (fs.existsSync(flags.skipList)) { + const contents = fs.readFileSync(flags.skipList, { encoding: 'utf-8' }); + skipIds.push(...contents.split(',').map(id => parseInt(id.trim(), 10))); + } else { + console.log('Skip list file not found. Exiting.'); + return; + } + } + + if (flags.shallow === true) { + ExecuteBatch.shallow = true; + } + + + // Start directly with the init of the database to improve startup time + const startDbInitPromise = Db.init(); + + // Load all node and credential types + const loadNodesAndCredentials = LoadNodesAndCredentials(); + const loadNodesAndCredentialsPromise = loadNodesAndCredentials.init(); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); + + // Wait till the database is ready + await startDbInitPromise; + + let allWorkflows; + + const query = Db.collections!.Workflow!.createQueryBuilder('workflows'); + + if (ids.length > 0) { + query.andWhere(`workflows.id in (:...ids)`, { ids }); + } + + if (skipIds.length > 0) { + query.andWhere(`workflows.id not in (:...skipIds)`, { skipIds }); + } + + allWorkflows = await query.getMany() as IWorkflowDb[]; + + if (ExecuteBatch.debug === true) { + process.stdout.write(`Found ${allWorkflows.length} workflows to execute.\n`); + } + + // Wait till the n8n-packages have been read + await loadNodesAndCredentialsPromise; + + // Load the credentials overwrites if any exist + await CredentialsOverwrites().init(); + + // Load all external hooks + const externalHooks = ExternalHooks(); + await externalHooks.init(); + + // Add the found types to an instance other parts of the application can use + const nodeTypes = NodeTypes(); + await nodeTypes.init(loadNodesAndCredentials.nodeTypes); + const credentialTypes = CredentialTypes(); + await credentialTypes.init(loadNodesAndCredentials.credentialTypes); + + // Send a shallow copy of allWorkflows so we still have all workflow data. + const results = await this.runTests([...allWorkflows]); + + let { retries } = flags; + + while (retries > 0 && (results.summary.warningExecutions + results.summary.failedExecutions > 0) && ExecuteBatch.cancelled === false) { + const failedWorkflowIds = results.summary.errors.map(execution => execution.workflowId); + failedWorkflowIds.push(...results.summary.warnings.map(execution => execution.workflowId)); + + const newWorkflowList = allWorkflows.filter(workflow => failedWorkflowIds.includes(workflow.id)); + + const retryResults = await this.runTests(newWorkflowList); + + this.mergeResults(results, retryResults); + // By now, `results` has been updated with the new successful executions. + retries--; + } + + if (flags.output !== undefined) { + fs.writeFileSync(flags.output, this.formatJsonOutput(results)); + console.log('\nExecution finished.'); + console.log('Summary:'); + console.log(`\tSuccess: ${results.summary.successfulExecutions}`); + console.log(`\tFailures: ${results.summary.failedExecutions}`); + console.log(`\tWarnings: ${results.summary.warningExecutions}`); + console.log('\nNodes successfully tested:'); + Object.entries(results.coveredNodes).forEach(([nodeName, nodeCount]) => { + console.log(`\t${nodeName}: ${nodeCount}`); + }); + console.log('\nCheck the JSON file for more details.'); + } else { + if (flags.shortOutput === true) { + console.log(this.formatJsonOutput({ ...results, executions: results.executions.filter(execution => execution.executionStatus !== 'success') })); + } else { + console.log(this.formatJsonOutput(results)); + } + } + + await ExecuteBatch.stopProcess(true); + + if (results.summary.failedExecutions > 0) { + this.exit(1); + } + this.exit(0); + + } + + mergeResults(results: IResult, retryResults: IResult) { + + if (retryResults.summary.successfulExecutions === 0) { + // Nothing to replace. + return; + } + + // Find successful executions and replace them on previous result. + retryResults.executions.forEach(newExecution => { + if (newExecution.executionStatus === 'success') { + // Remove previous execution from list. + results.executions = results.executions.filter(previousExecutions => previousExecutions.workflowId !== newExecution.workflowId); + + const errorIndex = results.summary.errors.findIndex(summaryInformation => summaryInformation.workflowId === newExecution.workflowId); + if (errorIndex !== -1) { + // This workflow errored previously. Decrement error count. + results.summary.failedExecutions--; + // Remove from the list of errors. + results.summary.errors.splice(errorIndex, 1); + } + + const warningIndex = results.summary.warnings.findIndex(summaryInformation => summaryInformation.workflowId === newExecution.workflowId); + if (warningIndex !== -1) { + // This workflow errored previously. Decrement error count. + results.summary.warningExecutions--; + // Remove from the list of errors. + results.summary.warnings.splice(warningIndex, 1); + } + // Increment successful executions count and push it to all executions array. + results.summary.successfulExecutions++; + results.executions.push(newExecution); + } + }); + } + + async runTests(allWorkflows: IWorkflowDb[]): Promise { + const result: IResult = { + totalWorkflows: allWorkflows.length, + summary: { + failedExecutions: 0, + warningExecutions: 0, + successfulExecutions: 0, + errors: [], + warnings: [], + }, + coveredNodes: {}, + executions: [], + }; + + if (ExecuteBatch.debug) { + this.initializeLogs(); + } + + return new Promise(async (res) => { + const promisesArray = []; + for (let i = 0; i < ExecuteBatch.concurrency; i++) { + const promise = new Promise(async (resolve) => { + let workflow: IWorkflowDb | undefined; + while (allWorkflows.length > 0) { + workflow = allWorkflows.shift(); + if (ExecuteBatch.cancelled === true) { + process.stdout.write(`Thread ${i + 1} resolving and quitting.`); + resolve(true); + break; + } + // This if shouldn't be really needed + // but it's a concurrency precaution. + if (workflow === undefined) { + resolve(true); + return; + } + + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow.id, + status: 'running', + }); + this.updateStatus(); + } + + await this.startThread(workflow).then((executionResult) => { + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].pop(); + } + result.executions.push(executionResult); + if (executionResult.executionStatus === 'success') { + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow!.id, + status: 'success', + }); + this.updateStatus(); + } + result.summary.successfulExecutions++; + const nodeNames = Object.keys(executionResult.coveredNodes); + + nodeNames.map(nodeName => { + if (result.coveredNodes[nodeName] === undefined) { + result.coveredNodes[nodeName] = 0; + } + result.coveredNodes[nodeName] += executionResult.coveredNodes[nodeName]; + }); + } else if (executionResult.executionStatus === 'warning') { + result.summary.warningExecutions++; + result.summary.warnings.push({ + workflowId: executionResult.workflowId, + error: executionResult.error!, + }); + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow!.id, + status: 'warning', + }); + this.updateStatus(); + } + } else if (executionResult.executionStatus === 'error') { + result.summary.failedExecutions++; + result.summary.errors.push({ + workflowId: executionResult.workflowId, + error: executionResult.error!, + }); + if (ExecuteBatch.debug) { + ExecuteBatch.workflowExecutionsProgress[i].push({ + workflowId: workflow!.id, + status: 'error', + }); + this.updateStatus(); + } + } else { + throw new Error('Wrong execution status - cannot proceed'); + } + }); + } + + resolve(true); + }); + + promisesArray.push(promise); + } + + await Promise.allSettled(promisesArray); + + res(result); + }); + } + + updateStatus() { + + if (ExecuteBatch.cancelled === true) { + return; + } + + if (process.stdout.isTTY === true) { + process.stdout.moveCursor(0, - (ExecuteBatch.concurrency)); + process.stdout.cursorTo(0); + process.stdout.clearLine(0); + } + + + ExecuteBatch.workflowExecutionsProgress.map((concurrentThread, index) => { + let message = `${index + 1}: `; + concurrentThread.map((executionItem, workflowIndex) => { + let openColor = '\x1b[0m'; + const closeColor = '\x1b[0m'; + switch (executionItem.status) { + case 'success': + openColor = '\x1b[32m'; + break; + case 'error': + openColor = '\x1b[31m'; + break; + case 'warning': + openColor = '\x1b[33m'; + break; + default: + break; + } + message += (workflowIndex > 0 ? ', ' : '') + `${openColor}${executionItem.workflowId}${closeColor}`; + }); + if (process.stdout.isTTY === true) { + process.stdout.cursorTo(0); + process.stdout.clearLine(0); + } + process.stdout.write(message + '\n'); + }); + } + + initializeLogs() { + process.stdout.write('**********************************************\n'); + process.stdout.write(' n8n test workflows\n'); + process.stdout.write('**********************************************\n'); + process.stdout.write('\n'); + process.stdout.write('Batch number:\n'); + ExecuteBatch.workflowExecutionsProgress = []; + for (let i = 0; i < ExecuteBatch.concurrency; i++) { + ExecuteBatch.workflowExecutionsProgress.push([]); + process.stdout.write(`${i + 1}: \n`); + } + } + + startThread(workflowData: IWorkflowDb): Promise { + // This will be the object returned by the promise. + // It will be updated according to execution progress below. + const executionResult: IExecutionResult = { + workflowId: workflowData.id, + workflowName: workflowData.name, + executionTime: 0, + finished: false, + executionStatus: 'running', + coveredNodes: {}, + }; + + + + const requiredNodeTypes = ['n8n-nodes-base.start']; + let startNode: INode | undefined = undefined; + for (const node of workflowData.nodes) { + if (requiredNodeTypes.includes(node.type)) { + startNode = node; + break; + } + } + + // We have a cool feature here. + // On each node, on the Settings tab in the node editor you can change + // the `Notes` field to add special cases for comparison and snapshots. + // You need to set one configuration per line with the following possible keys: + // CAP_RESULTS_LENGTH=x where x is a number. Cap the number of rows from this node to x. + // This means if you set CAP_RESULTS_LENGTH=1 we will have only 1 row in the output + // IGNORED_PROPERTIES=x,y,z where x, y and z are JSON property names. Removes these + // properties from the JSON object (useful for optional properties that can + // cause the comparison to detect changes when not true). + const nodeEdgeCases = {} as INodeSpecialCases; + workflowData.nodes.forEach(node => { + executionResult.coveredNodes[node.type] = (executionResult.coveredNodes[node.type] || 0) + 1; + if (node.notes !== undefined && node.notes !== '') { + node.notes.split('\n').forEach(note => { + const parts = note.split('='); + if (parts.length === 2) { + if (nodeEdgeCases[node.name] === undefined) { + nodeEdgeCases[node.name] = {} as INodeSpecialCase; + } + if (parts[0] === 'CAP_RESULTS_LENGTH') { + nodeEdgeCases[node.name].capResults = parseInt(parts[1], 10); + } else if (parts[0] === 'IGNORED_PROPERTIES') { + nodeEdgeCases[node.name].ignoredProperties = parts[1].split(',').map(property => property.trim()); + } + } + }); + } + }); + + return new Promise(async (resolve) => { + if (startNode === undefined) { + // If the workflow does not contain a start-node we can not know what + // should be executed and with which data to start. + executionResult.error = 'Workflow cannot be started as it does not contain a "Start" node.'; + executionResult.executionStatus = 'warning'; + resolve(executionResult); + } + + let gotCancel = false; + + // Timeouts execution after 5 minutes. + const timeoutTimer = setTimeout(() => { + gotCancel = true; + executionResult.error = 'Workflow execution timed out.'; + executionResult.executionStatus = 'warning'; + resolve(executionResult); + }, ExecuteBatch.executionTimeout); + + + try { + const credentials = await WorkflowCredentials(workflowData!.nodes); + + const runData: IWorkflowExecutionDataProcess = { + credentials, + executionMode: 'cli', + startNodes: [startNode!.name], + workflowData: workflowData!, + }; + + const workflowRunner = new WorkflowRunner(); + const executionId = await workflowRunner.run(runData); + + const activeExecutions = ActiveExecutions.getInstance(); + const data = await activeExecutions.getPostExecutePromise(executionId); + if (gotCancel || ExecuteBatch.cancelled === true) { + clearTimeout(timeoutTimer); + // The promise was settled already so we simply ignore. + return; + } + + if (data === undefined) { + executionResult.error = 'Workflow did not return any data.'; + executionResult.executionStatus = 'error'; + } else { + executionResult.executionTime = (Date.parse(data.stoppedAt as unknown as string) - Date.parse(data.startedAt as unknown as string)) / 1000; + executionResult.finished = (data?.finished !== undefined) as boolean; + + if (data.data.resultData.error) { + executionResult.error = + data.data.resultData.error.hasOwnProperty('description') ? + // @ts-ignore + data.data.resultData.error.description : data.data.resultData.error.message; + if (data.data.resultData.lastNodeExecuted !== undefined) { + executionResult.error += ` on node ${data.data.resultData.lastNodeExecuted}`; + } + executionResult.executionStatus = 'error'; + + if (this.shouldBeConsideredAsWarning(executionResult.error || '')) { + executionResult.executionStatus = 'warning'; + } + } else { + if (ExecuteBatch.shallow === true) { + // What this does is guarantee that top-level attributes + // from the JSON are kept and the are the same type. + + // We convert nested JSON objects to a simple {object:true} + // and we convert nested arrays to ['json array'] + + // This reduces the chance of false positives but may + // result in not detecting deeper changes. + Object.keys(data.data.resultData.runData).map((nodeName: string) => { + data.data.resultData.runData[nodeName].map((taskData: ITaskData) => { + if (taskData.data === undefined) { + return; + } + Object.keys(taskData.data).map(connectionName => { + const connection = taskData.data![connectionName] as Array; + connection.map(executionDataArray => { + if (executionDataArray === null) { + return; + } + + if (nodeEdgeCases[nodeName] !== undefined && nodeEdgeCases[nodeName].capResults !== undefined) { + executionDataArray.splice(nodeEdgeCases[nodeName].capResults!); + } + + executionDataArray.map(executionData => { + if (executionData.json === undefined) { + return; + } + if (nodeEdgeCases[nodeName] !== undefined && nodeEdgeCases[nodeName].ignoredProperties !== undefined) { + nodeEdgeCases[nodeName].ignoredProperties!.forEach(ignoredProperty => delete executionData.json[ignoredProperty]); + } + + const jsonProperties = executionData.json; + + const nodeOutputAttributes = Object.keys(jsonProperties); + nodeOutputAttributes.map(attributeName => { + if (Array.isArray(jsonProperties[attributeName])) { + jsonProperties[attributeName] = ['json array']; + } else if (typeof jsonProperties[attributeName] === 'object') { + jsonProperties[attributeName] = { object: true }; + } + }); + }); + }); + + }); + }); + }); + } else { + // If not using shallow comparison then we only treat nodeEdgeCases. + const specialCases = Object.keys(nodeEdgeCases); + + specialCases.forEach(nodeName => { + data.data.resultData.runData[nodeName].map((taskData: ITaskData) => { + if (taskData.data === undefined) { + return; + } + Object.keys(taskData.data).map(connectionName => { + const connection = taskData.data![connectionName] as Array; + connection.map(executionDataArray => { + if (executionDataArray === null) { + return; + } + + if (nodeEdgeCases[nodeName].capResults !== undefined) { + executionDataArray.splice(nodeEdgeCases[nodeName].capResults!); + } + + if (nodeEdgeCases[nodeName].ignoredProperties !== undefined) { + executionDataArray.map(executionData => { + if (executionData.json === undefined) { + return; + } + nodeEdgeCases[nodeName].ignoredProperties!.forEach(ignoredProperty => delete executionData.json[ignoredProperty]); + }); + } + }); + + }); + }); + }); + } + + const serializedData = this.formatJsonOutput(data); + if (ExecuteBatch.compare === undefined) { + executionResult.executionStatus = 'success'; + } else { + const fileName = (ExecuteBatch.compare.endsWith(sep) ? ExecuteBatch.compare : ExecuteBatch.compare + sep) + `${workflowData.id}-snapshot.json`; + if (fs.existsSync(fileName) === true) { + + const contents = fs.readFileSync(fileName, { encoding: 'utf-8' }); + + const changes = diff(JSON.parse(contents), data, { keysOnly: true }); + + if (changes !== undefined) { + // we have structural changes. Report them. + executionResult.error = `Workflow may contain breaking changes`; + executionResult.changes = changes; + executionResult.executionStatus = 'error'; + } else { + executionResult.executionStatus = 'success'; + } + } else { + executionResult.error = 'Snapshot for not found.'; + executionResult.executionStatus = 'warning'; + } + } + // Save snapshots only after comparing - this is to make sure we're updating + // After comparing to existing verion. + if (ExecuteBatch.snapshot !== undefined) { + const fileName = (ExecuteBatch.snapshot.endsWith(sep) ? ExecuteBatch.snapshot : ExecuteBatch.snapshot + sep) + `${workflowData.id}-snapshot.json`; + fs.writeFileSync(fileName, serializedData); + } + } + } + } catch (e) { + executionResult.error = 'Workflow failed to execute.'; + executionResult.executionStatus = 'error'; + } + clearTimeout(timeoutTimer); + resolve(executionResult); + }); + } + +} diff --git a/packages/cli/commands/import/credentials.ts b/packages/cli/commands/import/credentials.ts index b038f33693e..af2d7e0d46d 100644 --- a/packages/cli/commands/import/credentials.ts +++ b/packages/cli/commands/import/credentials.ts @@ -65,6 +65,9 @@ export class ImportCredentialsCommand extends Command { try { await Db.init(); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); let i; const encryptionKey = await UserSettings.getEncryptionKey(); diff --git a/packages/cli/commands/import/workflow.ts b/packages/cli/commands/import/workflow.ts index 5b31041a44d..65ddb770009 100644 --- a/packages/cli/commands/import/workflow.ts +++ b/packages/cli/commands/import/workflow.ts @@ -18,6 +18,9 @@ import { import * as fs from 'fs'; import * as glob from 'glob-promise'; import * as path from 'path'; +import { + UserSettings, +} from 'n8n-core'; export class ImportWorkflowsCommand extends Command { static description = 'Import workflows'; @@ -60,6 +63,9 @@ export class ImportWorkflowsCommand extends Command { try { await Db.init(); + + // Make sure the settings exist + await UserSettings.prepareUserSettings(); let i; if (flags.separate) { const files = await glob((flags.input.endsWith(path.sep) ? flags.input : flags.input + path.sep) + '*.json'); diff --git a/packages/cli/commands/list/workflow.ts b/packages/cli/commands/list/workflow.ts new file mode 100644 index 00000000000..6fdca2e2533 --- /dev/null +++ b/packages/cli/commands/list/workflow.ts @@ -0,0 +1,67 @@ +import { + Command, + flags, +} from '@oclif/command'; + +import { + IDataObject +} from 'n8n-workflow'; + +import { + Db, +} from "../../src"; + + +export class ListWorkflowCommand extends Command { + static description = '\nList workflows'; + + static examples = [ + '$ n8n list:workflow', + '$ n8n list:workflow --active=true --onlyId', + '$ n8n list:workflow --active=false', + ]; + + static flags = { + help: flags.help({ char: 'h' }), + active: flags.string({ + description: 'Filters workflows by active status. Can be true or false', + }), + onlyId: flags.boolean({ + description: 'Outputs workflow IDs only, one per line.', + }), + }; + + async run() { + const { flags } = this.parse(ListWorkflowCommand); + + if (flags.active !== undefined && !['true', 'false'].includes(flags.active)) { + this.error('The --active flag has to be passed using true or false'); + } + + try { + await Db.init(); + + const findQuery: IDataObject = {}; + if (flags.active !== undefined) { + findQuery.active = flags.active === 'true'; + } + + const workflows = await Db.collections.Workflow!.find(findQuery); + if (flags.onlyId) { + workflows.forEach(workflow => console.log(workflow.id)); + } else { + workflows.forEach(workflow => console.log(workflow.id + "|" + workflow.name)); + } + + + } catch (e) { + console.error('\nGOT ERROR'); + console.log('===================================='); + console.error(e.message); + console.error(e.stack); + this.exit(1); + } + + this.exit(); + } +} diff --git a/packages/cli/package.json b/packages/cli/package.json index ca4a198df18..4d866ec7293 100644 --- a/packages/cli/package.json +++ b/packages/cli/package.json @@ -1,6 +1,6 @@ { "name": "n8n", - "version": "0.126.1", + "version": "0.127.0", "description": "n8n Workflow Automation Tool", "license": "SEE LICENSE IN LICENSE.md", "homepage": "https://n8n.io", @@ -82,6 +82,7 @@ "dependencies": { "@oclif/command": "^1.5.18", "@oclif/errors": "^1.2.2", + "@types/json-diff": "^0.5.1", "@types/jsonwebtoken": "^8.5.2", "basic-auth": "^2.0.1", "bcryptjs": "^2.4.3", @@ -101,15 +102,16 @@ "glob-promise": "^3.4.0", "google-timezones-json": "^1.0.2", "inquirer": "^7.0.1", + "json-diff": "^0.5.4", "jsonwebtoken": "^8.5.1", "jwks-rsa": "~1.12.1", "localtunnel": "^2.0.0", "lodash.get": "^4.4.2", "mysql2": "~2.2.0", - "n8n-core": "~0.75.0", - "n8n-editor-ui": "~0.96.1", - "n8n-nodes-base": "~0.123.1", - "n8n-workflow": "~0.62.0", + "n8n-core": "~0.76.0", + "n8n-editor-ui": "~0.97.0", + "n8n-nodes-base": "~0.124.0", + "n8n-workflow": "~0.63.0", "oauth-1.0a": "^2.2.6", "open": "^7.0.0", "pg": "^8.3.0", diff --git a/packages/cli/src/Interfaces.ts b/packages/cli/src/Interfaces.ts index df965c50e74..e1c09cf55e0 100644 --- a/packages/cli/src/Interfaces.ts +++ b/packages/cli/src/Interfaces.ts @@ -188,6 +188,7 @@ export interface IExecutionsListResponse { count: number; // results: IExecutionShortResponse[]; results: IExecutionsSummary[]; + estimated: boolean; } export interface IExecutionsStopData { diff --git a/packages/cli/src/Server.ts b/packages/cli/src/Server.ts index 0a31144999d..ee2f50f2cfc 100644 --- a/packages/cli/src/Server.ts +++ b/packages/cli/src/Server.ts @@ -33,6 +33,7 @@ import { CredentialsHelper, CredentialsOverwrites, CredentialTypes, + DatabaseType, Db, ExternalHooks, GenericHelpers, @@ -88,6 +89,7 @@ import { IRunData, IWorkflowBase, IWorkflowCredentials, + LoggerProxy, Workflow, WorkflowExecuteMode, } from 'n8n-workflow'; @@ -1612,8 +1614,7 @@ class App { executingWorkflowIds.push(...this.activeExecutionsInstance.getActiveExecutions().map(execution => execution.id.toString()) as string[]); const countFilter = JSON.parse(JSON.stringify(filter)); - countFilter.select = ['id']; - countFilter.where = {id: Not(In(executingWorkflowIds))}; + countFilter.id = Not(In(executingWorkflowIds)); const resultsQuery = await Db.collections.Execution! .createQueryBuilder("execution") @@ -1645,10 +1646,10 @@ class App { const resultsPromise = resultsQuery.getMany(); - const countPromise = Db.collections.Execution!.count(countFilter); + const countPromise = getExecutionsCount(countFilter); const results: IExecutionFlattedDb[] = await resultsPromise; - const count = await countPromise; + const countedObjects = await countPromise; const returnResults: IExecutionsSummary[] = []; @@ -1667,8 +1668,9 @@ class App { } return { - count, + count: countedObjects.count, results: returnResults, + estimated: countedObjects.estimate, }; })); @@ -2161,3 +2163,35 @@ export async function start(): Promise { await app.externalHooks.run('n8n.ready', [app]); }); } + +async function getExecutionsCount(countFilter: IDataObject): Promise<{ count: number; estimate: boolean; }> { + + const dbType = await GenericHelpers.getConfigValue('database.type') as DatabaseType; + const filteredFields = Object.keys(countFilter).filter(field => field !== 'id'); + + // Do regular count for other databases than pgsql and + // if we are filtering based on workflowId or finished fields. + if (dbType !== 'postgresdb' || filteredFields.length > 0) { + const count = await Db.collections.Execution!.count(countFilter); + return { count, estimate: false }; + } + + try { + // Get an estimate of rows count. + const estimateRowsNumberSql = "SELECT n_live_tup FROM pg_stat_all_tables WHERE relname = 'execution_entity';"; + const rows: Array<{ n_live_tup: string }> = await Db.collections.Execution!.query(estimateRowsNumberSql); + + const estimate = parseInt(rows[0].n_live_tup, 10); + // If over 100k, return just an estimate. + if (estimate > 100000) { + // if less than 100k, we get the real count as even a full + // table scan should not take so long. + return { count: estimate, estimate: true }; + } + } catch (err) { + LoggerProxy.warn('Unable to get executions count from postgres: ' + err); + } + + const count = await Db.collections.Execution!.count(countFilter); + return { count, estimate: false }; +} diff --git a/packages/cli/tsconfig.json b/packages/cli/tsconfig.json index 4aaf4747fc6..aa44bc610f2 100644 --- a/packages/cli/tsconfig.json +++ b/packages/cli/tsconfig.json @@ -1,7 +1,8 @@ { "compilerOptions": { "lib": [ - "es2017" + "es2017", + "ES2020.Promise" ], "types": [ "node", diff --git a/packages/core/package.json b/packages/core/package.json index 71871514bfd..5151bfbb5ba 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -1,6 +1,6 @@ { "name": "n8n-core", - "version": "0.75.0", + "version": "0.76.0", "description": "Core functionality of n8n", "license": "SEE LICENSE IN LICENSE.md", "homepage": "https://n8n.io", @@ -47,7 +47,7 @@ "file-type": "^14.6.2", "lodash.get": "^4.4.2", "mime-types": "^2.1.27", - "n8n-workflow": "~0.62.0", + "n8n-workflow": "~0.63.0", "oauth-1.0a": "^2.2.6", "p-cancelable": "^2.0.0", "request": "^2.88.2", diff --git a/packages/editor-ui/package.json b/packages/editor-ui/package.json index 26fbc7c7731..b987c32da0f 100644 --- a/packages/editor-ui/package.json +++ b/packages/editor-ui/package.json @@ -1,6 +1,6 @@ { "name": "n8n-editor-ui", - "version": "0.96.1", + "version": "0.97.0", "description": "Workflow Editor UI for n8n", "license": "SEE LICENSE IN LICENSE.md", "homepage": "https://n8n.io", @@ -69,7 +69,7 @@ "lodash.debounce": "^4.0.8", "lodash.get": "^4.4.2", "lodash.set": "^4.3.2", - "n8n-workflow": "~0.62.0", + "n8n-workflow": "~0.63.0", "node-sass": "^4.12.0", "normalize-wheel": "^1.0.1", "prismjs": "^1.17.1", diff --git a/packages/editor-ui/src/Interface.ts b/packages/editor-ui/src/Interface.ts index 5acbaae9989..3a0790cf48e 100644 --- a/packages/editor-ui/src/Interface.ts +++ b/packages/editor-ui/src/Interface.ts @@ -325,6 +325,7 @@ export interface IExecutionShortResponse { export interface IExecutionsListResponse { count: number; results: IExecutionsSummary[]; + estimated: boolean; } export interface IExecutionsCurrentSummaryExtended { diff --git a/packages/editor-ui/src/components/BinaryDataDisplay.vue b/packages/editor-ui/src/components/BinaryDataDisplay.vue index c6ab2aa7485..3a737c1f0fd 100644 --- a/packages/editor-ui/src/components/BinaryDataDisplay.vue +++ b/packages/editor-ui/src/components/BinaryDataDisplay.vue @@ -14,6 +14,10 @@
Data to display did not get found
+ diff --git a/packages/editor-ui/src/components/ExecutionsList.vue b/packages/editor-ui/src/components/ExecutionsList.vue index e0e129a7374..1ecfc4caf84 100644 --- a/packages/editor-ui/src/components/ExecutionsList.vue +++ b/packages/editor-ui/src/components/ExecutionsList.vue @@ -1,6 +1,6 @@