From 6708d3ee66bd33018dfacf17b32260bf0da457c9 Mon Sep 17 00:00:00 2001 From: Anthony-19 Date: Tue, 30 Jun 2026 08:46:00 +0100 Subject: [PATCH] fix(contracts): add stream_count view fn and fix resume_stream status guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #421 — stream_count() -> u64 - Add stream_count() read-only function that returns the global StreamCounter value (monotonically increasing, equals the highest stream ID ever issued, never decrements on cancel/complete). - Returns 0 on a freshly-deployed contract with no streams. - Add TypeScript binding fetchStreamCount() in frontend/src/lib/soroban.ts that simulates the view call without wallet auth. - Unit tests: stream_count_returns_zero_on_fresh_contract, stream_count_increments_by_one_per_create, stream_count_is_not_decremented_by_cancel, stream_count_matches_last_stream_id. Closes #787 — resume_stream status guard - resume_stream now checks stream.status == Paused (not just stream.paused), so a stream cancelled while paused (status=Cancelled, paused=true) correctly returns StreamInactive instead of being resurrected to Active. - cancel_stream now explicitly clears stream.paused = false and stream.paused_at = None when settling a cancellation, ensuring no stale pause state survives into the Cancelled record. - Unit tests: resume_after_cancel_while_paused_returns_stream_inactive, cancel_while_paused_clears_pause_fields, cancel_normal_stream_also_clears_pause_fields. --- contracts/stream_contract/src/lib.rs | 24 +++- contracts/stream_contract/src/test.rs | 171 ++++++++++++++++++++++++++ frontend/src/lib/soroban.ts | 53 ++++++++ 3 files changed, 247 insertions(+), 1 deletion(-) diff --git a/contracts/stream_contract/src/lib.rs b/contracts/stream_contract/src/lib.rs index d0d2dd5d..ff1e23f4 100644 --- a/contracts/stream_contract/src/lib.rs +++ b/contracts/stream_contract/src/lib.rs @@ -512,9 +512,11 @@ impl StreamContract { token_client.transfer(&contract_address, &sender, &refunded_amount); } - // Mark stream as inactive + // Mark stream as inactive and clear any pause state stream.is_active = false; stream.status = StreamStatus::Cancelled; + stream.paused = false; + stream.paused_at = None; stream.last_update_time = now; let recipient = stream.recipient.clone(); @@ -588,6 +590,12 @@ impl StreamContract { let mut stream = load_stream(&env, stream_id)?; Self::validate_stream_ownership(&stream, &sender)?; + // Reject if the stream is not in Paused status — this covers streams + // that were cancelled while paused (is_active=false, paused=true). + if stream.status != StreamStatus::Paused { + return Err(StreamError::StreamInactive); + } + if !stream.paused { return Err(StreamError::StreamInactive); } @@ -624,6 +632,20 @@ impl StreamContract { // ─── Read-only Queries ──────────────────────────────────────────────────── + /// Returns the total number of streams ever created (monotonically increasing). + /// + /// This is the global stream ID counter, not the count of currently active + /// streams. It equals the highest stream ID that has been assigned, making + /// it useful for cursor-based or offset pagination without a full DB scan. + /// + /// Returns `0` on a freshly-deployed contract where no stream has been created. + pub fn stream_count(env: Env) -> u64 { + env.storage() + .instance() + .get(&crate::types::DataKey::StreamCounter) + .unwrap_or(0) + } + /// Returns the stream record for `stream_id`, or `None` if it does not exist. pub fn get_stream(env: Env, stream_id: u64) -> Option { try_load_stream(&env, stream_id) diff --git a/contracts/stream_contract/src/test.rs b/contracts/stream_contract/src/test.rs index 9c20d170..07d51ca4 100644 --- a/contracts/stream_contract/src/test.rs +++ b/contracts/stream_contract/src/test.rs @@ -2406,3 +2406,174 @@ fn test_resume_stream_emits_event() { assert_eq!(payload.sender, sender); assert_eq!(payload.new_end_time, 1150); } + +// ─── #421 stream_count ──────────────────────────────────────────────────────── + +#[test] +fn test_stream_count_returns_zero_on_fresh_contract() { + // A freshly deployed contract with no streams must return 0. + let env = Env::default(); + env.mock_all_auths(); + let client = create_contract(&env); + assert_eq!(client.stream_count(), 0); +} + +#[test] +fn test_stream_count_increments_by_one_per_create() { + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 3_000); + + let client = create_contract(&env); + + assert_eq!(client.stream_count(), 0); + + client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100); + assert_eq!(client.stream_count(), 1); + + client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100); + assert_eq!(client.stream_count(), 2); + + client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100); + assert_eq!(client.stream_count(), 3); +} + +#[test] +fn test_stream_count_is_not_decremented_by_cancel() { + // stream_count counts all streams ever created, not just active ones. + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + assert_eq!(client.stream_count(), 1); + + client.cancel_stream(&sender, &id); + // Cancelling must NOT decrement the counter. + assert_eq!(client.stream_count(), 1); +} + +#[test] +fn test_stream_count_matches_last_stream_id() { + // The counter equals the highest stream ID that has been issued. + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 5_000); + + let client = create_contract(&env); + for i in 1u64..=5 { + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &100); + assert_eq!(id, i); + assert_eq!(client.stream_count(), i); + } +} + +// ─── #787 resume_stream must reject cancelled-while-paused streams ──────────── + +#[test] +fn test_resume_after_cancel_while_paused_returns_stream_inactive() { + // Pause a stream, cancel it while paused, then attempt to resume. + // resume_stream must return StreamInactive and must NOT emit stream_resumed. + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + // 1_000 tokens / 1_000 s = 1 token/s + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + // Pause at t=100. + env.ledger().with_mut(|l| l.timestamp += 100); + client.pause_stream(&sender, &id); + + // Cancel while paused at t=200. + env.ledger().with_mut(|l| l.timestamp += 100); + client.cancel_stream(&sender, &id); + + // Verify the stream is correctly marked cancelled. + let s = client.get_stream(&id).unwrap(); + assert!(!s.is_active); + assert_eq!(s.status, StreamStatus::Cancelled); + // cancel_stream must also clear the pause fields. + assert!(!s.paused); + assert!(s.paused_at.is_none()); + + // Attempting to resume must return StreamInactive. + let result = client.try_resume_stream(&sender, &id); + assert_eq!(result, Err(Ok(StreamError::StreamInactive))); + + // No stream_resumed event must have been emitted. + let events = env.events().all(); + let resumed_event = events.iter().find(|e| { + Symbol::try_from_val(&env, &e.1.get(0).unwrap()).unwrap() + == Symbol::new(&env, "stream_resumed") + }); + assert!( + resumed_event.is_none(), + "stream_resumed must not be emitted after cancel" + ); +} + +#[test] +fn test_cancel_while_paused_clears_pause_fields() { + // After cancel_stream on a paused stream, paused and paused_at must be cleared. + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + let recipient = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &recipient, &token, &1_000, &1_000); + + env.ledger().with_mut(|l| l.timestamp += 200); + client.pause_stream(&sender, &id); + + // Verify pause state is set before cancel. + let before = client.get_stream(&id).unwrap(); + assert!(before.paused); + assert!(before.paused_at.is_some()); + + client.cancel_stream(&sender, &id); + + // After cancel, pause state must be cleared. + let after = client.get_stream(&id).unwrap(); + assert!(!after.paused, "paused flag must be false after cancel"); + assert!( + after.paused_at.is_none(), + "paused_at must be None after cancel" + ); + assert_eq!(after.status, StreamStatus::Cancelled); +} + +#[test] +fn test_cancel_normal_stream_also_clears_pause_fields() { + // Cancelling a non-paused stream should set paused=false and paused_at=None + // (they are already in that state, but the assignment must be idempotent). + let env = Env::default(); + env.mock_all_auths(); + let (token, _) = create_token(&env); + let sender = Address::generate(&env); + mint(&env, &token, &sender, 1_000); + + let client = create_contract(&env); + let id = client.create_stream(&sender, &Address::generate(&env), &token, &1_000, &1_000); + + client.cancel_stream(&sender, &id); + + let s = client.get_stream(&id).unwrap(); + assert!(!s.paused); + assert!(s.paused_at.is_none()); + assert_eq!(s.status, StreamStatus::Cancelled); +} diff --git a/frontend/src/lib/soroban.ts b/frontend/src/lib/soroban.ts index 742f4e39..a7a5667e 100644 --- a/frontend/src/lib/soroban.ts +++ b/frontend/src/lib/soroban.ts @@ -360,3 +360,56 @@ export async function resumeStream( nativeToScVal(params.streamId, { type: "u64" }), ]); } + +/** + * Read-only call to the contract's `stream_count` view function. + * + * Returns the total number of streams ever created (monotonically increasing + * stream ID counter). This is NOT the count of currently active streams — + * cancelled and completed streams are still counted. + * + * Returns `0n` on a freshly-deployed contract where no stream has been created. + * Does not require wallet authentication. + */ +export async function fetchStreamCount(): Promise { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const sdk: any = await import("@stellar/stellar-sdk"); + const { Contract, TransactionBuilder, BASE_FEE, scValToNative, Keypair } = sdk; + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const rpc: any = sdk.rpc ?? sdk.SorobanRpc; + + const server = new rpc.Server(SOROBAN_RPC_URL, { allowHttp: false }); + // stream_count is a read-only view — use a throwaway keypair for simulation. + const throwawayKeypair = Keypair.random(); + const account = await server.getAccount(throwawayKeypair.publicKey()).catch(() => { + // If the throwaway account doesn't exist on-chain, build a minimal account object. + return { accountId: () => throwawayKeypair.publicKey(), sequenceNumber: () => "0", incrementSequenceNumber: () => {} }; + }); + + const contract = new Contract(CONTRACT_ID); + const tx = new TransactionBuilder(account, { + fee: BASE_FEE, + networkPassphrase: NETWORK_PASSPHRASE, + }) + .addOperation(contract.call("stream_count")) + .setTimeout(30) + .build(); + + const simResult = await server.simulateTransaction(tx); + if (rpc.Api?.isSimulationError?.(simResult) ?? simResult?.error) { + throw new SorobanCallError(`stream_count simulation failed: ${simResult.error}`, "NetworkError"); + } + + const rawResult = simResult?.result?.retval; + if (!rawResult) { + // Contract not yet initialized (no streams ever created). + return 0n; + } + + const nativeValue = scValToNative(rawResult); + if (typeof nativeValue === "bigint") return nativeValue; + if (typeof nativeValue === "number") return BigInt(Math.trunc(nativeValue)); + if (typeof nativeValue === "string") return BigInt(nativeValue); + + throw new SorobanCallError("stream_count returned an unexpected value type.", "Unknown"); +}