n8n/packages/cli/test/integration/log-streaming.controller.test.ts

207 lines
6.3 KiB
TypeScript

import { mockInstance } from '@n8n/backend-test-utils';
import { InstanceSettingsLoaderConfig } from '@n8n/config';
import { GLOBAL_OWNER_ROLE, type User } from '@n8n/db';
import { Container } from '@n8n/di';
import { MessageEventBus } from '@/eventbus/message-event-bus/message-event-bus';
import { ExecutionRecoveryService } from '@/executions/execution-recovery.service';
import { LogStreamingDestinationService } from '@/modules/log-streaming.ee/log-streaming-destination.service';
import { Publisher } from '@/scaling/pubsub/publisher.service';
import { createUser } from './shared/db/users';
import type { SuperAgentTest } from './shared/types';
import * as utils from './shared/utils';
jest.unmock('@/eventbus/message-event-bus/message-event-bus');
mockInstance(Publisher);
mockInstance(ExecutionRecoveryService);
const testServer = utils.setupTestServer({
endpointGroups: ['eventBus'],
enabledFeatures: ['feat:logStreaming'],
modules: ['log-streaming'],
});
let owner: User;
let authOwnerAgent: SuperAgentTest;
beforeAll(async () => {
owner = await createUser({ role: GLOBAL_OWNER_ROLE });
authOwnerAgent = testServer.authAgentFor(owner);
const eventBus = Container.get(MessageEventBus);
await eventBus.initialize();
const destinationService = Container.get(LogStreamingDestinationService);
await destinationService.initialize();
});
describe('POST /eventbus/destination', () => {
describe('Valid destination creation', () => {
test('should create a webhook destination', async () => {
const webhookPayload = {
__type: '$$MessageEventBusDestinationWebhook',
url: 'http://localhost:3456',
method: 'POST',
label: 'Test Webhook',
enabled: false,
subscribedEvents: ['n8n.test.message'],
options: {},
};
const response = await authOwnerAgent.post('/eventbus/destination').send(webhookPayload);
expect(response.statusCode).toBe(200);
expect(response.body.data).toHaveProperty('id');
expect(response.body.data.__type).toBe('$$MessageEventBusDestinationWebhook');
expect(response.body.data.label).toBe('Test Webhook');
});
test('should create a sentry destination', async () => {
const sentryPayload = {
__type: '$$MessageEventBusDestinationSentry',
dsn: 'http://localhost:3000',
label: 'Test Sentry',
enabled: false,
subscribedEvents: ['n8n.test.message'],
};
const response = await authOwnerAgent.post('/eventbus/destination').send(sentryPayload);
expect(response.statusCode).toBe(200);
expect(response.body.data).toHaveProperty('id');
expect(response.body.data.__type).toBe('$$MessageEventBusDestinationSentry');
expect(response.body.data.label).toBe('Test Sentry');
});
test('should create a syslog destination', async () => {
const syslogPayload = {
__type: '$$MessageEventBusDestinationSyslog',
protocol: 'udp',
host: 'localhost',
port: 514,
label: 'Test Syslog',
enabled: false,
subscribedEvents: ['n8n.test.message'],
};
const response = await authOwnerAgent.post('/eventbus/destination').send(syslogPayload);
expect(response.statusCode).toBe(200);
expect(response.body.data).toHaveProperty('id');
expect(response.body.data.__type).toBe('$$MessageEventBusDestinationSyslog');
expect(response.body.data.label).toBe('Test Syslog');
});
});
describe('Invalid payloads', () => {
test('should return 400 for missing __type', async () => {
const invalidPayload = {
label: 'Test',
};
const response = await authOwnerAgent.post('/eventbus/destination').send(invalidPayload);
expect(response.statusCode).toBe(400);
});
test('should return 400 for invalid __type', async () => {
const invalidPayload = {
__type: 'InvalidType',
label: 'Test',
};
const response = await authOwnerAgent.post('/eventbus/destination').send(invalidPayload);
expect(response.statusCode).toBe(400);
});
test('should return 400 for webhook with missing url', async () => {
const invalidPayload = {
__type: '$$MessageEventBusDestinationWebhook',
url: '',
label: 'Test Webhook',
options: {},
};
const response = await authOwnerAgent.post('/eventbus/destination').send(invalidPayload);
expect(response.statusCode).toBe(400);
});
test('should return 400 for sentry with missing dsn', async () => {
const invalidPayload = {
__type: '$$MessageEventBusDestinationSentry',
dsn: '',
label: 'Test Sentry',
};
const response = await authOwnerAgent.post('/eventbus/destination').send(invalidPayload);
expect(response.statusCode).toBe(400);
});
test('should return 400 for syslog with invalid protocol', async () => {
const invalidPayload = {
__type: '$$MessageEventBusDestinationSyslog',
protocol: 'invalid-protocol',
host: 'localhost',
label: 'Test Syslog',
};
const response = await authOwnerAgent.post('/eventbus/destination').send(invalidPayload);
expect(response.statusCode).toBe(400);
});
});
});
describe('when log streaming is managed by env', () => {
beforeAll(() => {
Container.get(InstanceSettingsLoaderConfig).logStreamingManagedByEnv = true;
});
afterAll(() => {
Container.get(InstanceSettingsLoaderConfig).logStreamingManagedByEnv = false;
});
test('POST /eventbus/destination is rejected with 403', async () => {
const webhookPayload = {
__type: '$$MessageEventBusDestinationWebhook',
url: 'http://localhost:3456',
method: 'POST',
label: 'Should not be created',
enabled: false,
subscribedEvents: ['n8n.test.message'],
options: {},
};
const response = await authOwnerAgent.post('/eventbus/destination').send(webhookPayload);
expect(response.statusCode).toBe(403);
});
test('DELETE /eventbus/destination is rejected with 403', async () => {
const response = await authOwnerAgent
.delete('/eventbus/destination')
.query({ id: '11111111-1111-4111-8111-111111111111' });
expect(response.statusCode).toBe(403);
});
test('GET /eventbus/destination still succeeds', async () => {
const response = await authOwnerAgent.get('/eventbus/destination');
expect(response.statusCode).toBe(200);
});
test('GET /eventbus/testmessage still succeeds', async () => {
const response = await authOwnerAgent
.get('/eventbus/testmessage')
.query({ id: '11111111-1111-4111-8111-111111111111' });
expect(response.statusCode).toBe(200);
});
});