Skip to content
Snippets Groups Projects
Unverified Commit 07af6a2f authored by Diego Sampaio's avatar Diego Sampaio Committed by GitHub
Browse files

chore: improve change stream error handler (#29195)

parent 8f95d5d2
No related branches found
No related tags found
No related merge requests found
......@@ -37,6 +37,8 @@ export class DatabaseWatcher extends EventEmitter {
private logger: Logger;
private resumeRetryCount = 0;
/**
* Last doc timestamp received from a real time event
*/
......@@ -148,34 +150,55 @@ export class DatabaseWatcher extends EventEmitter {
});
}
private watchChangeStream(): void {
private watchChangeStream(resumeToken?: unknown): void {
try {
const options = resumeToken ? { startAfter: resumeToken } : {};
let lastEvent: unknown;
const changeStream = this.db.watch<
IRocketChatRecord,
| ChangeStreamInsertDocument<IRocketChatRecord>
| ChangeStreamUpdateDocument<IRocketChatRecord>
| ChangeStreamDeleteDocument<IRocketChatRecord>
>([
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] },
'ns.coll': { $in: watchCollections },
>(
[
{
$match: {
'operationType': { $in: ['insert', 'update', 'delete'] },
'ns.coll': { $in: watchCollections },
},
},
},
]);
],
options,
);
changeStream.on('change', (event) => {
// reset retry counter
this.resumeRetryCount = 0;
// save last event to resume on error
lastEvent = event._id;
this.emitDoc(event.ns.coll, convertChangeStreamPayload(event));
});
changeStream.on('error', (err) => {
if (this.resumeRetryCount++ < 5) {
this.logger.warn({ msg: `Change stream error. Trying resume after ${this.resumeRetryCount} seconds.`, err });
setTimeout(() => {
this.watchChangeStream(lastEvent);
}, this.resumeRetryCount * 1000);
return;
}
throw err;
});
this.logger.startup('Using change streams');
} catch (err: unknown) {
this.logger.error(err, 'Change stream error');
throw err;
this.logger.fatal({ msg: 'Cannot resume change stream.', err });
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment