Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion rust/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,25 @@ With the default `CliProgram::Resolve`, `Client::start()` resolves the CLI in th

### Session

Created via `Client::create_session` or `Client::resume_session`. Owns an internal event loop that dispatches CLI callbacks to the focused handler traits you install on `SessionConfig`, and broadcasts session events through `subscribe()`.
Created via `Client::create_session`, `Client::create_cloud_session`, or `Client::resume_session`. Owns an internal event loop that dispatches CLI callbacks to the focused handler traits you install on `SessionConfig`, and broadcasts session events through `subscribe()`.

#### Cloud sessions

`Client::create_cloud_session` creates a Mission Control–backed cloud session. The runtime owns the session ID: do **not** set `session_id` or `provider` on the config (the SDK rejects both with `Error::InvalidConfig`). Build the config with `SessionConfig::with_cloud(...)`; `Client::create_session` will reject any config that has `cloud` set.

```rust,ignore
use github_copilot_sdk::types::{CloudSessionOptions, CloudSessionRepository, SessionConfig};

let cloud = CloudSessionOptions::with_repository(
CloudSessionRepository::new("github", "copilot-sdk").with_branch("main"),
);
let session = client
.create_cloud_session(SessionConfig::default().with_cloud(cloud))
.await?;
println!("cloud session id: {}", session.id());
```

The SDK buffers any `session.event` notifications or inbound JSON-RPC requests that arrive before the `session.create` response (bounded, drop-oldest) and replays them once the runtime-assigned session ID is registered.

```rust,ignore
use github_copilot_sdk::MessageOptions;
Expand Down
54 changes: 53 additions & 1 deletion rust/src/jsonrpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ pub mod error_codes {
/// Invalid method parameters (-32602).
pub const INVALID_PARAMS: i32 = -32602;
/// Internal server error (-32603).
#[allow(dead_code, reason = "standard JSON-RPC code, reserved for future use")]
pub const INTERNAL_ERROR: i32 = -32603;
}

Expand Down Expand Up @@ -490,6 +489,59 @@ impl JsonRpcClient {
))),
}
}

/// Clone a sync handle onto the outbound writer for fire-and-forget
/// frames. Use only for paths that cannot `.await` (currently the
/// session router, which holds a `parking_lot::Mutex` while deciding
/// to discard a buffered request).
pub(crate) fn writer_handle(&self) -> WriterHandle {
WriterHandle {
write_tx: self.write_tx.clone(),
}
}
}

/// Sync, fire-and-forget handle onto the JSON-RPC writer actor. Cloned
/// from [`JsonRpcClient::writer_handle`]; serializes the message on the
/// caller's thread and enqueues it without awaiting an ack. Loss of the
/// ack means we'll never observe a write error here, which is acceptable
/// for the one current caller (error responses to dropped pending
/// requests): if the wire is broken, the runtime will time out the
/// request on its own.
pub(crate) struct WriterHandle {
write_tx: mpsc::UnboundedSender<WriteCommand>,
}

impl Clone for WriterHandle {
fn clone(&self) -> Self {
Self {
write_tx: self.write_tx.clone(),
}
}
}

impl WriterHandle {
/// Serialize and enqueue a JSON-RPC message without waiting for the
/// writer actor to flush it. Drops silently if serialization fails or
/// the writer actor has shut down — both indicate the transport is
/// already unusable.
pub(crate) fn send_fire_and_forget<T: serde::Serialize>(&self, message: &T) {
let body = match serde_json::to_vec(message) {
Ok(body) => body,
Err(e) => {
warn!(error = %e, "WriterHandle failed to serialize fire-and-forget message");
return;
}
};
let mut frame = Vec::with_capacity(CONTENT_LENGTH_HEADER.len() + 16 + body.len() + 4);
frame.extend_from_slice(CONTENT_LENGTH_HEADER.as_bytes());
frame.extend_from_slice(body.len().to_string().as_bytes());
frame.extend_from_slice(b"\r\n\r\n");
frame.extend_from_slice(&body);

let (ack_tx, _ack_rx) = oneshot::channel();
let _ = self.write_tx.send(WriteCommand { frame, ack: ack_tx });
}
}

/// RAII guard that removes a pending-request entry from the map if the
Expand Down
20 changes: 16 additions & 4 deletions rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1201,14 +1201,16 @@ impl Client {
let pid = child.as_ref().and_then(|c| c.id());
info!(pid = ?pid, "copilot CLI client ready");

let client_rpc_writer_handle = rpc.writer_handle();

let client = Self {
inner: Arc::new(ClientInner {
child: parking_lot::Mutex::new(child),
rpc,
cwd,
request_rx: parking_lot::Mutex::new(Some(request_rx)),
notification_tx: notification_broadcast_tx,
router: router::SessionRouter::new(),
router: router::SessionRouter::with_writer(client_rpc_writer_handle),
negotiated_protocol_version: OnceLock::new(),
state: parking_lot::Mutex::new(ConnectionState::Connected),
lifecycle_tx: broadcast::channel(256).0,
Expand All @@ -1221,6 +1223,10 @@ impl Client {
}),
};
client.spawn_lifecycle_dispatcher();
client
.inner
.router
.start(&client.inner.notification_tx, &client.inner.request_rx);
debug!(
elapsed_ms = setup_start.elapsed().as_millis(),
pid = ?pid,
Expand Down Expand Up @@ -1577,12 +1583,18 @@ impl Client {
&self,
session_id: &SessionId,
) -> crate::router::SessionChannels {
self.inner
.router
.ensure_started(&self.inner.notification_tx, &self.inner.request_rx);
self.inner.router.register(session_id)
}

/// Enter pending-routing mode on the router. While the returned guard is
/// alive, notifications and requests addressed to session ids that are
/// not yet registered are buffered instead of being dropped. Used by
/// [`Client::create_cloud_session`] so the SDK can receive events that
/// the runtime emits between `session.create` and the response.
pub(crate) fn begin_pending_session_routing(&self) -> crate::router::PendingSessionRouting {
self.inner.router.begin_pending_session_routing()
}

/// Unregister a session, dropping its per-session channels.
pub(crate) fn unregister_session(&self, session_id: &SessionId) {
self.inner.router.unregister(session_id);
Expand Down
Loading
Loading