Skip to content
Snippets Groups Projects
Unverified Commit 43cc75b0 authored by Ricardo Garim's avatar Ricardo Garim Committed by GitHub
Browse files

chore: add dependency control on local and network brokers (#34985)

parent 9a0a8ad4
Branches lingohub_language_rocketchat_update_2024-09-02Z
No related tags found
No related merge requests found
......@@ -22,6 +22,8 @@ export class NetworkBroker implements IBroker {
private started: Promise<boolean> = Promise.resolve(false);
private defaultDependencies = ['settings', 'license'];
metrics: IServiceMetrics;
constructor(broker: ServiceBroker) {
......@@ -64,7 +66,7 @@ export class NetworkBroker implements IBroker {
instance.removeAllListeners();
}
createService(instance: IServiceClass, serviceDependencies?: string[]): void {
createService(instance: IServiceClass, serviceDependencies: string[] = []): void {
const methods = (
instance.constructor?.name === 'Object'
? Object.getOwnPropertyNames(instance)
......@@ -83,13 +85,15 @@ export class NetworkBroker implements IBroker {
return;
}
// Allow services to depend on other services too
const dependencies = name !== 'license' ? { dependencies: ['license', ...(serviceDependencies || [])] } : {};
const dependencies = [...serviceDependencies, ...(name === 'settings' ? [] : this.defaultDependencies)].filter(
(dependency) => dependency !== name,
);
const service: ServiceSchema = {
name,
actions: {},
mixins: !instance.isInternal() ? [EnterpriseCheck] : [],
...dependencies,
...(dependencies.length ? { dependencies } : {}),
events: instanceEvents.reduce<Record<string, (ctx: Context) => void>>((map, { eventName }) => {
map[eventName] = /^\$/.test(eventName)
? (ctx: Context): void => {
......
import { EventEmitter } from 'events';
import { Logger } from '@rocket.chat/logger';
import { InstanceStatus } from '@rocket.chat/models';
import { injectCurrentContext, tracerActiveSpan } from '@rocket.chat/tracing';
......@@ -8,6 +9,13 @@ import type { EventSignatures } from './events/Events';
import type { IBroker, IBrokerNode } from './types/IBroker';
import type { ServiceClass, IServiceClass } from './types/ServiceClass';
type ExtendedServiceClass = { instance: IServiceClass; dependencies: string[]; isStarted: boolean };
const logger = new Logger('LocalBroker');
const INTERVAL = 1000;
const TIMEOUT = INTERVAL * 10;
export class LocalBroker implements IBroker {
private started = false;
......@@ -15,7 +23,11 @@ export class LocalBroker implements IBroker {
private events = new EventEmitter();
private services = new Set<IServiceClass>();
private services = new Map<string, ExtendedServiceClass>();
private pendingServices: Set<string> = new Set();
private defaultDependencies = ['settings'];
async call(method: string, data: any): Promise<any> {
return tracerActiveSpan(
......@@ -45,6 +57,7 @@ export class LocalBroker implements IBroker {
instance.constructor?.name === 'Object'
? Object.getOwnPropertyNames(instance)
: Object.getOwnPropertyNames(Object.getPrototypeOf(instance));
for (const method of methods) {
if (method === 'constructor') {
continue;
......@@ -56,10 +69,24 @@ export class LocalBroker implements IBroker {
await instance.stopped();
}
createService(instance: IServiceClass): void {
const namespace = instance.getName();
/**
* Creates a service and adds it to the local broker. In case of the broker is already started, it will start the service automatically.
*/
createService(instance: IServiceClass, serviceDependencies: string[] = []): void {
const serviceName = instance.getName();
if (!serviceName || serviceName === '') {
throw new Error('Service name cannot be empty');
}
if (this.services.has(serviceName)) {
throw new Error(`Service ${serviceName} already exists`);
}
this.services.add(instance);
// TODO: find a better way to handle default dependencies and avoid loops
const dependencies = [...serviceDependencies, ...(serviceName === 'settings' ? [] : this.defaultDependencies)].filter(
(dependency) => dependency !== serviceName,
);
instance.created();
......@@ -69,16 +96,20 @@ export class LocalBroker implements IBroker {
instance.constructor?.name === 'Object'
? Object.getOwnPropertyNames(instance)
: Object.getOwnPropertyNames(Object.getPrototypeOf(instance));
for (const method of methods) {
if (method === 'constructor') {
continue;
}
const i = instance as any;
this.methods.set(`${namespace}.${method}`, i[method].bind(i));
this.methods.set(`${serviceName}.${method}`, i[method].bind(i));
}
this.services.set(serviceName, { instance, dependencies, isStarted: false });
this.registerPendingServices(Array.from(new Set([serviceName, ...dependencies])));
if (this.started) {
void instance.started();
void this.start();
}
}
......@@ -111,8 +142,82 @@ export class LocalBroker implements IBroker {
return instances.map(({ _id }) => ({ id: _id, available: true }));
}
/**
* Registers services to be started. We're assuming that each service will only have one level of dependencies.
*/
private registerPendingServices(services: string[] = []): void {
for (const service of services) {
this.pendingServices.add(service);
}
}
/**
* Removes a service from the pending services set.
*/
private removePendingService(service: string): void {
this.pendingServices.delete(service);
}
private async startService(service: ExtendedServiceClass): Promise<void> {
const serviceName = service.instance.getName();
if (typeof service === 'string') {
logger.debug(`Service ${serviceName} is not in the services map. Bringing it back to queue`);
return;
}
if (service?.isStarted) {
logger.debug(`Service ${serviceName} already started`);
return;
}
const pendingDependencies = service.dependencies.filter((e) => !this.services.has(e) || !this.services.get(e)?.isStarted);
if (pendingDependencies.length > 0) {
logger.debug(
`Service ${serviceName} has dependencies that are not started yet, bringing it back to queue: ${pendingDependencies.join(', ')}`,
);
return;
}
await service.instance.started();
this.services.set(serviceName, { ...service, isStarted: true });
this.removePendingService(serviceName);
logger.debug(`Service ${serviceName} successfully started`);
}
async start(): Promise<void> {
await Promise.all([...this.services].map((service) => service.started()));
this.started = true;
const startTime = Date.now();
return new Promise((resolve, reject) => {
const intervalId = setInterval(async () => {
const elapsed = Date.now() - startTime;
if (this.pendingServices.size === 0) {
const availableServices = Array.from(this.services.values()).filter((service) => service.isStarted);
logger.info(`All ${availableServices.length} services available`);
clearInterval(intervalId);
return resolve();
}
if (elapsed > TIMEOUT) {
clearInterval(intervalId);
const pendingServices = Array.from(this.pendingServices).join(', ');
const error = new Error(`Timeout while waiting for LocalBroker services: ${pendingServices}`);
logger.error(error);
return reject(error);
}
for await (const service of Array.from(this.pendingServices)) {
const serviceInstance = this.services.get(service);
if (serviceInstance) {
await this.startService(serviceInstance);
}
}
logger.debug(`Waiting for ${this.pendingServices.size} pending services`);
}, INTERVAL);
});
}
}
......@@ -26,7 +26,7 @@ export interface IServiceContext {
}
export interface IServiceClass {
getName(): string | undefined;
getName(): string;
onNodeConnected?({ node, reconnected }: { node: IBrokerNode; reconnected: boolean }): void;
onNodeUpdated?({ node }: { node: IBrokerNode }): void;
onNodeDisconnected?({ node, unexpected }: { node: IBrokerNode; unexpected: boolean }): Promise<void>;
......@@ -46,7 +46,7 @@ export interface IServiceClass {
}
export abstract class ServiceClass implements IServiceClass {
protected name?: string;
protected abstract name: string;
protected events = new EventEmitter();
......@@ -73,7 +73,7 @@ export abstract class ServiceClass implements IServiceClass {
this.events.removeAllListeners();
}
getName(): string | undefined {
getName(): string {
return this.name;
}
......
......@@ -6,6 +6,8 @@ describe('LocalBroker', () => {
it('should call all the expected lifecycle hooks when creating a service', () => {
const createdStub = jest.fn();
const instance = new (class extends ServiceClass {
name = 'test';
async created() {
createdStub();
}
......@@ -23,6 +25,8 @@ describe('LocalBroker', () => {
const removeAllListenersStub = jest.fn();
const stoppedStub = jest.fn();
const instance = new (class extends ServiceClass {
name = 'test';
removeAllListeners() {
removeAllListenersStub();
}
......@@ -43,7 +47,9 @@ describe('LocalBroker', () => {
describe('#broadcast()', () => {
it('should call all the ServiceClass instance registered events', () => {
const instance = new (class extends ServiceClass {})();
const instance = new (class extends ServiceClass {
name = 'test';
})();
const testListener = jest.fn();
const testListener2 = jest.fn();
const test2Listener = jest.fn();
......@@ -62,7 +68,9 @@ describe('LocalBroker', () => {
});
it('should NOT call any instance event anymore after the service being destroyed', () => {
const instance = new (class extends ServiceClass {})();
const instance = new (class extends ServiceClass {
name = 'test';
})();
const testListener = jest.fn();
const test2Listener = jest.fn();
instance.onEvent('test' as any, testListener);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment