Skip to content

Commit b5b7d6b

Browse files
committed
Fix MaxParallelism to be command parallelism, not target parallelism.
1 parent 6a153dd commit b5b7d6b

9 files changed

Lines changed: 152 additions & 115 deletions

File tree

PSql.Deploy.Engine.Tests/Core/DeploymentSessionTests.cs

Lines changed: 56 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@
22
// SPDX-License-Identifier: MIT
33

44
using System.Collections.ObjectModel;
5-
using Moq.Protected;
5+
using System.Linq.Expressions;
66

77
namespace PSql.Deploy;
88

99
[TestFixture]
1010
public class DeploymentSessionTests : TestHarnessBase
1111
{
12-
private Mock<DeploymentSession>? _session;
12+
private Mock<TestDeploymentSession>? _session;
1313

1414
private readonly DeploymentSessionOptions
1515
_options = new TestDeploymentSessionOptions();
1616

17-
private Mock<DeploymentSession> SessionMock
17+
private Mock<TestDeploymentSession> SessionMock
1818
=> _session ??= CreateSession();
1919

2020
private DeploymentSession Session
@@ -81,7 +81,15 @@ public void BeginApplying_Group_NullTarget()
8181
[Test]
8282
public async Task Apply_Target_Ok()
8383
{
84-
ExpectApplyCore(TargetA, maxParallelism: 4);
84+
static void AssertParallelism(Target t, Parallelism p)
85+
{
86+
p.MaxParallelTargets .ShouldBe(1);
87+
p.MaxParallelCommands .ShouldBe(4);
88+
p.MaxCommandsPerTarget.ShouldBe(4);
89+
}
90+
91+
ExpectGetMaxParallelTargets(g => g.Targets.Single() == TargetA, result: 1);
92+
ExpectApplyCore(TargetA, AssertParallelism);
8593

8694
Session.BeginApplying(TargetA, maxParallelism: 4);
8795

@@ -94,11 +102,19 @@ public async Task Apply_Group_Ok()
94102
var group = new TargetGroup(
95103
[TargetA, TargetB],
96104
maxParallelism: 8,
97-
maxParallelismPerTarget: 2
105+
maxParallelismPerTarget: 1
98106
);
99107

100-
ExpectApplyCore(TargetA, maxParallelism: 2);
101-
ExpectApplyCore(TargetB, maxParallelism: 2);
108+
static void AssertParallelism(Target t, Parallelism p)
109+
{
110+
p.MaxParallelTargets .ShouldBe(6);
111+
p.MaxParallelCommands .ShouldBe(8);
112+
p.MaxCommandsPerTarget.ShouldBe(1);
113+
}
114+
115+
ExpectGetMaxParallelTargets(g => g == group, result: 6);
116+
ExpectApplyCore(TargetA, AssertParallelism);
117+
ExpectApplyCore(TargetB, AssertParallelism);
102118

103119
Session.BeginApplying(group);
104120

@@ -108,8 +124,8 @@ public async Task Apply_Group_Ok()
108124
[Test]
109125
public async Task Apply_Any_Cancellation()
110126
{
111-
ExpectApplyCore(TargetA, maxParallelism: 1, cancel: true); // cancels session
112-
//pectApplyCore(TargetB, maxParallelism: 1); // never happens
127+
ExpectApplyCore(TargetA, (t, p) => Session.Cancel()); // cancels session
128+
//pectApplyCore(TargetB); // never happens
113129

114130
Session.BeginApplying(TargetA, maxParallelism: 1);
115131
await WaitForSessionCancellationAsync(); // because count of errors exceeded max (default 0)
@@ -129,9 +145,9 @@ public async Task Apply_Any_Exception_UpToMax()
129145
_options.MaxErrorCount = 1;
130146
var exception = new Exception("Oops!");
131147

132-
ExpectApplyCore(TargetA, maxParallelism: 1, exception); // error tolerated
133-
ExpectApplyCore(TargetB, maxParallelism: 1); // succeeds
134-
ExpectApplyCore(TargetC, maxParallelism: 1); // succeeds
148+
ExpectApplyCore(TargetA, (t, p) => throw exception); // error tolerated
149+
ExpectApplyCore(TargetB); // succeeds
150+
ExpectApplyCore(TargetC); // succeeds
135151

136152
Session.BeginApplying(TargetA, maxParallelism: 1);
137153
await WaitForSessionToHaveErrorsAsync();
@@ -154,9 +170,9 @@ public async Task Apply_Any_Exception_OverMax()
154170
var exceptionA = new Exception("Bam!");
155171
var exceptionB = new Exception("Pow!");
156172

157-
ExpectApplyCore(TargetA, maxParallelism: 1, exceptionA); // error tolerated
158-
ExpectApplyCore(TargetB, maxParallelism: 1, exceptionB); // error cancels session
159-
//pectApplyCore(TargetC, maxParallelism: 1); // never happens
173+
ExpectApplyCore(TargetA, (t, p) => throw exceptionA); // error tolerated
174+
ExpectApplyCore(TargetB, (t, p) => throw exceptionB); // error cancels session
175+
//pectApplyCore(TargetC); // never happens
160176

161177
Session.BeginApplying(TargetA, maxParallelism: 1);
162178
await WaitForSessionToHaveErrorsAsync();
@@ -187,7 +203,7 @@ public async Task Apply_Any_Exception_ReadOnlyData()
187203

188204
var exception = new ExceptionWithData(data);
189205

190-
ExpectApplyCore(TargetA, maxParallelism: 1, exception);
206+
ExpectApplyCore(TargetA, (t, p) => throw exception);
191207

192208
Session.BeginApplying(TargetA, maxParallelism: 1);
193209

@@ -204,7 +220,7 @@ public async Task Apply_Any_Exception_NullData()
204220
{
205221
var exception = new ExceptionWithData(data: null);
206222

207-
ExpectApplyCore(TargetA, maxParallelism: 1, exception);
223+
ExpectApplyCore(TargetA, (t, p) => throw exception);
208224

209225
Session.BeginApplying(TargetA, maxParallelism: 1);
210226

@@ -216,37 +232,29 @@ public async Task Apply_Any_Exception_NullData()
216232
thrown.ShouldBeSameAs(exception);
217233
}
218234

219-
private void ExpectApplyCore(Target target, int maxParallelism)
235+
private void ExpectGetMaxParallelTargets(
236+
Expression<Func<TargetGroup, bool>> predicate, int result)
220237
{
221238
SessionMock
222-
.Protected()
223-
.Setup<Task>("ApplyCoreAsync", target, maxParallelism)
224-
.Returns(Task.CompletedTask)
239+
.Setup(s => s.PublicGetMaxParallelTargets_Public(It.Is(predicate)))
240+
.Returns(result)
225241
.Verifiable();
226242
}
227243

228-
private void ExpectApplyCore(Target target, int maxParallelism, bool cancel)
244+
private void ExpectApplyCore(Target target, Action<Target, Parallelism>? callback = null)
229245
{
230-
SessionMock
231-
.Protected()
232-
.Setup<Task>("ApplyCoreAsync", target, maxParallelism)
233-
.Callback(Session.Cancel)
234-
.ThrowsAsync(new OperationCanceledException())
235-
.Verifiable();
236-
}
246+
static void Nop(Target t, Parallelism p) { }
237247

238-
private void ExpectApplyCore(Target target, int maxParallelism, Exception exception)
239-
{
240248
SessionMock
241-
.Protected()
242-
.Setup<Task>("ApplyCoreAsync", target, maxParallelism)
243-
.ThrowsAsync(exception)
249+
.Setup(s => s.ApplyCoreAsync_Public(target, It.IsNotNull<Parallelism>()))
250+
.Callback(callback ?? Nop)
251+
.Returns(Task.CompletedTask)
244252
.Verifiable();
245253
}
246254

247-
private Mock<DeploymentSession> CreateSession()
255+
private Mock<TestDeploymentSession> CreateSession()
248256
{
249-
var session = Mocks.Create<DeploymentSession>(MockBehavior.Loose, _options);
257+
var session = Mocks.Create<TestDeploymentSession>(MockBehavior.Loose, _options);
250258

251259
session.CallBase = true;
252260

@@ -280,14 +288,23 @@ protected override void CleanUp(bool managed)
280288
base.CleanUp(managed);
281289
}
282290

283-
private class TestDeploymentSessionOptions : DeploymentSessionOptions { }
291+
internal class TestDeploymentSessionOptions : DeploymentSessionOptions { }
284292

285-
private class TestDeploymentSession : DeploymentSession
293+
internal class TestDeploymentSession : DeploymentSession
286294
{
287295
public TestDeploymentSession(TestDeploymentSessionOptions options)
288296
: base(options) { }
289297

290-
protected override Task ApplyCoreAsync(Target target, int maxParallelism)
298+
protected sealed override int GetMaxParallelTargets(TargetGroup group)
299+
=> PublicGetMaxParallelTargets_Public(group);
300+
301+
protected sealed override Task ApplyCoreAsync(Target target, Parallelism parallelism)
302+
=> ApplyCoreAsync_Public(target, parallelism);
303+
304+
public virtual int PublicGetMaxParallelTargets_Public(TargetGroup group)
305+
=> 4;
306+
307+
public virtual Task ApplyCoreAsync_Public(Target target, Parallelism parallelism)
291308
=> Task.CompletedTask;
292309
}
293310

PSql.Deploy.Engine.Tests/Seeds/SeedApplicatorTests.cs

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ public class SeedApplicatorTests : TestHarnessBase
1414
private readonly Mock<ISeedTargetConnection> _connection;
1515
private readonly MockSequence _sequence;
1616
private readonly Target _target;
17+
private readonly Parallelism _parallelism;
1718
private readonly StringWriter _log;
1819

1920
public SeedApplicatorTests()
2021
{
2122
_target = new("Server=db.example.com;Database=test;User ID=test;Password=test");
23+
_parallelism = new(1, 1, 1);
2224
_session = Mocks.Create<ISeedSessionInternal>();
2325
_console = Mocks.Create<ISeedConsole>();
2426
_connection = Mocks.Create<ISeedTargetConnection>();
@@ -41,14 +43,14 @@ public SeedApplicatorTests()
4143
}
4244

4345
private SeedApplicator Applicator
44-
=> _applicator ??= new(_session.Object, _seed, _target, maxParallelism: 1);
46+
=> _applicator ??= new(_session.Object, _seed, _target, _parallelism);
4547

4648
[Test]
4749
public void Construct_NullSession()
4850
{
4951
Should.Throw<ArgumentNullException>(() =>
5052
{
51-
_ = new SeedApplicator(null!, _seed, _target, maxParallelism: 1);
53+
_ = new SeedApplicator(null!, _seed, _target, _parallelism);
5254
});
5355
}
5456

@@ -57,7 +59,7 @@ public void Construct_NullSeed()
5759
{
5860
Should.Throw<ArgumentNullException>(() =>
5961
{
60-
_ = new SeedApplicator(_session.Object, null!, _target, maxParallelism: 1);
62+
_ = new SeedApplicator(_session.Object, null!, _target, _parallelism);
6163
});
6264
}
6365

@@ -66,16 +68,16 @@ public void Construct_NullTarget()
6668
{
6769
Should.Throw<ArgumentNullException>(() =>
6870
{
69-
_ = new SeedApplicator(_session.Object, _seed, null!, maxParallelism: 1);
71+
_ = new SeedApplicator(_session.Object, _seed, null!, _parallelism);
7072
});
7173
}
7274

7375
[Test]
74-
public void Construct_OutOfRangeMaxParallelism()
76+
public void Construct_NullParallelism()
7577
{
76-
Should.Throw<ArgumentOutOfRangeException>(() =>
78+
Should.Throw<ArgumentNullException>(() =>
7779
{
78-
_ = new SeedApplicator(_session.Object, _seed, _target, maxParallelism: 0);
80+
_ = new SeedApplicator(_session.Object, _seed, _target, null!);
7981
});
8082
}
8183

@@ -110,9 +112,9 @@ public void Target_Get()
110112
}
111113

112114
[Test]
113-
public void MaxParallelism_Get()
115+
public void Parallelism_Get()
114116
{
115-
Applicator.MaxParallelism.ShouldBe(1); // as set by test fixture constructor
117+
Applicator.Parallelism.ShouldBeSameAs(_parallelism);
116118
}
117119

118120
[Test]

PSql.Deploy.Engine/Core/DeploymentSession.cs

Lines changed: 27 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ public void BeginApplying(Target target, int maxParallelism = 0)
6969
if (target is null)
7070
throw new ArgumentNullException(nameof(target));
7171

72-
BeginApplying(new TargetGroup([target], name: null, maxParallelism: 1, maxParallelism));
72+
BeginApplying(new TargetGroup([target], name: null, maxParallelism, maxParallelism));
7373
}
7474

7575
/// <inheritdoc/>
@@ -119,44 +119,29 @@ private void Run(Func<Task> action)
119119

120120
private async Task ApplyAsync(TargetGroup group)
121121
{
122-
using var limiter = new SemaphoreSlim(
123-
initialCount: group.MaxParallelism,
124-
maxCount: group.MaxParallelism
122+
using var parallelism = new Parallelism(
123+
GetMaxParallelTargets(group),
124+
group.MaxParallelism,
125+
group.MaxParallelismPerTarget
125126
);
126127

127-
Task ApplyToTargetAsync(Target context)
128-
=> ApplyAsync(context, limiter, group.MaxParallelismPerTarget);
128+
Task ApplyToTargetAsync(Target target)
129+
=> ApplyAsync(target, parallelism);
129130

130131
await Task.WhenAll(group.Targets.Select(ApplyToTargetAsync));
131132
}
132133

133-
private async Task ApplyAsync(Target target, SemaphoreSlim limiter, int maxParallelism)
134+
private async Task ApplyAsync(Target target, Parallelism parallelism)
134135
{
135-
// Move to another thread so that caller's context iterator continues
136+
// Move to another thread so that caller's target iterator continues
136137
await Task.Yield();
137138

138-
var limited = false;
139-
140139
try
141140
{
142141
// Limit group parallelism
143-
await limiter.WaitAsync(CancellationToken);
144-
limited = true;
145-
146-
await ApplyAsync(target, maxParallelism);
147-
}
148-
finally
149-
{
150-
if (limited)
151-
limiter.Release();
152-
}
153-
}
142+
using var _ = await parallelism.UseTargetScopeAsync(CancellationToken);
154143

155-
private async Task ApplyAsync(Target target, int maxParallelism)
156-
{
157-
try
158-
{
159-
await ApplyCoreAsync(target, maxParallelism);
144+
await ApplyCoreAsync(target, parallelism);
160145
}
161146
catch (OperationCanceledException)
162147
{
@@ -168,20 +153,33 @@ private async Task ApplyAsync(Target target, int maxParallelism)
168153
}
169154
}
170155

156+
/// <summary>
157+
/// Gets the number of target databases against which the deployment
158+
/// operation should run in parallel for the specified target group.
159+
/// </summary>
160+
/// <param name="group">
161+
/// The group of target databases.
162+
/// </param>
163+
/// <returns>
164+
/// The count of target databases in <paramref name="group"/> against
165+
/// which the deployment operation should run in parallel.
166+
/// </returns>
167+
protected abstract int GetMaxParallelTargets(TargetGroup group);
168+
171169
/// <summary>
172170
/// Applies the deployment operation to the specified target database
173171
/// asynchronously.
174172
/// </summary>
175173
/// <param name="target">
176174
/// An object specifying the target database.
177175
/// </param>
178-
/// <param name="maxParallelism">
179-
/// The maximum degree of parallelism to use.
176+
/// <param name="parallelism">
177+
/// A governor to limit the parallelism of the deployment operation.
180178
/// </param>
181179
/// <returns>
182180
/// A <see cref="Task"/> representing the asynchronous operation.
183181
/// </returns>
184-
protected abstract Task ApplyCoreAsync(Target target, int maxParallelism);
182+
protected abstract Task ApplyCoreAsync(Target target, Parallelism parallelism);
185183

186184
private void HandleError(Exception e, Target target)
187185
{

PSql.Deploy.Engine/Core/TargetGroup.cs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,13 @@ public TargetGroup(
7070
public string? Name { get; }
7171

7272
/// <summary>
73-
/// Gets the maximum degree of parallelism across all targets in the set.
73+
/// Gets the maximum degree of parallelism across all target databases in
74+
/// the group.
7475
/// </summary>
7576
public int MaxParallelism { get; }
7677

7778
/// <summary>
78-
/// Gets the maximum degree of parallelism per target.
79+
/// Gets the maximum degree of parallelism per target database.
7980
/// </summary>
8081
public int MaxParallelismPerTarget { get; }
8182

PSql.Deploy.Engine/Migrations/MigrationSession.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,14 @@ private bool AdvanceToNextPhase()
166166
}
167167

168168
/// <inheritdoc/>
169-
protected override Task ApplyCoreAsync(Target target, int maxParallelism)
169+
protected override int GetMaxParallelTargets(TargetGroup group)
170+
{
171+
// Migrations do not use per-target parallelism
172+
return group.MaxParallelism;
173+
}
174+
175+
/// <inheritdoc/>
176+
protected override Task ApplyCoreAsync(Target target, Parallelism parallelism)
170177
{
171178
return new MigrationApplicator(this, target).ApplyAsync();
172179
}

0 commit comments

Comments
 (0)