driver.ts 18.8 KB
Newer Older
1
import { EventEmitter } from 'events'
2
3
4
5
import Asteroid from 'asteroid'
// Asteroid v2 imports
/*
import { createClass } from 'asteroid'
6
import WebSocket from 'ws'
7
8
9
import { Map } from 'immutable'
import immutableCollectionMixin from 'asteroid-immutable-collections-mixin'
*/
10
import * as methodCache from './methodCache'
11
import { Message } from './message'
12
import { IConnectOptions, IRespondOptions, ICallback, ILogger } from '../config/driverInterfaces'
13
import { IAsteroid, ICredentials, ISubscription, ICollection } from '../config/asteroidInterfaces'
14
import { IMessage } from '../config/messageInterfaces'
15
import { logger, replaceLog } from './log'
16

17
18
19
/** Collection names */
const _messageCollectionName = 'stream-room-messages'
const _messageStreamName = '__my_messages__'
20

21
/**
22
23
 * Asteroid ^v2 interface below, suspended for work on future branch
 * @todo Upgrade to Asteroid v2 or find a better maintained ddp client
24
 */
25
26
27
28
29
30
/*
const Asteroid: IAsteroid = createClass([immutableCollectionMixin])
*/

// CONNECTION SETUP AND CONFIGURE
// -----------------------------------------------------------------------------
31
32

/**
33
 * Define default config as public, allowing overrides from new connection.
34
 * Enable SSL by default if Rocket.Chat URL contains https.
35
 */
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
export function connectDefaults (): IConnectOptions {
  return {
    host: process.env.ROCKETCHAT_URL || 'localhost:3000',
    useSsl: ((process.env.ROCKETCHAT_URL || '').toString().startsWith('https')),
    timeout: 20 * 1000 // 20 seconds
  }
}

/** Define default config for message respond filters. */
export function respondDefaults (): IRespondOptions {
  return {
    allPublic: (process.env.LISTEN_ON_ALL_PUBLIC || 'false').toLowerCase() === 'true',
    dm: (process.env.RESPOND_TO_DM || 'false').toLowerCase() === 'true',
    livechat: (process.env.RESPOND_TO_LIVECHAT || 'false').toLowerCase() === 'true',
    edited: (process.env.RESPOND_TO_EDITED || 'false').toLowerCase() === 'true'
  }
52
53
}

54
55
56
/** Internal for comparing message update timestamps */
export let lastReadTime: Date

57
58
59
60
61
/**
 * The integration property is applied as an ID on sent messages `bot.i` param
 * Should be replaced when connection is invoked by a package using the SDK
 * e.g. The Hubot adapter would pass its integration ID with credentials, like:
 */
62
export const integrationId = process.env.INTEGRATION_ID || 'js.SDK'
63

64
/**
65
 * Event Emitter for listening to connection.
66
 * @example
67
 *  import { driver } from '@rocket.chat/sdk'
68
69
70
71
72
73
 *  driver.connect()
 *  driver.events.on('connected', () => console.log('driver connected'))
 */
export const events = new EventEmitter()

/**
74
75
 * An Asteroid instance for interacting with Rocket.Chat.
 * Variable not initialised until `connect` called.
76
77
78
79
 */
export let asteroid: IAsteroid

/**
80
81
82
83
84
 * Asteroid subscriptions, exported for direct polling by adapters
 * Variable not initialised until `prepMeteorSubscriptions` called.
 */
export let subscriptions: ISubscription[] = []

85
86
87
88
89
/**
 * Current user object populated from resolved login
 */
export let userId: string

90
91
92
93
94
/**
 * Array of messages received from reactive collection
 */
export let messages: ICollection

95
96
97
98
99
100
101
/**
 * Allow override of default logging with adapter's log instance
 */
export function useLog (externalLog: ILogger) {
  replaceLog(externalLog)
}

102
103
/**
 * Initialise asteroid instance with given options or defaults.
104
105
 * Returns promise, resolved with Asteroid instance. Callback follows
 * error-first-pattern. Error returned or promise rejected on timeout.
106
 * Removes http/s protocol to get connection hostname if taken from URL.
107
 * @example <caption>Use with callback</caption>
108
 *  import { driver } from '@rocket.chat/sdk'
109
 *  driver.connect({}, (err) => {
110
 *    if (err) throw err
111
 *    else console.log('connected')
112
113
 *  })
 * @example <caption>Using promise</caption>
114
 *  import { driver } from '@rocket.chat/sdk'
115
 *  driver.connect()
116
117
 *    .then(() => console.log('connected'))
 *    .catch((err) => console.error(err))
118
 */
119
export function connect (options: IConnectOptions = {}, callback?: ICallback): Promise<IAsteroid> {
120
  return new Promise((resolve, reject) => {
121
    const config = Object.assign({}, connectDefaults(), options) // override defaults
122
123
    config.host = config.host!.replace(/(^\w+:|^)\/\//, '')
    logger.info('[connect] Connecting', config)
124
    asteroid = new Asteroid(config.host, config.useSsl)
125
126
    // Asteroid ^v2 interface...
    /*
127
128
    asteroid = new Asteroid({
      endpoint: `ws://${options.host}/websocket`,
129
      SocketConstructor: WebSocket
130
    })
131
    */
132
    setupMethodCache(asteroid) // init instance for later caching method calls
133
134
    asteroid.on('connected', () => events.emit('connected'))
    asteroid.on('reconnected', () => events.emit('reconnected'))
135
    let cancelled = false
136
    const rejectionTimeout = setTimeout(function () {
137
      logger.info(`[connect] Timeout (${config.timeout})`)
138
      const err = new Error('Asteroid connection timeout')
139
140
      cancelled = true
      events.removeAllListeners('connected')
141
142
      callback ? callback(err, asteroid) : reject(err)
    }, config.timeout)
143
144
145
146
147
148
149
150
151
152
153
154

    // if to avoid condition where timeout happens before listener to 'connected' is added
    // and this listener is not removed (because it was added after the removal)
    if (!cancelled) {
      events.once('connected', () => {
        logger.info('[connect] Connected')
        // if (cancelled) return asteroid.ddp.disconnect() // cancel if already rejected
        clearTimeout(rejectionTimeout)
        if (callback) callback(null, asteroid)
        resolve(asteroid)
      })
    }
155
156
157
158
159
160
161
  })
}

/**
 * Remove all active subscriptions, logout and disconnect from Rocket.Chat
 */
export function disconnect (): Promise<void> {
162
  logger.info('Unsubscribing, logging out, disconnecting')
163
  unsubscribeAll()
164
  return logout().then(() => Promise.resolve()) // asteroid.disconnect()) // v2 only
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
}

// ASYNC AND CACHE METHOD UTILS
// -----------------------------------------------------------------------------

/**
 * Setup method cache configs from env or defaults, before they are called.
 * @param asteroid The asteroid instance to cache method calls
 */
function setupMethodCache (asteroid: IAsteroid): void {
  methodCache.use(asteroid)
  methodCache.create('getRoomIdByNameOrId', {
    max: parseInt(process.env.ROOM_CACHE_SIZE || '10', 10),
    maxAge: 1000 * parseInt(process.env.ROOM_CACHE_MAX_AGE || '300', 10)
  }),
  methodCache.create('getRoomNameById', {
    max: parseInt(process.env.ROOM_CACHE_SIZE || '10', 10),
    maxAge: 1000 * parseInt(process.env.ROOM_CACHE_MAX_AGE || '300', 10)
  })
  methodCache.create('createDirectMessage', {
    max: parseInt(process.env.DM_ROOM_CACHE_SIZE || '10', 10),
    maxAge: 1000 * parseInt(process.env.DM_ROOM_CACHE_MAX_AGE || '100', 10)
  })
}

/**
 * Wraps method calls to ensure they return a Promise with caught exceptions.
 * @param method The Rocket.Chat server method, to call through Asteroid
 * @param params Single or array of parameters of the method to call
 */
export function asyncCall (method: string, params: any | any[]): Promise<any> {
  if (!Array.isArray(params)) params = [params] // cast to array for apply
197
  logger.info(`[${method}] Calling (async): ${JSON.stringify(params)}`)
198
199
  return Promise.resolve(asteroid.apply(method, params).result)
    .catch((err: Error) => {
200
      logger.error(`[${method}] Error:`, err)
201
202
203
204
      throw err // throw after log to stop async chain
    })
    .then((result: any) => {
      (result)
205
206
        ? logger.debug(`[${method}] Success: ${JSON.stringify(result)}`)
        : logger.debug(`[${method}] Success`)
207
208
209
210
      return result
    })
}

211
212
213
214
215
216
217
218
219
/**
 * Call a method as async via Asteroid, or through cache if one is created.
 * @param name The Rocket.Chat server method to call
 * @param params Single or array of parameters of the method to call
 */
export function callMethod (name: string, params: any | any[]): Promise<any> {
  return (methodCache.has(name))
    ? asyncCall(name, params)
    : cacheCall(name, params)
220
221
222
223
224
225
226
227
228
229
}

/**
 * Wraps Asteroid method calls, passed through method cache if cache is valid.
 * @param method The Rocket.Chat server method, to call through Asteroid
 * @param key Single string parameters only, required to use as cache key
 */
export function cacheCall (method: string, key: string): Promise<any> {
  return methodCache.call(method, key)
    .catch((err: Error) => {
230
      logger.error(`[${method}] Error:`, err)
231
232
233
234
      throw err // throw after log to stop async chain
    })
    .then((result: any) => {
      (result)
235
236
        ? logger.debug(`[${method}] Success: ${JSON.stringify(result)}`)
        : logger.debug(`[${method}] Success`)
237
238
239
240
241
242
243
244
245
      return result
    })
}

// LOGIN AND SUBSCRIBE TO ROOMS
// -----------------------------------------------------------------------------

/** Login to Rocket.Chat via Asteroid */
export function login (credentials: ICredentials): Promise<any> {
246
  logger.info(`[login] Logging in ${credentials.username || credentials.email}`)
247
248
249
250
251
252
253
254
255
256
257
258
  let login: Promise<any>
  if (process.env.ROCKETCHAT_AUTH === 'ldap') {
    const params = [
      credentials.username,
      credentials.password,
      { ldap: true, ldapOptions: {} }
    ]
    login = asteroid.loginWithLDAP(...params)
  } else {
    const usernameOrEmail = credentials.username || credentials.email || 'bot'
    login = asteroid.loginWithPassword(usernameOrEmail, credentials.password)
  }
259
260
261
262
263
264
265
266
267
  return login
    .then((loggedInUserId) => {
      userId = loggedInUserId
      return loggedInUserId
    })
    .catch((err: Error) => {
      logger.info('[login] Error:', err)
      throw err // throw after log to stop async chain
    })
268
269
270
271
272
}

/** Logout of Rocket.Chat via Asteroid */
export function logout (): Promise<void | null> {
  return asteroid.logout().catch((err: Error) => {
273
    logger.error('[Logout] Error:', err)
274
275
276
277
278
279
280
281
282
283
284
    throw err // throw after log to stop async chain
  })
}

/**
 * Subscribe to Meteor subscription
 * Resolves with subscription (added to array), with ID property
 * @todo - 3rd param of asteroid.subscribe is deprecated in Rocket.Chat?
 */
export function subscribe (topic: string, roomId: string): Promise<ISubscription> {
  return new Promise((resolve, reject) => {
285
    logger.info(`[subscribe] Preparing subscription: ${topic}: ${roomId}`)
286
287
288
    const subscription = asteroid.subscribe(topic, roomId, true)
    subscriptions.push(subscription)
    return subscription.ready.then((id) => {
289
      logger.info(`[subscribe] Stream ready: ${id}`)
290
291
292
293
294
295
296
297
298
      resolve(subscription)
    })
    // Asteroid ^v2 interface...
    /*
    subscription.on('ready', () => {
      console.log(`[${topic}] Subscribe ready`)
      events.emit('subscription-ready', subscription)
      subscriptions.push(subscription)
      resolve(subscription)
299
    })
300
301
302
303
304
305
    subscription.on('error', (err: Error) => {
      console.error(`[${topic}] Subscribe error:`, err)
      events.emit('subscription-error', roomId, err)
      reject(err)
    })
    */
306
307
  })
}
308

309
310
311
312
313
314
315
/** Unsubscribe from Meteor subscription */
export function unsubscribe (subscription: ISubscription): void {
  const index = subscriptions.indexOf(subscription)
  if (index === -1) return
  subscription.stop()
  // asteroid.unsubscribe(subscription.id) // v2
  subscriptions.splice(index, 1) // remove from collection
316
  logger.info(`[${subscription.id}] Unsubscribed`)
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
}

/** Unsubscribe from all subscriptions in collection */
export function unsubscribeAll (): void {
  subscriptions.map((s: ISubscription) => unsubscribe(s))
}

/**
 * Begin subscription to room events for user.
 * Older adapters used an option for this method but it was always the default.
 */
export function subscribeToMessages (): Promise<ISubscription> {
  return subscribe(_messageCollectionName, _messageStreamName)
    .then((subscription) => {
      messages = asteroid.getCollection(_messageCollectionName)
      // v2
      // messages = asteroid.collections.get(_messageCollectionName) || Map()
      return subscription
    })
}
337

338
339
340
341
342
343
344
345
346
347
348
349
/**
 * Once a subscription is created, using `subscribeToMessages` this method
 * can be used to attach a callback to changes in the message stream.
 * This can be called directly for custom extensions, but for most usage (e.g.
 * for bots) the respondToMessages is more useful to only receive messages
 * matching configuration.
 *
 * @param callback Function called with every change in subscriptions.
 *  - Uses error-first callback pattern
 *  - Second argument is the changed item
 *  - Third argument is additional attributes, such as `roomType`
 */
350
export function reactToMessages (callback: ICallback): void {
351
  logger.info(`[reactive] Listening for change events in collection ${messages.name}`)
352
353
354
355
  messages.reactiveQuery({}).on('change', (_id: string) => {
    const changedMessageQuery = messages.reactiveQuery({ _id })
    if (changedMessageQuery.result && changedMessageQuery.result.length > 0) {
      const changedMessage = changedMessageQuery.result[0]
356
357
      if (Array.isArray(changedMessage.args)) {
        logger.info(`[received] Message in room ${ changedMessage.args[0].rid }`)
358
359
        callback(null, changedMessage.args[0], changedMessage.args[1])
      } else {
360
        logger.debug('[received] Update without message args')
361
362
      }
    } else {
363
      logger.debug('[received] Reactive query at ID ${ _id } without results')
364
365
366
367
    }
  })
}

368
369
370
371
372
373
374
375
376
377
378
379
/**
 * Proxy for `reactToMessages` with some filtering of messages based on config.
 *
 * @param callback Function called after filters run on subscription events.
 *  - Uses error-first callback pattern
 *  - Second argument is the changed item
 *  - Third argument is additional attributes, such as `roomType`
 * @param options Sets filters for different event/message types.
 */
export function respondToMessages (callback: ICallback, options: IRespondOptions = {}): void {
  const config = Object.assign({}, respondDefaults(), options)
  lastReadTime = new Date() // init before any message read
380
  reactToMessages(async (err, message, meta) => {
381
382
383
384
385
386
387
388
389
    if (err) {
      logger.error(`Unable to receive messages ${JSON.stringify(err)}`)
      callback(err) // bubble errors back to adapter
    }

    // Ignore bot's own messages
    if (message.u._id === userId) return

    // Ignore DMs if configured to
390
    const isDM = meta.roomType === 'd'
391
392
393
    if (isDM && !config.dm) return

    // Ignore Livechat if configured to
394
    const isLC = meta.roomType === 'l'
395
396
397
    if (isLC && !config.livechat) return

    // Ignore messages in public rooms not joined by bot if configured to
398
    if (!config.allPublic && !isDM && !meta.roomParticipant) return
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418

    // Set current time for comparison to incoming
    let currentReadTime = new Date(message.ts.$date)

    // Ignore edited messages if configured to
    // unless it's newer than current read time (hasn't been seen before)
    // @todo: test this logic, why not just return if edited and not responding
    if (config.edited && typeof message.editedAt !== 'undefined') {
      let edited = new Date(message.editedAt.$date)
      if (edited > currentReadTime) currentReadTime = edited
    }

    // Ignore messages in stream that aren't new
    if (currentReadTime <= lastReadTime) return

    // At this point, message has passed checks and can be responded to
    logger.info(`Message receive callback ID ${message._id} at ${currentReadTime}`)
    logger.info(`[Incoming] ${message.u.username}: ${(message.file !== undefined) ? message.attachments[0].title : message.msg}`)
    lastReadTime = currentReadTime

419
420
421
422
423
    // Add room name to meta, is useful for some adapters
    if (!isDM && !isLC) meta.roomName = await getRoomName(message.rid)

    // Processing completed, call callback to respond to message
    callback(null, message, meta)
424
425
426
  })
}

427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
/**
 * Get every new element added to DDP in Asteroid (v2)
 * @todo Resolve this functionality within Rocket.Chat with team
 * @param callback Function to call with element details
 */
/*
export function onAdded (callback: ICallback): void {
  console.log('Setting up reactive message list...')
  try {
    asteroid.ddp.on('added', ({ collection, id, fields }) => {
      console.log(`Element added to collection ${ collection }`)
      console.log(id)
      console.log(fields)
      callback(null, id)
    })
  } catch (err) {
    callback(err)
  }
}
*/

// PREPARE AND SEND MESSAGES
// -----------------------------------------------------------------------------

/** Get ID for a room by name (or ID). */
452
export function getRoomId (name: string): Promise<string> {
453
  return cacheCall('getRoomIdByNameOrId', name)
454
455
}

456
/** Get name for a room by ID. */
457
export function getRoomName (id: string): Promise<string> {
458
  return cacheCall('getRoomNameById', id)
459
460
461
}

/**
462
463
 * Get ID for a DM room by its recipient's name.
 * Will create a DM (with the bot) if it doesn't exist already.
464
 * @todo test why create resolves with object instead of simply ID
465
466
 */
export function getDirectMessageRoomId (username: string): Promise<string> {
467
  return cacheCall('createDirectMessage', username).then((DM) => DM.rid)
468
469
470
471
472
}

/** Join the bot into a room by its name or ID */
export function joinRoom (room: string): Promise<void> {
  return getRoomId(room).then((roomId) => asyncCall('joinRoom', roomId))
473
474
}

475
476
477
/** Join a set of rooms by array of names or IDs */
export function joinRooms (rooms: string[]): Promise<void[]> {
  return Promise.all(rooms.map((room) => joinRoom(room)))
478
479
480
}

/**
481
482
 * Structure message content, optionally addressing to room ID.
 * Accepts message text string or a structured message object.
483
 */
484
export function prepareMessage (content: string | IMessage, roomId?: string): Message {
485
  const message = new Message(content, integrationId)
486
487
488
489
490
  if (roomId) message.setRoomId(roomId)
  return message
}

/**
491
492
 * Send a prepared message object (with pre-defined room ID).
 * Usually prepared and called by sendMessageByRoomId or sendMessageByRoom.
493
 */
494
495
export function sendMessage (message: IMessage): Promise<IMessage> {
  return asyncCall('sendMessage', message)
496
497
}

498
/**
499
500
501
 * Prepare and send string/s to specified room ID.
 * @param content Accepts message text string or array of strings.
 * @param roomId  ID of the target room to use in send.
502
 */
503
504
505
506
507
508
509
510
export function sendToRoomId (content: string | string[], roomId: string): Promise<IMessage[] | IMessage> {
  if (!Array.isArray(content)) {
    return sendMessage(prepareMessage(content, roomId))
  } else {
    return Promise.all(content.map((text) => {
      return sendMessage(prepareMessage(text, roomId))
    }))
  }
511
512
}

513
/**
514
515
516
 * Prepare and send string/s to specified room name (or ID).
 * @param content Accepts message text string or array of strings.
 * @param room    A name (or ID) to resolve as ID to use in send.
517
 */
518
519
export function sendToRoom (content: string | string[], room: string): Promise<IMessage[] | IMessage> {
  return getRoomId(room).then((roomId) => sendToRoomId(content, roomId))
520
521
522
}

/**
523
524
525
 * Prepare and send string/s to a user in a DM.
 * @param content   Accepts message text string or array of strings.
 * @param username  Name to create (or get) DM for room ID to use in send.
526
 */
527
528
export function sendDirectToUser (content: string | string[], username: string): Promise<IMessage[] | IMessage> {
  return getDirectMessageRoomId(username).then((rid) => sendToRoomId(content, rid))
529
}