Add S3 overflow support for compilation workers (#8091)

This commit is contained in:
Patrick Quist
2025-09-12 00:05:43 +02:00
committed by GitHub
parent 6b6efecb92
commit 0c5b4a5fd6
2 changed files with 134 additions and 7 deletions

View File

@@ -69,11 +69,17 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
- `--instance-color <color>`: Optional command-line parameter to differentiate deployment instances. When specified (blue or green), modifies the queue URL by appending the color to the queue name (e.g., `staging-compilation-queue-blue.fifo`)
- **Implementation**: Located in `/lib/compilation/sqs-compilation-queue.ts` with shared parsing utilities in `/lib/compilation/compilation-request-parser.ts`
- **Queue Architecture**: Uses single AWS SQS FIFO queue for reliable message delivery, messages contain isCMake flag to distinguish compilation types
- **S3 Overflow Support**: Large compilation requests exceeding SQS message size limits (256KB) are automatically stored in S3
- Messages exceeding the limit are stored in S3 bucket `compiler-explorer-sqs-overflow`
- SQS receives a lightweight reference message with type `s3-overflow` containing S3 location
- Workers automatically detect overflow messages and fetch the full request from S3
- S3 objects are automatically deleted after 1 day via lifecycle policy
- **Result Delivery**: Uses WebSocket-based communication via `PersistentEventsSender` for improved performance with persistent connections
- **Message Production**: Queue messages are produced by external Lambda functions, not by the main Compiler Explorer server
- **Shared Parsing**: Common request parsing logic is shared between web handlers and SQS workers for consistency
- **Remote Compiler Support**: Workers automatically detect and proxy requests to remote compilers using HTTP, maintaining compatibility with existing remote compiler infrastructure
- **S3 Storage Integration**: Compilation results include an `s3Key` property containing the cache key hash for S3 storage reference. Large results (>31KiB) can be stored in S3 and referenced by this key. The s3Key is removed from API responses before sending to users.
- **Metrics & Statistics**: SQS workers track separate Prometheus metrics (`ce_sqs_compilations_total`, `ce_sqs_executions_total`, `ce_sqs_cmake_compilations_total`, `ce_sqs_cmake_executions_total`) and record compilation statistics via `statsNoter.noteCompilation` for Grafana monitoring, mirroring the regular API route behavior.
## Testing Guidelines
- Use Vitest for unit tests (compatible with Jest syntax)

View File

@@ -22,15 +22,22 @@
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.
import {S3} from '@aws-sdk/client-s3';
import {SQS} from '@aws-sdk/client-sqs';
import {Counter} from 'prom-client';
import {CompilationResult, WEBSOCKET_SIZE_THRESHOLD} from '../../types/compilation/compilation.interfaces.js';
import {
CompilationResult,
FiledataPair,
WEBSOCKET_SIZE_THRESHOLD,
} from '../../types/compilation/compilation.interfaces.js';
import {CompilationEnvironment} from '../compilation-env.js';
import {PersistentEventsSender} from '../execution/events-websocket.js';
import {CompileHandler} from '../handlers/compile.js';
import {logger} from '../logger.js';
import {PropertyGetter} from '../properties.interfaces.js';
import {SentryCapture} from '../sentry.js';
import {KnownBuildMethod} from '../stats.js';
export type RemoteCompilationRequest = {
guid: string;
@@ -51,8 +58,43 @@ export type RemoteCompilationRequest = {
queryStringParameters: Record<string, string>;
};
export type S3OverflowMessage = {
type: 's3-overflow';
guid: string;
compilerId: string;
s3Bucket: string;
s3Key: string;
originalSize: number;
timestamp: string;
};
const sqsCompileCounter = new Counter({
name: 'ce_sqs_compilations_total',
help: 'Number of SQS compilations',
labelNames: ['language'],
});
const sqsExecuteCounter = new Counter({
name: 'ce_sqs_executions_total',
help: 'Number of SQS executions',
labelNames: ['language'],
});
const sqsCmakeCounter = new Counter({
name: 'ce_sqs_cmake_compilations_total',
help: 'Number of SQS CMake compilations',
labelNames: ['language'],
});
const sqsCmakeExecuteCounter = new Counter({
name: 'ce_sqs_cmake_executions_total',
help: 'Number of SQS executions after CMake',
labelNames: ['language'],
});
export class SqsCompilationQueueBase {
protected sqs: SQS;
protected s3: S3;
protected readonly queue_url: string;
constructor(props: PropertyGetter, awsProps: PropertyGetter, appArgs?: {instanceColor?: string}) {
@@ -86,6 +128,7 @@ export class SqsCompilationQueueBase {
}
this.sqs = new SQS({region: region});
this.s3 = new S3({region: region});
}
}
@@ -104,6 +147,35 @@ export class SqsCompilationWorkerMode extends SqsCompilationQueueBase {
}
}
private isS3OverflowMessage(msg: any): msg is S3OverflowMessage {
return msg && msg.type === 's3-overflow' && msg.s3Bucket && msg.s3Key;
}
private async fetchFromS3(bucket: string, key: string): Promise<RemoteCompilationRequest | undefined> {
try {
logger.info(`Fetching overflow message from S3: ${bucket}/${key}`);
const response = await this.s3.getObject({
Bucket: bucket,
Key: key,
});
if (!response.Body) {
logger.error(`S3 object ${bucket}/${key} has no body`);
return undefined;
}
const bodyString = await response.Body.transformToString();
const parsed = JSON.parse(bodyString) as RemoteCompilationRequest;
logger.info(`Successfully fetched overflow message for ${parsed.guid} from S3`);
return parsed;
} catch (error) {
logger.error(
`Failed to fetch overflow message from S3: ${error instanceof Error ? error.message : String(error)}`,
);
throw error;
}
}
async pop(): Promise<RemoteCompilationRequest | undefined> {
const url = this.queue_url;
@@ -133,7 +205,34 @@ export class SqsCompilationWorkerMode extends SqsCompilationQueueBase {
throw parseError;
}
// Calculate queue time from SentTimestamp
if (this.isS3OverflowMessage(parsed)) {
logger.info(
`Received S3 overflow message for ${parsed.guid}, original size: ${parsed.originalSize} bytes`,
);
try {
const compilationRequest = await this.fetchFromS3(parsed.s3Bucket, parsed.s3Key);
if (compilationRequest) {
const sentTimestamp = queued_message.Attributes?.SentTimestamp;
if (sentTimestamp) {
const queueTimeMs = Date.now() - Number.parseInt(sentTimestamp, 10);
compilationRequest.queueTimeMs = queueTimeMs;
}
return compilationRequest;
}
} catch (s3Error) {
logger.error(
`Failed to fetch S3 overflow message for ${parsed.guid}: ${s3Error instanceof Error ? s3Error.message : String(s3Error)}`,
);
throw new Error(
`S3 overflow fetch failed for ${parsed.guid}: ${s3Error instanceof Error ? s3Error.message : String(s3Error)}`,
);
}
return undefined;
}
const sentTimestamp = queued_message.Attributes?.SentTimestamp;
if (sentTimestamp) {
const queueTimeMs = Date.now() - Number.parseInt(sentTimestamp, 10);
@@ -225,11 +324,31 @@ async function doOneCompilation(
);
let result: CompilationResult;
const files = (msg.files || []) as FiledataPair[];
// Local compilation only
if (msg.isCMake) {
result = await compiler.cmake(msg.files || [], parsedRequest, parsedRequest.bypassCache);
sqsCmakeCounter.inc({language: compiler.lang.id});
compilationEnvironment.statsNoter.noteCompilation(
compiler.getInfo().id,
parsedRequest,
files,
KnownBuildMethod.CMake,
);
result = await compiler.cmake(files, parsedRequest, parsedRequest.bypassCache);
if (result.didExecute || result.execResult?.didExecute) {
sqsCmakeExecuteCounter.inc({language: compiler.lang.id});
}
} else {
sqsCompileCounter.inc({language: compiler.lang.id});
compilationEnvironment.statsNoter.noteCompilation(
compiler.getInfo().id,
parsedRequest,
files,
KnownBuildMethod.Compile,
);
result = await compiler.compile(
parsedRequest.source,
parsedRequest.options,
@@ -239,11 +358,14 @@ async function doOneCompilation(
parsedRequest.tools,
parsedRequest.executeParameters,
parsedRequest.libraries,
msg.files || [],
files,
);
if (result.didExecute || result.execResult?.didExecute) {
sqsExecuteCounter.inc({language: compiler.lang.id});
}
}
// Add queue time to result if available
if (msg.queueTimeMs !== undefined) {
result.queueTime = msg.queueTimeMs;
}
@@ -278,7 +400,6 @@ async function doOneCompilation(
tools: [],
};
// Add queue time to error result if available
if (msg.queueTimeMs !== undefined) {
errorResult.queueTime = msg.queueTimeMs;
}