Skip to content
Snippets Groups Projects
Unverified Commit 20f5dbc0 authored by Diego Sampaio's avatar Diego Sampaio
Browse files

feat: add support for `MONOLITH_TRANSPORTER` env var (#29373)

parent 7f2e2730
No related branches found
No related tags found
No related merge requests found
---
'@rocket.chat/meteor': minor
---
feat: *Enterprise* Add support for different transporters to connect multiple monolith instances.
To use that, you can use the `TRANSPORTER` env var adding "monolith+" to the transporter value. To use NATS for example, your env var should be:
```bash
export TRANSPORTER="monolith+nats://localhost:4222"
```
export function getTransporter({ transporter, port }: { transporter?: string; port?: string } = {}) {
if (transporter) {
if (!transporter.match(/^(?:monolith\+)/)) {
throw new Error('invalid transporter');
}
const [, ...url] = transporter.split('+');
return url.join('');
}
return {
port: port ? port.trim() : 0,
udpDiscovery: false,
};
}
import os from 'os';
import type { BrokerNode } from 'moleculer';
import { ServiceBroker } from 'moleculer';
import { ServiceBroker, Transporters } from 'moleculer';
import { License, ServiceClassInternal } from '@rocket.chat/core-services';
import { InstanceStatus as InstanceStatusRaw } from '@rocket.chat/models';
import { InstanceStatus } from '@rocket.chat/instance-status';
import { StreamerCentral } from '../../../../server/modules/streamer/streamer.module';
import type { IInstanceService } from '../../sdk/types/IInstanceService';
import { getTransporter } from './getTransporter';
export class InstanceService extends ServiceClassInternal implements IInstanceService {
protected name = 'instance';
private broadcastStarted = false;
private transporter: Transporters.TCP | Transporters.NATS;
private isTransporterTCP = true;
private broker: ServiceBroker;
private troubleshootDisableInstanceBroadcast = false;
......@@ -21,15 +26,25 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
constructor() {
super();
this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => {
if (clientAction === 'removed') {
return;
}
const tx = getTransporter({ transporter: process.env.TRANSPORTER, port: process.env.TCP_PORT });
if (typeof tx === 'string') {
this.transporter = new Transporters.NATS({ url: tx });
this.isTransporterTCP = false;
} else {
this.transporter = new Transporters.TCP(tx);
}
if (clientAction === 'inserted' && data?.extraInformation?.port) {
this.connectNode(data);
}
});
if (this.isTransporterTCP) {
this.onEvent('watch.instanceStatus', async ({ clientAction, data }): Promise<void> => {
if (clientAction === 'removed') {
return;
}
if (clientAction === 'inserted' && data?.extraInformation?.tcpPort) {
this.connectNode(data);
}
});
}
this.onEvent('license.module', async ({ module, valid }) => {
if (module === 'scalability' && valid) {
......@@ -60,17 +75,9 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
}
async created() {
const port = process.env.TCP_PORT ? String(process.env.TCP_PORT).trim() : 0;
this.broker = new ServiceBroker({
nodeID: InstanceStatus.id(),
transporter: {
type: 'TCP',
options: {
port,
udpDiscovery: false,
},
},
transporter: this.transporter,
});
this.broker.createService({
......@@ -135,18 +142,20 @@ export class InstanceService extends ServiceClassInternal implements IInstanceSe
StreamerCentral.on('broadcast', this.sendBroadcast.bind(this));
await InstanceStatusRaw.find(
{
'extraInformation.tcpPort': {
$exists: true,
if (this.isTransporterTCP) {
await InstanceStatusRaw.find(
{
'extraInformation.tcpPort': {
$exists: true,
},
},
},
{
sort: {
_createdAt: -1,
{
sort: {
_createdAt: -1,
},
},
},
).forEach(this.connectNode.bind(this));
).forEach(this.connectNode.bind(this));
}
}
private connectNode(record: any) {
......
import { expect } from 'chai';
import { getTransporter } from '../../../../../../../../server/local-services/instance/getTransporter';
describe('getTransporter', () => {
it('should return TCP with port 0 by default', () => {
expect(getTransporter()).to.deep.equal({ port: 0, udpDiscovery: false });
});
it('should return TCP with port set via env var', () => {
expect(getTransporter({ port: '1234' })).to.deep.equal({ port: '1234', udpDiscovery: false });
expect(getTransporter({ port: ' 1234' })).to.deep.equal({ port: '1234', udpDiscovery: false });
expect(getTransporter({ port: ' 1234 ' })).to.deep.equal({ port: '1234', udpDiscovery: false });
});
it('should throw if transporter set incorrectly', () => {
expect(() => getTransporter({ transporter: 'something' })).to.throw('invalid transporter');
});
it('should return transporter if set correctly', () => {
expect(getTransporter({ transporter: 'monolith+nats://address' })).to.equal('nats://address');
});
});
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment