Commit cd29e24c authored by Mikael Mello's avatar Mikael Mello Committed by Tim Kinnane
Browse files

feat(driver): Client commands for bot management features (#15)

* Change to reflect moving to streams in RC-side of development

* Replies to commands are temporarily commented

* Stream for clientCommands takes userId as argument

* Reply to commands with a default response

* Improve dealing with ClientCommands

* Better logs

* No more using of a global variable to set a dynamic userId to
  subscribe to the client commands stream, pass it as a parameter
  instead

* SDK will not halt if server does not have client-commands stream

* Improve handling of ClientCommands and add support to getStatistics

* Add new stat to reply to getStatistics ClientCommand

* Add new statistic indicating how many times the bot reconnected

* Add test of custom command handler and fix bug on getStatistics handler

* Add response to getLogs request

* Improve use of logs regarding ClientCommands

* Fix lint error by adding new line to end of file

* Change log tag used on ClientCommand related logs

* Calls setCustomClientData on each client-commands stream reconnect

* Improve comments and logs regarding ClientCommands

* Add stack of clients to send as client info to the server

* Each client has a name and a version

* Gets version of SDK from package.json

* Update documentation of ClientCommands

* Improvoe how the client details are added to the client stack
parent b1887e33
......@@ -268,11 +268,26 @@ but have not been joined yet this method will join to those rooms automatically.
If `allPublic` is true, the `rooms` option will be ignored.
### `driver.addClientToStack(clientDetails)`
Adds details about the client using the SDK to the stack of clients.
It must be called before the `driver.login()` function, since the data is sent right after login.
clientDetails has a `name` and a `version` field, both strings.
```
driver.addClientToStack({
name: 'ExampleBotFramework Rocket.Chat Adapter',
version: '0.4.2'
})
```
### `driver.setCustomClientData(clientData)`
Set additional data about the client using the SDK to be sent to the server.
It must be called before the `driver.login()` function, otherwise it will have no effect.
It must be called before the `driver.login()` function, since the data is sent right after login.
Useful only when adding new features in Rocket.Chat that depend on the client.
......@@ -282,14 +297,19 @@ do not have this feature, so the bot manager interface in Rocket.Chat will have
to differentiate between them, hence the need to define its data.
```
driver.addClientToStack({
name: 'ExampleBotFramework Rocket.Chat Adapter',
version: '0.4.2'
})
driver.setCustomClientData({
framework: 'ExampleBotFramework',
canResetConnection: true
});
})
```
Then, Rocket.Chat's interface will check if the bot is able to reset its connection and
show an UI to allow the admin to do that.
Then, Rocket.Chat's interface can check if the bot is able to reset its connection before enabling
an UI with that feature.
### `driver.registerCommandHandler(key, callback)`
......@@ -310,7 +330,7 @@ The `callback` receives a `ClientCommand` object as the first parameter and retu
`ClientCommandResponse` object structure:
- ClientCommandResponse.success: Boolean indicating the success status of the command
- ClientCommandResponse.msg: Message response to the command
- Along with any other relevant property to the response
### `driver.asyncCall(method, params)`
......@@ -601,6 +621,7 @@ rocketchat.driver.connect({ host: 'localhost:3000' }, function (err, asteroid) {
| `ROOM_CACHE_MAX_AGE` | Max age of cache for room lookups |
| `DM_ROOM_CACHE_SIZE` | Size of cache for Direct Message room lookups |
| `DM_ROOM_CACHE_MAX_AGE`| Max age of cache for DM lookups |
| `WAIT_CLIENT_COMMANDS` | Wait subscription of ClientCommands before login finishes.
| **Test configs** | |
| `ADMIN_USERNAME` | Admin user password for API |
| `ADMIN_PASS` | Admin user password for API |
......
......@@ -17,8 +17,9 @@ export interface IClientCommand {
* Structure of the object to reply to a clientCommand
*/
export interface IClientCommandResponse {
status?: number,
msg: string
success: boolean,
error?: Error,
[key: string]: any
}
/*
......@@ -34,3 +35,16 @@ export interface IClientCommandHandler {
export interface IClientCommandHandlerMap {
[key: string]: IClientCommandHandler
}
/*
* Structure of the object of client data
*/
export interface ICustomClientData {
stack: Array<IClientDetails>,
[key: string]: any
}
export interface IClientDetails {
name: string,
version: string
}
\ No newline at end of file
......@@ -43,3 +43,7 @@ export interface ILogger {
export interface ICallback {
(error: Error | null, ...args: any[]): void
}
export interface ISessionStatistics {
[key: string]: any
}
......@@ -3,7 +3,7 @@ import sinon from 'sinon'
import { expect } from 'chai'
import { silence } from './log'
import { botUser, mockUser, apiUser } from '../utils/config'
import { logout } from './api'
import { get, login, logout } from './api'
import * as utils from '../utils/testing'
import * as driver from './driver'
import * as methodCache from './methodCache'
......@@ -116,6 +116,10 @@ describe('driver', () => {
})
})
describe('.clientCommands', () => {
/**
* Note: For them to work you have to wait for client-commands subscription
* to be fully initialized, so set ROCKETCHAT_WAIT_CLIENT_COMMANDS to true
*/
it('sets customClientData from the SDK with no customizations', async () => {
await driver.connect()
await driver.login()
......@@ -132,6 +136,17 @@ describe('driver', () => {
expect(result.user.customClientData).to.deep.include(driver.customClientData)
expect(result.user.customClientData.framework).to.equal('Testing')
})
it('custom handler is called once', async () => {
driver.setCustomClientData({ framework: 'Testing' })
const callback = sinon.spy()
driver.registerCommandHandler('getStatistics', callback)
await driver.connect()
await driver.login()
// Login as admin and request stats from the bot
await login({ username: apiUser.username, password: apiUser.password })
await get('bots.getLiveStats', { username: botUser.username })
sinon.assert.calledOnce(callback)
})
})
describe('.subscribeToMessages', () => {
it('resolves with subscription object', async () => {
......
import { EventEmitter } from 'events'
import Asteroid from 'asteroid'
import intercept from 'intercept-stdout'
// Asteroid v2 imports
/*
import { createClass } from 'asteroid'
......@@ -10,7 +11,7 @@ import immutableCollectionMixin from 'asteroid-immutable-collections-mixin'
import * as settings from './settings'
import * as methodCache from './methodCache'
import { Message } from './message'
import { IConnectOptions, IRespondOptions, ICallback, ILogger } from '../config/driverInterfaces'
import { IConnectOptions, IRespondOptions, ICallback, ILogger, ISessionStatistics } from '../config/driverInterfaces'
import { IAsteroid, ICredentials, ISubscription, ICollection } from '../config/asteroidInterfaces'
import { IMessage } from '../config/messageInterfaces'
import { logger, replaceLog } from './log'
......@@ -19,14 +20,15 @@ import {
IClientCommand,
IClientCommandResponse,
IClientCommandHandler,
IClientCommandHandlerMap
IClientCommandHandlerMap,
ICustomClientData,
IClientDetails
} from '../config/commandInterfaces'
/** Collection names */
const _messageCollectionName = 'stream-room-messages'
const _messageStreamName = '__my_messages__'
const _clientCommandsCollectionName = 'rocketchat_clientcommand'
const _clientCommandsSubscriptionName = 'clientCommands'
const _clientCommandsStreamName = 'stream-client-commands'
/**
* Asteroid ^v2 interface below, suspended for work on future branch
......@@ -39,6 +41,21 @@ const Asteroid: IAsteroid = createClass([immutableCollectionMixin])
// CONNECTION SETUP AND CONFIGURE
// -----------------------------------------------------------------------------
/**
* Intercept all logging going to stdout and store the last maxLogSize entries
* That is the array sent to the server when the client receives a ClientCommand
* getLogs
*/
export let logs: Array<string> = []
export let maxLogSize: number = 100
intercept((log: string) => {
logs.push(log)
if (logs.length > maxLogSize) {
logs.splice(logs.length - maxLogSize, logs.length)
}
return log
})
/** Internal for comparing message and command update timestamps */
export let messageLastReadTime: Date
export let commandLastReadTime: Date
......@@ -96,13 +113,36 @@ export let clientCommands: ICollection
*/
export let commandHandlers: IClientCommandHandlerMap = {}
/**
* ClientCommands that should not be logged
*/
export let silentClientCommands: Array<string> = ['heartbeat', 'getLogs']
/**
* Method calls that should not be logged
*/
export let silentMethods: Array<string> = ['replyClientCommand']
/**
* Custom Data set by the client that is using the SDK
*/
export let customClientData: object = {
framework: 'Rocket.Chat JS SDK',
export let customClientData: ICustomClientData = {
stack: [{
name: 'Rocket.Chat js.SDK',
version: settings.version
}],
canPauseResumeMsgStream: true,
canListenToHeartbeat: true
canListenToHeartbeat: true,
canGetStatistics: true,
canGetLogs: true
}
/**
* Map of session statistics collected by the SDK
*/
const sessionStatistics: ISessionStatistics = {
Bot_Stats_Read_Messages: 0,
Bot_Stats_Reconnect_Count: 0
}
/**
......@@ -164,6 +204,10 @@ export function connect (options: IConnectOptions = {}, callback?: ICallback): P
if (callback) callback(null, asteroid)
resolve(asteroid)
})
events.on('reconnected', () => {
sessionStatistics.Bot_Stats_Reconnect_Count += 1
})
}
})
}
......@@ -207,16 +251,21 @@ function setupMethodCache (asteroid: IAsteroid): void {
*/
export function asyncCall (method: string, params: any | any[]): Promise<any> {
if (!Array.isArray(params)) params = [params] // cast to array for apply
logger.info(`[${method}] Calling (async): ${JSON.stringify(params)}`)
const shouldLog: boolean = silentMethods.indexOf(method) === -1
if (shouldLog) logger.info(`[${method}] Calling (async): ${JSON.stringify(params)}`)
return Promise.resolve(asteroid.apply(method, params).result)
.catch((err: Error) => {
logger.error(`[${method}] Error:`, err)
throw err // throw after log to stop async chain
})
.then((result: any) => {
(result)
? logger.debug(`[${method}] Success: ${JSON.stringify(result)}`)
: logger.debug(`[${method}] Success`)
if (shouldLog) {
(result)
? logger.debug(`[${method}] Success: ${JSON.stringify(result)}`)
: logger.debug(`[${method}] Success`)
}
return result
})
}
......@@ -280,11 +329,16 @@ export function login (credentials: ICredentials = {
return login
.then((loggedInUserId) => {
userId = loggedInUserId
// Calling function to listen to commands and answer to them
return loggedInUserId
})
.then(() => {
// Calling function to listen to commands and answer to them
return respondToCommands()
.then(async (loggedInUserId) => {
if (settings.waitForClientCommands) {
await respondToCommands(loggedInUserId)
} else {
respondToCommands(loggedInUserId).catch(() => {/**/})
}
return loggedInUserId
})
.catch((err: Error) => {
logger.info('[login] Error:', err)
......@@ -312,7 +366,7 @@ export function subscribe (subscriptionName: string, ...params: any[]): Promise<
const subscription = asteroid.subscribe(subscriptionName, ...params)
subscriptions.push(subscription)
return subscription.ready.then((id) => {
logger.info(`[subscribe] Stream ready: ${id}`)
logger.info(`[subscribe] Subscription ${subscriptionName} ready: ${id}`)
resolve(subscription)
})
// Asteroid ^v2 interface...
......@@ -479,6 +533,7 @@ export function respondToMessages (callback: ICallback, options: IRespondOptions
// if (!isDM && !isLC) meta.roomName = await getRoomName(message.rid)
// Processing completed, call callback to respond to message
sessionStatistics.Bot_Stats_Read_Messages += 1
callback(null, message, meta)
})
return promise
......@@ -487,11 +542,9 @@ export function respondToMessages (callback: ICallback, options: IRespondOptions
/**
* Begin subscription to clientCommands for user and returns the collection
*/
async function subscribeToCommands (): Promise<ICollection> {
await subscribe(_clientCommandsSubscriptionName)
clientCommands = asteroid.getCollection(_clientCommandsCollectionName)
// v2
// clientCommands = asteroid.collections.get(_clientCommandsCollectionName) || Map()
async function subscribeToCommands (userId: string): Promise<ICollection> {
await subscribe(_clientCommandsStreamName, userId, true)
clientCommands = asteroid.getCollection(_clientCommandsStreamName)
return clientCommands
}
......@@ -503,9 +556,8 @@ async function subscribeToCommands (): Promise<ICollection> {
* - Uses error-first callback pattern
* - Second argument is the the command received
*/
async function reactToCommands (callback: ICallback): Promise<void> {
const clientCommands = await subscribeToCommands()
async function reactToCommands (userId: string, callback: ICallback): Promise<void> {
const clientCommands = await subscribeToCommands(userId)
await asyncCall('setCustomClientData', customClientData)
logger.info(`[reactive] Listening for change events in collection ${clientCommands.name}`)
......@@ -513,7 +565,13 @@ async function reactToCommands (callback: ICallback): Promise<void> {
const changedCommandQuery = clientCommands.reactiveQuery({ _id })
if (changedCommandQuery.result && changedCommandQuery.result.length > 0) {
const changedCommand = changedCommandQuery.result[0]
callback(null, changedCommand)
if (Array.isArray(changedCommand.args)) {
callback(null, changedCommand.args[0])
} else {
logger.debug('[ClientCommand] Stream received update without args, probably a reconnect')
logger.debug('[ClientCommand] Recalling setCustomClientData to ensure consistence')
asyncCall('setCustomClientData', customClientData)
}
}
})
}
......@@ -521,11 +579,11 @@ async function reactToCommands (callback: ICallback): Promise<void> {
/**
* Calls reactToCommands with a callback to read latest clientCommands and reply to them
*/
async function respondToCommands (): Promise<void | void[]> {
async function respondToCommands (userId: string): Promise<void | void[]> {
commandLastReadTime = new Date() // init before any message read
await reactToCommands(async (err, command) => {
await reactToCommands(userId, async (err, command) => {
if (err) {
logger.error(`Unable to receive commands ${JSON.stringify(err)}`)
logger.error(`[ClientCommand] Unable to receive command ${command.cmd.key}. ${JSON.stringify(err)}`)
throw err
}
......@@ -535,8 +593,12 @@ async function respondToCommands (): Promise<void | void[]> {
// Ignore commands in stream that aren't new
if (currentReadTime <= commandLastReadTime) return
// Only log the command when needed
if (silentClientCommands.indexOf(command.cmd.key) === -1) {
logger.info(`[ClientCommand] Received '${command.cmd.key}' at ${currentReadTime}`)
}
// At this point, command has passed checks and can be responded to
logger.info(`[Command] Received command '${command.cmd.key}' at ${currentReadTime}`)
commandLastReadTime = currentReadTime
// Processing completed, call callback to respond to command
......@@ -551,32 +613,66 @@ async function respondToCommands (): Promise<void | void[]> {
* @param command Command object
*/
async function commandHandler (command: IClientCommand): Promise<void | void[]> {
switch (command.cmd.key) {
// SDK-level command to pause the message stream, interrupting all messages from the server
case 'pauseMessageStream':
subscriptions.map((s: ISubscription) => (s._name === _messageCollectionName ? unsubscribe(s) : undefined))
await asyncCall('replyClientCommand', [command._id, { msg: 'OK' }])
break
// SDK-level command to resubscribe to the message stream
case 'resumeMessageStream':
await subscribeToMessages()
messageLastReadTime = new Date() // reset time of last read message
await asyncCall('replyClientCommand', [command._id, { msg: 'OK' }])
break
// SDK-level command to check for aliveness of the bot regarding commands
case 'heartbeat':
await asyncCall('replyClientCommand', [command._id, { msg: 'OK' }])
break
// If command is not at the SDK-level, it tries to call a handler added by the user
default:
const handler = commandHandlers[command.cmd.key]
if (handler) {
const result = await handler(command)
await asyncCall('replyClientCommand', [command._id, result])
}
let result: IClientCommandResponse = {
success: true
}
// Only log the command when needed
const shouldLog: boolean = silentClientCommands.indexOf(command.cmd.key) === -1
try {
const handler = commandHandlers[command.cmd.key]
switch (command.cmd.key) {
// SDK-level command to check for aliveness of the bot regarding commands
case 'heartbeat':
break
// SDK-level command to reply with the latest maxLogSize logs
case 'getLogs':
result.logs = logs
break
// SDK-level command to pause the message stream, interrupting all messages from the server
case 'pauseMessageStream':
subscriptions.map((s: ISubscription) => (s._name === _messageCollectionName ? unsubscribe(s) : undefined))
break
// SDK-level command to resubscribe to the message stream
case 'resumeMessageStream':
await subscribeToMessages()
messageLastReadTime = new Date() // reset time of last read message
break
case 'getStatistics':
const statistics: any = {}
statistics.sdk = sessionStatistics
statistics.sdk.Bot_Stats_Latest_Read = messageLastReadTime ? messageLastReadTime.toUTCString() : undefined
if (handler) {
statistics.adapter = await handler(command)
}
result.statistics = statistics
break
// If command is not at the SDK-level, it tries to call a handler added by the user
default:
if (handler) {
if (shouldLog) logger.info(`[ClientCommand] Calling custom handler of command '${command.cmd.key}'`)
result = await handler(command)
} else {
throw Error('Handler not found')
}
}
} catch (err) {
logger.info(`[ClientCommand] Error on handling of '${command.cmd.key}'. ${JSON.stringify(err)}`)
result.success = false
result.error = err
}
try {
if (shouldLog) logger.info(`[ClientCommand] Replying to '${command.cmd.key}' with result ${JSON.stringify(result)}`)
await asyncCall('replyClientCommand', [command._id, result])
if (shouldLog) logger.info(`[ClientCommand] Successful reply to command '${command.cmd.key}'`)
} catch (err) {
logger.info(`[ClientCommand] Failed to reply to command'${command.cmd.key}'. Error: ${JSON.stringify(err)}`)
}
}
......@@ -589,11 +685,11 @@ async function commandHandler (command: IClientCommand): Promise<void | void[]>
export function registerCommandHandler (key: string, callback: IClientCommandHandler) {
const currentHandler = commandHandlers[key]
if (currentHandler) {
logger.error(`[Command] Command '${key}' already has a handler`)
throw Error('Command in use')
logger.error(`[ClientCommand] Command '${key}' already has a handler`)
throw Error(`[ClientCommand] Command '${key}' already has a handler`)
}
logger.info(`[Command] Registering handler for command '${key}'`)
logger.info(`[ClientCommand] Registering handler for command '${key}'`)
commandHandlers[key] = callback
}
......@@ -605,6 +701,14 @@ export function setCustomClientData (clientData: object) {
Object.assign(customClientData, clientData)
}
/**
* Add client information to the client stack
* @param clientData Object containing additional data about the client using the SDK
*/
export function addClientToStack (clientDetails: IClientDetails) {
customClientData.stack.push(clientDetails)
}
/**
* Get every new element added to DDP in Asteroid (v2)
* @todo Resolve this functionality within Rocket.Chat with team
......
// Version of the package
export { version } from '../../package.json'
// Login settings - LDAP needs to be explicitly enabled
export let username = process.env.ROCKETCHAT_USER || 'bot'
......@@ -28,3 +30,5 @@ export let roomCacheMaxSize = parseInt(process.env.ROOM_CACHE_SIZE || '10', 10)
export let roomCacheMaxAge = 1000 * parseInt(process.env.ROOM_CACHE_MAX_AGE || '300', 10)
export let dmCacheMaxSize = parseInt(process.env.DM_ROOM_CACHE_SIZE || '10', 10)
export let dmCacheMaxAge = 1000 * parseInt(process.env.DM_ROOM_CACHE_MAX_AGE || '100', 10)
export let waitForClientCommands = process.env.WAIT_CLIENT_COMMANDS
declare module '*.json' {
export let version: string
}
declare module 'intercept-stdout'
......@@ -23,9 +23,8 @@ async function start () {
edited: true,
livechat: false
})
driver.registerCommandHandler('xdlol', async (command) => {
console.log('testing')
return { msg: 'OK' }
driver.registerCommandHandler('testCommand', async (command) => {
return { success: true, msg: 'OK' }
})
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment