Skip to content

Commit 6a153dd

Browse files
committed
Add Parallelism class to encapsulate parallelism limits.
1 parent 051f75a commit 6a153dd

2 files changed

Lines changed: 331 additions & 0 deletions

File tree

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
// Copyright Subatomix Research Inc.
2+
// SPDX-License-Identifier: MIT
3+
4+
namespace PSql.Deploy;
5+
6+
[TestFixture]
7+
public class ParallelismTests
8+
{
9+
[Test]
10+
public void Construct()
11+
{
12+
var parallelism = new Parallelism(
13+
maxParallelTargets: 4,
14+
maxParallelCommands: 8,
15+
maxCommandsPerTarget: 2
16+
);
17+
18+
parallelism.MaxParallelTargets .ShouldBe(4);
19+
parallelism.MaxParallelCommands .ShouldBe(8);
20+
parallelism.MaxCommandsPerTarget.ShouldBe(2);
21+
}
22+
23+
[Test]
24+
public void Construct_InvalidMaxParallelTargets()
25+
{
26+
Should.Throw<ArgumentOutOfRangeException>(() =>
27+
{
28+
_ = new Parallelism(0, 8, 2);
29+
});
30+
}
31+
32+
[Test]
33+
public void Construct_InvalidMaxParallelCommands()
34+
{
35+
Should.Throw<ArgumentOutOfRangeException>(() =>
36+
{
37+
_ = new Parallelism(4, 0, 2);
38+
});
39+
}
40+
41+
[Test]
42+
public void Construct_InvalidMaxCommandsPerTarget()
43+
{
44+
Should.Throw<ArgumentOutOfRangeException>(() =>
45+
{
46+
_ = new Parallelism(4, 8, 0);
47+
});
48+
}
49+
50+
[Test]
51+
public async Task UseTargetScopeAsync_ReturnsDisposableObject()
52+
{
53+
// Arrange
54+
var parallelism = new Parallelism(4, 8, 2);
55+
56+
// Act
57+
var scope = await parallelism.UseTargetScopeAsync();
58+
59+
// Assert
60+
scope.ShouldNotBeNull();
61+
scope.ShouldBeAssignableTo<IDisposable>();
62+
}
63+
64+
[Test]
65+
public async Task UseCommandScopeAsync_ReturnsDisposableObject()
66+
{
67+
// Arrange
68+
var parallelism = new Parallelism(4, 8, 2);
69+
70+
// Act
71+
var scope = await parallelism.UseCommandScopeAsync();
72+
73+
// Assert
74+
scope.ShouldNotBeNull();
75+
scope.ShouldBeAssignableTo<IDisposable>();
76+
}
77+
78+
[Test]
79+
public async Task UseTargetScopeAsync()
80+
{
81+
using var parallelism = new Parallelism(maxParallelTargets: 2, 8, 2);
82+
83+
using var scope0 = await parallelism.UseTargetScopeAsync();
84+
using var scope1 = await parallelism.UseTargetScopeAsync();
85+
86+
var scopeTask = parallelism.UseTargetScopeAsync();
87+
88+
await ShouldBeWaitingAsync(scopeTask);
89+
90+
scope0.Dispose();
91+
92+
using var scope2 = await scopeTask;
93+
94+
// scope0 will be disposed again here, testing multiple disposal of scope
95+
}
96+
97+
[Test]
98+
public async Task UseCommandScopeAsync()
99+
{
100+
using var parallelism = new Parallelism(4, maxParallelCommands: 2, 2);
101+
102+
using var scope0 = await parallelism.UseCommandScopeAsync();
103+
using var scope1 = await parallelism.UseCommandScopeAsync();
104+
105+
var scopeTask = parallelism.UseCommandScopeAsync();
106+
107+
await ShouldBeWaitingAsync(scopeTask);
108+
109+
scope0.Dispose();
110+
111+
using var scope2 = await scopeTask;
112+
113+
// scope0 will be disposed again here, testing multiple disposal of scope
114+
}
115+
116+
[Test]
117+
public async Task UseTargetScopeAsync_Cancellation()
118+
{
119+
using var parallelism = new Parallelism(maxParallelTargets: 1, 8, 2);
120+
121+
using var scope = await parallelism.UseTargetScopeAsync();
122+
123+
using var cancellation = new CancellationTokenSource();
124+
125+
var task = parallelism.UseTargetScopeAsync(cancellation.Token); // will wait
126+
127+
cancellation.Cancel();
128+
129+
await Should.ThrowAsync<OperationCanceledException>(() => task);
130+
}
131+
132+
[Test]
133+
public async Task UseCommandScopeAsync_HonorsCancellation()
134+
{
135+
using var parallelism = new Parallelism(4, maxParallelCommands: 1, 1);
136+
137+
using var scope = await parallelism.UseCommandScopeAsync();
138+
139+
using var cancellation = new CancellationTokenSource();
140+
141+
var task = parallelism.UseCommandScopeAsync(cancellation.Token); // will wait
142+
143+
cancellation.Cancel();
144+
145+
await Should.ThrowAsync<OperationCanceledException>(() => task);
146+
}
147+
148+
[Test]
149+
public void Dispose_Multiple()
150+
{
151+
using var parallelism = new Parallelism(4, 8, 2);
152+
153+
parallelism.Dispose();
154+
155+
Should.ThrowAsync<ObjectDisposedException>(() => parallelism.UseTargetScopeAsync());
156+
Should.ThrowAsync<ObjectDisposedException>(() => parallelism.UseCommandScopeAsync());
157+
158+
// Not affected by disposal
159+
parallelism.MaxParallelTargets .ShouldBe(4);
160+
parallelism.MaxParallelCommands .ShouldBe(8);
161+
parallelism.MaxCommandsPerTarget.ShouldBe(2);
162+
163+
// will be disposed again here, testing multiple disposal
164+
}
165+
166+
private static async Task ShouldBeWaitingAsync(Task task)
167+
{
168+
(await Task.WhenAny(task, Task.Delay(10))).ShouldNotBeSameAs(task);
169+
}
170+
}
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
// Copyright Subatomix Research Inc.
2+
// SPDX-License-Identifier: MIT
3+
4+
namespace PSql.Deploy;
5+
6+
/// <summary>
7+
/// A governor to limit parallelism within a database deployment operation.
8+
/// </summary>
9+
public class Parallelism : IDisposable
10+
{
11+
private readonly SemaphoreSlim _targetLimiter;
12+
private readonly SemaphoreSlim _commandLimiter;
13+
14+
private bool _isDisposed;
15+
16+
/// <summary>
17+
/// Initializes a new <see cref="Parallelism"/> instance with the
18+
/// specified limits.
19+
/// </summary>
20+
/// <param name="maxParallelTargets">
21+
/// The maximum number of target databases against which a deployment
22+
/// operation should run in parallel. Must be positive.
23+
/// </param>
24+
/// <param name="maxParallelCommands">
25+
/// The maximum number of commands that a deployment operation should
26+
/// execute in parallel across all target databases. Must be positive.
27+
/// </param>
28+
/// <param name="maxCommandsPerTarget">
29+
/// The maximum number of commands that a deployment operation should
30+
/// execute in parallel against any one target database. Must be
31+
/// positive.
32+
/// </param>
33+
/// <exception cref="ArgumentOutOfRangeException">
34+
/// <paramref name="maxParallelTargets"/>,
35+
/// <paramref name="maxParallelCommands"/>, and/or
36+
/// <paramref name="maxCommandsPerTarget"/> is zero or negative.
37+
/// </exception>
38+
public Parallelism(int maxParallelTargets, int maxParallelCommands, int maxCommandsPerTarget)
39+
{
40+
if (maxParallelTargets <= 0)
41+
throw new ArgumentOutOfRangeException(nameof(maxParallelTargets));
42+
if (maxParallelCommands <= 0)
43+
throw new ArgumentOutOfRangeException(nameof(maxParallelCommands));
44+
if (maxCommandsPerTarget <= 0)
45+
throw new ArgumentOutOfRangeException(nameof(maxCommandsPerTarget));
46+
47+
MaxParallelTargets = maxParallelTargets;
48+
MaxParallelCommands = maxParallelCommands;
49+
MaxCommandsPerTarget = maxCommandsPerTarget;
50+
51+
_targetLimiter = CreateSemaphore(maxParallelTargets);
52+
_commandLimiter = CreateSemaphore(maxParallelCommands);
53+
}
54+
55+
/// <summary>
56+
/// Gets the maximum number of target databases against which a
57+
/// deployment operation should run in parallel.
58+
/// </summary>
59+
public int MaxParallelTargets { get; }
60+
61+
/// <summary>
62+
/// Gets the maximum number of commands that a deployment operation
63+
/// should execute in parallel across all target databases.
64+
/// </summary>
65+
public int MaxParallelCommands { get; }
66+
67+
/// <summary>
68+
/// Gets the maximum number of commands that a deployment operation
69+
/// should execute in parallel against any one target database.
70+
/// </summary>
71+
public int MaxCommandsPerTarget { get; }
72+
73+
/// <summary>
74+
/// Establishes a scope to perform a database deployment operation
75+
/// against one target database, occupying one unit of target
76+
/// parallelism.
77+
/// </summary>
78+
/// <param name="cancellation">
79+
/// The token to monitor for cancellation requests.
80+
/// </param>
81+
/// <returns>
82+
/// An object that releases the unit of target parallelism on disposal.
83+
/// </returns>
84+
/// <exception cref="ObjectDisposedException">
85+
/// The object has been disposed.
86+
/// </exception>
87+
public async Task<IDisposable> UseTargetScopeAsync(CancellationToken cancellation = default)
88+
{
89+
await _targetLimiter.WaitAsync(cancellation).ConfigureAwait(false);
90+
return new Releaser(_targetLimiter);
91+
}
92+
93+
/// <summary>
94+
/// Establishes a scope to perform one linear sequence of commands
95+
/// against a target database, occupying one unit of command parallelism.
96+
/// </summary>
97+
/// <param name="cancellation">
98+
/// The token to monitor for cancellation requests.
99+
/// </param>
100+
/// <returns>
101+
/// An object that releases the unit of command parallelism on disposal.
102+
/// </returns>
103+
/// <exception cref="ObjectDisposedException">
104+
/// The object has been disposed.
105+
/// </exception>
106+
public async Task<IDisposable> UseCommandScopeAsync(CancellationToken cancellation = default)
107+
{
108+
await _commandLimiter.WaitAsync(cancellation).ConfigureAwait(false);
109+
return new Releaser(_commandLimiter);
110+
}
111+
112+
private static SemaphoreSlim CreateSemaphore(int limit)
113+
{
114+
return new SemaphoreSlim(initialCount: limit, maxCount: limit);
115+
}
116+
117+
/// <summary>
118+
/// Releases the resources used by the object.
119+
/// </summary>
120+
public void Dispose()
121+
{
122+
Dispose(managed: true);
123+
GC.SuppressFinalize(this);
124+
}
125+
126+
/// <summary>
127+
/// Releases the resources used by the object.
128+
/// </summary>
129+
/// <param name="managed">
130+
/// <see langword="true"/> to dispose managed and unmanaged resources;
131+
/// <see langword="false"/> to dispose unmanaged resources only.
132+
/// </param>
133+
protected virtual void Dispose(bool managed)
134+
{
135+
if (_isDisposed)
136+
return;
137+
138+
if (managed)
139+
{
140+
_targetLimiter .Dispose();
141+
_commandLimiter.Dispose();
142+
}
143+
144+
_isDisposed = true;
145+
}
146+
147+
private sealed class Releaser : IDisposable
148+
{
149+
private SemaphoreSlim? _semaphore;
150+
151+
public Releaser(SemaphoreSlim semaphore)
152+
{
153+
_semaphore = semaphore;
154+
}
155+
156+
public void Dispose()
157+
{
158+
Interlocked.Exchange(ref _semaphore, null)?.Release();
159+
}
160+
}
161+
}

0 commit comments

Comments
 (0)