n8n/packages/cli/src/workflows/workflow.service.ts

1287 lines
41 KiB
TypeScript

import { UpdateWorkflowHistoryVersionDto } from '@n8n/api-types';
import { LicenseState, Logger } from '@n8n/backend-common';
import { GlobalConfig } from '@n8n/config';
import type {
User,
ListQueryDb,
WorkflowFolderUnionFull,
WorkflowHistory,
WorkflowEntity,
} from '@n8n/db';
import {
SharedWorkflow,
ExecutionRepository,
FolderRepository,
WorkflowTagMappingRepository,
SharedWorkflowRepository,
WorkflowRepository,
WorkflowPublishedVersionRepository,
WorkflowPublishHistoryRepository,
ProjectRepository,
} from '@n8n/db';
import { Container, Service } from '@n8n/di';
import type { Scope } from '@n8n/permissions';
import { hasGlobalScope } from '@n8n/permissions';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import type { EntityManager } from '@n8n/typeorm';
// eslint-disable-next-line n8n-local-rules/misplaced-n8n-typeorm-import
import { In } from '@n8n/typeorm';
import type { QueryDeepPartialEntity } from '@n8n/typeorm/query-builder/QueryPartialEntity';
import isEqual from 'lodash/isEqual';
import pick from 'lodash/pick';
import { FileLocation, BinaryDataService } from 'n8n-core';
import type { INode, INodes, IWorkflowSettings, JsonValue, IConnections } from 'n8n-workflow';
import {
PROJECT_ROOT,
Workflow,
assert,
calculateWorkflowChecksum,
ensureError,
} from 'n8n-workflow';
import { v4 as uuid } from 'uuid';
import { getErrorDescription, getErrorNodeId, getRequiredRedactionScopes } from './utils';
import { WorkflowFinderService } from './workflow-finder.service';
import { WorkflowHistoryService } from './workflow-history/workflow-history.service';
import { ActiveWorkflowManager } from '@/active-workflow-manager';
import { FolderNotFoundError } from '@/errors/folder-not-found.error';
import { BadRequestError } from '@/errors/response-errors/bad-request.error';
import { NotFoundError } from '@/errors/response-errors/not-found.error';
import { WorkflowActivationBadRequestError } from '@/errors/response-errors/workflow-activation-bad-request.error';
import { WorkflowValidationError } from '@/errors/response-errors/workflow-validation.error';
import { WorkflowHistoryVersionNotFoundError } from '@/errors/workflow-history-version-not-found.error';
import { EventService } from '@/events/event.service';
import type { WorkflowActionSource } from '@/events/maps/relay.event-map';
import { userHasScopes } from '@/permissions.ee/check-access';
import { ExternalHooks } from '@/external-hooks';
import { validateEntity } from '@/generic-helpers';
import { RedactionEnforcementService } from '@/modules/redaction/redaction-enforcement.service';
import { NodeTypes } from '@/node-types';
import type { ListQuery } from '@/requests';
import { hasSharing } from '@/requests';
import { OwnershipService } from '@/services/ownership.service';
import { ProjectService } from '@/services/project.service.ee';
import { RoleService } from '@/services/role.service';
import { TagService } from '@/services/tag.service';
import * as WorkflowHelpers from '@/workflow-helpers';
import { getBase as getWorkflowExecutionData } from '@/workflow-execute-additional-data';
import { WorkflowValidationService } from './workflow-validation.service';
import { WebhookService } from '@/webhooks/webhook.service';
import { ConflictError } from '@/errors/response-errors/conflict.error';
@Service()
export class WorkflowService {
constructor(
private readonly logger: Logger,
private readonly sharedWorkflowRepository: SharedWorkflowRepository,
private readonly workflowRepository: WorkflowRepository,
private readonly workflowTagMappingRepository: WorkflowTagMappingRepository,
private readonly binaryDataService: BinaryDataService,
private readonly ownershipService: OwnershipService,
private readonly tagService: TagService,
private readonly workflowHistoryService: WorkflowHistoryService,
private readonly externalHooks: ExternalHooks,
private readonly activeWorkflowManager: ActiveWorkflowManager,
private readonly roleService: RoleService,
private readonly projectService: ProjectService,
private readonly executionRepository: ExecutionRepository,
private readonly eventService: EventService,
private readonly globalConfig: GlobalConfig,
private readonly folderRepository: FolderRepository,
private readonly workflowFinderService: WorkflowFinderService,
private readonly workflowPublishedVersionRepository: WorkflowPublishedVersionRepository,
private readonly workflowPublishHistoryRepository: WorkflowPublishHistoryRepository,
private readonly workflowValidationService: WorkflowValidationService,
private readonly nodeTypes: NodeTypes,
private readonly webhookService: WebhookService,
private readonly licenseState: LicenseState,
private readonly projectRepository: ProjectRepository,
private readonly redactionEnforcementService: RedactionEnforcementService,
) {}
async getMany(
user: User,
options?: ListQuery.Options,
includeScopes?: boolean,
includeFolders?: boolean,
onlySharedWithMe?: boolean,
requiredScopes: Scope[] = ['workflow:read'],
) {
let count;
let workflows;
let workflowsAndFolders: WorkflowFolderUnionFull[] = [];
let isPersonalProject = false;
let personalProjectOwnerId: string | null = null;
if (options?.filter?.projectId) {
const project = await this.projectRepository.findOneBy({
id: options.filter.projectId as string,
});
if (!project) {
return { workflows: [], count: 0 };
}
isPersonalProject = project.type === 'personal';
personalProjectOwnerId = project.creatorId;
}
// Prepare sharing options for the subquery
const sharingOptions: {
scopes?: Scope[];
projectRoles?: string[];
workflowRoles?: string[];
isPersonalProject?: boolean;
personalProjectOwnerId?: string;
onlySharedWithMe?: boolean;
} = {};
if (isPersonalProject && personalProjectOwnerId) {
if (personalProjectOwnerId !== user.id && !hasGlobalScope(user, 'workflow:read')) {
return { workflows: [], count: 0 };
}
sharingOptions.isPersonalProject = true;
sharingOptions.personalProjectOwnerId = personalProjectOwnerId;
} else if (onlySharedWithMe) {
sharingOptions.onlySharedWithMe = true;
} else {
// Get roles from scopes
const projectRoles = await this.roleService.rolesWithScope('project', requiredScopes);
const workflowRoles = await this.roleService.rolesWithScope('workflow', requiredScopes);
sharingOptions.scopes = requiredScopes;
sharingOptions.projectRoles = projectRoles;
sharingOptions.workflowRoles = workflowRoles;
}
// Use the new subquery-based repository methods
if (includeFolders) {
[workflowsAndFolders, count] =
await this.workflowRepository.getWorkflowsAndFoldersWithCountWithSharingSubquery(
user,
sharingOptions,
options,
);
workflows = workflowsAndFolders.filter((wf) => wf.resource === 'workflow');
} else {
({ workflows, count } = await this.workflowRepository.getManyAndCountWithSharingSubquery(
user,
sharingOptions,
options,
));
}
/*
Since we're filtering using project ID as part of the relation,
we end up filtering out all the other relations, meaning that if
it's shared to a project, it won't be able to find the home project.
To solve this, we have to get all the relation now, even though
we're deleting them later.
*/
if (hasSharing(workflows)) {
workflows = await this.processSharedWorkflows(workflows, options);
}
if (includeScopes) {
workflows = await this.addUserScopes(workflows, user);
}
this.cleanupSharedField(workflows);
if (includeFolders) {
workflows = this.mergeProcessedWorkflows(workflowsAndFolders, workflows);
}
// Add hasResolvableCredentials if dynamic credentials feature is licensed
if (this.licenseState.isDynamicCredentialsLicensed()) {
return {
workflows: await this.addResolvableCredentialsFlag(workflows),
count,
};
}
return {
workflows,
count,
};
}
private async addResolvableCredentialsFlag<
T extends ListQueryDb.Workflow.Plain | ListQueryDb.Workflow.WithSharing,
>(workflows: T[]): Promise<(T & { hasResolvableCredentials: boolean })[]> {
// Use lazy import to avoid circular dependency
const { EnterpriseWorkflowService } = await import('./workflow.service.ee');
const enterpriseWorkflowService = Container.get(EnterpriseWorkflowService);
const workflowIds = workflows.map((w) => w.id);
const workflowIdsWithResolvable =
await enterpriseWorkflowService.getWorkflowIdsWithResolvableCredentials(workflowIds);
return workflows.map((workflow) => ({
...workflow,
hasResolvableCredentials: workflowIdsWithResolvable.has(workflow.id),
}));
}
private async processSharedWorkflows(
workflows: ListQueryDb.Workflow.WithSharing[],
options?: ListQuery.Options,
) {
const projectId = options?.filter?.projectId;
const shouldAddProjectRelations = typeof projectId === 'string' && projectId !== '';
if (shouldAddProjectRelations) {
await this.addSharedRelation(workflows);
}
return workflows.map((workflow) => this.ownershipService.addOwnedByAndSharedWith(workflow));
}
private async addSharedRelation(workflows: ListQueryDb.Workflow.WithSharing[]): Promise<void> {
const workflowIds = workflows.map((workflow) => workflow.id);
const relations = await this.sharedWorkflowRepository.getAllRelationsForWorkflows(workflowIds);
workflows.forEach((workflow) => {
workflow.shared = relations.filter((relation) => relation.workflowId === workflow.id);
});
}
private async addUserScopes(
workflows: ListQueryDb.Workflow.Plain[] | ListQueryDb.Workflow.WithSharing[],
user: User,
) {
const projectRelations = await this.projectService.getProjectRelationsForUser(user);
return workflows.map((workflow) =>
this.roleService.addScopes(workflow, user, projectRelations),
);
}
private isWorkflowWithSharing(
workflow: ListQueryDb.Workflow.Plain,
): workflow is ListQueryDb.Workflow.WithSharing {
return 'shared' in workflow;
}
private cleanupSharedField(
workflows: ListQueryDb.Workflow.Plain[] | ListQueryDb.Workflow.WithSharing[],
): void {
/*
This is to emulate the old behavior of removing the shared field as
part of `addOwnedByAndSharedWith`. We need this field in `addScopes`
though. So to avoid leaking the information we just delete it.
*/
workflows.forEach((workflow) => {
if (this.isWorkflowWithSharing(workflow)) {
delete workflow.shared;
}
});
}
private mergeProcessedWorkflows(
workflowsAndFolders: WorkflowFolderUnionFull[],
processedWorkflows: ListQueryDb.Workflow.Plain[] | ListQueryDb.Workflow.WithSharing[],
) {
const workflowMap = new Map(processedWorkflows.map((workflow) => [workflow.id, workflow]));
return workflowsAndFolders.map((item) =>
item.resource === 'workflow' ? (workflowMap.get(item.id) ?? item) : item,
);
}
/**
* Updates the workflow content (such as name, nodes, connections, settings, etc.).
*
* This method never updates the workflow's active fields (active, activeVersionId) directly.
* However, if settings change and the workflow has an active version, the workflow will be
* automatically reactivated to ensure the ActiveWorkflowManager uses the updated settings.
* For explicit activation or deactivation, use the activate/deactivate methods.
*/
// eslint-disable-next-line complexity
async update(
user: User,
workflowUpdateData: WorkflowEntity,
workflowId: string,
options: {
tagIds?: string[];
parentFolderId?: string;
forceSave?: boolean;
publicApi?: boolean;
publishIfActive?: boolean;
aiBuilderAssisted?: boolean;
expectedChecksum?: string;
autosaved?: boolean;
source?: WorkflowActionSource;
} = {},
): Promise<WorkflowEntity> {
const {
expectedChecksum,
tagIds,
parentFolderId,
forceSave = false,
publicApi = false,
publishIfActive = false,
aiBuilderAssisted = false,
autosaved = false,
source = 'ui',
} = options;
const workflow = await this.workflowFinderService.findWorkflowForUser(workflowId, user, [
'workflow:update',
]);
if (!workflow) {
this.logger.warn('User attempted to update a workflow without permissions', {
workflowId,
userId: user.id,
});
throw new NotFoundError(
'You do not have permission to update this workflow. Ask the owner to share it with you.',
);
}
if (workflow.isArchived) {
throw new BadRequestError('Cannot update an archived workflow.');
}
await this.redactionEnforcementService.assertPolicyChangeAllowed(
workflow.settings?.redactionPolicy,
workflowUpdateData.settings?.redactionPolicy,
);
if (!forceSave && expectedChecksum) {
await this._detectConflicts(workflow, expectedChecksum);
}
// Update the workflow's version when changing nodes, connections, or nodeGroups
const hasNodesKey = 'nodes' in workflowUpdateData;
const hasConnectionsKey = 'connections' in workflowUpdateData;
const hasNodeGroupsKey = 'nodeGroups' in workflowUpdateData;
const nodesChanged = hasNodesKey && !isEqual(workflowUpdateData.nodes, workflow.nodes);
const connectionsChanged =
hasConnectionsKey && !isEqual(workflowUpdateData.connections, workflow.connections);
const nodeGroupsChanged =
hasNodeGroupsKey && !isEqual(workflowUpdateData.nodeGroups, workflow.nodeGroups);
const saveNewVersion = nodesChanged || connectionsChanged || nodeGroupsChanged;
if (saveNewVersion) {
workflowUpdateData.versionId = uuid();
this.logger.debug(
`Updating versionId for workflow ${workflowId} for user ${user.id} after saving`,
{
previousVersionId: workflow.versionId,
newVersionId: workflowUpdateData.versionId,
},
);
// To save a version, we need both nodes and connections
workflowUpdateData.nodes = workflowUpdateData.nodes ?? workflow.nodes;
workflowUpdateData.connections = workflowUpdateData.connections ?? workflow.connections;
} else {
// Do not let users change versionId directly
workflowUpdateData.versionId = workflow.versionId;
}
// check credentials for old format - scope to the workflow's owner project
const ownerProject = await this.ownershipService.getWorkflowProjectCached(workflowId);
await WorkflowHelpers.replaceInvalidCredentials(workflowUpdateData, ownerProject.id);
WorkflowHelpers.addNodeIds(workflowUpdateData);
WorkflowHelpers.resolveNodeWebhookIds(workflowUpdateData, this.nodeTypes);
WorkflowHelpers.validateWorkflowStructure({
nodes: workflowUpdateData.nodes ?? workflow.nodes,
connections: workflowUpdateData.connections ?? workflow.connections,
});
WorkflowHelpers.validateWorkflowNodeGroups({
nodes: workflowUpdateData.nodes ?? workflow.nodes,
nodeGroups: workflowUpdateData.nodeGroups ?? workflow.nodeGroups,
});
// Strip redactionPolicy if instance lacks data-redaction license
if (
workflowUpdateData.settings?.redactionPolicy !== undefined &&
workflowUpdateData.settings.redactionPolicy !== workflow.settings?.redactionPolicy &&
!this.licenseState.isDataRedactionLicensed()
) {
delete workflowUpdateData.settings.redactionPolicy;
}
// Strip redactionPolicy if user lacks the required directional scope
if (
workflowUpdateData.settings?.redactionPolicy !== undefined &&
workflowUpdateData.settings.redactionPolicy !== workflow.settings?.redactionPolicy
) {
const requiredScopes = getRequiredRedactionScopes(
workflow.settings?.redactionPolicy,
workflowUpdateData.settings.redactionPolicy,
);
const canUpdate = await userHasScopes(user, requiredScopes, false, {
projectId: ownerProject.id,
});
if (!canUpdate) {
delete workflowUpdateData.settings.redactionPolicy;
}
}
// Merge settings to support partial updates
if (workflowUpdateData.settings && workflow.settings) {
workflowUpdateData.settings = {
...workflow.settings,
...workflowUpdateData.settings,
};
}
if (workflowUpdateData.settings) {
workflowUpdateData.settings = WorkflowHelpers.removeDefaultValues(
workflowUpdateData.settings,
this.globalConfig.executions.timeout,
);
}
// Check if settings actually changed
const settingsChanged =
workflowUpdateData.settings !== undefined &&
!isEqual(workflow.settings, workflowUpdateData.settings);
// Always set updatedAt to get millisecond precision
workflowUpdateData.updatedAt = new Date();
if (workflowUpdateData.name) {
await validateEntity(workflowUpdateData);
}
// Validate pinData size after all mutations are applied
if ('pinData' in workflowUpdateData) {
WorkflowHelpers.validatePinDataSize({ ...workflow, ...workflowUpdateData });
}
// Reject illegal credential-to-node bindings before persisting
const restrictionValidation = this.workflowValidationService.validateCredentialNodeRestrictions(
workflowUpdateData.nodes ?? workflow.nodes,
);
if (!restrictionValidation.isValid) {
throw new WorkflowValidationError(
restrictionValidation.error ?? 'Credential binding is not allowed.',
);
}
// Run external hook after all validation has passed, right before persisting
await this.externalHooks.run('workflow.update', [workflowUpdateData]);
const fieldsToUpdate = [
'name',
'nodes',
'connections',
'nodeGroups',
'meta',
'settings',
'staticData',
'pinData',
'versionId',
'description',
'updatedAt',
// do not update active fields
];
const updatePayload = pick(
workflowUpdateData,
fieldsToUpdate,
) as QueryDeepPartialEntity<WorkflowEntity>;
// Save the workflow to history first, so we can retrieve the complete version object for the update
if (saveNewVersion) {
await this.workflowHistoryService.saveVersion(
user,
workflowUpdateData,
workflowId,
autosaved,
);
}
const publishCurrent = workflow.activeVersionId && publishIfActive;
if (publishCurrent) {
updatePayload.active = true;
updatePayload.activeVersionId = workflowUpdateData.versionId;
}
if (parentFolderId) {
const project = await this.sharedWorkflowRepository.getWorkflowOwningProject(workflow.id);
if (parentFolderId !== PROJECT_ROOT) {
try {
await this.folderRepository.findOneOrFailFolderInProject(
parentFolderId,
project?.id ?? '',
);
} catch (e) {
throw new FolderNotFoundError(parentFolderId);
}
}
updatePayload.parentFolder = parentFolderId === PROJECT_ROOT ? null : { id: parentFolderId };
}
await this.workflowRepository.update(workflowId, updatePayload);
const tagsDisabled = this.globalConfig.tags.disabled;
if (tagIds && !tagsDisabled) {
await this.workflowTagMappingRepository.overwriteTaggings(workflowId, tagIds);
}
const relations = tagsDisabled ? ['activeVersion'] : ['tags', 'activeVersion'];
// We sadly get nothing back from "update". Neither if it updated a record
// nor the new value. So query now the hopefully updated entry.
const updatedWorkflow = await this.workflowRepository.findOne({
where: { id: workflowId },
relations,
});
if (updatedWorkflow === null) {
throw new BadRequestError(
`Workflow with ID "${workflowId}" could not be found to be updated.`,
);
}
if (updatedWorkflow.tags?.length && tagIds?.length) {
updatedWorkflow.tags = this.tagService.sortByRequestOrder(updatedWorkflow.tags, {
requestOrder: tagIds,
});
}
await this.externalHooks.run('workflow.afterUpdate', [updatedWorkflow]);
const settingsChangesDetail = this.calculateSettingsChanges(
workflow.settings,
updatedWorkflow.settings,
);
this.eventService.emit('workflow-saved', {
user,
workflow: updatedWorkflow,
publicApi,
previousWorkflow: workflow,
aiBuilderAssisted,
...(settingsChangesDetail && { settingsChanged: settingsChangesDetail }),
source,
});
// Activate workflow if requested, or
// Reactivate workflow if settings changed and workflow has an active version
if (updatedWorkflow.activeVersionId && (publishCurrent || settingsChanged)) {
await this.activateWorkflow(user, workflowId, {
versionId: updatedWorkflow.activeVersionId,
source,
});
}
return updatedWorkflow;
}
private async _addToActiveWorkflowManager(
user: User,
workflowId: string,
workflow: WorkflowEntity,
mode: 'activate' | 'update',
tracking: { source: WorkflowActionSource } = { source: 'ui' },
): Promise<void> {
let didPublish = false;
try {
await this.activeWorkflowManager.add(workflowId, mode);
didPublish = true;
} catch (error) {
const rollbackPayload = {
active: false,
activeVersionId: null,
activeVersion: null,
};
await this.workflowRepository.update(workflowId, rollbackPayload);
// Also set it in the returned data
workflow.active = rollbackPayload.active;
workflow.activeVersionId = rollbackPayload.activeVersionId;
workflow.activeVersion = rollbackPayload.activeVersion;
const message = (error as Error).message;
const description = getErrorDescription(error);
throw new WorkflowActivationBadRequestError(message, {
nodeId: getErrorNodeId(error),
description,
});
} finally {
if (didPublish) {
assert(workflow.activeVersionId !== null);
// Temporary: In the future, the workflow publication service will
// manage this mapping. We set it here for now to support incremental
// development and testing.
if (this.globalConfig.workflows.useWorkflowPublicationService) {
await this.workflowPublishedVersionRepository.setPublishedVersion(
workflowId,
workflow.activeVersionId,
);
}
await this.workflowPublishHistoryRepository.addRecord({
workflowId,
versionId: workflow.activeVersionId,
event: 'activated',
userId: user.id,
});
this.eventService.emit('workflow-activated', {
user,
workflowId,
workflow,
publicApi: tracking.source === 'api',
source: tracking.source,
});
}
}
}
private async _findConflictingWebhooks(
workflowEntity: WorkflowEntity,
versionToActivate: WorkflowHistory,
) {
const workflow = new Workflow({
id: workflowEntity.id,
nodes: versionToActivate.nodes,
connections: versionToActivate.connections,
active: !!workflowEntity.activeVersion,
settings: workflowEntity.settings,
nodeTypes: this.nodeTypes,
});
const additionalData = await getWorkflowExecutionData({
workflowId: workflow.id,
});
await workflow.expression.acquireIsolate();
try {
return await this.webhookService.findWebhookConflicts(workflow, additionalData);
} finally {
await workflow.expression.releaseIsolate();
}
}
private async _detectWebhookConflicts(
workflowEntity: WorkflowEntity,
versionToActivate: WorkflowHistory,
) {
const conflicts = await this._findConflictingWebhooks(workflowEntity, versionToActivate);
if (conflicts.length > 0) {
throw new ConflictError(
'There is a conflict with one of the webhooks.',
JSON.stringify(
conflicts.map(({ trigger, conflict }) => ({
trigger,
conflict,
})),
),
);
}
}
/**
* Activates a workflow by setting its activeVersionId and adding it to the active workflow manager.
* @param user - The user activating the workflow
* @param workflowId - The ID of the workflow to activate
* @param options - Optional versionId, name and description updates
* @param publicApi - Whether this is called from the public API (affects event emission)
* @returns The activated workflow
*/
// eslint-disable-next-line complexity
async activateWorkflow(
user: User,
workflowId: string,
options?: {
versionId?: string;
name?: string;
description?: string;
expectedChecksum?: string;
source?: WorkflowActionSource;
},
): Promise<WorkflowEntity> {
const source = options?.source ?? 'ui';
const publicApi = source === 'api';
const workflow = await this.workflowFinderService.findWorkflowForUser(workflowId, user, [
'workflow:publish',
]);
if (!workflow) {
this.logger.warn('User attempted to activate a workflow without permissions', {
workflowId,
userId: user.id,
});
throw new NotFoundError(
'You do not have permission to activate this workflow. Ask the owner to share it with you.',
);
}
if (workflow.isArchived) {
throw new BadRequestError('Cannot activate an archived workflow.');
}
const versionIdToActivate = options?.versionId ?? workflow.versionId;
const previousActiveVersionId = workflow.activeVersionId;
let versionToActivate: WorkflowHistory;
try {
versionToActivate = await this.workflowHistoryService.getVersion(
user,
workflow.id,
versionIdToActivate,
{
includePublishHistory: false,
},
);
} catch (error) {
if (error instanceof WorkflowHistoryVersionNotFoundError) {
throw new NotFoundError('Version not found');
}
throw error;
}
if (options?.expectedChecksum) {
await this._detectConflicts(workflow, options.expectedChecksum);
}
await this._detectWebhookConflicts(workflow, versionToActivate);
this._validateNodes(workflowId, versionToActivate.nodes, versionToActivate.connections);
await this._validateDynamicCredentials(workflowId, versionToActivate.nodes, workflow.settings);
await this._validateSubWorkflowReferences(workflowId, versionToActivate.nodes);
// Run hook before destructive state changes so a rejection leaves
// the previous active version running instead of deactivating it.
const candidateWorkflow = this.workflowRepository.create({
...workflow,
active: true,
activeVersionId: versionIdToActivate,
activeVersion: versionToActivate,
});
try {
await this.externalHooks.run('workflow.activate', [candidateWorkflow]);
} catch (error) {
throw new WorkflowActivationBadRequestError(ensureError(error).message, {
nodeId: getErrorNodeId(error),
description: getErrorDescription(error),
});
}
if (previousActiveVersionId) {
await this.activeWorkflowManager.remove(workflowId);
this.eventService.emit('workflow-deactivated', {
user,
workflowId,
workflow,
publicApi,
deactivatedVersionId: previousActiveVersionId,
source,
});
await this.workflowPublishHistoryRepository.addRecord({
workflowId,
versionId: previousActiveVersionId,
event: 'deactivated',
userId: user.id,
});
}
const activationMode = previousActiveVersionId ? 'update' : 'activate';
await this.workflowRepository.update(workflowId, {
activeVersionId: versionIdToActivate,
active: true,
// workflow content did not change, so we keep updatedAt as is
updatedAt: workflow.updatedAt,
});
const workflowForActivation = await this.workflowRepository.findOne({
where: { id: workflowId },
relations: ['activeVersion'],
});
if (!workflowForActivation) {
throw new NotFoundError(`Workflow with ID "${workflowId}" could not be found.`);
}
await this._addToActiveWorkflowManager(
user,
workflowId,
workflowForActivation,
activationMode,
{ source },
);
if (options?.name !== undefined || options?.description !== undefined) {
const updateFields: UpdateWorkflowHistoryVersionDto = {};
if (options.name !== undefined) updateFields.name = options.name;
if (options.description !== undefined) updateFields.description = options.description;
await this.workflowHistoryService.updateVersion(
workflowId,
versionIdToActivate,
updateFields,
);
}
// Fetch workflow again with workflowPublishHistory after activation to include the new entry
const updatedWorkflow = await this.workflowRepository.findOne({
where: { id: workflowId },
relations: {
activeVersion: {
workflowPublishHistory: true,
},
},
});
if (!updatedWorkflow) {
throw new NotFoundError(`Workflow with ID "${workflowId}" could not be found.`);
}
return updatedWorkflow;
}
/**
* Deactivates a workflow by removing it from the active workflow manager and setting activeVersionId to null.
* @param user - The user deactivating the workflow
* @param workflowId - The ID of the workflow to deactivate
* @param options - Optional settings including expectedChecksum for conflict detection and publicApi flag
* @returns The deactivated workflow
*/
async deactivateWorkflow(
user: User,
workflowId: string,
options?: {
expectedChecksum?: string;
source?: WorkflowActionSource;
},
): Promise<WorkflowEntity> {
const source = options?.source ?? 'ui';
const publicApi = source === 'api';
const workflow = await this.workflowFinderService.findWorkflowForUser(workflowId, user, [
'workflow:unpublish',
]);
if (!workflow) {
this.logger.warn('User attempted to deactivate a workflow without permissions', {
workflowId,
userId: user.id,
});
throw new NotFoundError(
'You do not have permission to deactivate this workflow. Ask the owner to share it with you.',
);
}
if (workflow.activeVersionId === null) {
return workflow;
}
if (options?.expectedChecksum) {
await this._detectConflicts(workflow, options.expectedChecksum);
}
// Remove from active workflow manager
await this.activeWorkflowManager.remove(workflowId);
await this.workflowRepository.update(workflowId, {
active: false,
activeVersionId: null,
// workflow content did not change, so we keep updatedAt as is
updatedAt: workflow.updatedAt,
});
const deactivatedVersionId = workflow.activeVersionId;
// Temporary: will be removed when the workflow publication service manages this.
if (this.globalConfig.workflows.useWorkflowPublicationService) {
await this.workflowPublishedVersionRepository.removePublishedVersion(workflowId);
}
await this.workflowPublishHistoryRepository.addRecord({
workflowId,
versionId: deactivatedVersionId,
event: 'deactivated',
userId: user.id,
});
// Update the workflow object for response
workflow.active = false;
workflow.activeVersionId = null;
workflow.activeVersion = null;
this.eventService.emit('workflow-deactivated', {
user,
workflowId,
workflow,
publicApi,
deactivatedVersionId,
source,
});
return workflow;
}
/**
* Deletes a workflow and returns it.
*
* If the workflow is active this will deactivate the workflow.
* If the user does not have the permissions to delete the workflow this does
* nothing and returns void.
*/
async delete(user: User, workflowId: string, force = false): Promise<WorkflowEntity | undefined> {
await this.externalHooks.run('workflow.delete', [workflowId]);
const workflow = await this.workflowFinderService.findWorkflowForUser(workflowId, user, [
'workflow:delete',
]);
if (!workflow) {
return;
}
if (!workflow.isArchived && !force) {
throw new BadRequestError('Workflow must be archived before it can be deleted.');
}
if (workflow.active) {
// deactivate before deleting
await this.activeWorkflowManager.remove(workflowId);
}
const idsForDeletion = await this.executionRepository
.find({
select: ['id'],
where: { workflowId },
})
.then((rows) =>
rows.map(({ id: executionId }) => FileLocation.ofExecution(workflowId, executionId)),
);
await this.workflowRepository.delete(workflowId);
await this.binaryDataService.deleteMany(idsForDeletion);
this.eventService.emit('workflow-deleted', { user, workflowId, publicApi: false });
await this.externalHooks.run('workflow.afterDelete', [workflowId]);
return workflow;
}
async archive(
user: User,
workflowId: string,
options?: { skipArchived?: boolean; expectedChecksum?: string; publicApi?: boolean },
): Promise<WorkflowEntity | undefined> {
const workflow = await this.workflowFinderService.findWorkflowForUser(workflowId, user, [
'workflow:delete',
]);
if (!workflow) {
return;
}
if (workflow.isArchived) {
if (options?.skipArchived) {
return workflow;
}
throw new BadRequestError('Workflow is already archived.');
}
if (options?.expectedChecksum) {
await this._detectConflicts(workflow, options.expectedChecksum);
}
if (workflow.activeVersionId !== null) {
await this.activeWorkflowManager.remove(workflowId);
// Temporary: will be removed when the workflow publication service manages this.
if (this.globalConfig.workflows.useWorkflowPublicationService) {
await this.workflowPublishedVersionRepository.removePublishedVersion(workflowId);
}
await this.workflowPublishHistoryRepository.addRecord({
workflowId,
versionId: workflow.activeVersionId,
event: 'deactivated',
userId: user.id,
});
}
const versionId = uuid();
workflow.versionId = versionId;
workflow.isArchived = true;
workflow.active = false;
workflow.activeVersionId = null;
workflow.activeVersion = null;
await this.workflowRepository.update(workflowId, {
isArchived: true,
active: false,
activeVersion: null,
versionId,
});
await this.workflowHistoryService.saveVersion(user, workflow, workflowId);
this.eventService.emit('workflow-archived', {
user,
workflowId,
publicApi: options?.publicApi ?? false,
});
await this.externalHooks.run('workflow.afterArchive', [workflowId]);
return workflow;
}
async unarchive(
user: User,
workflowId: string,
options?: { publicApi?: boolean },
): Promise<WorkflowEntity | undefined> {
const workflow = await this.workflowFinderService.findWorkflowForUser(workflowId, user, [
'workflow:delete',
]);
if (!workflow) {
return;
}
if (!workflow.isArchived) {
throw new BadRequestError('Workflow is not archived.');
}
const versionId = uuid();
workflow.versionId = versionId;
workflow.isArchived = false;
await this.workflowRepository.update(workflowId, { isArchived: false, versionId });
await this.workflowHistoryService.saveVersion(user, workflow, workflowId);
this.eventService.emit('workflow-unarchived', {
user,
workflowId,
publicApi: options?.publicApi ?? false,
});
await this.externalHooks.run('workflow.afterUnarchive', [workflowId]);
return workflow;
}
async archiveForPublicApi(user: User, workflowId: string): Promise<WorkflowEntity | undefined> {
return await this.archive(user, workflowId, { skipArchived: true, publicApi: true });
}
async unarchiveForPublicApi(user: User, workflowId: string): Promise<WorkflowEntity | undefined> {
return await this.unarchive(user, workflowId, { publicApi: true });
}
async getWorkflowScopes(user: User, workflowId: string): Promise<Scope[]> {
const userProjectRelations = await this.projectService.getProjectRelationsForUser(user);
const shared = await this.sharedWorkflowRepository.find({
where: {
projectId: In([...new Set(userProjectRelations.map((pr) => pr.projectId))]),
workflowId,
},
});
return this.roleService.combineResourceScopes('workflow', user, shared, userProjectRelations);
}
/**
* Transfers all workflows owned by a project to another one.
* This has only been tested for personal projects. It may need to be amended
* for team projects.
**/
async transferAll(fromProjectId: string, toProjectId: string, trx?: EntityManager) {
trx = trx ?? this.workflowRepository.manager;
// Get all shared workflows for both projects.
const allSharedWorkflows = await trx.findBy(SharedWorkflow, {
projectId: In([fromProjectId, toProjectId]),
});
const sharedWorkflowsOfFromProject = allSharedWorkflows.filter(
(sw) => sw.projectId === fromProjectId,
);
// For all workflows that the from-project owns transfer the ownership to
// the to-project.
// This will override whatever relationship the to-project already has to
// the resources at the moment.
const ownedWorkflowIds = sharedWorkflowsOfFromProject
.filter((sw) => sw.role === 'workflow:owner')
.map((sw) => sw.workflowId);
await this.sharedWorkflowRepository.makeOwner(ownedWorkflowIds, toProjectId, trx);
// Delete the relationship to the from-project.
await this.sharedWorkflowRepository.deleteByIds(ownedWorkflowIds, fromProjectId, trx);
// Transfer relationships that are not `workflow:owner`.
// This will NOT override whatever relationship the from-project already
// has to the resource at the moment.
const sharedWorkflowIdsOfTransferee = allSharedWorkflows
.filter((sw) => sw.projectId === toProjectId)
.map((sw) => sw.workflowId);
// All resources that are shared with the from-project, but not with the
// to-project.
const sharedWorkflowsToTransfer = sharedWorkflowsOfFromProject.filter(
(sw) =>
sw.role !== 'workflow:owner' && !sharedWorkflowIdsOfTransferee.includes(sw.workflowId),
);
await trx.insert(
SharedWorkflow,
sharedWorkflowsToTransfer.map((sw) => ({
workflowId: sw.workflowId,
projectId: toProjectId,
role: sw.role,
})),
);
}
async getWorkflowsWithNodesIncluded(user: User, nodeTypes: string[], includeNodes = false) {
const foundWorkflows = await this.workflowRepository.findWorkflowsWithNodeType(
nodeTypes,
includeNodes,
);
let { workflows } = await this.workflowRepository.getManyAndCount(
foundWorkflows.map((w) => w.id),
);
if (hasSharing(workflows)) {
workflows = await this.processSharedWorkflows(workflows);
}
const withScopes = await this.addUserScopes(workflows, user);
this.cleanupSharedField(withScopes);
return withScopes.map((workflow) => {
const nodes = includeNodes
? (foundWorkflows.find((w) => w.id === workflow.id)?.nodes ?? [])
: undefined;
return { resourceType: 'workflow', ...workflow, ...(includeNodes ? { nodes } : {}) };
});
}
async _detectConflicts(dbWorkflow: WorkflowEntity, expectedChecksum: string) {
const currentChecksum = await calculateWorkflowChecksum(dbWorkflow);
if (expectedChecksum !== currentChecksum) {
throw new ConflictError(
'Your most recent changes may be lost, because someone else just updated this workflow. Open this workflow in a new tab to see those new updates.',
);
}
}
_validateNodes(workflowId: string, nodes: INode[], connections: IConnections) {
const nodesToValidate = nodes.reduce<INodes>((acc, node) => {
acc[node.name] = node;
return acc;
}, {});
const validation = this.workflowValidationService.validateForActivation(
nodesToValidate,
connections,
this.nodeTypes,
);
if (!validation.isValid) {
this.logger.warn('Workflow activation failed validation', {
workflowId,
error: validation.error,
});
throw new WorkflowValidationError(validation.error ?? 'Workflow validation failed');
}
}
private async _validateDynamicCredentials(
workflowId: string,
nodes: INode[],
workflowSettings?: IWorkflowSettings,
) {
const validation = await this.workflowValidationService.validateDynamicCredentials(
nodes,
this.nodeTypes,
workflowSettings,
);
if (!validation.isValid) {
this.logger.warn('Workflow activation failed dynamic credentials validation', {
workflowId,
error: validation.error,
});
throw new WorkflowValidationError(
validation.error ?? 'Dynamic credentials validation failed',
);
}
}
/**
* Calculates which workflow settings changed between two versions.
* Returns an object with { settingKey: { from, to } } for each changed setting,
* or undefined if no settings changed.
*/
private calculateSettingsChanges(
previousSettings: IWorkflowSettings | undefined,
newSettings: IWorkflowSettings | undefined,
): Record<string, { from: JsonValue; to: JsonValue }> | undefined {
const changes: Record<string, { from: JsonValue; to: JsonValue }> = {};
const prev = previousSettings ?? {};
const next = newSettings ?? {};
// Get all unique keys from both previous and new settings
const allKeys = new Set([...Object.keys(prev), ...Object.keys(next)]);
for (const key of allKeys) {
const prevValue = prev[key as keyof IWorkflowSettings];
const nextValue = next[key as keyof IWorkflowSettings];
if (!isEqual(prevValue, nextValue)) {
const from: JsonValue = prevValue ?? null;
const to: JsonValue = nextValue ?? null;
changes[key] = { from, to };
}
}
return Object.keys(changes).length > 0 ? changes : undefined;
}
/**
* Validates that all sub-workflow references in a workflow are published.
* Prevents publishing a parent workflow that references draft-only sub-workflows.
*
* Note: A published workflow could still end up referencing draft-only sub-workflows if:
* - A referenced sub-workflow gets unpublished after the parent workflow was published
* - The workflow ID is provided via an expression (e.g., ={{ $json.workflowId }})
* - The workflow source is not 'database' (e.g., URL, parameter, localFile)
*
* In these cases, the invariant is enforced at execution time, where the workflow will
* fail with a clear error message if the sub-workflow is not published (for production
* executions) or not found.
*/
private async _validateSubWorkflowReferences(workflowId: string, nodes: INode[]) {
const validation = await this.workflowValidationService.validateSubWorkflowReferences(
workflowId,
nodes,
);
if (!validation.isValid) {
this.logger.warn('Workflow activation failed sub-workflow validation', {
workflowId,
error: validation.error,
invalidReferences: validation.invalidReferences,
});
throw new WorkflowValidationError(validation.error ?? 'Sub-workflow validation failed');
}
}
}