Unverified Commit 408c4060 authored by Mikael Mello's avatar Mikael Mello
Browse files

Add working clientCommand handler that pauses and resumes stream

parent cc139910
......@@ -57,7 +57,8 @@ export interface IUserOptions {
export interface ISubscription {
stop: () => void,
ready: Promise<IReady>,
id?: string
id?: string,
_name?: string
}
// Asteroid v1 only
......
/** @todo contribute these to @types/rocketchat and require */
export interface IClientCommand {
_id: string // generated by Random.id()
u: {
_id: string,
username: string
}
cmd: {
msg: string,
options: object
}
ts: Date
}
......@@ -13,6 +13,7 @@ import { Message } from './message'
import { IConnectOptions, IRespondOptions, ICallback, ILogger } from '../config/driverInterfaces'
import { IAsteroid, ICredentials, ISubscription, ICollection } from '../config/asteroidInterfaces'
import { IMessage } from '../config/messageInterfaces'
import { IClientCommand } from '../config/commandInterfaces'
import { logger, replaceLog } from './log'
/** Collection names */
......@@ -32,8 +33,9 @@ const Asteroid: IAsteroid = createClass([immutableCollectionMixin])
// CONNECTION SETUP AND CONFIGURE
// -----------------------------------------------------------------------------
/** Internal for comparing message update timestamps */
export let lastReadTime: Date
/** Internal for comparing message and command update timestamps */
export let messageLastReadTime: Date
export let commandLastReadTime: Date
/**
* The integration property is applied as an ID on sent messages `bot.i` param
......@@ -83,6 +85,11 @@ export let messages: ICollection
*/
export let clientCommands: ICollection
/**
* Map of command handlers added by the client of the sdk
*/
export let commandHandlers = {}
/**
* Allow override of default logging with adapter's log instance
*/
......@@ -264,15 +271,7 @@ export function login (credentials: ICredentials = {
return subscribeToCommands()
})
.then(() => {
return respondToCommands((err, cmd) => {
console.log(cmd);
callMethod('replyClientCommand', [cmd._id, { msg: 'OK' }]);
if (cmd.msg == 'pauseSubscriptions') {
console.log(cmd);
} else if (cmd.msg == 'resumeSubscriptions') {
console.log(cmd);
}
})
return respondToCommands()
})
.catch((err: Error) => {
logger.info('[login] Error:', err)
......@@ -349,88 +348,6 @@ export function subscribeToMessages (): Promise<ISubscription> {
})
}
/**
* Begin subscription to client commands for user
* Adapters might register callbacks to certain commands
*/
export function subscribeToCommands (): Promise<ISubscription> {
return subscribe(_clientCommandsSubscriptionName, '')
.then((subscription) => {
clientCommands = asteroid.getCollection(_clientCommandsCollectionName)
// v2
// messages = asteroid.collections.get(_clientCommandsCollectionName) || Map()
return subscription
})
}
/**
* Once a subscription is created, using `subscribeToCommands` this method
* can be used to attach a callback to changes in the commands stream.
*
* @todo `reactToCommands` should call `subscribeToCommands` if not already
* done, so it's not required as an arbitrary step for simpler adapters.
* Also make `login` call `connect` for the same reason, the way
* `respondToCommands` calls `respondToCommands`, so all that's really
* required is:
* `driver.login(credentials).then(() => driver.respondToCommands(callback))`
* @param callback Function called with every change in subscription of clientCOmmands.
* - Uses error-first callback pattern
* - Second argument is the the command received
*/
export function reactToCommands (callback: ICallback): void {
logger.info(`[reactive] Listening for change events in collection ${clientCommands.name}`)
clientCommands.reactiveQuery({}).on('change', (_id: string) => {
const changedCommandQuery = clientCommands.reactiveQuery({ _id })
if (changedCommandQuery.result && changedCommandQuery.result.length > 0) {
const changedCommand = changedCommandQuery.result[0]
callback(null, changedCommand)
} else {
logger.debug('[received] Reactive query at ID ${ _id } without results')
}
})
}
/**
* Begin subscription to client commands for user
* Adapters might register callbacks to certain commands
*/
export function respondToCommands (callback: ICallback, options: IRespondOptions = {}): Promise<void | void[]> {
const config = Object.assign({}, settings, options)
let promise: Promise<void | void[]> = Promise.resolve() // return value, may be replaced by async ops
lastReadTime = new Date() // init before any message read
reactToCommands(async (err, command) => {
if (err) {
logger.error(`Unable to receive commands ${JSON.stringify(err)}`)
callback(err) // bubble errors back to adapter
}
// Set current time for comparison to incoming
let currentReadTime = new Date(command.ts.$date)
// Ignore commands in stream that aren't new
if (currentReadTime <= lastReadTime) return
// At this point, command has passed checks and can be responded to
logger.info(`Command receive callback ID ${command._id} at ${currentReadTime}`)
logger.info(`[Incoming] ${command.u.username}: ${command.cmd.msg}`)
lastReadTime = currentReadTime
/**
* @todo Fix below by adding to meta from Rocket.Chat instead of getting on
* each command event. It's inefficient and throws off tests that
* await on send completion, because the callback has not yet fired.
* Then re-enable last two `.respondToMessages` tests.
*/
// Add room name to meta, is useful for some adapters (is promise)
// if (!isDM && !isLC) meta.roomName = await getRoomName(command.rid)
// Processing completed, call callback to respond to command
callback(null, command)
})
return promise
}
/**
* Once a subscription is created, using `subscribeToMessages` this method
* can be used to attach a callback to changes in the message stream.
......@@ -499,7 +416,7 @@ export function respondToMessages (callback: ICallback, options: IRespondOptions
})
}
lastReadTime = new Date() // init before any message read
messageLastReadTime = new Date() // init before any message read
reactToMessages(async (err, message, meta) => {
if (err) {
logger.error(`Unable to receive messages ${JSON.stringify(err)}`)
......@@ -532,12 +449,12 @@ export function respondToMessages (callback: ICallback, options: IRespondOptions
}
// Ignore messages in stream that aren't new
if (currentReadTime <= lastReadTime) return
if (currentReadTime <= messageLastReadTime) return
// At this point, message has passed checks and can be responded to
logger.info(`Message receive callback ID ${message._id} at ${currentReadTime}`)
logger.info(`[Incoming] ${message.u.username}: ${(message.file !== undefined) ? message.attachments[0].title : message.msg}`)
lastReadTime = currentReadTime
messageLastReadTime = currentReadTime
/**
* @todo Fix below by adding to meta from Rocket.Chat instead of getting on
......@@ -554,6 +471,104 @@ export function respondToMessages (callback: ICallback, options: IRespondOptions
return promise
}
/**
* Begin subscription to client commands for user
* Adapters might register callbacks to certain commands
*/
function subscribeToCommands (): Promise<ISubscription> {
return subscribe(_clientCommandsSubscriptionName)
.then((subscription) => {
clientCommands = asteroid.getCollection(_clientCommandsCollectionName)
// v2
// messages = asteroid.collections.get(_clientCommandsCollectionName) || Map()
return subscription
})
}
/**
* Once a subscription is created, using `subscribeToCommands` this method
* can be used to attach a callback to changes in the commands stream.
*
* @todo `reactToCommands` should call `subscribeToCommands` if not already
* done, so it's not required as an arbitrary step for simpler adapters.
* Also make `login` call `connect` for the same reason, the way
* `respondToCommands` calls `respondToCommands`, so all that's really
* required is:
* `driver.login(credentials).then(() => driver.respondToCommands(callback))`
* @param callback Function called with every change in subscription of clientCommands.
* - Uses error-first callback pattern
* - Second argument is the the command received
*/
function reactToCommands (callback: ICallback): void {
logger.info(`[reactive] Listening for change events in collection ${clientCommands.name}`)
clientCommands.reactiveQuery({}).on('change', (_id: string) => {
const changedCommandQuery = clientCommands.reactiveQuery({ _id })
if (changedCommandQuery.result && changedCommandQuery.result.length > 0) {
const changedCommand = changedCommandQuery.result[0]
callback(null, changedCommand)
} else {
logger.debug('[received] Reactive query at command ${ _id } without results')
}
})
}
/**
* Proxy for `reacToCommands` filtering commands based on their timestamp
*/
function respondToCommands (): Promise<void | void[]> {
let promise: Promise<void | void[]> = Promise.resolve() // return value, may be replaced by async ops
commandLastReadTime = new Date() // init before any message read
reactToCommands(async (err, command) => {
if (err) {
logger.error(`Unable to receive commands ${JSON.stringify(err)}`)
return commandHandler(err, command) // bubble errors back to adapter
}
// Set current time for comparison to incoming
let currentReadTime = new Date(command.ts.$date)
// Ignore commands in stream that aren't new
if (currentReadTime <= commandLastReadTime) return
// At this point, command has passed checks and can be responded to
logger.info(`[Command] Received command ID ${command._id} at ${currentReadTime}`)
commandLastReadTime = currentReadTime
// Processing completed, call callback to respond to command
commandHandler(null, command)
})
return promise
}
function commandHandler (err: Error | null, command: IClientCommand): Promise<void | void[]> {
let promise: Promise<void | void[]> = Promise.resolve()
if (err) {
logger.error(`Unable to receive commands ${JSON.stringify(err)}`)
}
switch (command.cmd.msg) {
case 'pauseMessageStream':
subscriptions.map((s: ISubscription) => (s._name === _messageCollectionName ? unsubscribe(s) : undefined))
asyncCall('replyClientCommand', [command._id, { msg: 'OK' }])
break
case 'resumeMessageStream':
subscribeToMessages()
.then(() => {
messageLastReadTime = new Date() // reset time of last read message
asyncCall('replyClientCommand', [command._id, { msg: 'OK' }])
})
break
default:
// result = commandHandlers[command.cmd.msg](command)
}
return promise
}
/**
* Get every new element added to DDP in Asteroid (v2)
* @todo Resolve this functionality within Rocket.Chat with team
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment