mirror of
https://github.com/n8n-io/n8n.git
synced 2026-05-27 23:07:12 +02:00
fix(Google BigQuery Node): Prevent infinite loop on job fail (#21741)
Co-authored-by: Michael Kret <michael.k@radency.com>
This commit is contained in:
parent
cb0fa963ca
commit
008cd8d083
|
|
@ -0,0 +1,40 @@
|
|||
import { NodeTestHarness } from '@nodes-testing/node-test-harness';
|
||||
import nock from 'nock';
|
||||
|
||||
jest.mock('jsonwebtoken', () => ({
|
||||
sign: jest.fn().mockReturnValue('signature'),
|
||||
}));
|
||||
|
||||
describe('Test Google BigQuery V2, executeQuery', () => {
|
||||
nock('https://oauth2.googleapis.com')
|
||||
.persist()
|
||||
.post(
|
||||
'/token',
|
||||
'grant_type=urn%3Aietf%3Aparams%3Aoauth%3Agrant-type%3Ajwt-bearer&assertion=signature',
|
||||
)
|
||||
.reply(200, { access_token: 'token' });
|
||||
|
||||
nock('https://bigquery.googleapis.com/bigquery')
|
||||
.post('/v2/projects/test-project/jobs', {
|
||||
configuration: {
|
||||
query: {
|
||||
query: 'SELECT * FROM bigquery_node_dev_test_dataset.test_json;',
|
||||
useLegacySql: false,
|
||||
},
|
||||
},
|
||||
})
|
||||
.reply(200, {
|
||||
jobReference: {
|
||||
jobId: 'job_123',
|
||||
},
|
||||
status: {
|
||||
state: 'RUNNING',
|
||||
},
|
||||
})
|
||||
.get('/v2/projects/test-project/queries/job_123?maxResults=1000&timeoutMs=10000')
|
||||
.replyWithError('Internal server error');
|
||||
|
||||
new NodeTestHarness().setupTests({
|
||||
workflowFiles: ['executeQueryContinueOnJobFail.workflow.json'],
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,69 @@
|
|||
{
|
||||
"name": "My workflow 12",
|
||||
"nodes": [
|
||||
{
|
||||
"parameters": {},
|
||||
"id": "7db7d51a-83c2-4aa0-a736-9c3d1c031b60",
|
||||
"name": "When clicking \"Execute Workflow\"",
|
||||
"type": "n8n-nodes-base.manualTrigger",
|
||||
"typeVersion": 1,
|
||||
"position": [360, 340]
|
||||
},
|
||||
{
|
||||
"parameters": {
|
||||
"authentication": "serviceAccount",
|
||||
"projectId": {
|
||||
"__rl": true,
|
||||
"value": "test-project",
|
||||
"mode": "list",
|
||||
"cachedResultName": "test-project",
|
||||
"cachedResultUrl": "https://console.cloud.google.com/bigquery?project=test-project"
|
||||
},
|
||||
"sqlQuery": "SELECT * FROM bigquery_node_dev_test_dataset.test_json;",
|
||||
"options": {}
|
||||
},
|
||||
"id": "83d00275-0f98-4d5e-a3d6-bbca940ff8ac",
|
||||
"name": "Google BigQuery",
|
||||
"type": "n8n-nodes-base.googleBigQuery",
|
||||
"typeVersion": 2,
|
||||
"position": [620, 340],
|
||||
"credentials": {
|
||||
"googleApi": {
|
||||
"id": "66",
|
||||
"name": "Google account 5"
|
||||
}
|
||||
},
|
||||
"onError": "continueRegularOutput"
|
||||
}
|
||||
],
|
||||
"pinData": {
|
||||
"Google BigQuery": [
|
||||
{
|
||||
"json": {
|
||||
"error": "Internal server error"
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"connections": {
|
||||
"When clicking \"Execute Workflow\"": {
|
||||
"main": [
|
||||
[
|
||||
{
|
||||
"node": "Google BigQuery",
|
||||
"type": "main",
|
||||
"index": 0
|
||||
}
|
||||
]
|
||||
]
|
||||
}
|
||||
},
|
||||
"active": false,
|
||||
"settings": {},
|
||||
"versionId": "be2fc126-5d71-4e86-9a4e-eb62ad266860",
|
||||
"id": "156",
|
||||
"meta": {
|
||||
"instanceId": "36203ea1ce3cef713fa25999bd9874ae26b9e4c2c3a90a365f2882a154d031d0"
|
||||
},
|
||||
"tags": []
|
||||
}
|
||||
|
|
@ -219,6 +219,7 @@ const displayOptions = {
|
|||
export const description = updateDisplayOptions(displayOptions, properties);
|
||||
|
||||
export async function execute(this: IExecuteFunctions): Promise<INodeExecutionData[]> {
|
||||
const abortSignal = this.getExecutionCancelSignal();
|
||||
const items = this.getInputData();
|
||||
const length = items.length;
|
||||
|
||||
|
|
@ -400,10 +401,13 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
|
|||
}
|
||||
|
||||
let waitTime = 1000;
|
||||
while (jobs.length > 0) {
|
||||
const completedJobs: string[] = [];
|
||||
outerLoop: while (jobs.length > 0) {
|
||||
const settledJobs: string[] = [];
|
||||
|
||||
for (const job of jobs) {
|
||||
if (abortSignal?.aborted) {
|
||||
break outerLoop;
|
||||
}
|
||||
try {
|
||||
const qs: IDataObject = job.location ? { location: job.location } : {};
|
||||
|
||||
|
|
@ -420,7 +424,7 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
|
|||
);
|
||||
|
||||
if (response.jobComplete) {
|
||||
completedJobs.push(job.jobId);
|
||||
settledJobs.push(job.jobId);
|
||||
|
||||
returnData.push(...prepareOutput.call(this, response, job.i, job.raw, job.includeSchema));
|
||||
}
|
||||
|
|
@ -435,6 +439,7 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
|
|||
}
|
||||
} catch (error) {
|
||||
if (this.continueOnFail()) {
|
||||
settledJobs.push(job.jobId);
|
||||
const executionErrorData = this.helpers.constructExecutionMetaData(
|
||||
this.helpers.returnJsonArray({ error: error.message }),
|
||||
{ itemData: { item: job.i } },
|
||||
|
|
@ -449,7 +454,7 @@ export async function execute(this: IExecuteFunctions): Promise<INodeExecutionDa
|
|||
}
|
||||
}
|
||||
|
||||
jobs = jobs.filter((job) => !completedJobs.includes(job.jobId));
|
||||
jobs = jobs.filter((job) => !settledJobs.includes(job.jobId));
|
||||
|
||||
if (jobs.length > 0) {
|
||||
await sleep(waitTime);
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user