Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 83 additions & 0 deletions R3.DynamicData.Tests/Cache/AsyncDisposeManyTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Port of DynamicData to R3.
#pragma warning disable SA1516, SA1515, SA1503, SA1502, SA1107

using System.Runtime.CompilerServices;
using R3.DynamicData.Cache;

namespace R3.DynamicData.Tests.Cache;

public class AsyncDisposeManyTests
{
private sealed class AsyncTracker : IAsyncDisposable
{
public bool IsDisposed { get; private set; }

public ValueTask DisposeAsync()
{
IsDisposed = true;
return ValueTask.CompletedTask;
}
}

[Fact]
public async Task AsyncDisposeMany_DisposesOnRemove()
{
var source = new SourceCache<AsyncTracker, int>(x => RuntimeHelpers.GetHashCode(x));
var t1 = new AsyncTracker();
var t2 = new AsyncTracker();
using var sub = source.Connect().AsyncDisposeMany().Subscribe(_ => { });
source.AddOrUpdate(new[] { t1, t2 });
source.Remove(RuntimeHelpers.GetHashCode(t1));
await Task.Delay(100);
Assert.True(t1.IsDisposed);
Assert.False(t2.IsDisposed);
}

[Fact]
public async Task AsyncDisposeMany_DisposesOnClear()
{
var source = new SourceCache<AsyncTracker, int>(x => RuntimeHelpers.GetHashCode(x));
var t1 = new AsyncTracker();
var t2 = new AsyncTracker();
using var sub = source.Connect().AsyncDisposeMany().Subscribe(_ => { });
source.AddOrUpdate(new[] { t1, t2 });
source.Clear();
await Task.Delay(100);
Assert.True(t1.IsDisposed);
Assert.True(t2.IsDisposed);
}

[Fact]
public async Task AsyncDisposeMany_DisposesOldValueOnUpdate()
{
var source = new SourceCache<AsyncTracker, int>(x => RuntimeHelpers.GetHashCode(x));
var t1 = new AsyncTracker();
var t2 = new AsyncTracker();
using var sub = source.Connect().AsyncDisposeMany().Subscribe(_ => { });
source.AddOrUpdate(t1);
source.AddOrUpdate(t2);
source.Remove(RuntimeHelpers.GetHashCode(t1));
await Task.Delay(100);
Assert.True(t1.IsDisposed);
Assert.False(t2.IsDisposed);
}

[Fact]
public async Task AsyncDisposeMany_DisposesRemainingOnUnsubscribe()
{
var source = new SourceCache<AsyncTracker, int>(x => RuntimeHelpers.GetHashCode(x));
var t1 = new AsyncTracker();
var sub = source.Connect().AsyncDisposeMany().Subscribe(_ => { });
source.AddOrUpdate(t1);
sub.Dispose();
await Task.Delay(100);
Assert.True(t1.IsDisposed);
}

[Fact]
public void AsyncDisposeMany_ThrowsOnNullSource()
{
Observable<IChangeSet<AsyncTracker, int>>? source = null;
Assert.Throws<ArgumentNullException>(() => source!.AsyncDisposeMany());
}
}
115 changes: 115 additions & 0 deletions R3.DynamicData.Tests/Cache/FilterWithStateTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Port of DynamicData to R3.
#pragma warning disable SA1516, SA1515, SA1503, SA1502, SA1107, SA1513, SA1518

using R3;
using R3.DynamicData.Cache;
using R3.DynamicData.Kernel;

namespace R3.DynamicData.Tests.Cache;

public class FilterWithStateTests
{
[Fact]
public void FilterWithState_InitialState_FiltersCorrectly()
{
var cache = new SourceCache<int, int>(x => x);
var stateSubject = new Subject<int>();
var results = new List<IChangeSet<int, int>>();

using var sub = cache.Connect()
.Filter(stateSubject, (item, minValue) => item >= minValue)
.Subscribe(results.Add);

// Emit initial state threshold = 5
stateSubject.OnNext(5);
cache.AddOrUpdate(new[] { 1, 3, 5, 7, 10 });

// Only items >= 5 should be included
int adds = results.Sum(cs => cs.Adds);
Assert.Equal(3, adds); // 5, 7, 10
}

[Fact]
public void FilterWithState_StateChange_ReEvaluatesItems()
{
var cache = new SourceCache<int, int>(x => x);
var stateSubject = new Subject<int>();
var results = new List<IChangeSet<int, int>>();

using var sub = cache.Connect()
.Filter(stateSubject, (item, minValue) => item >= minValue)
.Subscribe(results.Add);

stateSubject.OnNext(5);
cache.AddOrUpdate(new[] { 3, 5, 8 });
// At this point: 5 and 8 included (2 adds)

results.Clear();
stateSubject.OnNext(8); // now only 8 qualifies: should remove 5

Assert.True(results.Count > 0);
int removes = results.Sum(cs => cs.Removes);
Assert.Equal(1, removes); // 5 removed
}

[Fact]
public void FilterWithState_StateChange_AddsItemsThatNowPass()
{
var cache = new SourceCache<int, int>(x => x);
var stateSubject = new Subject<int>();
var results = new List<IChangeSet<int, int>>();

using var sub = cache.Connect()
.Filter(stateSubject, (item, minValue) => item >= minValue)
.Subscribe(results.Add);

stateSubject.OnNext(10);
cache.AddOrUpdate(new[] { 3, 7, 15 });
// Only 15 passes

results.Clear();
stateSubject.OnNext(3); // now 3, 7, 15 all pass

int adds = results.Sum(cs => cs.Adds);
Assert.Equal(2, adds); // 3 and 7 newly added
}

[Fact]
public void FilterWithState_NewItemsAfterStateChange_FilteredByCurrentState()
{
var cache = new SourceCache<int, int>(x => x);
var stateSubject = new Subject<int>();
var results = new List<IChangeSet<int, int>>();

using var sub = cache.Connect()
.Filter(stateSubject, (item, minValue) => item >= minValue)
.Subscribe(results.Add);

stateSubject.OnNext(10);
cache.AddOrUpdate(new[] { 1, 2, 15 });

results.Clear();
cache.AddOrUpdate(new[] { 5, 20 });

int adds = results.Sum(cs => cs.Adds);
Assert.Equal(1, adds); // only 20 passes the threshold of 10
}

[Fact]
public void FilterWithState_NoState_ItemsNotEmitted()
{
var cache = new SourceCache<int, int>(x => x);
var stateSubject = new Subject<int>();
var results = new List<IChangeSet<int, int>>();

using var sub = cache.Connect()
.Filter(stateSubject, (item, minValue) => item >= minValue)
.Subscribe(results.Add);

// Add items before any state is emitted
cache.AddOrUpdate(new[] { 1, 2, 3 });

// Nothing should pass since no state has been set
Assert.Equal(0, results.Sum(cs => cs.Adds));
}
}
147 changes: 147 additions & 0 deletions R3.DynamicData.Tests/Cache/TransformOnObservableTests.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Port of DynamicData to R3.
#pragma warning disable SA1516, SA1515, SA1503, SA1502, SA1107, SA1513, SA1518

using R3;
using R3.DynamicData.Cache;
using R3.DynamicData.Kernel;

namespace R3.DynamicData.Tests.Cache;

public class TransformOnObservableTests
{
[Fact]
public void TransformOnObservable_Add_EmitsAddChangeset()
{
var cache = new SourceCache<string, int>(s => s.Length);
var results = new List<IChangeSet<int, int>>();

// Use Observable.Return so value is emitted synchronously on subscribe
using var sub = cache.Connect()
.TransformOnObservable<string, int, int>((item, key) =>
Observable.Return(item.Length * 2))
.Subscribe(results.Add);

cache.AddOrUpdate("abc"); // length 3 → 6

Assert.Single(results);
Assert.Equal(1, results[0].Adds);
Assert.Equal(6, results[0].First().Current);
}

[Fact]
public void TransformOnObservable_SubsequentEmission_EmitsUpdateChangeset()
{
var cache = new SourceCache<string, int>(s => s.Length);
var results = new List<IChangeSet<int, int>>();
Subject<int>? capturedSubject = null;

using var sub = cache.Connect()
.TransformOnObservable<string, int, int>((item, key) =>
{
capturedSubject = new Subject<int>();
return capturedSubject;
})
.Subscribe(results.Add);

cache.AddOrUpdate("abc");
Assert.NotNull(capturedSubject);

capturedSubject!.OnNext(100); // first emission → Add
Assert.Equal(1, results.Count);
Assert.Equal(1, results[0].Adds);

capturedSubject.OnNext(200); // second emission → Update
Assert.Equal(2, results.Count);
Assert.Equal(1, results[1].Updates);
Assert.Equal(200, results[1].First().Current);
Assert.Equal(100, results[1].First().Previous.Value);
}

[Fact]
public void TransformOnObservable_Remove_EmitsRemoveChangeset()
{
var cache = new SourceCache<string, int>(s => s.Length);
var results = new List<IChangeSet<int, int>>();
Subject<int>? capturedSubject = null;

using var sub = cache.Connect()
.TransformOnObservable<string, int, int>((item, key) =>
{
capturedSubject = new Subject<int>();
return capturedSubject;
})
.Subscribe(results.Add);

cache.AddOrUpdate("abc");
capturedSubject!.OnNext(42); // Add
Assert.Equal(1, results.Count);

cache.Remove("abc".Length); // Remove key 3

Assert.Equal(2, results.Count);
Assert.Equal(1, results[1].Removes);
Assert.Equal(42, results[1].First().Current);
}

[Fact]
public void TransformOnObservable_Remove_NoMoreEmissionsFromDisposedObservable()
{
var cache = new SourceCache<string, int>(s => s.Length);
var results = new List<IChangeSet<int, int>>();
Subject<int>? capturedSubject = null;

using var sub = cache.Connect()
.TransformOnObservable<string, int, int>((item, key) =>
{
capturedSubject = new Subject<int>();
return capturedSubject;
})
.Subscribe(results.Add);

cache.AddOrUpdate("abc");
capturedSubject!.OnNext(10); // first emission → Add
cache.Remove("abc".Length); // Remove

int countAfterRemove = results.Count;
capturedSubject.OnNext(99); // should be ignored

Assert.Equal(countAfterRemove, results.Count);
}

[Fact]
public void TransformOnObservable_Update_DisposesOldSubscription_SubscribesNew()
{
var cache = new SourceCache<string, int>(s => s.Length);
var results = new List<IChangeSet<int, int>>();

var subjects = new List<Subject<int>>();

using var sub = cache.Connect()
.TransformOnObservable<string, int, int>((item, key) =>
{
var s = new Subject<int>();
subjects.Add(s);
return s;
})
.Subscribe(results.Add);

cache.AddOrUpdate("abc"); // key 3, creates subjects[0]
subjects[0].OnNext(10); // Add: 10

// Update same key with different value: "xyz" has same length 3
cache.AddOrUpdate("xyz"); // same key 3, creates subjects[1]
Assert.Equal(2, subjects.Count);

// Old subject should no longer produce output
subjects[0].OnNext(999);
int countBeforeNewEmit = results.Count;
Assert.Equal(countBeforeNewEmit, results.Count);

// New subject should produce output
subjects[1].OnNext(20);
Assert.Equal(countBeforeNewEmit + 1, results.Count);
// New observable's first emission: if output had previous value from old obs → Update
// If not (old obs was disposed before producing output here), then Add
Assert.True(results.Last().Updates == 1 || results.Last().Adds == 1);
}
}
Loading
Loading