Skip to content

Commit 8113aff

Browse files
tclemCopilot
andcommitted
dotnet: emit JSON-RPC error on pending-buffer overflow + guard drop
Carries forward the Rust SDK PR #1394 follow-up review feedback into the .NET port: 1. Cap the per-session inbound-request parked-waiter list at 128. When exceeded, reject the oldest waiter with 'pending session buffer overflow'. The custom JsonRpc dispatcher translates the thrown exception into a JSON-RPC error response (-32603) back to the runtime so the request id isn't left hanging — silently dropping it would leave the runtime waiting on the response until its own timeout. Mirrors Rust commit 491b442 and TS commit c167bc3. 2. Use a distinct message ('pending session routing ended before session was registered') when the pending guard drops without registration. Lets debugging tell the overflow path from the create-failed path. Also fixes the pre-existing bug where the waiter-fault loop ran inside _pendingLock despite the comment saying otherwise — moved it outside the lock so TCS continuations can't deadlock against the lock. Mirrors Rust commit e0ff254. Adds two tests: - overflow path: 129 early inbound requests → oldest gets error with 'pending session buffer overflow', remaining 128 resolve after registration - guard-drop path: session.create fails with parked request → error response with 'pending session routing ended before session was registered' Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent b27f91b commit 8113aff

2 files changed

Lines changed: 217 additions & 10 deletions

File tree

dotnet/src/Client.cs

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -904,23 +904,29 @@ private PendingSessionRoutingGuard BeginPendingSessionRouting()
904904

905905
private void EndPendingSessionRouting()
906906
{
907+
List<TaskCompletionSource<CopilotSession>>? waiters = null;
907908
lock (_pendingLock)
908909
{
909910
_pendingRoutingCount--;
910911
if (_pendingRoutingCount == 0)
911912
{
912913
_pendingSessionEvents.Clear();
913-
var waiters = new List<TaskCompletionSource<CopilotSession>>();
914+
waiters = new List<TaskCompletionSource<CopilotSession>>();
914915
foreach (var list in _pendingSessionWaiters.Values)
915916
waiters.AddRange(list);
916917
_pendingSessionWaiters.Clear();
918+
}
919+
}
917920

918-
// Fault pending waiters outside the lock so TCS continuations don't run under it.
919-
foreach (var tcs in waiters)
920-
{
921-
tcs.TrySetException(new InvalidOperationException(
922-
"Cloud session.create completed without registering this sessionId; request dropped."));
923-
}
921+
// Fault pending waiters outside the lock so TCS continuations don't run under it.
922+
// Distinct phrasing from the overflow-eviction path so the runtime / debugging can tell
923+
// the two cases apart. Matches the Rust SDK message in PR #1394 (commit e0ff254f).
924+
if (waiters != null)
925+
{
926+
foreach (var tcs in waiters)
927+
{
928+
tcs.TrySetException(new InvalidOperationException(
929+
"pending session routing ended before session was registered"));
924930
}
925931
}
926932
}
@@ -979,6 +985,18 @@ private Task<CopilotSession> ResolveSessionAsync(string sessionId)
979985
list = [];
980986
_pendingSessionWaiters[sessionId] = list;
981987
}
988+
989+
// Cap parked waiters per session id. When exceeded, reject the oldest with a
990+
// distinct message so the runtime isn't left waiting on a hung request id.
991+
// RunContinuationsAsynchronously ensures the continuation won't run under this lock.
992+
// Matches the Rust SDK fix in PR #1394 (commit 491b4427).
993+
if (list.Count >= PendingSessionBufferLimit)
994+
{
995+
var oldest = list[0];
996+
list.RemoveAt(0);
997+
oldest.TrySetException(new InvalidOperationException("pending session buffer overflow"));
998+
}
999+
9821000
list.Add(tcs);
9831001
return tcs.Task;
9841002
}

dotnet/test/Unit/CloudSessionTests.cs

Lines changed: 192 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,93 @@ public async Task CreateCloudSessionAsync_Parks_Inbound_Requests_Until_Registrat
267267
Assert.Equal("blue", response!["answer"]?.ToString());
268268
}
269269

270+
// -------------------------------------------------------------------------
271+
// 8. Pending-waiter overflow: oldest is rejected, remaining 128 succeed
272+
// -------------------------------------------------------------------------
273+
274+
[Fact]
275+
public async Task PendingRequestWaiterOverflow_RejectsOldestWithOverflowMessage()
276+
{
277+
const string cloudId = "overflow-session";
278+
const int requestCount = 129; // one beyond the 128-waiter cap
279+
280+
await using var server = await FakeCloudServer.StartAsync(
281+
cloudSessionId: cloudId,
282+
earlyInboundRequestCount: requestCount);
283+
284+
await using var client = new CopilotClient(new CopilotClientOptions
285+
{ Connection = RuntimeConnection.ForUri(server.Url) });
286+
287+
await using var _ = await client.CreateCloudSessionAsync(new SessionConfig
288+
{
289+
OnPermissionRequest = PermissionHandler.ApproveAll,
290+
Cloud = new CloudSessionOptions
291+
{
292+
Repository = new CloudSessionRepository { Owner = "github", Name = "copilot-sdk" }
293+
},
294+
OnUserInputRequest = (_, _) => Task.FromResult(new UserInputResponse { Answer = "yes", WasFreeform = false })
295+
});
296+
297+
var responses = await server.WaitForAllInboundResponses(requestCount, TimeSpan.FromSeconds(10));
298+
299+
// Exactly one overflow eviction, 128 successful completions.
300+
Assert.Equal(1, responses.Count(r => r.IsError));
301+
Assert.Equal(128, responses.Count(r => !r.IsError));
302+
303+
var err = responses.Single(r => r.IsError);
304+
Assert.Contains("pending session buffer overflow", err.ErrorMessage ?? "");
305+
}
306+
307+
// -------------------------------------------------------------------------
308+
// 9. Guard-drop path: parked requests are rejected with distinct message
309+
// -------------------------------------------------------------------------
310+
311+
[Fact]
312+
public async Task PendingSessionGuardDrop_RejectsParkedRequestWithDistinctMessage()
313+
{
314+
const string cloudId = "guard-drop-session";
315+
const int inboundRequestId = 500;
316+
317+
await using var server = await FakeCloudServer.StartAsync(
318+
cloudSessionId: cloudId,
319+
failSessionCreate: true,
320+
earlyInboundRequest: new Dictionary<string, object?>
321+
{
322+
["jsonrpc"] = "2.0",
323+
["id"] = inboundRequestId,
324+
["method"] = "userInput.request",
325+
["params"] = new Dictionary<string, object?>
326+
{
327+
["sessionId"] = cloudId,
328+
["question"] = "Color?",
329+
["choices"] = new object?[] { "red", "blue" },
330+
["allowFreeform"] = false
331+
}
332+
});
333+
334+
await using var client = new CopilotClient(new CopilotClientOptions
335+
{ Connection = RuntimeConnection.ForUri(server.Url) });
336+
337+
await Assert.ThrowsAnyAsync<Exception>(() =>
338+
client.CreateCloudSessionAsync(new SessionConfig
339+
{
340+
OnPermissionRequest = PermissionHandler.ApproveAll,
341+
Cloud = new CloudSessionOptions
342+
{
343+
Repository = new CloudSessionRepository { Owner = "github", Name = "copilot-sdk" }
344+
}
345+
}));
346+
347+
// The parked request must have been rejected with the guard-drop message (not the overflow message).
348+
var responses = await server.WaitForAllInboundResponses(1, TimeSpan.FromSeconds(5));
349+
350+
Assert.Single(responses);
351+
Assert.True(responses[0].IsError);
352+
Assert.Contains(
353+
"pending session routing ended before session was registered",
354+
responses[0].ErrorMessage ?? "");
355+
}
356+
270357
// =========================================================================
271358
// Fake server infrastructure
272359
// =========================================================================
@@ -280,21 +367,35 @@ private sealed class FakeCloudServer : IAsyncDisposable
280367
private readonly string _cloudSessionId;
281368
private readonly Dictionary<string, object?>? _earlyNotification;
282369
private readonly Dictionary<string, object?>? _earlyInboundRequest;
370+
private readonly int _earlyInboundRequestCount;
371+
private readonly bool _failSessionCreate;
283372
private readonly TaskCompletionSource<Dictionary<string, object?>?> _userInputResponseTcs =
284373
new(TaskCreationOptions.RunContinuationsAsynchronously);
285374

375+
// Response tracking for overflow / guard-drop tests.
376+
private readonly object _inboundResponsesLock = new();
377+
private readonly List<InboundResponse> _collectedInboundResponses = [];
378+
private int _waitForInboundResponseCount;
379+
private TaskCompletionSource<IReadOnlyList<InboundResponse>>? _allInboundResponsesTcs;
380+
286381
public JsonElement? LastCreatePayload { get; private set; }
287382

383+
public record InboundResponse(int RequestId, bool IsError, string? ErrorMessage);
384+
288385
private FakeCloudServer(
289386
TcpListener listener,
290387
string cloudSessionId,
291388
Dictionary<string, object?>? earlyNotification,
292-
Dictionary<string, object?>? earlyInboundRequest)
389+
Dictionary<string, object?>? earlyInboundRequest,
390+
int earlyInboundRequestCount,
391+
bool failSessionCreate)
293392
{
294393
_listener = listener;
295394
_cloudSessionId = cloudSessionId;
296395
_earlyNotification = earlyNotification;
297396
_earlyInboundRequest = earlyInboundRequest;
397+
_earlyInboundRequestCount = earlyInboundRequestCount;
398+
_failSessionCreate = failSessionCreate;
298399
_serverTask = RunAsync();
299400
}
300401

@@ -310,16 +411,55 @@ public string Url
310411
public static Task<FakeCloudServer> StartAsync(
311412
string cloudSessionId = "cloud-session-id",
312413
Dictionary<string, object?>? earlyNotification = null,
313-
Dictionary<string, object?>? earlyInboundRequest = null)
414+
Dictionary<string, object?>? earlyInboundRequest = null,
415+
int earlyInboundRequestCount = 0,
416+
bool failSessionCreate = false)
314417
{
315418
var listener = new TcpListener(IPAddress.Loopback, 0);
316419
listener.Start();
317-
return Task.FromResult(new FakeCloudServer(listener, cloudSessionId, earlyNotification, earlyInboundRequest));
420+
return Task.FromResult(new FakeCloudServer(
421+
listener, cloudSessionId, earlyNotification, earlyInboundRequest,
422+
earlyInboundRequestCount, failSessionCreate));
318423
}
319424

320425
public Task<Dictionary<string, object?>?> WaitForUserInputResponse(TimeSpan timeout)
321426
=> _userInputResponseTcs.Task.WaitAsync(timeout);
322427

428+
/// <summary>
429+
/// Waits until the server has collected <paramref name="count"/> responses (error or success)
430+
/// from the client for inbound requests. Used by overflow and guard-drop tests.
431+
/// </summary>
432+
public Task<IReadOnlyList<InboundResponse>> WaitForAllInboundResponses(int count, TimeSpan timeout)
433+
{
434+
var tcs = new TaskCompletionSource<IReadOnlyList<InboundResponse>>(
435+
TaskCreationOptions.RunContinuationsAsynchronously);
436+
lock (_inboundResponsesLock)
437+
{
438+
_waitForInboundResponseCount = count;
439+
_allInboundResponsesTcs = tcs;
440+
if (_collectedInboundResponses.Count >= count)
441+
tcs.TrySetResult(new List<InboundResponse>(_collectedInboundResponses));
442+
}
443+
return tcs.Task.WaitAsync(timeout);
444+
}
445+
446+
private void RecordInboundResponse(InboundResponse response)
447+
{
448+
TaskCompletionSource<IReadOnlyList<InboundResponse>>? tcs = null;
449+
IReadOnlyList<InboundResponse>? snapshot = null;
450+
lock (_inboundResponsesLock)
451+
{
452+
_collectedInboundResponses.Add(response);
453+
if (_allInboundResponsesTcs != null &&
454+
_collectedInboundResponses.Count >= _waitForInboundResponseCount)
455+
{
456+
tcs = _allInboundResponsesTcs;
457+
snapshot = new List<InboundResponse>(_collectedInboundResponses);
458+
}
459+
}
460+
tcs?.TrySetResult(snapshot!);
461+
}
462+
323463
public async ValueTask DisposeAsync()
324464
{
325465
_cts.Cancel();
@@ -373,6 +513,16 @@ private async Task HandleRequestAsync(Stream stream, JsonElement request, Cancel
373513
};
374514
}
375515
_userInputResponseTcs.TrySetResult(dict);
516+
517+
if (idElement.ValueKind == JsonValueKind.Number && idElement.TryGetInt32(out var successId))
518+
RecordInboundResponse(new InboundResponse(successId, IsError: false, null));
519+
}
520+
else if (request.TryGetProperty("error", out var errorEl))
521+
{
522+
var requestId = idElement.ValueKind == JsonValueKind.Number && idElement.TryGetInt32(out var errId)
523+
? errId : -1;
524+
var msg = errorEl.TryGetProperty("message", out var msgEl) ? msgEl.GetString() : null;
525+
RecordInboundResponse(new InboundResponse(requestId, IsError: true, msg));
376526
}
377527
return;
378528
}
@@ -424,6 +574,45 @@ private async Task HandleRequestAsync(Stream stream, JsonElement request, Cancel
424574
await Task.Delay(50, cancellationToken);
425575
}
426576

577+
// For overflow tests: send N inbound requests to exercise the buffer cap.
578+
if (_earlyInboundRequestCount > 0)
579+
{
580+
for (var i = 1; i <= _earlyInboundRequestCount; i++)
581+
{
582+
await WriteMessageAsync(stream, new Dictionary<string, object?>
583+
{
584+
["jsonrpc"] = "2.0",
585+
["id"] = i,
586+
["method"] = "userInput.request",
587+
["params"] = new Dictionary<string, object?>
588+
{
589+
["sessionId"] = _cloudSessionId,
590+
["question"] = $"Question {i}",
591+
["choices"] = new object?[] { "yes", "no" },
592+
["allowFreeform"] = false
593+
}
594+
}, cancellationToken);
595+
}
596+
597+
// Give the client time to park/overflow all requests before responding.
598+
await Task.Delay(100, cancellationToken);
599+
}
600+
601+
if (_failSessionCreate)
602+
{
603+
await WriteMessageAsync(stream, new Dictionary<string, object?>
604+
{
605+
["jsonrpc"] = "2.0",
606+
["id"] = id,
607+
["error"] = new Dictionary<string, object?>
608+
{
609+
["code"] = -32603,
610+
["message"] = "session.create failed (test-induced failure)"
611+
}
612+
}, cancellationToken);
613+
return;
614+
}
615+
427616
await WriteMessageAsync(stream, new Dictionary<string, object?>
428617
{
429618
["jsonrpc"] = "2.0",

0 commit comments

Comments
 (0)