Add compilation worker mode infrastructure (#7864)

This commit is contained in:
Patrick Quist
2025-09-08 22:07:47 +02:00
committed by GitHub
parent 082c4304f2
commit b3c6a321f1
21 changed files with 900 additions and 51 deletions

View File

@@ -59,6 +59,22 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
- This separation is enforced by pre-commit hooks (`npm run check-frontend-imports`)
- Violations will cause build failures and prevent commits
## Worker Mode Configuration
- **Compilation Workers**: New feature for offloading compilation tasks to dedicated worker instances
- `compilequeue.is_worker=true`: Enables compilation worker mode (similar to execution workers)
- `compilequeue.queue_url`: SQS queue URL for compilation requests (both regular and CMake)
- `compilequeue.events_url`: WebSocket URL for sending compilation results
- `compilequeue.worker_threads=2`: Number of concurrent worker threads
- `compilequeue.poll_interval_ms=1000`: Interval between poll attempts after processing or errors (default: 1000ms). Note: SQS long polling means actual wait time is up to 20 seconds when queue is empty
- `--instance-color <color>`: Optional command-line parameter to differentiate deployment instances. When specified (blue or green), modifies the queue URL by appending the color to the queue name (e.g., `staging-compilation-queue-blue.fifo`)
- **Implementation**: Located in `/lib/compilation/sqs-compilation-queue.ts` with shared parsing utilities in `/lib/compilation/compilation-request-parser.ts`
- **Queue Architecture**: Uses single AWS SQS FIFO queue for reliable message delivery, messages contain isCMake flag to distinguish compilation types
- **Result Delivery**: Uses WebSocket-based communication via `PersistentEventsSender` for improved performance with persistent connections
- **Message Production**: Queue messages are produced by external Lambda functions, not by the main Compiler Explorer server
- **Shared Parsing**: Common request parsing logic is shared between web handlers and SQS workers for consistency
- **Remote Compiler Support**: Workers automatically detect and proxy requests to remote compilers using HTTP, maintaining compatibility with existing remote compiler infrastructure
- **S3 Storage Integration**: Compilation results include an `s3Key` property containing the cache key hash for S3 storage reference. Large results (>31KiB) can be stored in S3 and referenced by this key. The s3Key is removed from API responses before sending to users.
## Testing Guidelines
- Use Vitest for unit tests (compatible with Jest syntax)
- Tests are in the `/test` directory, typically named like the source files they test

View File

@@ -77,7 +77,15 @@ compilationStaleAfterMs=60000
compilerVersionsUrl=https://api.compiler-explorer.com/get_deployed_exe_version
# Execution worker mode configuration
execqueue.is_worker=false
execqueue.remote_archs_url=https://api.compiler-explorer.com/get_remote_execution_archs
execqueue.queue_url=https://sqs.us-east-1.amazonaws.com/052730242331/prod-execqueue
execqueue.events_url=wss://events.compiler-explorer.com/prod
execqueue.is_worker=false
# Compilation worker mode configuration
compilequeue.is_worker=true
compilequeue.queue_url=https://sqs.us-east-1.amazonaws.com/052730242331/prod-compilation-queue.fifo
compilequeue.events_url=wss://events.compiler-explorer.com/prod
compilequeue.worker_threads=2
compilequeue.poll_interval_ms=1000

View File

@@ -3,3 +3,11 @@ httpRoot=/beta
storageSolution=s3
motdUrl=/motd/motd-beta.json
sentryEnvironment=beta
# Claude Explain API endpoint, for beta
explainApiEndpoint=https://api.compiler-explorer.com/explain
# Compilation worker mode configuration
compilequeue.is_worker=true
compilequeue.queue_url=https://sqs.us-east-1.amazonaws.com/052730242331/beta-compilation-queue.fifo
compilequeue.events_url=wss://events.compiler-explorer.com/beta

View File

@@ -63,3 +63,10 @@ statusTrackingEnabled=true
# adds endpoint '/localexecution/:hash' for testing of remote execution
localexecutionEndpoint=false
# Compilation worker mode configuration
compilequeue.is_worker=false
compilequeue.queue_url=
compilequeue.events_url=
compilequeue.worker_threads=2
compilequeue.poll_interval_ms=1000

View File

@@ -3,5 +3,11 @@ httpRoot=/staging
motdUrl=/motd/motd-staging.json
sentryEnvironment=staging
# Execution worker mode configuration
execqueue.queue_url=https://sqs.us-east-1.amazonaws.com/052730242331/staging-execqueue
execqueue.events_url=wss://events.compiler-explorer.com/staging
# Compilation worker mode configuration
compilequeue.is_worker=true
compilequeue.queue_url=https://sqs.us-east-1.amazonaws.com/052730242331/staging-compilation-queue.fifo
compilequeue.events_url=wss://events.compiler-explorer.com/staging

View File

@@ -45,4 +45,5 @@ export type AppArguments = {
loggingOptions: LoggingOptions;
isWsl: boolean;
devMode: boolean;
instanceColor?: string;
};

View File

@@ -76,6 +76,7 @@ export interface CompilerExplorerOptions {
local: boolean;
version: boolean;
devMode: boolean;
instanceColor?: string;
}
/**
@@ -115,7 +116,8 @@ export function parseCommandLine(argv: string[]): CompilerExplorerOptions {
'--dev-mode',
'Run in dev mode (default if NODE_ENV is not production)',
process.env.NODE_ENV !== 'production',
);
)
.option('--instance-color <color>', 'Instance color (blue or green) for deployment differentiation');
program.parse(argv);
return program.opts() as CompilerExplorerOptions;
@@ -195,6 +197,7 @@ export function convertOptionsToAppArguments(
tmpDir: options.tmpDir,
isWsl: isWsl,
devMode: options.devMode,
instanceColor: options.instanceColor,
loggingOptions: {
debug: options.debug || false,
logHost: options.logHost,

View File

@@ -50,6 +50,7 @@ export interface ApiControllers {
* @param compilationQueue - The compilation queue instance
* @param healthCheckFilePath - Optional path to health check file
* @param isExecutionWorker - Whether the server is running as an execution worker
* @param isCompilationWorker - Whether the server is running as a compilation worker
* @param formDataHandler - Handler for form data
* @returns Object containing all initialized controllers
*/
@@ -59,6 +60,7 @@ export function setupControllersAndHandlers(
compilationQueue: CompilationQueue,
healthCheckFilePath: string | null,
isExecutionWorker: boolean,
isCompilationWorker: boolean,
formDataHandler: express.Handler,
): ApiControllers {
// Initialize API controllers

View File

@@ -27,6 +27,7 @@ import path from 'node:path';
import type {AppArguments} from '../app.interfaces.js';
import * as aws from '../aws.js';
import {startCompilationWorkerThread} from '../compilation/sqs-compilation-queue.js';
import {CompilerFinder} from '../compiler-finder.js';
import {startWineInit} from '../exec.js';
import {RemoteExecutionQuery} from '../execution/execution-query.js';
@@ -79,6 +80,7 @@ export async function initialiseApplication(options: ApplicationOptions): Promis
const compilerFinder = new CompilerFinder(compileHandler, compilerProps, appArgs, clientOptionsHandler);
const isExecutionWorker = ceProps<boolean>('execqueue.is_worker', false);
const isCompilationWorker = ceProps<boolean>('compilequeue.is_worker', false);
const healthCheckFilePath = ceProps('healthCheckFilePath', null) as string | null;
const formDataHandler = createFormDataHandler();
@@ -89,6 +91,7 @@ export async function initialiseApplication(options: ApplicationOptions): Promis
compilationEnvironment.compilationQueue,
healthCheckFilePath,
isExecutionWorker,
isCompilationWorker,
formDataHandler,
);
@@ -162,6 +165,10 @@ export async function initialiseApplication(options: ApplicationOptions): Promis
startExecutionWorkerThread(ceProps, awsProps, compilationEnvironment);
}
if (isCompilationWorker) {
startCompilationWorkerThread(ceProps, awsProps, compilationEnvironment, appArgs);
}
startListening(webServer, appArgs);
return {webServer};

View File

@@ -52,6 +52,8 @@ import {
FiledataPair,
GccDumpOptions,
LibsAndOptions,
TEMP_STORAGE_TTL_DAYS,
WEBSOCKET_SIZE_THRESHOLD,
} from '../types/compilation/compilation.interfaces.js';
import {
CompilerOverrideOption,
@@ -218,6 +220,7 @@ export class BaseCompiler {
});
protected executionEnvironmentClass: any;
protected readonly argParser: BaseParser;
protected readonly isCompilationWorker: boolean;
constructor(compilerInfo: PreliminaryCompilerInfo & {disabledFilters?: string[]}, env: CompilationEnvironment) {
// Information about our compiler
@@ -235,6 +238,7 @@ export class BaseCompiler {
this.alwaysResetLdPath = this.env.ceProps('alwaysResetLdPath');
this.delayCleanupTemp = this.env.ceProps('delayCleanupTemp', false);
this.isCompilationWorker = this.env.ceProps('compilequeue.is_worker', false);
this.stubRe = new RegExp(this.compilerProps('stubRe', ''));
this.stubText = this.compilerProps('stubText', '');
this.compilerWrapper = this.compilerProps('compiler-wrapper');
@@ -656,7 +660,7 @@ export class BaseCompiler {
if (this.externalparser) {
const objResult = await this.externalparser.objdumpAndParseAssembly(result.dirPath, args, filters);
if (objResult.parsingTime !== undefined) {
objResult.objdumpTime = Number.parseInt(result.execTime) - Number.parseInt(result.parsingTime);
objResult.objdumpTime = (objResult.execTime ?? 0) - (objResult.parsingTime ?? 0);
delete objResult.execTime;
}
@@ -2727,6 +2731,7 @@ export class BaseCompiler {
: await this.loadPackageWithExecutable(cacheKey, executablePackageHash, dirPath);
if (fullResult) {
fullResult.retreivedFromCache = true;
fullResult.s3Key = BaseCache.hash(cacheKey);
delete fullResult.inputFilename;
delete fullResult.dirPath;
@@ -2884,8 +2889,8 @@ export class BaseCompiler {
const optOutput = undefined;
const stackUsageOutput = undefined;
await this.afterCompilation(
fullResult.result,
await this.afterCmakeCompilation(
fullResult,
false,
cacheKey,
executeOptions,
@@ -2902,6 +2907,20 @@ export class BaseCompiler {
if (fullResult.result) delete fullResult.result.dirPath;
this.cleanupResult(fullResult);
fullResult.s3Key = BaseCache.hash(cacheKey);
// In worker mode, store large non-cacheable results with short TTL
if (this.isCompilationWorker && !fullResult.result?.okToCache && fullResult) {
// Check if result is large enough to require S3 storage
const resultSize = JSON.stringify(fullResult).length;
if (resultSize > WEBSOCKET_SIZE_THRESHOLD) {
// Store with 1-day TTL for temporary retrieval in temp/ subdirectory
await this.env.tempCachePutWithTTL(cacheKey, fullResult, TEMP_STORAGE_TTL_DAYS, undefined);
// Set s3Key with temp/ prefix to reflect storage location
fullResult.s3Key = `temp/${BaseCache.hash(cacheKey)}`;
}
}
return fullResult;
}
@@ -2994,6 +3013,7 @@ export class BaseCompiler {
const cacheRetrieveTimeEnd = process.hrtime.bigint();
result.retreivedFromCacheTime = utils.deltaTimeNanoToMili(cacheRetrieveTimeStart, cacheRetrieveTimeEnd);
result.retreivedFromCache = true;
result.s3Key = BaseCache.hash(key);
if (doExecute) {
const queueTime = performance.now();
result.execResult = await this.env.enqueue(
@@ -3085,6 +3105,7 @@ export class BaseCompiler {
stackUsageOutput: StackUsage.StackUsageInfo[] | undefined,
bypassCache: BypassCache,
customBuildPath?: string,
delayCaching?: boolean,
) {
// Start the execution as soon as we can, but only await it at the end.
const execPromise =
@@ -3158,7 +3179,7 @@ export class BaseCompiler {
];
}
if (result.okToCache) {
if (result.okToCache && !delayCaching) {
await this.env.cachePut(key, result, undefined);
}
@@ -3171,10 +3192,66 @@ export class BaseCompiler {
}
this.cleanupResult(result);
result.s3Key = BaseCache.hash(key);
// In worker mode, store large non-cacheable results with short TTL
if (this.isCompilationWorker && !result.okToCache && !delayCaching) {
// Check if result is large enough to require S3 storage
const resultSize = JSON.stringify(result).length;
if (resultSize > WEBSOCKET_SIZE_THRESHOLD) {
// Store with 1-day TTL for temporary retrieval in temp/ subdirectory
await this.env.tempCachePutWithTTL(key, result, TEMP_STORAGE_TTL_DAYS, undefined);
// Set s3Key with temp/ prefix to reflect storage location
result.s3Key = `temp/${BaseCache.hash(key)}`;
}
}
return result;
}
async afterCmakeCompilation(
fullResult: CompilationResult,
doExecute: boolean,
key: CacheKey,
executeOptions: ExecutableExecutionOptions,
tools: ActiveTool[],
backendOptions: Record<string, any>,
filters: ParseFiltersAndOutputOptions,
options: string[],
optOutput: OptRemark[] | undefined,
stackUsageOutput: StackUsage.StackUsageInfo[] | undefined,
bypassCache: BypassCache,
customBuildPath?: string,
) {
// Process the inner result using existing afterCompilation logic, but skip caching
const processedResult = await this.afterCompilation(
fullResult.result,
doExecute,
key,
executeOptions,
tools,
backendOptions,
filters,
options,
optOutput,
stackUsageOutput,
bypassCache,
customBuildPath,
true, // delayCaching = true
);
// Recombine the processed result back into fullResult
fullResult.result = processedResult;
// Cache the complete fullResult (including buildsteps) instead of just the inner result
if (fullResult.result?.okToCache) {
await this.env.cachePut(key, fullResult, undefined);
}
return fullResult;
}
cleanupResult(result: CompilationResult) {
if (result.compilationOptions) {
result.compilationOptions = this.maskPathsInArgumentsForUser(result.compilationOptions);

35
lib/cache/s3.ts vendored
View File

@@ -82,4 +82,39 @@ export class S3Cache extends BaseCache {
this.onError(e, 'write');
}
}
async putWithTTL(key: string, value: Buffer, ttlDays: number, creator?: string): Promise<void> {
const expiresDate = new Date(Date.now() + ttlDays * 24 * 60 * 60 * 1000);
const options: S3HandlerOptions = {
metadata: creator ? {CreatedBy: creator} : {},
redundancy: StorageClass.REDUCED_REDUNDANCY,
expires: expiresDate,
};
try {
await this.s3.put(key, value, this.path, options);
} catch (e) {
this.onError(e, 'write');
}
}
async putWithTTLAndPath(
key: string,
value: Buffer,
ttlDays: number,
pathPrefix: string,
creator?: string,
): Promise<void> {
const expiresDate = new Date(Date.now() + ttlDays * 24 * 60 * 60 * 1000);
const options: S3HandlerOptions = {
metadata: creator ? {CreatedBy: creator} : {},
redundancy: StorageClass.REDUCED_REDUNDANCY,
expires: expiresDate,
};
try {
const customPath = `${this.path}/${pathPrefix}`;
await this.s3.put(key, value, customPath, options);
} catch (e) {
this.onError(e, 'write');
}
}
}

View File

@@ -36,6 +36,7 @@ import {BaseCompiler} from './base-compiler.js';
import type {Cache} from './cache/base.interfaces.js';
import {BaseCache} from './cache/base.js';
import {createCacheFromConfig} from './cache/from-config.js';
import {S3Cache} from './cache/s3.js';
import {CompilationQueue, EnqueueOptions, Job} from './compilation-queue.js';
import {FormattingService} from './formatting-service.js';
import {logger} from './logger.js';
@@ -168,6 +169,32 @@ export class CompilationEnvironment {
return this.compilerCache.put(key, JSON.stringify(result), creator);
}
async cachePutWithTTL(object: CacheableValue, result: object, ttlDays: number, creator: string | undefined) {
const key = BaseCache.hash(object);
const jsonData = JSON.stringify(result);
// Check if cache is S3Cache to use TTL functionality
if (this.cache instanceof S3Cache) {
return this.cache.putWithTTL(key, Buffer.from(jsonData), ttlDays, creator);
} else {
// Fallback to regular put for non-S3 caches
return this.cache.put(key, jsonData, creator);
}
}
async tempCachePutWithTTL(object: CacheableValue, result: object, ttlDays: number, creator: string | undefined) {
const key = BaseCache.hash(object);
const jsonData = JSON.stringify(result);
// Check if cache is S3Cache to use TTL functionality with temp path
if (this.cache instanceof S3Cache) {
return this.cache.putWithTTLAndPath(key, Buffer.from(jsonData), ttlDays, 'temp', creator);
} else {
// Fallback to regular put for non-S3 caches
return this.cache.put(key, jsonData, creator);
}
}
getExecutableHash(object: CacheableValue): string {
return BaseCache.hash(object) + '_exec';
}

View File

@@ -0,0 +1,103 @@
// Copyright (c) 2025, Compiler Explorer Authors
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
import {splitArguments} from '../../shared/common-utils.js';
import {
ActiveTool,
BypassCache,
ExecutionParams,
LegacyCompatibleActiveTool,
UnparsedExecutionParams,
} from '../../types/compilation/compilation.interfaces.js';
import {ParseFiltersAndOutputOptions} from '../../types/features/filters.interfaces.js';
import {SelectedLibraryVersion} from '../../types/libraries/libraries.interfaces.js';
import {BaseCompiler} from '../base-compiler.js';
/**
* Core compilation request data that can come from various sources (JSON requests, SQS messages, etc.)
*/
export type CompilationRequestData = {
source: string;
userArguments?: string | string[];
compilerOptions?: Record<string, any>;
executeParameters?: UnparsedExecutionParams;
filters?: Record<string, boolean>;
tools?: LegacyCompatibleActiveTool[];
libraries?: SelectedLibraryVersion[];
bypassCache?: BypassCache;
};
/**
* Shared utility functions for parsing compilation parameters
* These can be used by both web handlers and SQS workers for consistency
*/
/**
* Parse user arguments from various formats into a string array
*/
export function parseUserArguments(userArgs: string | string[] | undefined): string[] {
if (Array.isArray(userArgs)) {
return userArgs;
}
if (typeof userArgs === 'string') {
return splitArguments(userArgs);
}
return [];
}
/**
* Parse execution parameters with proper defaults
*/
export function parseExecutionParameters(execParams: UnparsedExecutionParams = {}): ExecutionParams {
return {
args: Array.isArray(execParams.args) ? execParams.args : splitArguments(execParams.args || ''),
stdin: execParams.stdin || '',
runtimeTools: execParams.runtimeTools || [],
};
}
/**
* Parse tools array and ensure args are properly split
*/
export function parseTools(tools: LegacyCompatibleActiveTool[] = []): ActiveTool[] {
return tools.map(tool => {
if (typeof tool.args === 'string') {
return {...tool, args: splitArguments(tool.args)};
}
return tool as ActiveTool;
});
}
/**
* Merge compiler default filters with request filters
*/
export function parseFilters(
compiler: BaseCompiler,
requestFilters: Record<string, boolean> = {},
): ParseFiltersAndOutputOptions {
return {
...compiler.getDefaultFilters(),
...requestFilters,
};
}

View File

@@ -0,0 +1,336 @@
// Copyright (c) 2025, Compiler Explorer Authors
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
import {SQS} from '@aws-sdk/client-sqs';
import {CompilationResult, WEBSOCKET_SIZE_THRESHOLD} from '../../types/compilation/compilation.interfaces.js';
import {CompilationEnvironment} from '../compilation-env.js';
import {PersistentEventsSender} from '../execution/events-websocket.js';
import {CompileHandler} from '../handlers/compile.js';
import {logger} from '../logger.js';
import {PropertyGetter} from '../properties.interfaces.js';
import {SentryCapture} from '../sentry.js';
export type RemoteCompilationRequest = {
guid: string;
compilerId: string;
source: string;
options: any;
backendOptions: any;
filters: any;
bypassCache: any;
tools: any;
executeParameters: any;
libraries: any[];
lang: string;
files: any[];
isCMake?: boolean;
queueTimeMs?: number;
headers: Record<string, string>;
queryStringParameters: Record<string, string>;
};
export class SqsCompilationQueueBase {
protected sqs: SQS;
protected readonly queue_url: string;
constructor(props: PropertyGetter, awsProps: PropertyGetter, appArgs?: {instanceColor?: string}) {
let queue_url = props<string>('compilequeue.queue_url', '');
// If instance color is provided, modify the queue URL to include the color
if (appArgs?.instanceColor && queue_url) {
// Replace the queue name with color suffix
// e.g., "staging-compilation-queue.fifo" becomes "staging-compilation-queue-blue.fifo"
queue_url = queue_url.replace(
'-compilation-queue.fifo',
`-compilation-queue-${appArgs.instanceColor}.fifo`,
);
}
this.queue_url = queue_url;
if (!this.queue_url) {
throw new Error(
'Configuration error: compilequeue.queue_url is required when compilequeue.is_worker=true. ' +
'Please set the SQS queue URL in your configuration.',
);
}
const region = awsProps<string>('region', '');
if (!region) {
throw new Error(
'Configuration error: AWS region is required when compilequeue.is_worker=true. ' +
'Please set the AWS region in your configuration.',
);
}
this.sqs = new SQS({region: region});
}
}
export class SqsCompilationWorkerMode extends SqsCompilationQueueBase {
private async receiveMsg(url: string) {
try {
return await this.sqs.receiveMessage({
QueueUrl: url,
MaxNumberOfMessages: 1,
WaitTimeSeconds: 20, // Long polling - wait up to 20 seconds for a message
MessageSystemAttributeNames: ['SentTimestamp'],
});
} catch (e) {
logger.error(`Error retrieving compilation message from queue with URL: ${url}`);
throw e;
}
}
async pop(): Promise<RemoteCompilationRequest | undefined> {
const url = this.queue_url;
let queued_messages;
try {
queued_messages = await this.receiveMsg(url);
} catch (receiveError) {
logger.error(
`SQS receiveMsg failed: ${receiveError instanceof Error ? receiveError.message : String(receiveError)}`,
);
throw receiveError;
}
if (queued_messages.Messages && queued_messages.Messages.length === 1) {
const queued_message = queued_messages.Messages[0];
try {
if (queued_message.Body) {
const json = queued_message.Body;
let parsed;
try {
parsed = JSON.parse(json);
} catch (parseError) {
logger.error(
`JSON.parse failed: ${parseError instanceof Error ? parseError.message : String(parseError)}`,
);
throw parseError;
}
// Calculate queue time from SentTimestamp
const sentTimestamp = queued_message.Attributes?.SentTimestamp;
if (sentTimestamp) {
const queueTimeMs = Date.now() - Number.parseInt(sentTimestamp, 10);
parsed.queueTimeMs = queueTimeMs;
}
return parsed as RemoteCompilationRequest;
}
return undefined;
} finally {
if (queued_message.ReceiptHandle) {
await this.sqs.deleteMessage({
QueueUrl: url,
ReceiptHandle: queued_message.ReceiptHandle,
});
}
}
}
return undefined;
}
}
async function sendCompilationResultViaWebsocket(
persistentSender: PersistentEventsSender,
guid: string,
result: CompilationResult,
totalTimeMs: number,
) {
try {
const basicResult = {
...result,
okToCache: result.okToCache ?? false,
execTime: result.execTime !== undefined ? result.execTime : totalTimeMs,
};
const resultSize = JSON.stringify(basicResult).length;
let webResult;
if (result.s3Key && resultSize > WEBSOCKET_SIZE_THRESHOLD) {
webResult = {
s3Key: result.s3Key,
okToCache: result.okToCache ?? false,
execTime: result.execTime !== undefined ? result.execTime : totalTimeMs,
};
} else {
webResult = basicResult;
}
await persistentSender.send(guid, webResult);
logger.info(`Successfully sent compilation result for ${guid} via WebSocket (total time: ${totalTimeMs}ms)`);
} catch (error) {
logger.error('WebSocket send error:', error);
}
}
async function doOneCompilation(
queue: SqsCompilationWorkerMode,
compilationEnvironment: CompilationEnvironment,
persistentSender: PersistentEventsSender,
) {
const msg = await queue.pop();
if (msg?.guid) {
const startTime = Date.now();
const compilationType = msg.isCMake ? 'cmake' : 'compile';
try {
const compiler = compilationEnvironment.findCompiler(msg.lang as any, msg.compilerId);
if (!compiler) {
throw new Error(`Compiler with ID ${msg.compilerId} not found for language ${msg.lang}`);
}
const isJson = msg.headers['content-type'] === 'application/json';
const query = msg.queryStringParameters;
const parsedRequest = CompileHandler.parseRequestReusable(
isJson,
query,
isJson ? msg : msg.source,
compiler,
);
let result: CompilationResult;
// Local compilation only
if (msg.isCMake) {
result = await compiler.cmake(msg.files || [], parsedRequest, parsedRequest.bypassCache);
} else {
result = await compiler.compile(
parsedRequest.source,
parsedRequest.options,
parsedRequest.backendOptions,
parsedRequest.filters,
parsedRequest.bypassCache,
parsedRequest.tools,
parsedRequest.executeParameters,
parsedRequest.libraries,
msg.files || [],
);
}
// Add queue time to result if available
if (msg.queueTimeMs !== undefined) {
result.queueTime = msg.queueTimeMs;
}
const endTime = Date.now();
const duration = endTime - startTime;
await sendCompilationResultViaWebsocket(persistentSender, msg.guid, result, duration);
logger.info(`Completed ${compilationType} request ${msg.guid} in ${duration}ms`);
} catch (e: any) {
const endTime = Date.now();
const duration = endTime - startTime;
logger.error(`Failed ${compilationType} request ${msg.guid} after ${duration}ms:`, e);
// Create a more descriptive error message
let errorMessage = 'Internal error during compilation';
if (e.message) {
errorMessage = e.message;
} else if (typeof e === 'string') {
errorMessage = e;
}
const errorResult: CompilationResult = {
code: -1,
stderr: [{text: errorMessage}],
stdout: [],
okToCache: false,
timedOut: false,
inputFilename: '',
asm: [],
tools: [],
};
// Add queue time to error result if available
if (msg.queueTimeMs !== undefined) {
errorResult.queueTime = msg.queueTimeMs;
}
await sendCompilationResultViaWebsocket(persistentSender, msg.guid, errorResult, duration);
}
}
}
export function startCompilationWorkerThread(
ceProps: PropertyGetter,
awsProps: PropertyGetter,
compilationEnvironment: CompilationEnvironment,
appArgs?: {instanceColor?: string},
) {
const queue = new SqsCompilationWorkerMode(ceProps, awsProps, appArgs);
const numThreads = ceProps<number>('compilequeue.worker_threads', 2);
const pollIntervalMs = ceProps<number>('compilequeue.poll_interval_ms', 50);
// Create persistent WebSocket sender
const execqueueEventsUrl = compilationEnvironment.ceProps('execqueue.events_url', '');
const compilequeueEventsUrl = compilationEnvironment.ceProps('compilequeue.events_url', '');
const eventsUrl = compilequeueEventsUrl || execqueueEventsUrl;
if (!eventsUrl) {
throw new Error('No events URL configured - need either compilequeue.events_url or execqueue.events_url');
}
const compilationEventsProps = (key: string, defaultValue?: any) => {
if (key === 'execqueue.events_url') {
return eventsUrl;
}
return compilationEnvironment.ceProps(key, defaultValue);
};
const persistentSender = new PersistentEventsSender(compilationEventsProps);
// Handle graceful shutdown
const shutdown = async () => {
logger.info('Shutting down compilation worker - closing persistent WebSocket connection');
await persistentSender.close();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
logger.info(`Starting ${numThreads} compilation worker threads with ${pollIntervalMs}ms poll interval`);
for (let i = 0; i < numThreads; i++) {
const doCompilationWork = async () => {
try {
await doOneCompilation(queue, compilationEnvironment, persistentSender);
} catch (error) {
logger.error('Error in compilation worker thread:', error);
SentryCapture(error, 'compilation worker thread error');
}
setTimeout(doCompilationWork, pollIntervalMs);
};
setTimeout(doCompilationWork, 1500 + i * 30);
}
}

View File

@@ -23,7 +23,7 @@
// POSSIBILITY OF SUCH DAMAGE.
import {WebSocket} from 'ws';
import {CompilationResult} from '../../types/compilation/compilation.interfaces.js';
import {BasicExecutionResult} from '../../types/execution/execution.interfaces.js';
import {logger} from '../logger.js';
import {PropertyGetter} from '../properties.interfaces.js';
@@ -59,7 +59,7 @@ export class EventsWsBase {
}
export class EventsWsSender extends EventsWsBase {
async send(guid: string, result: BasicExecutionResult): Promise<void> {
async send(guid: string, result: CompilationResult): Promise<void> {
this.connect();
return new Promise(resolve => {
this.ws!.on('open', async () => {
@@ -75,6 +75,171 @@ export class EventsWsSender extends EventsWsBase {
}
}
export class PersistentEventsSender extends EventsWsBase {
private messageQueue: Array<{
guid: string;
result: CompilationResult;
resolve: () => void;
reject: (error: any) => void;
}> = [];
private isConnected = false;
private isConnecting = false;
private reconnectAttempts = 0;
private maxReconnectAttempts = 5;
private reconnectDelay = 1000; // Start with 1 second
private heartbeatInterval: NodeJS.Timeout | undefined;
private heartbeatIntervalMs = 30000; // 30 seconds
constructor(props: PropertyGetter) {
super(props);
this.connect();
}
protected override connect(): void {
if (this.isConnecting || this.isConnected) {
return;
}
this.isConnecting = true;
this.ws = new WebSocket(this.events_url);
this.ws.on('open', () => {
this.isConnected = true;
this.isConnecting = false;
this.reconnectAttempts = 0;
this.reconnectDelay = 1000;
logger.info(`Persistent WebSocket connection established to ${this.events_url}`);
this.startHeartbeat();
this.processQueuedMessages();
});
this.ws.on('error', (error: any) => {
this.got_error = true;
this.isConnected = false;
this.isConnecting = false;
logger.error(`Persistent WebSocket error for URL ${this.events_url}:`, error);
this.scheduleReconnect();
});
this.ws.on('close', () => {
this.isConnected = false;
this.isConnecting = false;
this.stopHeartbeat();
if (!this.expectClose) {
logger.warn(`Persistent WebSocket connection closed unexpectedly for ${this.events_url}`);
this.scheduleReconnect();
}
});
this.ws.on('pong', () => {});
}
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(() => {
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws.ping();
}
}, this.heartbeatIntervalMs);
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = undefined;
}
}
private scheduleReconnect(): void {
if (this.expectClose || this.reconnectAttempts >= this.maxReconnectAttempts) {
logger.error(`Max reconnection attempts (${this.maxReconnectAttempts}) reached for ${this.events_url}`);
this.rejectQueuedMessages(new Error('WebSocket connection failed permanently'));
return;
}
const delay = this.reconnectDelay * 2 ** this.reconnectAttempts; // Exponential backoff
this.reconnectAttempts++;
logger.info(
`Scheduling reconnect attempt ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms`,
);
setTimeout(() => {
if (!this.expectClose) {
this.connect();
}
}, delay);
}
private processQueuedMessages(): void {
while (this.messageQueue.length > 0 && this.isConnected) {
const message = this.messageQueue.shift();
if (message && this.ws?.readyState === WebSocket.OPEN) {
try {
this.ws.send(
JSON.stringify({
guid: message.guid,
...message.result,
}),
);
message.resolve();
} catch (error) {
message.reject(error);
}
}
}
}
private rejectQueuedMessages(error: Error): void {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
if (message) {
message.reject(error);
}
}
}
async send(guid: string, result: CompilationResult): Promise<void> {
return new Promise((resolve, reject) => {
if (this.isConnected && this.ws?.readyState === WebSocket.OPEN) {
try {
this.ws.send(
JSON.stringify({
guid: guid,
...result,
}),
);
resolve();
} catch (error) {
reject(error);
}
} else {
// Queue the message for when connection is available
this.messageQueue.push({guid, result, resolve, reject});
// Ensure we're trying to connect
if (!this.isConnecting && !this.isConnected) {
this.connect();
}
}
});
}
override async close(): Promise<void> {
this.expectClose = true;
this.stopHeartbeat();
// Reject any queued messages
this.rejectQueuedMessages(new Error('WebSocket connection closing'));
if (this.ws) {
this.ws.close();
this.ws = undefined;
}
}
}
export class EventsWsWaiter extends EventsWsBase {
private timeout: number;

View File

@@ -33,7 +33,7 @@ import {getHash} from '../utils.js';
import {LocalExecutionEnvironment} from './_all.js';
import {BaseExecutionTriple} from './base-execution-triple.js';
import {EventsWsSender} from './events-websocket.js';
import {PersistentEventsSender} from './events-websocket.js';
import {getExecutionTriplesForCurrentHost} from './execution-triple.js';
export type RemoteExecutionMessage = {
@@ -131,33 +131,48 @@ export class SqsWorkerMode extends SqsExecuteQueueBase {
}
async function sendResultViaWebsocket(
compilationEnvironment: CompilationEnvironment,
persistentSender: PersistentEventsSender,
guid: string,
result: BasicExecutionResult,
totalTimeMs?: number,
) {
try {
const sender = new EventsWsSender(compilationEnvironment.ceProps);
await sender.send(guid, result);
await sender.close();
await persistentSender.send(guid, result);
const timingInfo = totalTimeMs !== undefined ? ` (total time: ${totalTimeMs}ms)` : '';
logger.info(`Successfully sent execution result for ${guid} via WebSocket${timingInfo}`);
} catch (error) {
logger.error(error);
logger.error('WebSocket send error:', error);
}
}
async function doOneExecution(queue: SqsWorkerMode, compilationEnvironment: CompilationEnvironment) {
async function doOneExecution(
queue: SqsWorkerMode,
compilationEnvironment: CompilationEnvironment,
persistentSender: PersistentEventsSender,
) {
const msg = await queue.pop();
if (msg?.guid) {
const startTime = Date.now();
try {
const executor = new LocalExecutionEnvironment(compilationEnvironment);
await executor.downloadExecutablePackage(msg.hash);
const result = await executor.execute(msg.params);
await sendResultViaWebsocket(compilationEnvironment, msg.guid, result);
const endTime = Date.now();
const duration = endTime - startTime;
await sendResultViaWebsocket(persistentSender, msg.guid, result, duration);
} catch (e) {
// todo: e is undefined somehow?
logger.error(e);
await sendResultViaWebsocket(compilationEnvironment, msg.guid, {
const endTime = Date.now();
const duration = endTime - startTime;
await sendResultViaWebsocket(
persistentSender,
msg.guid,
{
code: -1,
stderr: [{text: 'Internal error when remotely executing'}],
stdout: [],
@@ -165,7 +180,9 @@ async function doOneExecution(queue: SqsWorkerMode, compilationEnvironment: Comp
timedOut: false,
filenameTransform: f => f,
execTime: 0,
});
},
duration,
);
}
}
}
@@ -177,18 +194,31 @@ export function startExecutionWorkerThread(
) {
const queue = new SqsWorkerMode(ceProps, awsProps);
// Create persistent WebSocket sender
const persistentSender = new PersistentEventsSender(compilationEnvironment.ceProps);
// Handle graceful shutdown
const shutdown = async () => {
logger.info('Shutting down execution worker - closing persistent WebSocket connection');
await persistentSender.close();
process.exit(0);
};
process.on('SIGINT', shutdown);
process.on('SIGTERM', shutdown);
// allow 2 executions at the same time
// Note: With WaitTimeSeconds=20, the receiveMessage call will wait up to 20 seconds
// for a message to arrive. The 100ms timeout only applies between successful message
// processing, providing immediate response when messages are available.
const doExecutionWork1 = async () => {
await doOneExecution(queue, compilationEnvironment);
await doOneExecution(queue, compilationEnvironment, persistentSender);
setTimeout(doExecutionWork1, 100);
};
const doExecutionWork2 = async () => {
await doOneExecution(queue, compilationEnvironment);
await doOneExecution(queue, compilationEnvironment, persistentSender);
setTimeout(doExecutionWork2, 100);
};

View File

@@ -47,6 +47,7 @@ import {LanguageKey} from '../../types/languages.interfaces.js';
import {SelectedLibraryVersion} from '../../types/libraries/libraries.interfaces.js';
import {ResultLine} from '../../types/resultline/resultline.interfaces.js';
import {BaseCompiler} from '../base-compiler.js';
import {parseExecutionParameters, parseTools, parseUserArguments} from '../compilation/compilation-request-parser.js';
import {CompilationEnvironment} from '../compilation-env.js';
import {getCompilerTypeByKey} from '../compilers/index.js';
import {logger} from '../logger.js';
@@ -383,13 +384,13 @@ export class CompileHandler implements ICompileHandler {
return compiler;
}
checkRequestRequirements(req: express.Request): CompileRequestJsonBody {
if (req.body.options === undefined) throw new Error('Missing options property');
if (req.body.source === undefined) throw new Error('Missing source property');
return req.body;
static checkRequestRequirements(body: any): CompileRequestJsonBody {
if (body.options === undefined) throw new Error('Missing options property');
if (body.source === undefined) throw new Error('Missing source property');
return body;
}
parseRequest(req: express.Request, compiler: BaseCompiler): ParsedRequest {
static parseRequestReusable(isJson: boolean, query: any, body: any, compiler: BaseCompiler): ParsedRequest {
let source: string;
let options: string;
let backendOptions: Record<string, any> = {};
@@ -399,9 +400,9 @@ export class CompileHandler implements ICompileHandler {
const execReqParams: UnparsedExecutionParams = {};
let libraries: any[] = [];
// IF YOU MODIFY ANYTHING HERE PLEASE UPDATE THE DOCUMENTATION!
if (req.is('json')) {
if (isJson) {
// JSON-style request
const jsonRequest = this.checkRequestRequirements(req);
const jsonRequest = CompileHandler.checkRequestRequirements(body);
const requestOptions = jsonRequest.options;
source = jsonRequest.source;
if (jsonRequest.bypassCache) bypassCache = jsonRequest.bypassCache;
@@ -414,8 +415,8 @@ export class CompileHandler implements ICompileHandler {
filters = {...compiler.getDefaultFilters(), ...requestOptions.filters};
inputTools = requestOptions.tools || [];
libraries = requestOptions.libraries || [];
} else if (req.body?.compiler) {
const textRequest = req.body as CompileRequestTextBody;
} else if (body?.compiler) {
const textRequest = body as CompileRequestTextBody;
source = textRequest.source;
if (textRequest.bypassCache) bypassCache = textRequest.bypassCache;
options = textRequest.userArguments;
@@ -431,8 +432,7 @@ export class CompileHandler implements ICompileHandler {
backendOptions.skipAsm = textRequest.skipAsm === 'true';
} else {
// API-style
source = req.body;
const query = req.query as CompileRequestQueryArgs;
source = body || '';
options = query.options || '';
// By default we get the default filters.
filters = compiler.getDefaultFilters();
@@ -453,18 +453,11 @@ export class CompileHandler implements ICompileHandler {
// Ask for asm not to be returned
backendOptions.skipAsm = query.skipAsm === 'true';
backendOptions.skipPopArgs = query.skipPopArgs === 'true';
backendOptions.filterAnsi = query.filterAnsi === 'true';
}
const executeParameters: ExecutionParams = {
args: Array.isArray(execReqParams.args) ? execReqParams.args || '' : splitArguments(execReqParams.args),
stdin: execReqParams.stdin || '',
runtimeTools: execReqParams.runtimeTools || [],
};
const tools: ActiveTool[] = inputTools.map(tool => {
// expand tools.args to an array using utils.splitArguments if it was a string
if (typeof tool.args === 'string') tool.args = splitArguments(tool.args);
return tool as ActiveTool;
});
// Use shared parsing utilities for consistency with SQS workers
const executeParameters = parseExecutionParameters(execReqParams);
const tools = parseTools(inputTools);
// Backwards compatibility: bypassCache used to be a boolean.
// Convert a boolean input to an enum's underlying numeric value
@@ -472,7 +465,7 @@ export class CompileHandler implements ICompileHandler {
return {
source,
options: splitArguments(options),
options: parseUserArguments(options), // Use shared utility for consistency
backendOptions,
filters,
bypassCache,
@@ -482,6 +475,14 @@ export class CompileHandler implements ICompileHandler {
};
}
parseRequest(req: express.Request, compiler: BaseCompiler): ParsedRequest {
const isJson = !!req.is('json');
const query = req.query as CompileRequestQueryArgs;
const body = req.body;
return CompileHandler.parseRequestReusable(isJson, query, body, compiler);
}
handlePopularArguments(req: express.Request, res: express.Response) {
const compiler = this.compilerFor(req);
if (!compiler) {
@@ -558,6 +559,7 @@ export class CompileHandler implements ICompileHandler {
.then(result => {
if (result.didExecute || result.execResult?.didExecute)
this.cmakeExecuteCounter.inc({language: compiler.lang.id});
delete result.s3Key; // Remove s3Key before sending to user
res.send(result);
})
.catch(e => {
@@ -629,6 +631,7 @@ export class CompileHandler implements ICompileHandler {
if (result.didExecute || result.execResult?.didExecute)
this.executeCounter.inc({language: compiler.lang.id});
if (req.accepts(['text', 'json']) === 'json') {
delete result.s3Key; // Remove s3Key before sending to user
res.send(result);
} else {
res.set('Content-Type', 'text/plain');

View File

@@ -27,4 +27,5 @@ import {StorageClass} from '@aws-sdk/client-s3';
export type S3HandlerOptions = {
redundancy?: StorageClass;
metadata?: Record<string, string>;
expires?: Date;
};

View File

@@ -77,6 +77,7 @@ export class S3Bucket {
Body: value,
StorageClass: options.redundancy || 'STANDARD',
Metadata: options.metadata || {},
Expires: options.expires,
});
}
}

View File

@@ -226,12 +226,15 @@ export type CompilationResult = {
processExecutionResultTime?: number;
objdumpTime?: number;
parsingTime?: number;
queueTime?: number;
source?: string; // todo: this is a crazy hack, we should get rid of it
instructionSet?: InstructionSet;
popularArguments?: PossibleArguments;
s3Key?: string; // Cache key hash for S3 storage reference
};
export type ExecutionOptions = {
@@ -316,3 +319,13 @@ export type FiledataPair = {
};
export type BufferOkFunc = (buffer: Buffer) => boolean;
// Maximum safe WebSocket message size for AWS API Gateway and ALB
// AWS API Gateway has a 32 KiB frame size limit for WebSocket messages
// We use 31 KiB as a conservative threshold to account for protocol overhead
export const WEBSOCKET_SIZE_THRESHOLD = 31 * 1024;
// TTL for temporary S3 storage of large compilation results in worker mode
// Set to 1 day to provide sufficient time for retrieval while ensuring
// temporary data doesn't persist indefinitely
export const TEMP_STORAGE_TTL_DAYS = 1;

View File

@@ -54,7 +54,7 @@ const hasGit = fs.existsSync(path.resolve(__dirname, '.git'));
// Hack alert: due to a variety of issues, sometimes we need to change
// the name here. Mostly it's things like webpack changes that affect
// how minification is done, even though that's supposed not to matter.
const webpackJsHack = '.v61.';
const webpackJsHack = '.v62.';
const plugins: Webpack.WebpackPluginInstance[] = [
new MonacoEditorWebpackPlugin({
languages: [