Skip to content
Snippets Groups Projects
Unverified Commit c95cad24 authored by Marcos Spessatto Defendi's avatar Marcos Spessatto Defendi Committed by GitHub
Browse files

chore: Add the possibility for removing the watcher for the message collection (#30381)

parent f340139d
No related branches found
No related tags found
No related merge requests found
Showing
with 146 additions and 29 deletions
......@@ -41,9 +41,11 @@ export async function deleteUser(userId: string, confirmRelinquish = false, dele
// Users without username can't do anything, so there is nothing to remove
if (user.username != null) {
let userToReplaceWhenUnlinking: IUser | null = null;
const nameAlias = i18n.t('Removed_User');
await relinquishRoomOwnerships(userId, subscribedRooms);
const messageErasureType = settings.get('Message_ErasureType');
const messageErasureType = settings.get<'Delete' | 'Unlink' | 'Keep'>('Message_ErasureType');
switch (messageErasureType) {
case 'Delete':
const store = FileUpload.getStore('Uploads');
......@@ -68,12 +70,11 @@ export async function deleteUser(userId: string, confirmRelinquish = false, dele
break;
case 'Unlink':
const rocketCat = await Users.findOneById('rocket.cat');
const nameAlias = i18n.t('Removed_User');
if (!rocketCat?._id || !rocketCat?.username) {
userToReplaceWhenUnlinking = await Users.findOneById('rocket.cat');
if (!userToReplaceWhenUnlinking?._id || !userToReplaceWhenUnlinking?.username) {
break;
}
await Messages.unlinkUserId(userId, rocketCat?._id, rocketCat?.username, nameAlias);
await Messages.unlinkUserId(userId, userToReplaceWhenUnlinking?._id, userToReplaceWhenUnlinking?.username, nameAlias);
break;
}
......@@ -104,8 +105,16 @@ export async function deleteUser(userId: string, confirmRelinquish = false, dele
await Integrations.disableByUserId(userId); // Disables all the integrations which rely on the user being deleted.
// Don't broadcast user.deleted for Erasure Type of 'Keep' so that messages don't disappear from logged in sessions
if (messageErasureType !== 'Keep') {
void api.broadcast('user.deleted', user);
if (messageErasureType === 'Delete') {
void api.broadcast('user.deleted', user, {
messageErasureType,
});
}
if (messageErasureType === 'Unlink' && userToReplaceWhenUnlinking) {
void api.broadcast('user.deleted', user, {
messageErasureType,
replaceByUser: { _id: userToReplaceWhenUnlinking._id, username: userToReplaceWhenUnlinking?.username, alias: nameAlias },
});
}
}
......
......@@ -4,7 +4,23 @@ import { ChatMessage } from '../../app/models/client';
import { Notifications } from '../../app/notifications/client';
Meteor.startup(() => {
Notifications.onLogged('Users:Deleted', ({ userId }) => {
Notifications.onLogged('Users:Deleted', ({ userId, messageErasureType, replaceByUser }) => {
if (messageErasureType === 'Unlink' && replaceByUser) {
return ChatMessage.update(
{
'u._id': userId,
},
{
$set: {
'alias': replaceByUser.alias,
'u._id': replaceByUser._id,
'u.username': replaceByUser.username,
'u.name': undefined,
},
},
{ multi: true },
);
}
ChatMessage.remove({
'u._id': userId,
});
......
......@@ -99,9 +99,10 @@ export class ListenersModule {
});
});
service.onEvent('user.deleted', ({ _id: userId }) => {
service.onEvent('user.deleted', ({ _id: userId }, data) => {
notifications.notifyLoggedInThisInstance('Users:Deleted', {
userId,
...data,
});
});
......
import type { IMessage, SettingValue, IUser } from '@rocket.chat/core-typings';
import { Messages, Settings, Users } from '@rocket.chat/models';
import mem from 'mem';
const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });
const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);
export const broadcastMessageSentEvent = async ({
id,
data,
broadcastCallback,
}: {
id: IMessage['_id'];
broadcastCallback: (message: IMessage) => Promise<void>;
data?: IMessage;
}): Promise<void> => {
const message = data ?? (await Messages.findOneById(id));
if (!message) {
return;
}
if (message._hidden !== true && message.imported == null) {
const UseRealName = (await getSettingCached('UI_Use_Real_Name')) === true;
if (UseRealName) {
if (message.u?._id) {
const name = await getUserNameCached(message.u._id);
if (name) {
message.u.name = name;
}
}
if (message.mentions?.length) {
for await (const mention of message.mentions) {
const name = await getUserNameCached(mention._id);
if (name) {
mention.name = name;
}
}
}
}
void broadcastCallback(message);
}
};
import type { EventSignatures } from '@rocket.chat/core-services';
import { dbWatchersDisabled } from '@rocket.chat/core-services';
import type {
ISubscription,
IUser,
......@@ -64,17 +65,17 @@ export function isWatcherRunning(): boolean {
return watcherStarted;
}
export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallback): void {
const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });
const getSettingCached = mem(async (setting: string): Promise<SettingValue> => Settings.getValueById(setting), { maxAge: 10000 });
const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);
const getUserNameCached = mem(
async (userId: string): Promise<string | undefined> => {
const user = await Users.findOne<Pick<IUser, 'name'>>(userId, { projection: { name: 1 } });
return user?.name;
},
{ maxAge: 10000 },
);
const messageWatcher = (watcher: DatabaseWatcher, broadcast: BroadcastCallback): void => {
watcher.on<IMessage>(Messages.getCollectionName(), async ({ clientAction, id, data }) => {
switch (clientAction) {
case 'inserted':
......@@ -110,6 +111,13 @@ export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallb
break;
}
});
};
export function initWatchers(watcher: DatabaseWatcher, broadcast: BroadcastCallback): void {
const dbWatchersEnabled = !dbWatchersDisabled;
if (dbWatchersEnabled) {
messageWatcher(watcher, broadcast);
}
watcher.on<ISubscription>(Subscriptions.getCollectionName(), async ({ clientAction, id, data, diff }) => {
switch (clientAction) {
......
......@@ -226,9 +226,16 @@ export interface StreamerEvents {
{
key: 'Users:Deleted';
args: [
{
userId: IUser['_id'];
},
| {
userId: IUser['_id'];
messageErasureType: 'Delete';
replaceByUser?: never;
}
| {
userId: IUser['_id'];
messageErasureType: 'Unlink';
replaceByUser?: { _id: IUser['_id']; username: IUser['username']; alias: string };
},
];
},
{
......
......@@ -3,7 +3,7 @@ import { EventEmitter } from 'events';
import { InstanceStatus } from '@rocket.chat/models';
import { asyncLocalStorage } from '.';
import type { EventSignatures } from './Events';
import type { EventSignatures } from './events/Events';
import type { IBroker, IBrokerNode } from './types/IBroker';
import type { ServiceClass, IServiceClass } from './types/ServiceClass';
......
......@@ -36,7 +36,7 @@ import type {
} from '@rocket.chat/core-typings';
import type { LicenseLimitKind } from '@rocket.chat/license';
import type { AutoUpdateRecord } from './types/IMeteor';
import type { AutoUpdateRecord } from '../types/IMeteor';
type ClientAction = 'inserted' | 'updated' | 'removed' | 'changed';
......@@ -100,7 +100,17 @@ export type EventSignatures = {
'stream'([streamer, eventName, payload]: [string, string, any[]]): void;
'subscription'(data: { action: string; subscription: Partial<ISubscription> }): void;
'user.avatarUpdate'(user: Partial<IUser>): void;
'user.deleted'(user: Pick<IUser, '_id'>): void;
'user.deleted'(
user: Pick<IUser, '_id'>,
data:
| {
messageErasureType: 'Delete';
}
| {
messageErasureType: 'Unlink';
replaceByUser: { _id: IUser['_id']; username: IUser['username']; alias: string };
},
): void;
'user.deleteCustomStatus'(userStatus: IUserStatus): void;
'user.nameChanged'(user: Pick<IUser, '_id' | 'name' | 'username'>): void;
'user.realNameChanged'(user: Partial<IUser>): void;
......@@ -273,4 +283,5 @@ export type EventSignatures = {
'command.updated'(command: string): void;
'command.removed'(command: string): void;
'actions.changed'(): void;
'message.sent'(message: IMessage): void;
};
import type { IMessage } from '@rocket.chat/core-typings';
import type { IServiceClass } from '../types/ServiceClass';
export const dbWatchersDisabled = ['yes', 'true'].includes(String(process.env.DISABLE_DB_WATCHERS).toLowerCase());
export const listenToMessageSentEvent = (service: IServiceClass, action: (message: IMessage) => Promise<void>): void => {
if (dbWatchersDisabled) {
return service.onEvent('message.sent', (message: IMessage) => action(message));
}
return service.onEvent('watch.messages', ({ message }) => action(message));
};
......@@ -50,7 +50,8 @@ import type { IVoipService } from './types/IVoipService';
export { asyncLocalStorage } from './lib/asyncLocalStorage';
export { MeteorError, isMeteorError } from './MeteorError';
export { api } from './api';
export { EventSignatures } from './Events';
export { EventSignatures } from './events/Events';
export { listenToMessageSentEvent, dbWatchersDisabled } from './events/listeners';
export { LocalBroker } from './LocalBroker';
export { IBroker, IBrokerNode, BaseMetricOptions, IServiceMetrics } from './types/IBroker';
......
import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import type { IApiService } from '../types/IApiService';
import type { IBroker, IBrokerNode } from '../types/IBroker';
import type { IServiceClass } from '../types/ServiceClass';
......
import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import type { IBroker, IBrokerNode } from './IBroker';
import type { IServiceClass } from './ServiceClass';
......
import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import type { IServiceClass } from './ServiceClass';
export interface IBrokerNode {
......
import { EventEmitter } from 'events';
import type { EventSignatures } from '../Events';
import type { EventSignatures } from '../events/Events';
import { asyncLocalStorage } from '../lib/asyncLocalStorage';
import type { IApiService } from './IApiService';
import type { IBroker, IBrokerNode } from './IBroker';
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment