Skip to content
Snippets Groups Projects
Commit c43220dc authored by Douglas Gubert's avatar Douglas Gubert Committed by Guilherme Gazzo
Browse files

fix: decoder error preventing succesfull app subprocess restart (#34858)

parent 00e7f110
No related branches found
No related tags found
No related merge requests found
---
'@rocket.chat/omnichannel-transcript': patch
'@rocket.chat/authorization-service': patch
'@rocket.chat/stream-hub-service': patch
'@rocket.chat/presence-service': patch
'@rocket.chat/fuselage-ui-kit': patch
'@rocket.chat/account-service': patch
'@rocket.chat/mock-providers': patch
'@rocket.chat/ui-theming': patch
'@rocket.chat/uikit-playground': patch
'@rocket.chat/ddp-streamer': patch
'@rocket.chat/queue-worker': patch
'@rocket.chat/apps-engine': patch
'@rocket.chat/ui-composer': patch
'@rocket.chat/ui-contexts': patch
'@rocket.chat/ui-client': patch
'@rocket.chat/models': patch
'@rocket.chat/sha256': patch
'@rocket.chat/meteor': patch
---
Fixes an issue that prevented the apps-engine from reestablishing communications with subprocesses in some cases
......@@ -8,7 +8,7 @@ import * as jsonrpc from 'jsonrpc-lite';
import { LivenessManager } from './LivenessManager';
import { ProcessMessenger } from './ProcessMessenger';
import { bundleLegacyApp } from './bundler';
import { decoder } from './codec';
import { newDecoder } from './codec';
import { AppStatus, AppStatusUtils } from '../../../definition/AppStatus';
import type { AppMethod } from '../../../definition/metadata';
import type { AppManager } from '../../AppManager';
......@@ -389,6 +389,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
console.error(`Failed to startup Deno subprocess for app ${this.getAppId()}`, err);
});
this.once('ready', this.onReady.bind(this));
this.parseStdout(this.deno.stdout);
}
......@@ -610,43 +611,50 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
}
private async parseStdout(stream: Readable): Promise<void> {
for await (const message of decoder.decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}
const JSONRPCMessage = jsonrpc.parseObject(message);
if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}
if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}
if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) => console.error(`[${this.getAppId()}] Error executing handler`, reason, message));
continue;
}
console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
try {
for await (const message of newDecoder().decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}
const JSONRPCMessage = jsonrpc.parseObject(message);
if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}
if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}
if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}
console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
}
console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}
console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}
} catch (e) {
console.error(`[${this.getAppId()}]`, e);
this.emit('error', new Error('DECODE_ERROR'));
}
}
......@@ -654,7 +662,7 @@ export class DenoRuntimeSubprocessController extends EventEmitter {
try {
const data = JSON.parse(chunk.toString());
this.debug('Metrics received from subprocess: %o', data);
this.debug('Metrics received from subprocess (via stderr): %o', data);
} catch (e) {
console.error('Subprocess stderr', chunk.toString());
}
......
......@@ -67,6 +67,13 @@ export class LivenessManager {
this.pingAbortController = new EventEmitter();
this.options = Object.assign({}, defaultOptions, options);
this.controller.on('ready', () => this.ping());
this.controller.on('error', async (reason) => {
if (reason instanceof Error && reason.message.startsWith('DECODE_ERROR')) {
await this.restartProcess('Decode error', 'controller');
}
})
}
public getRuntimeData() {
......@@ -84,7 +91,6 @@ export class LivenessManager {
this.pingTimeoutConsecutiveCount = 0;
this.controller.once('ready', () => this.ping());
this.subprocess.once('exit', this.handleExit.bind(this));
this.subprocess.once('error', this.handleError.bind(this));
}
......@@ -188,7 +194,7 @@ export class LivenessManager {
this.restartProcess(reason);
}
private async restartProcess(reason: string) {
private async restartProcess(reason: string, source = 'liveness-manager') {
if (this.restartCount >= this.options.maxRestarts) {
this.debug('Limit of restarts reached (%d). Aborting restart...', this.options.maxRestarts);
this.controller.stopApp();
......@@ -197,8 +203,8 @@ export class LivenessManager {
this.restartLog.push({
reason,
source,
restartedAt: new Date(),
source: 'liveness-manager',
pid: this.subprocess.pid,
});
......
......@@ -2,10 +2,11 @@ import type { ChildProcess } from 'child_process';
import type { JsonRpc } from 'jsonrpc-lite';
import { encoder } from './codec';
import { Encoder, newEncoder } from './codec';
export class ProcessMessenger {
private deno: ChildProcess | undefined;
private encoder: Encoder | undefined;
private _sendStrategy: (message: JsonRpc) => void;
......@@ -25,6 +26,7 @@ export class ProcessMessenger {
public clearReceiver() {
delete this.deno;
delete this.encoder;
this.switchStrategy();
}
......@@ -32,6 +34,9 @@ export class ProcessMessenger {
private switchStrategy() {
if (this.deno?.stdin?.writable) {
this._sendStrategy = this.strategySend.bind(this);
// Get a clean encoder
this.encoder = newEncoder();
} else {
this._sendStrategy = this.strategyError.bind(this);
}
......@@ -43,6 +48,6 @@ export class ProcessMessenger {
private strategySend(message: JsonRpc) {
this.debug('Sending message to subprocess %o', message);
this.deno.stdin.write(encoder.encode(message));
this.deno.stdin.write(this.encoder.encode(message));
}
}
import { Decoder, Encoder, ExtensionCodec } from '@msgpack/msgpack';
import { Decoder as _Decoder, Encoder as _Encoder, ExtensionCodec } from '@msgpack/msgpack';
const extensionCodec = new ExtensionCodec();
......@@ -10,6 +10,7 @@ extensionCodec.register({
return new Uint8Array([0]);
}
},
decode: (_data: Uint8Array) => undefined,
});
......@@ -21,9 +22,24 @@ extensionCodec.register({
return new Uint8Array(object.buffer, object.byteOffset, object.byteLength);
}
},
// msgpack will reuse the Uint8Array instance, so WE NEED to copy it instead of simply creating a view
decode: (data: Uint8Array) => Buffer.from(data),
});
export const encoder = new Encoder({ extensionCodec });
export const decoder = new Decoder({ extensionCodec });
/**
* The Encoder and Decoder classes perform "stateful" operations, i.e. they read from a
* stream, store the data locally and decode it from its buffer.
*
* In practice, this affects the decoder when there is decode error. After an error, the decoder
* keeps the malformed data in its buffer, and even if we try to decode from another source (e.g. different stream)
* it will fail again as there's still data in the buffer.
*
* For that reason, we can't have a singleton instance of Encoder and Decoder, but rather one
* instance for each time we create a new subprocess
*/
export const newEncoder = () => new _Encoder({ extensionCodec });
export const newDecoder = () => new _Decoder({ extensionCodec });
export type Encoder = _Encoder;
export type Decoder = _Decoder;
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment