Skip to content
Snippets Groups Projects
Unverified Commit 3e932e16 authored by Rodrigo Nascimento's avatar Rodrigo Nascimento
Browse files

Convert server/stream to JS

parent f7b6474a
No related branches found
No related tags found
No related merge requests found
@msgStream = new Meteor.Streamer 'room-messages'
msgStream.allowWrite('none')
msgStream.allowRead (eventName) ->
try
room = Meteor.call 'canAccessRoom', eventName, this.userId
if not room
return false
if room.t is 'c' and not RocketChat.authz.hasPermission(this.userId, 'preview-c-room') and room.usernames.indexOf(room.username) is -1
return false
return true
catch e
return false
msgStream.allowRead('__my_messages__', 'all')
msgStream.allowEmit '__my_messages__', (eventName, msg, options) ->
try
room = Meteor.call 'canAccessRoom', msg.rid, this.userId
if not room
return false
options.roomParticipant = room.usernames.indexOf(room.username) > -1
options.roomType = room.t
return true
catch e
return false
Meteor.startup ->
fields = undefined
if not RocketChat.settings.get 'Message_ShowEditedStatus'
fields = { 'editedAt': 0 }
publishMessage = (type, record) ->
if record._hidden isnt true and not record.imported?
msgStream.emitWithoutBroadcast '__my_messages__', record, {}
msgStream.emitWithoutBroadcast record.rid, record
query =
collection: RocketChat.models.Messages.collectionName
RocketChat.models.Messages._db.on 'change', ({action, id, data, oplog}) =>
switch action
when 'insert'
data._id = id;
publishMessage 'inserted', data
break;
when 'update:record'
publishMessage 'updated', data;
break;
when 'update:diff'
publishMessage 'updated', RocketChat.models.Messages.findOne({_id: id})
break;
const msgStream = new Meteor.Streamer('room-messages');
this.msgStream = msgStream;
msgStream.allowWrite('none');
msgStream.allowRead(function(eventName) {
try {
const room = Meteor.call('canAccessRoom', eventName, this.userId);
if (!room) {
return false;
}
if (room.t === 'c' && !RocketChat.authz.hasPermission(this.userId, 'preview-c-room') && room.usernames.indexOf(room.username) === -1) {
return false;
}
return true;
} catch (error) {
/*error*/
return false;
}
});
msgStream.allowRead('__my_messages__', 'all');
msgStream.allowEmit('__my_messages__', function(eventName, msg, options) {
try {
const room = Meteor.call('canAccessRoom', msg.rid, this.userId);
if (!room) {
return false;
}
options.roomParticipant = room.usernames.indexOf(room.username) > -1;
options.roomType = room.t;
return true;
} catch (error) {
/*error*/
return false;
}
});
Meteor.startup(function() {
function publishMessage(type, record) {
if (record._hidden !== true && (record.imported == null)) {
msgStream.emitWithoutBroadcast('__my_messages__', record, {});
return msgStream.emitWithoutBroadcast(record.rid, record);
}
}
return RocketChat.models.Messages._db.on('change', function({action, id, data/*, oplog*/}) {
switch (action) {
case 'insert':
data._id = id;
publishMessage('inserted', data);
break;
case 'update:record':
publishMessage('updated', data);
break;
case 'update:diff':
publishMessage('updated', RocketChat.models.Messages.findOne({
_id: id
}));
break;
}
});
});
`import {DDPCommon} from 'meteor/ddp-common'`
logger = new Logger 'StreamBroadcast',
sections:
connection: 'Connection'
auth: 'Auth'
stream: 'Stream'
_authorizeConnection = (instance) ->
logger.auth.info "Authorizing with #{instance}"
connections[instance].call 'broadcastAuth', InstanceStatus.id(), connections[instance].instanceId, (err, ok) ->
if err?
return logger.auth.error "broadcastAuth error #{instance} #{InstanceStatus.id()} #{connections[instance].instanceId}", err
connections[instance].broadcastAuth = ok
logger.auth.info "broadcastAuth with #{instance}", ok
authorizeConnection = (instance) ->
if not InstanceStatus.getCollection().findOne({_id: InstanceStatus.id()})?
return Meteor.setTimeout ->
authorizeConnection(instance)
, 500
_authorizeConnection(instance)
startMatrixBroadcast = ->
InstanceStatus.getCollection().find({'extraInformation.port': {$exists: true}}, {sort: {_createdAt: -1}}).observe
added: (record) ->
instance = "#{record.extraInformation.host}:#{record.extraInformation.port}"
if record.extraInformation.port is process.env.PORT and record.extraInformation.host is process.env.INSTANCE_IP
logger.auth.info "prevent self connect", instance
return
if record.extraInformation.host is process.env.INSTANCE_IP and RocketChat.isDocker() is false
instance = "localhost:#{record.extraInformation.port}"
if connections[instance]?.instanceRecord?
if connections[instance].instanceRecord._createdAt < record._createdAt
connections[instance].disconnect()
delete connections[instance]
else
return
logger.connection.info 'connecting in', instance
connections[instance] = DDP.connect(instance, {_dontPrintErrors: LoggerManager.logLevel < 2})
connections[instance].instanceRecord = record;
connections[instance].instanceId = record._id;
connections[instance].onReconnect = ->
authorizeConnection(instance)
removed: (record) ->
instance = "#{record.extraInformation.host}:#{record.extraInformation.port}"
if record.extraInformation.host is process.env.INSTANCE_IP and RocketChat.isDocker() is false
instance = "localhost:#{record.extraInformation.port}"
if connections[instance]? and not InstanceStatus.getCollection().findOne({'extraInformation.host': record.extraInformation.host, 'extraInformation.port': record.extraInformation.port})?
logger.connection.info 'disconnecting from', instance
connections[instance].disconnect()
delete connections[instance]
Meteor.methods
broadcastAuth: (remoteId, selfId) ->
check selfId, String
check remoteId, String
@unblock()
if selfId is InstanceStatus.id() and remoteId isnt InstanceStatus.id() and InstanceStatus.getCollection().findOne({_id: remoteId})?
@connection.broadcastAuth = true
return @connection.broadcastAuth is true
stream: (streamName, eventName, args) ->
# Prevent call from self and client
if not @connection?
return 'self-not-authorized'
# Prevent call from unauthrorized connections
if @connection.broadcastAuth isnt true
return 'not-authorized'
if not Meteor.StreamerCentral.instances[streamName]?
return 'stream-not-exists'
Meteor.StreamerCentral.instances[streamName]._emit(eventName, args)
return undefined
startStreamCastBroadcast = (value) ->
instance = 'StreamCast'
logger.connection.info 'connecting in', instance, value
connection = DDP.connect(value, {_dontPrintErrors: LoggerManager.logLevel < 2})
connections[instance] = connection
connection.instanceId = instance
connection.onReconnect = ->
authorizeConnection(instance)
connection._stream.on 'message', (raw_msg) ->
msg = DDPCommon.parseDDP(raw_msg)
if not msg or msg.msg isnt 'changed' or not msg.collection? or not msg.fields?
return
{streamName, eventName, args} = msg.fields
if not streamName? or not eventName? or not args?
return
if connection.broadcastAuth isnt true
return 'not-authorized'
if not Meteor.StreamerCentral.instances[streamName]?
return 'stream-not-exists'
Meteor.StreamerCentral.instances[streamName]._emit(eventName, args)
connection.subscribe 'stream'
@connections = {}
@startStreamBroadcast = () ->
process.env.INSTANCE_IP ?= 'localhost'
logger.info 'startStreamBroadcast'
RocketChat.settings.get 'Stream_Cast_Address', (key, value) ->
for instance, connection of connections
do (instance, connection) ->
connection.disconnect()
delete connections[instance]
if value?.trim() isnt ''
startStreamCastBroadcast(value)
else
startMatrixBroadcast()
broadcast = (streamName, eventName, args, userId) ->
fromInstance = process.env.INSTANCE_IP + ':' + process.env.PORT
for instance, connection of connections
do (instance, connection) ->
if connection.status().connected is true
connection.call 'stream', streamName, eventName, args, (error, response) ->
if error?
logger.error "Stream broadcast error", error
switch response
when 'self-not-authorized'
logger.stream.error "Stream broadcast from '#{fromInstance}' to '#{connection._stream.endpoint}' with name #{streamName} to self is not authorized".red
logger.stream.debug " -> connection authorized".red, connection.broadcastAuth
logger.stream.debug " -> connection status".red, connection.status()
logger.stream.debug " -> arguments".red, eventName, args
when 'not-authorized'
logger.stream.error "Stream broadcast from '#{fromInstance}' to '#{connection._stream.endpoint}' with name #{streamName} not authorized".red
logger.stream.debug " -> connection authorized".red, connection.broadcastAuth
logger.stream.debug " -> connection status".red, connection.status()
logger.stream.debug " -> arguments".red, eventName, args
authorizeConnection(instance);
when 'stream-not-exists'
logger.stream.error "Stream broadcast from '#{fromInstance}' to '#{connection._stream.endpoint}' with name #{streamName} does not exist".red
logger.stream.debug " -> connection authorized".red, connection.broadcastAuth
logger.stream.debug " -> connection status".red, connection.status()
logger.stream.debug " -> arguments".red, eventName, args
Meteor.StreamerCentral.on 'broadcast', (streamName, eventName, args) ->
broadcast streamName, eventName, args
Meteor.startup ->
startStreamBroadcast()
/* global InstanceStatus, DDP, LoggerManager */
import {DDPCommon} from 'meteor/ddp-common';
const connections = {};
this.connections = connections;
const logger = new Logger('StreamBroadcast', {
sections: {
connection: 'Connection',
auth: 'Auth',
stream: 'Stream'
}
});
function _authorizeConnection(instance) {
logger.auth.info(`Authorizing with ${instance}`);
return connections[instance].call('broadcastAuth', InstanceStatus.id(), connections[instance].instanceId, function(err, ok) {
if (err != null) {
return logger.auth.error(`broadcastAuth error ${instance} ${InstanceStatus.id()} ${connections[instance].instanceId}`, err);
}
connections[instance].broadcastAuth = ok;
return logger.auth.info(`broadcastAuth with ${instance}`, ok);
});
}
function authorizeConnection(instance) {
const query = {
_id: InstanceStatus.id()
};
if (!InstanceStatus.getCollection().findOne(query)) {
return Meteor.setTimeout(function() {
return authorizeConnection(instance);
}, 500);
}
return _authorizeConnection(instance);
}
function startMatrixBroadcast() {
const query = {
'extraInformation.port': {
$exists: true
}
};
const options = {
sort: {
_createdAt: -1
}
};
return InstanceStatus.getCollection().find(query, options).observe({
added(record) {
let instance = `${record.extraInformation.host}:${record.extraInformation.port}`;
if (record.extraInformation.port === process.env.PORT && record.extraInformation.host === process.env.INSTANCE_IP) {
logger.auth.info('prevent self connect', instance);
return;
}
if (record.extraInformation.host === process.env.INSTANCE_IP && RocketChat.isDocker() === false) {
instance = `localhost:${record.extraInformation.port}`;
}
if (connections[instance] && connections[instance].instanceRecord) {
if (connections[instance].instanceRecord._createdAt < record._createdAt) {
connections[instance].disconnect();
delete connections[instance];
} else {
return;
}
}
logger.connection.info('connecting in', instance);
connections[instance] = DDP.connect(instance, {
_dontPrintErrors: LoggerManager.logLevel < 2
});
connections[instance].instanceRecord = record;
connections[instance].instanceId = record._id;
return connections[instance].onReconnect = function() {
return authorizeConnection(instance);
};
},
removed(record) {
let instance = `${record.extraInformation.host}:${record.extraInformation.port}`;
if (record.extraInformation.host === process.env.INSTANCE_IP && RocketChat.isDocker() === false) {
instance = 'localhost:' + record.extraInformation.port;
}
const query = {
'extraInformation.host': record.extraInformation.host,
'extraInformation.port': record.extraInformation.port
};
if (connections[instance] && !InstanceStatus.getCollection().findOne(query)) {
logger.connection.info('disconnecting from', instance);
connections[instance].disconnect();
return delete connections[instance];
}
}
});
}
Meteor.methods({
broadcastAuth(remoteId, selfId) {
check(selfId, String);
check(remoteId, String);
this.unblock();
const query = {
_id: remoteId
};
if (selfId === InstanceStatus.id() && remoteId !== InstanceStatus.id() && (InstanceStatus.getCollection().findOne(query))) {
this.connection.broadcastAuth = true;
}
return this.connection.broadcastAuth === true;
},
stream(streamName, eventName, args) {
if (!this.connection) {
return 'self-not-authorized';
}
if (this.connection.broadcastAuth !== true) {
return 'not-authorized';
}
if (!Meteor.StreamerCentral.instances[streamName]) {
return 'stream-not-exists';
}
Meteor.StreamerCentral.instances[streamName]._emit(eventName, args);
}
});
function startStreamCastBroadcast(value) {
const instance = 'StreamCast';
logger.connection.info('connecting in', instance, value);
const connection = DDP.connect(value, {
_dontPrintErrors: LoggerManager.logLevel < 2
});
connections[instance] = connection;
connection.instanceId = instance;
connection.onReconnect = function() {
return authorizeConnection(instance);
};
connection._stream.on('message', function(raw_msg) {
const msg = DDPCommon.parseDDP(raw_msg);
if (!msg || msg.msg !== 'changed' || !msg.collection || !msg.fields) {
return;
}
const {streamName, eventName, args} = msg.fields;
if (!streamName || !eventName || !args) {
return;
}
if (connection.broadcastAuth !== true) {
return 'not-authorized';
}
if (!Meteor.StreamerCentral.instances[streamName]) {
return 'stream-not-exists';
}
return Meteor.StreamerCentral.instances[streamName]._emit(eventName, args);
});
return connection.subscribe('stream');
}
function startStreamBroadcast() {
if (!process.env.INSTANCE_IP) {
process.env.INSTANCE_IP = 'localhost';
}
logger.info('startStreamBroadcast');
RocketChat.settings.get('Stream_Cast_Address', function(key, value) {
// var connection, fn, instance;
const fn = function(instance, connection) {
connection.disconnect();
return delete connections[instance];
};
for (const instance of Object.keys(connections)) {
const connection = connections[instance];
fn(instance, connection);
}
if (value && value.trim() !== '') {
return startStreamCastBroadcast(value);
} else {
return startMatrixBroadcast();
}
});
function broadcast(streamName, eventName, args/*, userId*/) {
const fromInstance = `${process.env.INSTANCE_IP}:${process.env.PORT}`;
const results = [];
for (const instance of Object.keys(connections)) {
const connection = connections[instance];
if (connection.status().connected === true) {
connection.call('stream', streamName, eventName, args, function(error, response) {
if (error) {
logger.error('Stream broadcast error', error);
}
switch (response) {
case 'self-not-authorized':
logger.stream.error((`Stream broadcast from '${fromInstance}' to '${connection._stream.endpoint}' with name ${streamName} to self is not authorized`).red);
logger.stream.debug(' -> connection authorized'.red, connection.broadcastAuth);
logger.stream.debug(' -> connection status'.red, connection.status());
return logger.stream.debug(' -> arguments'.red, eventName, args);
case 'not-authorized':
logger.stream.error((`Stream broadcast from '${fromInstance}' to '${connection._stream.endpoint}' with name ${streamName} not authorized`).red);
logger.stream.debug(' -> connection authorized'.red, connection.broadcastAuth);
logger.stream.debug(' -> connection status'.red, connection.status());
logger.stream.debug(' -> arguments'.red, eventName, args);
return authorizeConnection(instance);
case 'stream-not-exists':
logger.stream.error((`Stream broadcast from '${fromInstance}' to '${connection._stream.endpoint}' with name ${streamName} does not exist`).red);
logger.stream.debug(' -> connection authorized'.red, connection.broadcastAuth);
logger.stream.debug(' -> connection status'.red, connection.status());
return logger.stream.debug(' -> arguments'.red, eventName, args);
}
});
}
}
return results;
}
return Meteor.StreamerCentral.on('broadcast', function(streamName, eventName, args) {
return broadcast(streamName, eventName, args);
});
}
Meteor.startup(function() {
return startStreamBroadcast();
});
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment