Skip to content
Snippets Groups Projects
Unverified Commit 4aa95b61 authored by Kevin Aleman's avatar Kevin Aleman Committed by GitHub
Browse files

fix: Start queue worker with service start instead of first call (#34903)

parent c7be829f
No related branches found
No related tags found
No related merge requests found
---
"@rocket.chat/omnichannel-services": patch
---
Fixes a behavior when running microservices that caused queue worker to process just the first 60 seconds of request.
This was due to a mistakenly bound context. Queue Worker was changed to start doing work only after it received the first request.
However, with the introduction of ASL and actual context on calls, the worker registration was absorbing the context of the call that created them, causing service calls happening inside the callbacks to fail because of a timeout.
...@@ -11,14 +11,12 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ...@@ -11,14 +11,12 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
protected retryCount = 5; protected retryCount = 5;
// Default delay is 5 seconds // Default delay is 5 seconds
protected retryDelay = 5000; protected retryDelay = Number(process.env.RETRY_DELAY) || 5000;
protected queue: MessageQueue; protected queue: MessageQueue;
private logger: Logger; private logger: Logger;
private queueStarted = false;
constructor( constructor(
private readonly db: Db, private readonly db: Db,
loggerClass: typeof Logger, loggerClass: typeof Logger,
...@@ -28,7 +26,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ...@@ -28,7 +26,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
// eslint-disable-next-line new-cap // eslint-disable-next-line new-cap
this.logger = new loggerClass('QueueWorker'); this.logger = new loggerClass('QueueWorker');
this.queue = new MessageQueue(); this.queue = new MessageQueue();
this.queue.pollingInterval = 5000; this.queue.pollingInterval = Number(process.env.POLLING_INTERVAL) || 5000;
} }
isServiceNotFoundMessage(message: string): boolean { isServiceNotFoundMessage(message: string): boolean {
...@@ -46,6 +44,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ...@@ -46,6 +44,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
try { try {
await this.createIndexes(); await this.createIndexes();
this.registerWorkers();
} catch (e) { } catch (e) {
this.logger.fatal(e, 'Fatal error occurred when registering workers'); this.logger.fatal(e, 'Fatal error occurred when registering workers');
process.exit(1); process.exit(1);
...@@ -55,7 +54,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ...@@ -55,7 +54,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
async createIndexes(): Promise<void> { async createIndexes(): Promise<void> {
this.logger.info('Creating indexes for queue worker'); this.logger.info('Creating indexes for queue worker');
// Library doesnt create indexes by itself, for some reason // Library doesn't create indexes by itself, for some reason
// This should create the indexes we need and improve queue perf on reading // This should create the indexes we need and improve queue perf on reading
await this.db.collection(this.queue.collectionName).createIndex({ type: 1 }); await this.db.collection(this.queue.collectionName).createIndex({ type: 1 });
await this.db.collection(this.queue.collectionName).createIndex({ rejectedTime: 1 }, { sparse: true }); await this.db.collection(this.queue.collectionName).createIndex({ rejectedTime: 1 }, { sparse: true });
...@@ -105,8 +104,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ...@@ -105,8 +104,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
this.logger.info('Registering workers of type "workComplete"'); this.logger.info('Registering workers of type "workComplete"');
this.queue.registerWorker('workComplete', this.workerCallback.bind(this)); this.queue.registerWorker('workComplete', this.workerCallback.bind(this));
this.queueStarted = true;
} }
private matchServiceCall(service: string): boolean { private matchServiceCall(service: string): boolean {
...@@ -123,10 +120,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ...@@ -123,10 +120,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
// This is a "generic" job that allows you to call any service // This is a "generic" job that allows you to call any service
async queueWork<T extends Record<string, unknown>>(queue: Actions, to: string, data: T): Promise<void> { async queueWork<T extends Record<string, unknown>>(queue: Actions, to: string, data: T): Promise<void> {
this.logger.info(`Queueing work for ${to}`); this.logger.info(`Queueing work for ${to}`);
if (!this.queueStarted) {
this.registerWorkers();
}
if (!this.matchServiceCall(to)) { if (!this.matchServiceCall(to)) {
// We don't want to queue calls to invalid service names // We don't want to queue calls to invalid service names
throw new Error(`Invalid service name ${to}`); throw new Error(`Invalid service name ${to}`);
...@@ -150,8 +143,4 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ...@@ -150,8 +143,4 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService {
]) ])
.toArray(); .toArray();
} }
async isQueueStarted(): Promise<boolean> {
return this.queueStarted;
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment