import EventEmitter from 'events'; import type { IRocketChatRecord } from '@rocket.chat/core-typings'; import type { Logger } from '@rocket.chat/logger'; import { escapeRegExp } from '@rocket.chat/string-helpers'; import type { Timestamp, Db, ChangeStreamDeleteDocument, ChangeStreamInsertDocument, ChangeStreamUpdateDocument } from 'mongodb'; import { MongoClient } from 'mongodb'; import { convertChangeStreamPayload } from './convertChangeStreamPayload'; import { convertOplogPayload } from './convertOplogPayload'; import { getWatchCollections } from './watchCollections'; const instancePing = parseInt(String(process.env.MULTIPLE_INSTANCES_PING_INTERVAL)) || 10000; const maxDocMs = instancePing * 4; // 4 times the ping interval export type RealTimeData<T> = { id: string; action: 'insert' | 'update' | 'remove'; clientAction: 'inserted' | 'updated' | 'removed'; data?: T; diff?: Record<string, any>; unset?: Record<string, number>; oplog?: true; }; const ignoreChangeStream = ['yes', 'true'].includes(String(process.env.IGNORE_CHANGE_STREAM).toLowerCase()); const useMeteorOplog = ['yes', 'true'].includes(String(process.env.USE_NATIVE_OPLOG).toLowerCase()); const useFullDocument = ['yes', 'true'].includes(String(process.env.CHANGESTREAM_FULL_DOCUMENT).toLowerCase()); export class DatabaseWatcher extends EventEmitter { private db: Db; private _oplogHandle?: any; private metrics?: any; private logger: Logger; private resumeRetryCount = 0; /** * Last doc timestamp received from a real time event */ private lastDocTS: Date; private watchCollections: string[]; // eslint-disable-next-line @typescript-eslint/naming-convention constructor({ db, _oplogHandle, metrics, logger: LoggerClass }: { db: Db; _oplogHandle?: any; metrics?: any; logger: typeof Logger }) { super(); this.db = db; this._oplogHandle = _oplogHandle; this.metrics = metrics; this.logger = new LoggerClass('DatabaseWatcher'); } async watch(): Promise<void> { this.watchCollections = getWatchCollections(); if (useMeteorOplog) { // TODO remove this when updating to Meteor 2.8 this.logger.warn( 'Using USE_NATIVE_OPLOG=true is currently discouraged due to known performance issues. Please use IGNORE_CHANGE_STREAM=true instead.', ); this.watchMeteorOplog(); return; } if (ignoreChangeStream) { await this.watchOplog(); return; } try { this.watchChangeStream(); } catch (err: unknown) { await this.watchOplog(); } } private async watchOplog(): Promise<void> { if (!process.env.MONGO_OPLOG_URL) { throw Error('No $MONGO_OPLOG_URL provided'); } const isMasterDoc = await this.db.admin().command({ ismaster: 1 }); if (!isMasterDoc?.setName) { throw Error("$MONGO_URL should be a replica set's URL"); } const dbName = this.db.databaseName; const client = new MongoClient(process.env.MONGO_OPLOG_URL, { maxPoolSize: 1, }); if (client.db().databaseName !== 'local') { throw Error("$MONGO_OPLOG_URL must be set to the 'local' database of a Mongo replica set"); } await client.connect(); this.logger.startup('Using oplog'); const db = client.db(); const oplogCollection = db.collection('oplog.rs'); const lastOplogEntry = await oplogCollection.findOne<{ ts: Timestamp }>({}, { sort: { $natural: -1 }, projection: { _id: 0, ts: 1 } }); const oplogSelector = { ns: new RegExp(`^(?:${[escapeRegExp(`${dbName}.`)].join('|')})`), op: { $in: ['i', 'u', 'd'] }, ...(lastOplogEntry && { ts: { $gt: lastOplogEntry.ts } }), }; const cursor = oplogCollection.find(oplogSelector); cursor.addCursorFlag('tailable', true); cursor.addCursorFlag('awaitData', true); cursor.addCursorFlag('oplogReplay', true); const stream = cursor.stream(); stream.on('data', (doc) => { const doesMatter = this.watchCollections.some((collection) => doc.ns === `${dbName}.${collection}`); if (!doesMatter) { return; } this.emitDoc( doc.ns.slice(dbName.length + 1), convertOplogPayload({ id: doc.op === 'u' ? doc.o2._id : doc.o._id, op: doc, }), ); }); } private watchMeteorOplog(): void { if (!this._oplogHandle) { throw new Error('no-oplog-handle'); } this.logger.startup('Using Meteor oplog'); this.watchCollections.forEach((collection) => { this._oplogHandle.onOplogEntry({ collection }, (event: any) => { this.emitDoc(collection, convertOplogPayload(event)); }); }); } private watchChangeStream(resumeToken?: unknown): void { try { const options = { ...(useFullDocument ? { fullDocument: 'updateLookup' } : {}), ...(resumeToken ? { startAfter: resumeToken } : {}), }; let lastEvent: unknown; const changeStream = this.db.watch< IRocketChatRecord, | ChangeStreamInsertDocument<IRocketChatRecord> | ChangeStreamUpdateDocument<IRocketChatRecord> | ChangeStreamDeleteDocument<IRocketChatRecord> >( [ { $match: { 'operationType': { $in: ['insert', 'update', 'delete'] }, 'ns.coll': { $in: this.watchCollections }, }, }, ], options, ); changeStream.on('change', (event) => { // reset retry counter this.resumeRetryCount = 0; // save last event to resume on error lastEvent = event._id; this.emitDoc(event.ns.coll, convertChangeStreamPayload(event)); }); changeStream.on('error', (err) => { if (this.resumeRetryCount++ < 5) { this.logger.warn({ msg: `Change stream error. Trying resume after ${this.resumeRetryCount} seconds.`, err }); setTimeout(() => { this.watchChangeStream(lastEvent); }, this.resumeRetryCount * 1000); return; } throw err; }); this.logger.startup('Using change streams'); } catch (err: unknown) { this.logger.fatal({ msg: 'Cannot resume change stream.', err }); } } private emitDoc(collection: string, doc: RealTimeData<IRocketChatRecord> | void): void { if (!doc) { return; } this.lastDocTS = new Date(); this.metrics?.oplog.inc({ collection, op: doc.action, }); this.emit(collection, doc); } on<T>(collection: string, callback: (event: RealTimeData<T>) => void): this { return super.on(collection, callback); } /** * @returns the last timestamp delta in miliseconds received from a real time event */ getLastDocDelta(): number { return this.lastDocTS ? Date.now() - this.lastDocTS.getTime() : Infinity; } /** * @returns Indicates if the last document received is older than it should be. If that happens, it means that the oplog is not working properly */ isLastDocDelayed(): boolean { return this.getLastDocDelta() > maxDocMs; } }