diff --git a/.vscode/settings.json b/.vscode/settings.json index 04c7bdc33b1a207213b3f37700d5beff64caa89b..b6244e79e3df17c3b35bc56b128725093ddbd140 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -14,5 +14,15 @@ } ], "typescript.tsdk": "./node_modules/typescript/lib", - "cSpell.words": ["katex", "listbox", "livechat", "omnichannel", "photoswipe", "searchbox", "tmid"] + "cSpell.words": [ + "autotranslate", + "katex", + "listbox", + "livechat", + "oauthapps", + "omnichannel", + "photoswipe", + "searchbox", + "tmid" + ] } diff --git a/apps/meteor/app/api/server/v1/instances.ts b/apps/meteor/app/api/server/v1/instances.ts index 204f1ceb9440eaf637f0da7167de6223114422f2..e2b3b352580c09da623f3940c9ff9f6f45afa43a 100644 --- a/apps/meteor/app/api/server/v1/instances.ts +++ b/apps/meteor/app/api/server/v1/instances.ts @@ -1,9 +1,17 @@ -import type { IInstanceStatus } from '@rocket.chat/core-typings'; import { InstanceStatus } from '@rocket.chat/models'; -import { getInstanceConnection } from '../../../../server/stream/streamBroadcast'; +import { Instance as InstanceService } from '../../../../ee/server/sdk'; import { hasPermission } from '../../../authorization/server'; import { API } from '../api'; +import { isRunningMs } from '../../../../server/lib/isRunningMs'; + +const getMatrixInstances = (() => { + if (isRunningMs()) { + return () => []; + } + + return () => InstanceService.getInstances(); +})(); API.v1.addRoute( 'instances.get', @@ -14,20 +22,27 @@ API.v1.addRoute( return API.v1.unauthorized(); } - const instances = await InstanceStatus.find().toArray(); + const instanceRecords = await InstanceStatus.find().toArray(); + + const connections = await getMatrixInstances(); + + const result = instanceRecords.map((instanceRecord) => { + const connection = connections.find((c) => c.id === instanceRecord._id); + + return { + address: connection?.ipList[0], + currentStatus: { + connected: connection?.available || false, + lastHeartbeatTime: connection?.lastHeartbeatTime, + local: connection?.local, + }, + instanceRecord, + broadcastAuth: true, + }; + }); return API.v1.success({ - instances: instances.map((instance: IInstanceStatus) => { - const connection = getInstanceConnection(instance); - - if (connection) { - delete connection.instanceRecord; - } - return { - ...instance, - connection, - }; - }), + instances: result, }); }, }, diff --git a/apps/meteor/client/views/admin/info/DeploymentCard.tsx b/apps/meteor/client/views/admin/info/DeploymentCard.tsx index 2181e6183f9dc46371015ac4638b048a5807adf3..1bf5accb489fd90fe8865238833e817baca47813 100644 --- a/apps/meteor/client/views/admin/info/DeploymentCard.tsx +++ b/apps/meteor/client/views/admin/info/DeploymentCard.tsx @@ -1,6 +1,7 @@ -import type { IInstanceStatus, IServerInfo, IStats } from '@rocket.chat/core-typings'; +import type { IServerInfo, IStats, Serialized } from '@rocket.chat/core-typings'; import { ButtonGroup, Button } from '@rocket.chat/fuselage'; import { useMutableCallback } from '@rocket.chat/fuselage-hooks'; +import type { IInstance } from '@rocket.chat/rest-typings'; import { Card } from '@rocket.chat/ui-client'; import { useSetModal, useTranslation } from '@rocket.chat/ui-contexts'; import type { ReactElement } from 'react'; @@ -11,7 +12,7 @@ import InstancesModal from './InstancesModal'; type DeploymentCardProps = { info: IServerInfo; - instances: Array<IInstanceStatus>; + instances: Serialized<IInstance[]>; statistics: IStats; }; diff --git a/apps/meteor/client/views/admin/info/InformationPage.tsx b/apps/meteor/client/views/admin/info/InformationPage.tsx index c25103706fba7551a6b41a17285a964a121c5a30..d192086844550315dcd3f34999c13f9f49f98776 100644 --- a/apps/meteor/client/views/admin/info/InformationPage.tsx +++ b/apps/meteor/client/views/admin/info/InformationPage.tsx @@ -1,5 +1,6 @@ -import type { IInstanceStatus, IServerInfo, IStats } from '@rocket.chat/core-typings'; +import type { IServerInfo, IStats, Serialized } from '@rocket.chat/core-typings'; import { Box, Button, ButtonGroup, Callout, Grid, Icon } from '@rocket.chat/fuselage'; +import type { IInstance } from '@rocket.chat/rest-typings'; import { useTranslation } from '@rocket.chat/ui-contexts'; import React, { memo } from 'react'; @@ -15,7 +16,7 @@ type InformationPageProps = { canViewStatistics: boolean; info: IServerInfo; statistics: IStats; - instances: Array<IInstanceStatus>; + instances: Serialized<IInstance[]>; onClickRefreshButton: () => void; onClickDownloadInfo: () => void; }; diff --git a/apps/meteor/client/views/admin/info/InformationRoute.tsx b/apps/meteor/client/views/admin/info/InformationRoute.tsx index cdc31cfde83a1abf8621223aa4dcb0e97daa9d90..f2d38cc39f56a83ecde6b0686835065a025201ec 100644 --- a/apps/meteor/client/views/admin/info/InformationRoute.tsx +++ b/apps/meteor/client/views/admin/info/InformationRoute.tsx @@ -1,6 +1,7 @@ -import type { IInstanceStatus, IStats } from '@rocket.chat/core-typings'; +import type { IStats, Serialized } from '@rocket.chat/core-typings'; import { Callout, ButtonGroup, Button, Icon } from '@rocket.chat/fuselage'; -import { usePermission, useServerInformation, useEndpoint, useTranslation, useMethod } from '@rocket.chat/ui-contexts'; +import type { IInstance } from '@rocket.chat/rest-typings'; +import { usePermission, useServerInformation, useEndpoint, useTranslation } from '@rocket.chat/ui-contexts'; import type { ReactElement } from 'react'; import React, { useState, useEffect, memo } from 'react'; @@ -19,10 +20,10 @@ const InformationRoute = (): ReactElement => { const [isLoading, setLoading] = useState(true); const [error, setError] = useState(false); const [statistics, setStatistics] = useState<IStats>(); - const [instances, setInstances] = useState<IInstanceStatus[]>([]); + const [instances, setInstances] = useState<Serialized<IInstance[]>>([]); const [fetchStatistics, setFetchStatistics] = useState<fetchStatisticsCallback>(() => (): void => undefined); const getStatistics = useEndpoint('GET', '/v1/statistics'); - const getInstances = useMethod('instances/get'); + const getInstances = useEndpoint('GET', '/v1/instances.get'); useEffect(() => { let didCancel = false; @@ -32,13 +33,13 @@ const InformationRoute = (): ReactElement => { setError(false); try { - const [statistics, instances] = await Promise.all([getStatistics({ refresh: refresh ? 'true' : 'false' }), getInstances()]); + const [statistics, instancesData] = await Promise.all([getStatistics({ refresh: refresh ? 'true' : 'false' }), getInstances()]); if (didCancel) { return; } setStatistics(statistics); - setInstances(instances as IInstanceStatus[]); + setInstances(instancesData.instances); } catch (error) { setError(!!error); } finally { diff --git a/apps/meteor/client/views/admin/info/InstancesModal.js b/apps/meteor/client/views/admin/info/InstancesModal.js index b8c378a27108c01262ba947dffc69c7877f1c7ca..0f484bca5c8dfe2eaf3e7021adcb8da78f63b4e0 100644 --- a/apps/meteor/client/views/admin/info/InstancesModal.js +++ b/apps/meteor/client/views/admin/info/InstancesModal.js @@ -35,20 +35,20 @@ const InstancesModal = ({ instances = [], onClose }) => { <DescriptionList.Entry label={ <> - {t('Current_Status')} > {t('Retry_Count')} + {t('Current_Status')} > {t('Local')} </> } > - {currentStatus.retryCount} + {currentStatus.local ? 'true' : 'false'} </DescriptionList.Entry> <DescriptionList.Entry label={ <> - {t('Current_Status')} > {t('Status')} + {t('Current_Status')} > {t('Last_Heartbeat_Time')} </> } > - {currentStatus.status} + {currentStatus.lastHeartbeatTime} </DescriptionList.Entry> <DescriptionList.Entry label={ diff --git a/apps/meteor/ee/server/local-services/instance/service.ts b/apps/meteor/ee/server/local-services/instance/service.ts new file mode 100644 index 0000000000000000000000000000000000000000..425ff0116c54b21e7b17f3b44a10f1cd667a84b4 --- /dev/null +++ b/apps/meteor/ee/server/local-services/instance/service.ts @@ -0,0 +1,173 @@ +import os from 'os'; + +import type { BrokerNode } from 'moleculer'; +import { ServiceBroker } from 'moleculer'; +import { License, ServiceClassInternal } from '@rocket.chat/core-services'; +import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models'; +import { InstanceStatus } from '@rocket.chat/instance-status'; + +import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module'; +import type { IInstanceService } from '../../sdk/types/IInstanceService'; + +export class InstanceService extends ServiceClassInternal implements IInstanceService { + protected name = 'instance'; + + private broadcastStarted = false; + + private broker: ServiceBroker; + + private troubleshootDisableInstanceBroadcast = false; + + constructor() { + super(); + + this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => { + if (clientAction === 'removed') { + return; + } + + if (clientAction === 'inserted' && data?.extraInformation?.port) { + this.connectNode(data); + } + }); + + this.onEvent('license.module', ({ module, valid }) => { + if (module === 'scalability' && valid) { + this.startBroadcast(); + } + }); + + this.onEvent('watch.settings', async ({ clientAction, setting }): Promise<void> => { + if (clientAction === 'removed') { + return; + } + + const { _id, value } = setting; + if (_id !== 'Troubleshoot_Disable_Instance_Broadcast') { + return; + } + + if (typeof value !== 'boolean') { + return; + } + + if (this.troubleshootDisableInstanceBroadcast === value) { + return; + } + + this.troubleshootDisableInstanceBroadcast = value; + }); + } + + async created() { + const port = process.env.TCP_PORT ? String(process.env.TCP_PORT).trim() : 0; + + this.broker = new ServiceBroker({ + nodeID: InstanceStatus.id(), + transporter: { + type: 'TCP', + options: { + port, + udpDiscovery: false, + }, + }, + }); + + this.broker.createService({ + name: 'matrix', + events: { + broadcast(ctx: any) { + const { eventName, streamName, args } = ctx.params; + + const instance = StreamerCentral.instances[streamName]; + if (!instance) { + return 'stream-not-exists'; + } + + if (instance.serverOnly) { + instance.__emit(eventName, ...args); + } else { + // @ts-expect-error not sure why it thinks _emit needs an extra argument + StreamerCentral.instances[streamName]._emit(eventName, args); + } + }, + }, + }); + } + + async started() { + await this.broker.start(); + + const instance = { + host: process.env.INSTANCE_IP ? String(process.env.INSTANCE_IP).trim() : 'localhost', + port: String(process.env.PORT).trim(), + tcpPort: (this.broker.transit?.tx as any)?.nodes?.localNode?.port, + os: { + type: os.type(), + platform: os.platform(), + arch: os.arch(), + release: os.release(), + uptime: os.uptime(), + loadavg: os.loadavg(), + totalmem: os.totalmem(), + freemem: os.freemem(), + cpus: os.cpus().length, + }, + nodeVersion: process.version, + }; + + InstanceStatus.registerInstance('rocket.chat', instance); + + const hasLicense = await License.hasLicense('scalability'); + if (!hasLicense) { + return; + } + + this.startBroadcast(); + } + + private startBroadcast() { + if (this.broadcastStarted) { + return; + } + + this.broadcastStarted = true; + + StreamerCentral.on('broadcast', this.sendBroadcast.bind(this)); + + InstanceStatusRaw.find( + { + 'extraInformation.tcpPort': { + $exists: true, + }, + }, + { + sort: { + _createdAt: -1, + }, + }, + ).forEach(this.connectNode.bind(this)); + } + + private connectNode(record: any) { + if (record._id === InstanceStatus.id()) { + return; + } + + const { host, tcpPort } = record.extraInformation; + + (this.broker?.transit?.tx as any).addOfflineNode(record._id, host, tcpPort); + } + + private sendBroadcast(streamName: string, eventName: string, args: unknown[]) { + if (this.troubleshootDisableInstanceBroadcast) { + return; + } + + this.broker.broadcast('broadcast', { streamName, eventName, args }); + } + + async getInstances(): Promise<BrokerNode[]> { + return this.broker.call('$node.list', { onlyAvailable: true }); + } +} diff --git a/apps/meteor/ee/server/sdk/index.ts b/apps/meteor/ee/server/sdk/index.ts index 0a463a498843c7ec584ae785ac09a893cd518734..795af715544a96c107a9d1bd8d2b33a7014b8c0b 100644 --- a/apps/meteor/ee/server/sdk/index.ts +++ b/apps/meteor/ee/server/sdk/index.ts @@ -1,5 +1,7 @@ import { proxifyWithWait } from '@rocket.chat/core-services'; import type { ILDAPEEService } from './types/ILDAPEEService'; +import type { IInstanceService } from './types/IInstanceService'; export const LDAPEE = proxifyWithWait<ILDAPEEService>('ldap-enterprise'); +export const Instance = proxifyWithWait<IInstanceService>('instance'); diff --git a/apps/meteor/ee/server/sdk/types/IInstanceService.ts b/apps/meteor/ee/server/sdk/types/IInstanceService.ts new file mode 100644 index 0000000000000000000000000000000000000000..b5c54349dfa16dc11079e3bd2184856c654fac4b --- /dev/null +++ b/apps/meteor/ee/server/sdk/types/IInstanceService.ts @@ -0,0 +1,5 @@ +import type { BrokerNode } from 'moleculer'; + +export interface IInstanceService { + getInstances(): Promise<BrokerNode[]>; +} diff --git a/apps/meteor/ee/server/startup/services.ts b/apps/meteor/ee/server/startup/services.ts index 37ecf9a21a91ce19b8c993ccc75b7544098b124d..bfaca103055b03a8e9cfb7e7979a9380d86f3800 100644 --- a/apps/meteor/ee/server/startup/services.ts +++ b/apps/meteor/ee/server/startup/services.ts @@ -2,9 +2,16 @@ import { api } from '@rocket.chat/core-services'; import { EnterpriseSettings } from '../../app/settings/server/settings.internalService'; import { LDAPEEService } from '../local-services/ldap/service'; +import { InstanceService } from '../local-services/instance/service'; import { LicenseService } from '../../app/license/server/license.internalService'; +import { isRunningMs } from '../../../server/lib/isRunningMs'; // TODO consider registering these services only after a valid license is added api.registerService(new EnterpriseSettings()); api.registerService(new LDAPEEService()); api.registerService(new LicenseService()); + +// when not running micro services we want to start up the instance intercom +if (!isRunningMs()) { + api.registerService(new InstanceService()); +} diff --git a/apps/meteor/packages/rocketchat-i18n/i18n/en.i18n.json b/apps/meteor/packages/rocketchat-i18n/i18n/en.i18n.json index ab6978d3a53bf235cf0bb9c481e7000c5accf967..0ac2ab3b99e98a31f09a188a13802542b9318055 100644 --- a/apps/meteor/packages/rocketchat-i18n/i18n/en.i18n.json +++ b/apps/meteor/packages/rocketchat-i18n/i18n/en.i18n.json @@ -2693,6 +2693,7 @@ "Last_active": "Last active", "Last_Call": "Last Call", "Last_Chat": "Last Chat", + "Last_Heartbeat_Time": "Last Heartbeat Time", "Last_login": "Last login", "Last_Message": "Last Message", "Last_Message_At": "Last Message At", @@ -2999,6 +3000,7 @@ "Loading_more_from_history": "Loading more from history", "Loading_suggestion": "Loading suggestions", "Loading...": "Loading...", + "Local": "Local", "Local_Domains": "Local Domains", "Local_Password": "Local Password", "Local_Time": "Local Time", @@ -4034,7 +4036,6 @@ "RetentionPolicyRoom_OverrideGlobal": "Override global retention policy", "RetentionPolicyRoom_ReadTheDocs": "Watch out! Tweaking these settings without utmost care can destroy all message history. Please read the documentation before turning the feature on <a href='https://rocket.chat/docs/administrator-guides/retention-policies/'>here</a>.", "Retry": "Retry", - "Retry_Count": "Retry Count", "Return_to_home": "Return to home", "Return_to_previous_page": "Return to previous page", "Return_to_the_queue": "Return back to the Queue", diff --git a/apps/meteor/server/main.ts b/apps/meteor/server/main.ts index 806329ae35735ce47ccbe3c97ce05b1cec06a19e..b0a55772071cd4e25d4c51be984458d7a15e5032 100644 --- a/apps/meteor/server/main.ts +++ b/apps/meteor/server/main.ts @@ -79,7 +79,6 @@ import './routes/i18n'; import './routes/timesync'; import './routes/userDataDownload'; import './stream/stdout'; -import './stream/streamBroadcast'; import './settings/index'; import './features/EmailInbox/index'; diff --git a/apps/meteor/server/services/meteor/service.ts b/apps/meteor/server/services/meteor/service.ts index cbc8bb1f3da4c31a5c3bd89f8571b8a1989b1d7d..6e28e1c41f28ab658d2f815434787cfbd1278c6a 100644 --- a/apps/meteor/server/services/meteor/service.ts +++ b/apps/meteor/server/services/meteor/service.ts @@ -11,7 +11,6 @@ import { Livechat } from '../../../app/livechat/server'; import { settings } from '../../../app/settings/server'; import { setValue, updateValue } from '../../../app/settings/server/raw'; import { onlineAgents, monitorAgents } from '../../../app/livechat/server/lib/stream/agentStatus'; -import { matrixBroadCastActions } from '../../stream/streamBroadcast'; import { triggerHandler } from '../../../app/integrations/server/lib/triggerHandler'; import { ListenersModule } from '../../modules/listeners/listeners.module'; import notifications from '../../../app/notifications/server/lib/Notifications'; @@ -143,17 +142,6 @@ export class MeteorService extends ServiceClassInternal implements IMeteor { setValue(setting._id, undefined); }); - this.onEvent('watch.instanceStatus', async ({ clientAction, id, data }): Promise<void> => { - if (clientAction === 'removed') { - matrixBroadCastActions?.removed?.(id); - return; - } - - if (clientAction === 'inserted' && data?.extraInformation?.port) { - matrixBroadCastActions?.added?.(data); - } - }); - if (disableOplog) { this.onEvent('watch.loginServiceConfiguration', ({ clientAction, id, data }) => { if (clientAction === 'removed') { diff --git a/apps/meteor/server/startup/index.ts b/apps/meteor/server/startup/index.ts index 0393d2701445ff9ba99c9d1d5401c6f42761c39f..17f53ae2d63d4aa73f7fdc7da30d484bb1e2d930 100644 --- a/apps/meteor/server/startup/index.ts +++ b/apps/meteor/server/startup/index.ts @@ -3,7 +3,6 @@ import './appcache'; import './callbacks'; import './cron'; import './initialData'; -import './instance'; import './serverRunning'; import './coreApps'; import './presenceTroubleshoot'; diff --git a/apps/meteor/server/startup/instance.ts b/apps/meteor/server/startup/instance.ts deleted file mode 100644 index 2254896894bb808cd618dc0182578e8ebe649007..0000000000000000000000000000000000000000 --- a/apps/meteor/server/startup/instance.ts +++ /dev/null @@ -1,29 +0,0 @@ -import os from 'os'; - -import { Meteor } from 'meteor/meteor'; -import { InstanceStatus } from '@rocket.chat/instance-status'; - -import { startStreamBroadcast } from '../stream/streamBroadcast'; - -Meteor.startup(function () { - const instance = { - host: process.env.INSTANCE_IP ? String(process.env.INSTANCE_IP).trim() : 'localhost', - port: String(process.env.PORT).trim(), - os: { - type: os.type(), - platform: os.platform(), - arch: os.arch(), - release: os.release(), - uptime: os.uptime(), - loadavg: os.loadavg(), - totalmem: os.totalmem(), - freemem: os.freemem(), - cpus: os.cpus().length, - }, - nodeVersion: process.version, - }; - - InstanceStatus.registerInstance('rocket.chat', instance); - - return startStreamBroadcast(); -}); diff --git a/apps/meteor/server/stream/streamBroadcast.js b/apps/meteor/server/stream/streamBroadcast.js deleted file mode 100644 index 060fbf30bd95d08f670d8e09f079b055548ab44c..0000000000000000000000000000000000000000 --- a/apps/meteor/server/stream/streamBroadcast.js +++ /dev/null @@ -1,364 +0,0 @@ -import debounce from 'lodash.debounce'; -import { Meteor } from 'meteor/meteor'; -import { InstanceStatus } from '@rocket.chat/instance-status'; -import { check } from 'meteor/check'; -import { DDP } from 'meteor/ddp'; -import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models'; -import { TAPi18n } from 'meteor/rocketchat:tap-i18n'; - -import { Logger } from '../lib/logger/Logger'; -import { hasPermission } from '../../app/authorization/server'; -import { settings } from '../../app/settings/server'; -import { isDocker, getURL } from '../../app/utils/server'; -import { StreamerCentral } from '../modules/streamer/streamer.module'; -import { isEnterprise } from '../../ee/app/license/server/license'; - -process.env.PORT = String(process.env.PORT).trim(); -process.env.INSTANCE_IP = String(process.env.INSTANCE_IP).trim(); - -const connections = {}; -this.connections = connections; - -const logger = new Logger('StreamBroadcast'); - -export const connLogger = logger.section('Connection'); -export const authLogger = logger.section('Auth'); -export const streamLogger = logger.section('Stream'); - -// show warning debounced, giving an extra time for a license to be fetched -const showMonolithWarning = debounce(function () { - if (!isEnterprise()) { - logger.warn(TAPi18n.__('Multiple_monolith_instances_alert')); - } -}, 10000); - -function _authorizeConnection(instance) { - authLogger.info(`Authorizing with ${instance}`); - - return connections[instance].call('broadcastAuth', InstanceStatus.id(), connections[instance].instanceId, function (err, ok) { - if (err != null) { - return authLogger.error({ - msg: `broadcastAuth error ${instance} ${InstanceStatus.id()} ${connections[instance].instanceId}`, - err, - }); - } - - connections[instance].broadcastAuth = ok; - return authLogger.info({ msg: `broadcastAuth with ${instance}`, ok }); - }); -} - -function authorizeConnection(instance) { - const record = Promise.await(InstanceStatusRaw.findOneById(InstanceStatus.id(), { projection: { _id: 1 } })); - if (!record) { - return Meteor.setTimeout(function () { - return authorizeConnection(instance); - }, 500); - } - - return _authorizeConnection(instance); -} - -const cache = new Map(); -export let matrixBroadCastActions; -function startMatrixBroadcast() { - matrixBroadCastActions = { - added: Meteor.bindEnvironment((record) => { - cache.set(record._id, record); - - const subPath = getURL('', { cdn: false, full: false }); - let instance = `${record.extraInformation.host}:${record.extraInformation.port}${subPath}`; - - if (record.extraInformation.port === process.env.PORT && record.extraInformation.host === process.env.INSTANCE_IP) { - authLogger.info({ msg: 'prevent self connect', instance }); - return; - } - - if (record.extraInformation.host === process.env.INSTANCE_IP && isDocker() === false) { - instance = `localhost:${record.extraInformation.port}${subPath}`; - } - - if (connections[instance] && connections[instance].instanceRecord) { - if (connections[instance].instanceRecord._createdAt < record._createdAt) { - connections[instance].disconnect(); - delete connections[instance]; - } else { - return; - } - } - - connLogger.info({ msg: 'connecting in', instance }); - - connections[instance] = DDP.connect(instance, { - _dontPrintErrors: settings.get('Log_Level') !== '2', - }); - - // remove not relevant info from instance record - delete record.extraInformation.os; - - connections[instance].instanceRecord = record; - connections[instance].instanceId = record._id; - - connections[instance].onReconnect = function () { - return authorizeConnection(instance); - }; - - if (cache.size > 1) { - showMonolithWarning(); - } - }), - - removed(id) { - const record = cache.get(id); - if (!record) { - return; - } - cache.delete(id); - - const subPath = getURL('', { cdn: false, full: false }); - let instance = `${record.extraInformation.host}:${record.extraInformation.port}${subPath}`; - - if (record.extraInformation.host === process.env.INSTANCE_IP && isDocker() === false) { - instance = `localhost:${record.extraInformation.port}${subPath}`; - } - - const query = { - 'extraInformation.host': record.extraInformation.host, - 'extraInformation.port': record.extraInformation.port, - }; - - if (connections[instance]) { - const found = Promise.await(InstanceStatusRaw.findOne(query, { projection: { _id: 1 } })); - if (!found) { - connLogger.info({ msg: 'disconnecting from', instance }); - connections[instance].disconnect(); - return delete connections[instance]; - } - } - }, - }; - - InstanceStatusRaw.find( - { - 'extraInformation.port': { - $exists: true, - }, - }, - { - sort: { - _createdAt: -1, - }, - }, - ).forEach(matrixBroadCastActions.added); -} - -function startStreamCastBroadcast(value) { - const instance = 'StreamCast'; - - connLogger.info({ msg: 'connecting in', instance, value }); - - const connection = DDP.connect(value, { - _dontPrintErrors: settings.get('Log_Level') !== '2', - }); - - connections[instance] = connection; - connection.instanceId = instance; - connection.instanceRecord = {}; - connection.onReconnect = function () { - return authorizeConnection(instance); - }; - - connection.registerStore('broadcast-stream', { - update({ fields }) { - const { streamName, eventName, args } = fields; - - if (!streamName || !eventName || !args) { - return; - } - - if (connection.broadcastAuth !== true) { - return 'not-authorized'; - } - - const instance = StreamerCentral.instances[streamName]; - if (!instance) { - return 'stream-not-exists'; - } - - if (instance.serverOnly) { - return instance.__emit(eventName, ...args); - } - return instance._emit(eventName, args); - }, - }); - - return connection.subscribe('stream'); -} - -export function startStreamBroadcast() { - if (!process.env.INSTANCE_IP) { - process.env.INSTANCE_IP = 'localhost'; - } - - logger.info('startStreamBroadcast'); - - settings.watch('Stream_Cast_Address', function (value) { - // var connection, fn, instance; - const fn = function (instance, connection) { - connection.disconnect(); - return delete connections[instance]; - }; - - for (const instance of Object.keys(connections)) { - const connection = connections[instance]; - fn(instance, connection); - } - - if (value && value.trim() !== '') { - return startStreamCastBroadcast(value); - } - return startMatrixBroadcast(); - }); - - function broadcast(streamName, eventName, args /* , userId*/) { - const fromInstance = `${process.env.INSTANCE_IP}:${process.env.PORT}`; - const results = []; - - for (const instance of Object.keys(connections)) { - const connection = connections[instance]; - - if (connection.status().connected === true) { - connection.call('stream', streamName, eventName, args, function (error, response) { - if (error) { - logger.error({ msg: 'Stream broadcast error', err: error }); - } - - switch (response) { - case 'self-not-authorized': - streamLogger.error( - `Stream broadcast from '${fromInstance}' to '${connection._stream.endpoint}' with name ${streamName} to self is not authorized`, - ); - streamLogger.debug({ - msg: 'self-not-authorized', - broadcastAuth: connection.broadcastAuth, - status: connection.status(), - eventName, - args, - }); - return; - case 'not-authorized': - streamLogger.error( - `Stream broadcast from '${fromInstance}' to '${connection._stream.endpoint}' with name ${streamName} not authorized`, - ); - streamLogger.debug({ - msg: 'not-authorized', - broadcastAuth: connection.broadcastAuth, - status: connection.status(), - eventName, - args, - }); - return authorizeConnection(instance); - case 'stream-not-exists': - streamLogger.error( - `Stream broadcast from '${fromInstance}' to '${connection._stream.endpoint}' with name ${streamName} does not exist`, - ); - streamLogger.debug({ - msg: 'stream-not-exists', - broadcastAuth: connection.broadcastAuth, - status: connection.status(), - eventName, - args, - }); - } - }); - } - } - return results; - } - - const onBroadcast = Meteor.bindEnvironment(broadcast); - - let TroubleshootDisableInstanceBroadcast; - settings.watch('Troubleshoot_Disable_Instance_Broadcast', (value) => { - if (TroubleshootDisableInstanceBroadcast === value) { - return; - } - TroubleshootDisableInstanceBroadcast = value; - - if (value) { - return StreamerCentral.removeListener('broadcast', onBroadcast); - } - - // TODO move to a service and stop using StreamerCentral - StreamerCentral.on('broadcast', onBroadcast); - }); -} - -function getConnection(address) { - const conn = connections[address]; - if (!conn) { - return; - } - - const { instanceRecord, broadcastAuth } = conn; - - return { - address, - currentStatus: conn._stream.currentStatus, - instanceRecord, - broadcastAuth, - }; -} - -export function getInstanceConnection(instance) { - const subPath = getURL('', { cdn: false, full: false }); - const address = `${instance.extraInformation.host}:${instance.extraInformation.port}${subPath}`; - - return getConnection(address); -} - -Meteor.methods({ - 'broadcastAuth'(remoteId, selfId) { - check(selfId, String); - check(remoteId, String); - - if (selfId === InstanceStatus.id() && remoteId !== InstanceStatus.id()) { - const found = Promise.await(InstanceStatusRaw.findOneById(remoteId, { projection: { _id: 1 } })); - if (found) { - this.connection.broadcastAuth = true; - } - } - - return this.connection.broadcastAuth === true; - }, - - 'stream'(streamName, eventName, args) { - if (!this.connection) { - return 'self-not-authorized'; - } - - if (this.connection.broadcastAuth !== true) { - return 'not-authorized'; - } - - const instance = StreamerCentral.instances[streamName]; - if (!instance) { - return 'stream-not-exists'; - } - - if (instance.serverOnly) { - instance.__emit(eventName, ...args); - } else { - StreamerCentral.instances[streamName]._emit(eventName, args); - } - }, - - 'instances/get'() { - if (!hasPermission(Meteor.userId(), 'view-statistics')) { - throw new Meteor.Error('error-action-not-allowed', 'List instances is not allowed', { - method: 'instances/get', - }); - } - return Object.keys(connections).map(getConnection); - }, -}); diff --git a/apps/meteor/tests/end-to-end/api/00-miscellaneous.js b/apps/meteor/tests/end-to-end/api/00-miscellaneous.js index 4c3e411197377e098655742b6c3d0be58f34424a..df30f2b71cd0d08eb9cb9e2c39e4b34bde77cdb4 100644 --- a/apps/meteor/tests/end-to-end/api/00-miscellaneous.js +++ b/apps/meteor/tests/end-to-end/api/00-miscellaneous.js @@ -597,25 +597,37 @@ describe('miscellaneous', function () { .expect((res) => { expect(res.body).to.have.property('success', true); - // ddp-streamer registers itself as an instance, so for EE we have 2 instances - const totalInstances = IS_EE ? 2 : 1; - expect(res.body).to.have.property('instances').and.to.be.an('array').with.lengthOf(totalInstances); + expect(res.body).to.have.property('instances').and.to.be.an('array').with.lengthOf(1); const { instances } = res.body; - const instance = instances.filter((i) => i.name === 'rocket.chat')[0]; + const instanceName = IS_EE ? 'ddp-streamer' : 'rocket.chat'; - expect(instance).to.have.property('_id'); - expect(instance).to.have.property('extraInformation'); - expect(instance).to.have.property('name'); - expect(instance).to.have.property('pid'); + const instance = instances.filter((i) => i.instanceRecord.name === instanceName)[0]; - const { extraInformation } = instance; + expect(instance).to.have.property('instanceRecord'); + expect(instance).to.have.property('currentStatus'); - expect(extraInformation).to.have.property('host'); - expect(extraInformation).to.have.property('port'); - expect(extraInformation).to.have.property('os').and.to.have.property('cpus').to.be.a('number'); - expect(extraInformation).to.have.property('nodeVersion'); + expect(instance.currentStatus).to.have.property('connected'); + + expect(instance.instanceRecord).to.have.property('_id'); + expect(instance.instanceRecord).to.have.property('extraInformation'); + expect(instance.instanceRecord).to.have.property('name'); + expect(instance.instanceRecord).to.have.property('pid'); + + if (!IS_EE) { + expect(instance).to.have.property('address'); + + expect(instance.currentStatus).to.have.property('lastHeartbeatTime'); + expect(instance.currentStatus).to.have.property('local'); + + const { extraInformation } = instance.instanceRecord; + + expect(extraInformation).to.have.property('host'); + expect(extraInformation).to.have.property('port'); + expect(extraInformation).to.have.property('os').and.to.have.property('cpus').to.be.a('number'); + expect(extraInformation).to.have.property('nodeVersion'); + } }) .end(done); }); diff --git a/packages/core-typings/src/IInstanceStatus.ts b/packages/core-typings/src/IInstanceStatus.ts index 687774baeacb83dfab245a348c35c186962061d4..d5fd280280b477550292e46729bfbc2e8fe671b0 100644 --- a/packages/core-typings/src/IInstanceStatus.ts +++ b/packages/core-typings/src/IInstanceStatus.ts @@ -1,29 +1,24 @@ import type { IRocketChatRecord } from './IRocketChatRecord'; export interface IInstanceStatus extends IRocketChatRecord { - extraInformation?: { - port?: number; - }; - - address: string; - currentStatus: { - connected: boolean; - retryCount: number; - retryTime: number; - status: string; - }; - instanceRecord?: { - name: string; - pid: number; - _createdAt: Date; - _id: string; - _updatedAt: Date; - extraInformation: { - host: string; - nodeVersion: string; - port: string; + _createdAt: Date; + name: string; + pid: number; + extraInformation: { + host: string; + nodeVersion: string; + port: string; + tcpPort: number; + os: { + type: string; + platform: string; + arch: string; + release: string; + uptime: number; + loadavg: number[]; + totalmem: number; + freemem: number; + cpus: number; }; }; - - broadcastAuth: boolean; } diff --git a/packages/rest-typings/src/index.ts b/packages/rest-typings/src/index.ts index 429bc1bdcfb965f3742f7cfb0a589f8ff4534cbf..b7fbab50bb0fe98917996461def0bfa3f428c51c 100644 --- a/packages/rest-typings/src/index.ts +++ b/packages/rest-typings/src/index.ts @@ -224,6 +224,7 @@ export * from './helpers/PaginatedResult'; export * from './helpers/ReplacePlaceholders'; export * from './helpers/WithItemCount'; export * from './v1/emojiCustom'; +export * from './v1/instances'; export * from './v1/users'; export * from './v1/users/UsersSetAvatarParamsPOST'; export * from './v1/users/UsersSetPreferenceParamsPOST'; diff --git a/packages/rest-typings/src/v1/instances.ts b/packages/rest-typings/src/v1/instances.ts index faf085e5ae9224e08b18b2da29781e4daec2151c..c24c652996f8d49ddf1da1a7555ebd8bfdda2d66 100644 --- a/packages/rest-typings/src/v1/instances.ts +++ b/packages/rest-typings/src/v1/instances.ts @@ -1,19 +1,21 @@ import type { IInstanceStatus } from '@rocket.chat/core-typings'; +export interface IInstance { + address?: string; + currentStatus: { + connected: boolean; + local?: boolean; + lastHeartbeatTime?: number; + }; + instanceRecord?: IInstanceStatus; + + broadcastAuth: boolean; +} + export type InstancesEndpoints = { '/v1/instances.get': { GET: () => { - instances: ( - | IInstanceStatus - | { - connection: { - address: string; - currentStatus: IInstanceStatus['currentStatus']; - instanceRecord: IInstanceStatus['instanceRecord']; - broadcastAuth: boolean; - }; - } - )[]; + instances: IInstance[]; }; }; };