execution worker fix

This commit is contained in:
Partouf
2025-09-10 08:19:40 +02:00
parent 098356e50b
commit 6b6efecb92
2 changed files with 27 additions and 10 deletions

View File

@@ -102,13 +102,18 @@ export class PersistentEventsSender extends EventsWsBase {
>();
private maxRetries = 3;
private ackTimeoutMs = 3000;
private requireAcknowledgments = true;
constructor(props: PropertyGetter) {
constructor(props: PropertyGetter, requireAcknowledgments = true) {
super(props);
this.requireAcknowledgments = requireAcknowledgments;
this.connect();
}
isReadyForNewMessages(): boolean {
if (!this.requireAcknowledgments) {
return this.isConnected && this.ws?.readyState === WebSocket.OPEN;
}
return this.isConnected && this.ws?.readyState === WebSocket.OPEN && this.pendingAcks.size === 0;
}
@@ -374,16 +379,28 @@ export class PersistentEventsSender extends EventsWsBase {
guid: guid,
...result,
};
this.setupAckTimeout(guid, messageData, resolve, reject);
if (this.requireAcknowledgments) {
this.setupAckTimeout(guid, messageData, resolve, reject);
}
try {
this.ws.send(JSON.stringify(messageData));
if (!this.requireAcknowledgments) {
resolve();
}
} catch (error) {
// Don't immediately fail - let the acknowledgment timeout handle retries
// A failed send means we won't get an acknowledgment, so the timeout will trigger retries
logger.warn(
`Initial send failed for ${guid}, letting acknowledgment timeout handle retries:`,
error,
);
if (this.requireAcknowledgments) {
// Don't immediately fail - let the acknowledgment timeout handle retries
// A failed send means we won't get an acknowledgment, so the timeout will trigger retries
logger.warn(
`Initial send failed for ${guid}, letting acknowledgment timeout handle retries:`,
error,
);
} else {
logger.warn(`Send failed for ${guid} (no acks required):`, error);
reject(error);
}
}
} else {
// Queue the message for when connection is available

View File

@@ -201,8 +201,8 @@ export function startExecutionWorkerThread(
): () => boolean {
const queue = new SqsWorkerMode(ceProps, awsProps);
// Create persistent WebSocket sender
const persistentSender = new PersistentEventsSender(compilationEnvironment.ceProps);
// Create persistent WebSocket sender (execution workers don't require acknowledgments)
const persistentSender = new PersistentEventsSender(compilationEnvironment.ceProps, false);
// Handle graceful shutdown
const shutdown = async () => {