-
Douglas Gubert authoredDouglas Gubert authored
AppsEngineDenoRuntime.ts 24.16 KiB
import * as child_process from 'child_process';
import * as path from 'path';
import { type Readable, EventEmitter } from 'stream';
import debugFactory from 'debug';
import * as jsonrpc from 'jsonrpc-lite';
import { LivenessManager } from './LivenessManager';
import { ProcessMessenger } from './ProcessMessenger';
import { bundleLegacyApp } from './bundler';
import { newDecoder } from './codec';
import { AppStatus, AppStatusUtils } from '../../../definition/AppStatus';
import type { AppMethod } from '../../../definition/metadata';
import type { AppManager } from '../../AppManager';
import type { AppBridges } from '../../bridges';
import type { IParseAppPackageResult } from '../../compiler';
import { AppConsole, type ILoggerStorageEntry } from '../../logging';
import type { AppAccessorManager, AppApiManager } from '../../managers';
import type { AppLogStorage, IAppStorageItem } from '../../storage';
const baseDebug = debugFactory('appsEngine:runtime:deno');
export const ALLOWED_ACCESSOR_METHODS = [
'getConfigurationExtend',
'getEnvironmentRead',
'getEnvironmentWrite',
'getConfigurationModify',
'getReader',
'getPersistence',
'getHttp',
'getModifier',
] as Array<
keyof Pick<
AppAccessorManager,
| 'getConfigurationExtend'
| 'getEnvironmentRead'
| 'getEnvironmentWrite'
| 'getConfigurationModify'
| 'getReader'
| 'getPersistence'
| 'getHttp'
| 'getModifier'
>
>;
// Trying to access environment variables in Deno throws an error where in vm2 it simply returned `undefined`
// So here we define the allowed envvars to prevent the process (and the compatibility) from breaking
export const ALLOWED_ENVIRONMENT_VARIABLES = [
'NODE_EXTRA_CA_CERTS', // Accessed by the `https` node module
];
const COMMAND_PONG = '_zPONG';
export const JSONRPC_METHOD_NOT_FOUND = -32601;
export function getRuntimeTimeout() {
const defaultTimeout = 30000;
const envValue = isFinite(process.env.APPS_ENGINE_RUNTIME_TIMEOUT as any) ? Number(process.env.APPS_ENGINE_RUNTIME_TIMEOUT) : defaultTimeout;
if (envValue < 0) {
console.log('Environment variable APPS_ENGINE_RUNTIME_TIMEOUT has a negative value, ignoring...');
return defaultTimeout;
}
return envValue;
}
export function isValidOrigin(accessor: string): accessor is (typeof ALLOWED_ACCESSOR_METHODS)[number] {
return ALLOWED_ACCESSOR_METHODS.includes(accessor as any);
}
export function getDenoWrapperPath(): string {
try {
// This path is relative to the compiled version of the Apps-Engine source
return require.resolve('../../../deno-runtime/main.ts');
} catch {
// This path is relative to the original Apps-Engine files
return require.resolve('../../../../deno-runtime/main.ts');
}
}
export type DenoRuntimeOptions = {
timeout: number;
};
export class DenoRuntimeSubprocessController extends EventEmitter {
private deno: child_process.ChildProcess | undefined;
private state: 'uninitialized' | 'ready' | 'invalid' | 'restarting' | 'unknown' | 'stopped';
/**
* Incremental id that keeps track of how many times we've spawned a process for this app
*/
private spawnId = 0;
private readonly debug: debug.Debugger;
private readonly options = {
timeout: getRuntimeTimeout(),
};
private readonly accessors: AppAccessorManager;
private readonly api: AppApiManager;
private readonly logStorage: AppLogStorage;
private readonly bridges: AppBridges;
private readonly messenger: ProcessMessenger;
private readonly livenessManager: LivenessManager;
// We need to keep the appSource around in case the Deno process needs to be restarted
constructor(
manager: AppManager,
private readonly appPackage: IParseAppPackageResult,
private readonly storageItem: IAppStorageItem,
) {
super();
this.debug = baseDebug.extend(appPackage.info.id);
this.messenger = new ProcessMessenger(this.debug);
this.livenessManager = new LivenessManager({
controller: this,
messenger: this.messenger,
debug: this.debug,
});
this.state = 'uninitialized';
this.accessors = manager.getAccessorManager();
this.api = manager.getApiManager();
this.logStorage = manager.getLogStorage();
this.bridges = manager.getBridges();
}
public spawnProcess(): void {
try {
const denoExePath = 'deno';
const denoWrapperPath = getDenoWrapperPath();
// During development, the appsEngineDir is enough to run the deno process
const appsEngineDir = path.dirname(path.join(denoWrapperPath, '..'));
const DENO_DIR = process.env.DENO_DIR ?? path.join(appsEngineDir, '.deno-cache');
// When running in production, we're likely inside a node_modules which the Deno
// process must be able to read in order to include files that use NPM packages
const parentNodeModulesDir = path.dirname(path.join(appsEngineDir, '..'));
const options = [
'run',
`--allow-read=${appsEngineDir},${parentNodeModulesDir}`,
`--allow-env=${ALLOWED_ENVIRONMENT_VARIABLES.join(',')}`,
denoWrapperPath,
'--subprocess',
this.appPackage.info.id,
'--spawnId',
String(this.spawnId++),
];
// If the app doesn't request any permissions, it gets the default set of permissions, which includes "networking"
// If the app requests specific permissions, we need to check whether it requests "networking" or not
if (!this.appPackage.info.permissions || this.appPackage.info.permissions.findIndex((p) => p.name === 'networking') !== -1) {
options.splice(1, 0, '--allow-net');
}
const environment = {
env: {
// We need to pass the PATH, otherwise the shell won't find the deno executable
// But the runtime itself won't have access to the env var because of the parameters
PATH: process.env.PATH,
DENO_DIR,
},
};
this.deno = child_process.spawn(denoExePath, options, environment);
this.messenger.setReceiver(this.deno);
this.livenessManager.attach(this.deno);
this.debug('Started subprocess %d with options %O and env %O', this.deno.pid, options, environment);
this.setupListeners();
} catch (e) {
this.state = 'invalid';
console.error(`Failed to start Deno subprocess for app ${this.getAppId()}`, e);
}
}
/**
* Attempts to kill the process currently controlled by this.deno
*
* @returns boolean - if a process has been killed or not
*/
public async killProcess(): Promise<boolean> {
if (!this.deno) {
this.debug('No child process reference');
return false;
}
let { killed } = this.deno;
// This field is not populated if the process is killed by the OS
if (killed) {
this.debug('App process was already killed');
return killed;
}
// What else should we do?
if (this.deno.kill('SIGKILL')) {
// Let's wait until we get confirmation the process exited
await new Promise<void>((r) => this.deno.on('exit', r));
killed = true;
} else {
this.debug('Tried killing the process but failed. Was it already dead?');
killed = false;
}
delete this.deno;
this.messenger.clearReceiver();
return killed;
}
// Debug purposes, could be deleted later
emit(eventName: string | symbol, ...args: any[]): boolean {
const hadListeners = super.emit(eventName, ...args);
if (!hadListeners) {
this.debug('Emitted but no one listened: ', eventName, args);
}
return hadListeners;
}
public getProcessState() {
return this.state;
}
public async getStatus(): Promise<AppStatus> {
// If the process has been terminated, we can't get the status
if (!this.deno || this.deno.exitCode !== null) {
return AppStatus.UNKNOWN;
}
return this.sendRequest({ method: 'app:getStatus', params: [] }) as Promise<AppStatus>;
}
public async setupApp() {
this.debug('Setting up app subprocess');
this.spawnProcess();
// If there is more than one file in the package, then it is a legacy app that has not been bundled
if (Object.keys(this.appPackage.files).length > 1) {
await bundleLegacyApp(this.appPackage);
}
await this.waitUntilReady();
await this.sendRequest({ method: 'app:construct', params: [this.appPackage] });
}
public async stopApp() {
this.debug('Stopping app subprocess');
this.state = 'stopped';
await this.killProcess();
}
public async restartApp() {
this.debug('Restarting app subprocess');
const logger = new AppConsole('runtime:restart');
logger.info('Starting restart procedure for app subprocess...', this.livenessManager.getRuntimeData());
this.state = 'restarting';
try {
const pid = this.deno?.pid;
const hasKilled = await this.killProcess();
if (hasKilled) {
logger.debug('Process successfully terminated', { pid });
} else {
logger.warn('Could not terminate process. Maybe it was already dead?', { pid });
}
await this.setupApp();
logger.info('New subprocess successfully spawned', { pid: this.deno.pid });
// setupApp() changes the state to 'ready' - we'll need to workaround that for now
this.state = 'restarting';
await this.sendRequest({ method: 'app:initialize' });
await this.sendRequest({ method: 'app:setStatus', params: [this.storageItem.status] });
if (AppStatusUtils.isEnabled(this.storageItem.status)) {
await this.sendRequest({ method: 'app:onEnable' });
}
this.state = 'ready';
logger.info('Successfully restarted app subprocess');
} catch (e) {
logger.error("Failed to restart app's subprocess", { error: e.message || e });
throw e;
} finally {
await this.logStorage.storeEntries(AppConsole.toStorageEntry(this.getAppId(), logger));
}
}
public getAppId(): string {
return this.appPackage.info.id;
}
public async sendRequest(message: Pick<jsonrpc.RequestObject, 'method' | 'params'>, options = this.options): Promise<unknown> {
const id = String(Math.random().toString(36)).substring(2);
const start = Date.now();
const request = jsonrpc.request(id, message.method, message.params);
const promise = this.waitForResponse(request, options).finally(() => {
this.debug('Request %s for method %s took %dms', id, message.method, Date.now() - start);
});
this.messenger.send(request);
return promise;
}
private waitUntilReady(): Promise<void> {
if (this.state === 'ready') {
return;
}
return new Promise((resolve, reject) => {
let timeoutId: NodeJS.Timeout;
const handler = () => {
clearTimeout(timeoutId);
resolve();
};
timeoutId = setTimeout(() => {
this.off('ready', handler);
reject(new Error(`[${this.getAppId()}] Timeout: app process not ready`));
}, this.options.timeout);
this.once('ready', handler);
});
}
private waitForResponse(req: jsonrpc.RequestObject, options = this.options): Promise<unknown> {
return new Promise((resolve, reject) => {
const responseCallback = (result: unknown, error: jsonrpc.IParsedObjectError['payload']['error']) => {
clearTimeout(timeoutId);
if (error) {
reject(error);
}
resolve(result);
};
const eventName = `result:${req.id}`;
const timeoutId = setTimeout(() => {
this.off(eventName, responseCallback);
reject(new Error(`[${this.getAppId()}] Request "${req.id}" for method "${req.method}" timed out`));
}, options.timeout);
this.once(eventName, responseCallback);
});
}
private onReady(): void {
this.state = 'ready';
}
private setupListeners(): void {
if (!this.deno) {
return;
}
this.deno.stderr.on('data', this.parseError.bind(this));
this.deno.on('error', (err) => {
this.state = 'invalid';
console.error(`Failed to startup Deno subprocess for app ${this.getAppId()}`, err);
});
this.once('ready', this.onReady.bind(this));
this.parseStdout(this.deno.stdout);
}
// Probable should extract this to a separate file
private async handleAccessorMessage({ payload: { method, id, params } }: jsonrpc.IParsedObjectRequest): Promise<jsonrpc.SuccessObject> {
const accessorMethods = method.substring(9).split(':'); // First 9 characters are always 'accessor:'
this.debug('Handling accessor message %o with params %o', accessorMethods, params);
const managerOrigin = accessorMethods.shift();
const tailMethodName = accessorMethods.pop();
// If we're restarting the app, we can't register resources again, so we
// hijack requests for the `ConfigurationExtend` accessor and don't let them through
// This needs to be refactored ASAP
if (this.state === 'restarting' && managerOrigin === 'getConfigurationExtend') {
return jsonrpc.success(id, null);
}
if (managerOrigin === 'api' && tailMethodName === 'listApis') {
const result = this.api.listApis(this.appPackage.info.id);
return jsonrpc.success(id, result);
}
/**
* At this point, the accessorMethods array will contain the path to the accessor from the origin (AppAccessorManager)
* The accessor is the one that contains the actual method the app wants to call
*
* Most of the times, it will take one step from origin to accessor
* For example, for the call AppAccessorManager.getEnvironmentRead().getServerSettings().getValueById() we'll have
* the following:
*
* ```
* const managerOrigin = 'getEnvironmentRead'
* const tailMethod = 'getValueById'
* const accessorMethods = ['getServerSettings']
* ```
*
* But sometimes there can be more steps, like in the following example:
* AppAccessorManager.getReader().getEnvironmentReader().getEnvironmentVariables().getValueByName()
* In this case, we'll have:
*
* ```
* const managerOrigin = 'getReader'
* const tailMethod = 'getValueByName'
* const accessorMethods = ['getEnvironmentReader', 'getEnvironmentVariables']
* ```
**/
// Prevent app from trying to get properties from the manager that
// are not intended for public access
if (!isValidOrigin(managerOrigin)) {
throw new Error(`Invalid accessor namespace "${managerOrigin}"`);
}
// Need to fix typing of return value
const getAccessorForOrigin = (
accessorMethods: string[],
managerOrigin: (typeof ALLOWED_ACCESSOR_METHODS)[number],
accessorManager: AppAccessorManager,
) => {
const origin = accessorManager[managerOrigin](this.appPackage.info.id);
if (managerOrigin === 'getHttp' || managerOrigin === 'getPersistence') {
return origin;
}
if (managerOrigin === 'getConfigurationExtend' || managerOrigin === 'getConfigurationModify') {
return origin[accessorMethods[0] as keyof typeof origin];
}
let accessor = origin;
// Call all intermediary objects to "resolve" the accessor
accessorMethods.forEach((methodName) => {
const method = accessor[methodName as keyof typeof accessor] as unknown;
if (typeof method !== 'function') {
throw new Error(`Invalid accessor method "${methodName}"`);
}
accessor = method.apply(accessor);
});
return accessor;
};
const accessor = getAccessorForOrigin(accessorMethods, managerOrigin, this.accessors);
const tailMethod = accessor[tailMethodName as keyof typeof accessor] as unknown;
if (typeof tailMethod !== 'function') {
throw new Error(`Invalid accessor method "${tailMethodName}"`);
}
const result = await tailMethod.apply(accessor, params);
return jsonrpc.success(id, typeof result === 'undefined' ? null : result);
}
private async handleBridgeMessage({ payload: { method, id, params } }: jsonrpc.IParsedObjectRequest): Promise<jsonrpc.SuccessObject | jsonrpc.ErrorObject> {
const [bridgeName, bridgeMethod] = method.substring(8).split(':');
this.debug('Handling bridge message %s().%s() with params %o', bridgeName, bridgeMethod, params);
const bridge = this.bridges[bridgeName as keyof typeof this.bridges];
if (!bridgeMethod.startsWith('do') || typeof bridge !== 'function' || !Array.isArray(params)) {
throw new Error('Invalid bridge request');
}
const bridgeInstance = bridge.call(this.bridges);
const methodRef = bridgeInstance[bridgeMethod as keyof typeof bridge] as unknown;
if (typeof methodRef !== 'function') {
throw new Error('Invalid bridge request');
}
let result;
try {
result = await methodRef.apply(
bridgeInstance,
// Should the protocol expect the placeholder APP_ID value or should the Deno process send the actual appId?
// If we do not expect the APP_ID, the Deno process will be able to impersonate other apps, potentially
params.map((value: unknown) => (value === 'APP_ID' ? this.appPackage.info.id : value)),
);
} catch (error) {
this.debug('Error executing bridge method %s().%s() %o', bridgeName, bridgeMethod, error.message);
const jsonRpcError = new jsonrpc.JsonRpcError(error.message, -32000, error);
return jsonrpc.error(id, jsonRpcError);
}
return jsonrpc.success(id, typeof result === 'undefined' ? null : result);
}
private async handleIncomingMessage(message: jsonrpc.IParsedObjectNotification | jsonrpc.IParsedObjectRequest): Promise<void> {
const { method } = message.payload;
if (method.startsWith('accessor:')) {
let result: jsonrpc.SuccessObject | jsonrpc.ErrorObject;
try {
result = await this.handleAccessorMessage(message as jsonrpc.IParsedObjectRequest);
} catch (e) {
result = jsonrpc.error((message.payload as jsonrpc.RequestObject).id, new jsonrpc.JsonRpcError(e.message, 1000));
}
this.messenger.send(result);
return;
}
if (method.startsWith('bridges:')) {
let result: jsonrpc.SuccessObject | jsonrpc.ErrorObject;
try {
result = await this.handleBridgeMessage(message as jsonrpc.IParsedObjectRequest);
} catch (e) {
result = jsonrpc.error((message.payload as jsonrpc.RequestObject).id, new jsonrpc.JsonRpcError(e.message, 1000));
}
this.messenger.send(result);
return;
}
switch (method) {
case 'ready':
this.emit('ready');
break;
case 'log':
console.log('SUBPROCESS LOG', message);
break;
case 'unhandledRejection':
case 'uncaughtException':
await this.logUnhandledError(`runtime:${method}`, message);
break;
default:
console.warn('Unrecognized method from sub process');
break;
}
}
private async logUnhandledError(
method: `${AppMethod.RUNTIME_UNCAUGHT_EXCEPTION | AppMethod.RUNTIME_UNHANDLED_REJECTION}`,
message: jsonrpc.IParsedObjectRequest | jsonrpc.IParsedObjectNotification,
) {
this.debug('Unhandled error of type "%s" caught in subprocess', method);
const logger = new AppConsole(method);
logger.error(message.payload);
await this.logStorage.storeEntries(AppConsole.toStorageEntry(this.getAppId(), logger));
}
private async handleResultMessage(message: jsonrpc.IParsedObjectError | jsonrpc.IParsedObjectSuccess): Promise<void> {
const { id } = message.payload;
let result: unknown;
let error: jsonrpc.IParsedObjectError['payload']['error'] | undefined;
let logs: ILoggerStorageEntry;
if (message.type === 'success') {
const params = message.payload.result as { value: unknown; logs?: ILoggerStorageEntry };
result = params.value;
logs = params.logs;
} else {
error = message.payload.error;
logs = message.payload.error.data?.logs as ILoggerStorageEntry;
}
// Should we try to make sure all result messages have logs?
if (logs) {
await this.logStorage.storeEntries(logs);
}
this.emit(`result:${id}`, result, error);
}
private async parseStdout(stream: Readable): Promise<void> {
try {
for await (const message of newDecoder().decodeStream(stream)) {
this.debug('Received message from subprocess %o', message);
try {
// Process PONG resonse first as it is not JSON RPC
if (message === COMMAND_PONG) {
this.emit('pong');
continue;
}
const JSONRPCMessage = jsonrpc.parseObject(message);
if (Array.isArray(JSONRPCMessage)) {
throw new Error('Invalid message format');
}
if (JSONRPCMessage.type === 'request' || JSONRPCMessage.type === 'notification') {
this.handleIncomingMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}
if (JSONRPCMessage.type === 'success' || JSONRPCMessage.type === 'error') {
this.handleResultMessage(JSONRPCMessage).catch((reason) =>
console.error(`[${this.getAppId()}] Error executing handler`, reason, message),
);
continue;
}
console.error('Unrecognized message type', JSONRPCMessage);
} catch (e) {
// SyntaxError is thrown when the message is not a valid JSON
if (e instanceof SyntaxError) {
console.error(`[${this.getAppId()}] Failed to parse message`);
continue;
}
console.error(`[${this.getAppId()}] Error executing handler`, e, message);
}
}
} catch (e) {
console.error(`[${this.getAppId()}]`, e);
this.emit('error', new Error('DECODE_ERROR'));
}
}
private async parseError(chunk: Buffer): Promise<void> {
try {
const data = JSON.parse(chunk.toString());
this.debug('Metrics received from subprocess (via stderr): %o', data);
} catch (e) {
console.error('Subprocess stderr', chunk.toString());
}
}
}