Unverified Commit 38c7d8a8 authored by Diego Mello's avatar Diego Mello Committed by GitHub
Browse files

[IMPROVEMENT] Memoize and batch subscriptions updates (#1642)

parent 155df774
...@@ -9,6 +9,7 @@ import random from '../../../utils/random'; ...@@ -9,6 +9,7 @@ import random from '../../../utils/random';
import store from '../../createStore'; import store from '../../createStore';
import { roomsRequest } from '../../../actions/rooms'; import { roomsRequest } from '../../../actions/rooms';
import { notificationReceived } from '../../../actions/notification'; import { notificationReceived } from '../../../actions/notification';
import buildMessage from '../helpers/buildMessage';
const removeListener = listener => listener.stop(); const removeListener = listener => listener.stop();
...@@ -16,8 +17,12 @@ let connectedListener; ...@@ -16,8 +17,12 @@ let connectedListener;
let disconnectedListener; let disconnectedListener;
let streamListener; let streamListener;
let subServer; let subServer;
let subQueue = {};
let subTimer = null;
let roomQueue = {};
let roomTimer = null;
const WINDOW_TIME = 1000;
// TODO: batch execution
const createOrUpdateSubscription = async(subscription, room) => { const createOrUpdateSubscription = async(subscription, room) => {
try { try {
const db = database.active; const db = database.active;
...@@ -128,32 +133,32 @@ const createOrUpdateSubscription = async(subscription, room) => { ...@@ -128,32 +133,32 @@ const createOrUpdateSubscription = async(subscription, room) => {
} }
} }
// if (tmp.lastMessage) { if (tmp.lastMessage) {
// const lastMessage = buildMessage(tmp.lastMessage); const lastMessage = buildMessage(tmp.lastMessage);
// const messagesCollection = db.collections.get('messages'); const messagesCollection = db.collections.get('messages');
// let messageRecord; let messageRecord;
// try { try {
// messageRecord = await messagesCollection.find(lastMessage._id); messageRecord = await messagesCollection.find(lastMessage._id);
// } catch (error) { } catch (error) {
// // Do nothing // Do nothing
// } }
// if (messageRecord) { if (messageRecord) {
// batch.push( batch.push(
// messageRecord.prepareUpdate(() => { messageRecord.prepareUpdate(() => {
// Object.assign(messageRecord, lastMessage); Object.assign(messageRecord, lastMessage);
// }) })
// ); );
// } else { } else {
// batch.push( batch.push(
// messagesCollection.prepareCreate((m) => { messagesCollection.prepareCreate((m) => {
// m._raw = sanitizedRaw({ id: lastMessage._id }, messagesCollection.schema); m._raw = sanitizedRaw({ id: lastMessage._id }, messagesCollection.schema);
// m.subscription.id = lastMessage.rid; m.subscription.id = lastMessage.rid;
// return Object.assign(m, lastMessage); return Object.assign(m, lastMessage);
// }) })
// ); );
// } }
// } }
await db.batch(...batch); await db.batch(...batch);
}); });
...@@ -162,6 +167,34 @@ const createOrUpdateSubscription = async(subscription, room) => { ...@@ -162,6 +167,34 @@ const createOrUpdateSubscription = async(subscription, room) => {
} }
}; };
const debouncedUpdateSub = (subscription) => {
if (!subTimer) {
subTimer = setTimeout(() => {
const subBatch = subQueue;
subQueue = {};
subTimer = null;
Object.keys(subBatch).forEach((key) => {
createOrUpdateSubscription(subBatch[key]);
});
}, WINDOW_TIME);
}
subQueue[subscription.rid] = subscription;
};
const debouncedUpdateRoom = (room) => {
if (!roomTimer) {
roomTimer = setTimeout(() => {
const roomBatch = roomQueue;
roomQueue = {};
roomTimer = null;
Object.keys(roomBatch).forEach((key) => {
createOrUpdateSubscription(null, roomBatch[key]);
});
}, WINDOW_TIME);
}
roomQueue[room._id] = room;
};
export default function subscribeRooms() { export default function subscribeRooms() {
const handleConnection = () => { const handleConnection = () => {
store.dispatch(roomsRequest()); store.dispatch(roomsRequest());
...@@ -202,12 +235,12 @@ export default function subscribeRooms() { ...@@ -202,12 +235,12 @@ export default function subscribeRooms() {
log(e); log(e);
} }
} else { } else {
await createOrUpdateSubscription(data); debouncedUpdateSub(data);
} }
} }
if (/rooms/.test(ev)) { if (/rooms/.test(ev)) {
if (type === 'updated' || type === 'inserted') { if (type === 'updated' || type === 'inserted') {
await createOrUpdateSubscription(null, data); debouncedUpdateRoom(data);
} }
} }
if (/message/.test(ev)) { if (/message/.test(ev)) {
...@@ -257,6 +290,16 @@ export default function subscribeRooms() { ...@@ -257,6 +290,16 @@ export default function subscribeRooms() {
streamListener.then(removeListener); streamListener.then(removeListener);
streamListener = false; streamListener = false;
} }
subQueue = {};
roomQueue = {};
if (subTimer) {
clearTimeout(subTimer);
subTimer = false;
}
if (roomTimer) {
clearTimeout(roomTimer);
roomTimer = false;
}
}; };
connectedListener = this.sdk.onStreamData('connected', handleConnection); connectedListener = this.sdk.onStreamData('connected', handleConnection);
......
...@@ -11,6 +11,8 @@ import database from '../lib/database'; ...@@ -11,6 +11,8 @@ import database from '../lib/database';
import log from '../utils/log'; import log from '../utils/log';
import mergeSubscriptionsRooms from '../lib/methods/helpers/mergeSubscriptionsRooms'; import mergeSubscriptionsRooms from '../lib/methods/helpers/mergeSubscriptionsRooms';
import RocketChat from '../lib/rocketchat'; import RocketChat from '../lib/rocketchat';
import buildMessage from '../lib/methods/helpers/buildMessage';
import protectedFunction from '../lib/methods/helpers/protectedFunction';
const updateRooms = function* updateRooms({ server, newRoomsUpdatedAt }) { const updateRooms = function* updateRooms({ server, newRoomsUpdatedAt }) {
const serversDB = database.servers; const serversDB = database.servers;
...@@ -38,6 +40,7 @@ const handleRoomsRequest = function* handleRoomsRequest() { ...@@ -38,6 +40,7 @@ const handleRoomsRequest = function* handleRoomsRequest() {
const db = database.active; const db = database.active;
const subCollection = db.collections.get('subscriptions'); const subCollection = db.collections.get('subscriptions');
const messagesCollection = db.collections.get('messages');
if (subscriptions.length) { if (subscriptions.length) {
const subsIds = subscriptions.map(sub => sub.rid); const subsIds = subscriptions.map(sub => sub.rid);
...@@ -46,6 +49,14 @@ const handleRoomsRequest = function* handleRoomsRequest() { ...@@ -46,6 +49,14 @@ const handleRoomsRequest = function* handleRoomsRequest() {
const subsToCreate = subscriptions.filter(i1 => !existingSubs.find(i2 => i1._id === i2._id)); const subsToCreate = subscriptions.filter(i1 => !existingSubs.find(i2 => i1._id === i2._id));
// TODO: subsToDelete? // TODO: subsToDelete?
const lastMessages = subscriptions
.map(sub => sub.lastMessage && buildMessage(sub.lastMessage))
.filter(lm => lm);
const lastMessagesIds = lastMessages.map(lm => lm._id);
const existingMessages = yield messagesCollection.query(Q.where('id', Q.oneOf(lastMessagesIds))).fetch();
const messagesToUpdate = existingMessages.filter(i1 => lastMessages.find(i2 => i1.id === i2._id));
const messagesToCreate = lastMessages.filter(i1 => !existingMessages.find(i2 => i1._id === i2.id));
const allRecords = [ const allRecords = [
...subsToCreate.map(subscription => subCollection.prepareCreate((s) => { ...subsToCreate.map(subscription => subCollection.prepareCreate((s) => {
s._raw = sanitizedRaw({ id: subscription.rid }, subCollection.schema); s._raw = sanitizedRaw({ id: subscription.rid }, subCollection.schema);
...@@ -56,6 +67,17 @@ const handleRoomsRequest = function* handleRoomsRequest() { ...@@ -56,6 +67,17 @@ const handleRoomsRequest = function* handleRoomsRequest() {
return subscription.prepareUpdate(() => { return subscription.prepareUpdate(() => {
Object.assign(subscription, newSub); Object.assign(subscription, newSub);
}); });
}),
...messagesToCreate.map(message => messagesCollection.prepareCreate(protectedFunction((m) => {
m._raw = sanitizedRaw({ id: message._id }, messagesCollection.schema);
m.subscription.id = message.rid;
return Object.assign(m, message);
}))),
...messagesToUpdate.map((message) => {
const newMessage = lastMessages.find(m => m._id === message.id);
return message.prepareUpdate(protectedFunction(() => {
Object.assign(message, newMessage);
}));
}) })
]; ];
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment