Skip to content

Commit 0bf765b

Browse files
author
Oren (electricessence)
committed
Improved Dataflow behavior to include faulting and cancellation.
1 parent ba040dd commit 0bf765b

4 files changed

Lines changed: 80 additions & 62 deletions

File tree

Documentation.xml

Lines changed: 5 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ExpressiveDbCommandBase.cs

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
using System.Data.Common;
55
using System.Diagnostics.Contracts;
66
using System.Linq;
7+
using System.Threading;
78
using System.Threading.Tasks;
89
using System.Threading.Tasks.Dataflow;
910
// ReSharper disable MemberCanBePrivate.Global
@@ -429,8 +430,8 @@ public IReceivableSourceBlock<T> AsSourceBlockAsync<T>(
429430
{
430431
if (t.IsFaulted) ((ITargetBlock<T>)source).Fault(t.Exception);
431432
else source.Complete();
432-
}, CancellationToken)
433-
.ConfigureAwait(false);
433+
}, CancellationToken);
434+
434435
return source;
435436
}
436437

@@ -501,14 +502,14 @@ public async Task<List<T>> ToListAsync<T>(Func<IDataRecord, T> transform, Comman
501502
public Task<IEnumerable<T>> ResultsAsync<T>(params (string Field, string Column)[] fieldMappingOverrides) where T : new()
502503
=> ResultsAsync<T>(fieldMappingOverrides as IEnumerable<(string Field, string Column)>);
503504

504-
/// <summary>
505-
/// Returns a source block as the source of records.
506-
/// </summary>
507-
/// <typeparam name="T">The model type to map the values to (using reflection).</typeparam>
508-
/// <param name="fieldMappingOverrides">An override map of field names to column names where the keys are the property names, and values are the column names.</param>
509-
/// <param name="options"></param>
510-
/// <returns>A transform block that is receiving the results.</returns>
511-
public IReceivableSourceBlock<T> AsSourceBlockAsync<T>(
505+
/// <summary>
506+
/// Returns a source block as the source of records.
507+
/// </summary>
508+
/// <typeparam name="T">The model type to map the values to (using reflection).</typeparam>
509+
/// <param name="fieldMappingOverrides">An override map of field names to column names where the keys are the property names, and values are the column names.</param>
510+
/// <param name="options">The optional ExecutionDataflowBlockOptions to use with the source.</param>
511+
/// <returns>A transform block that is receiving the results.</returns>
512+
public IReceivableSourceBlock<T> AsSourceBlockAsync<T>(
512513
IEnumerable<(string Field, string Column)> fieldMappingOverrides,
513514
ExecutionDataflowBlockOptions options = null)
514515
where T : new()
@@ -517,22 +518,27 @@ public IReceivableSourceBlock<T> AsSourceBlockAsync<T>(
517518
var cn = x.ColumnNames;
518519
var block = x.ResultsBlock(out var initColumnNames, options);
519520

520-
ExecuteReaderAsync(async reader =>
521-
{
522-
// Ignores fields that don't match.
523-
var columns = reader.GetMatchingOrdinals(cn, true);
524-
525-
var ordinalValues = columns.Select(c => c.Ordinal).ToArray();
526-
initColumnNames(columns.Select(c => c.Name).ToArray());
527-
528-
await reader.ToTargetBlockAsync(block,
529-
r => r.GetValuesFromOrdinals(ordinalValues),
530-
UseAsyncRead);
531-
532-
block.Complete();
533-
});
534-
535-
return block;
521+
ExecuteReaderAsync(
522+
reader =>
523+
{
524+
// Ignores fields that don't match.
525+
var columns = reader.GetMatchingOrdinals(cn, true);
526+
527+
var ordinalValues = columns.Select(c => c.Ordinal).ToArray();
528+
initColumnNames(columns.Select(c => c.Name).ToArray());
529+
530+
return reader.ToTargetBlockAsync(block,
531+
r => r.GetValuesFromOrdinals(ordinalValues),
532+
UseAsyncRead,
533+
CancellationToken);
534+
})
535+
.ContinueWith(t =>
536+
{
537+
if (t.IsFaulted) ((ITargetBlock<object[]>)block).Fault(t.Exception);
538+
else block.Complete();
539+
}, CancellationToken);
540+
541+
return block;
536542
}
537543

538544
/// <summary>

Extensions.Dataflow.cs

Lines changed: 39 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Data;
33
using System.Data.Common;
44
using System.Diagnostics.Contracts;
5+
using System.Threading;
56
using System.Threading.Tasks;
67
using System.Threading.Tasks.Dataflow;
78
// ReSharper disable MemberCanBePrivate.Global
@@ -34,18 +35,20 @@ public static void ToTargetBlock<T>(this IDataReader reader,
3435
while (target.IsStillAlive() && reader.Read() && target.Post(transform(reader))) { }
3536
}
3637

37-
/// <summary>
38-
/// Asynchronously iterates an IDataReader and through the transform function and posts each record it to the target block.
39-
/// </summary>
40-
/// <typeparam name="T">The return type of the transform function.</typeparam>
41-
/// <param name="reader">The SqlDataReader to read from.</param>
42-
/// <param name="target">The target block to receive the results.</param>
43-
/// <param name="transform">The transform function to process each IDataRecord.</param>
44-
/// <param name="useReadAsync">If true (default) will iterate the results using .ReadAsync() otherwise will only Execute the reader asynchronously and then use .Read() to iterate the results but still allowing cancellation.</param>
45-
public static async Task ToTargetBlockAsync<T>(this DbDataReader reader,
38+
/// <summary>
39+
/// Asynchronously iterates an IDataReader and through the transform function and posts each record it to the target block.
40+
/// </summary>
41+
/// <typeparam name="T">The return type of the transform function.</typeparam>
42+
/// <param name="reader">The SqlDataReader to read from.</param>
43+
/// <param name="target">The target block to receive the results.</param>
44+
/// <param name="transform">The transform function to process each IDataRecord.</param>
45+
/// <param name="useReadAsync">If true (default) will iterate the results using .ReadAsync() otherwise will only Execute the reader asynchronously and then use .Read() to iterate the results but still allowing cancellation.</param>
46+
/// <param name="cancellationToken">Optional cancellation token.</param>
47+
public static async Task ToTargetBlockAsync<T>(this DbDataReader reader,
4648
ITargetBlock<T> target,
4749
Func<IDataRecord, T> transform,
48-
bool useReadAsync = true)
50+
bool useReadAsync = true,
51+
CancellationToken cancellationToken = default)
4952
{
5053
if (target == null) throw new ArgumentNullException(nameof(target));
5154
if (transform == null) throw new ArgumentNullException(nameof(transform));
@@ -54,53 +57,61 @@ public static async Task ToTargetBlockAsync<T>(this DbDataReader reader,
5457
if (useReadAsync)
5558
{
5659
Task<bool> lastSend = null;
57-
while (target.IsStillAlive()
58-
&& await reader.ReadAsync().ConfigureAwait(false)
60+
while (
61+
target.IsStillAlive() && !cancellationToken.IsCancellationRequested
62+
&& await reader.ReadAsync(cancellationToken).ConfigureAwait(false) // Premtively grab next while waiting for previous transform.
5963
&& (lastSend == null || await lastSend.ConfigureAwait(false)))
6064
{
6165
var values = transform(reader);
6266
lastSend = target.Post(values) ? null : target.SendAsync(values);
6367
}
68+
// Makes sure we hook up to the last one if the while loop is done to cover any edge cases.
69+
if (lastSend != null)
70+
await lastSend.ConfigureAwait(false);
6471
}
6572
else
6673
{
6774
var ok = true;
68-
while (ok && target.IsStillAlive() && reader.Read())
75+
while (ok
76+
&& target.IsStillAlive() && !cancellationToken.IsCancellationRequested
77+
&& reader.Read())
6978
{
7079
var values = transform(reader);
7180
ok = target.Post(values) || await target.SendAsync(values);
7281
}
7382
}
7483
}
7584

76-
/// <summary>
77-
/// Asynchronously iterates an IDataReader and through the transform function and posts each record it to the target block.
78-
/// If a connection is desired to remain open after completion, you must open the connection before calling this method.
79-
/// </summary>
80-
/// <typeparam name="T">The return type of the transform function.</typeparam>
81-
/// <param name="command">The DbCommand to generate a reader from.</param>
82-
/// <param name="target">The target block to receive the results.</param>
83-
/// <param name="transform">The transform function for each IDataRecord.</param>
84-
/// <param name="behavior">The behavior to use with the data reader.</param>
85-
/// <param name="useReadAsync">If true (default) will iterate the results using .ReadAsync() otherwise will only Execute the reader asynchronously and then use .Read() to iterate the results but still allowing cancellation.</param>
86-
public static async Task ToTargetBlockAsync<T>(this DbCommand command,
85+
/// <summary>
86+
/// Asynchronously iterates an IDataReader and through the transform function and posts each record it to the target block.
87+
/// If a connection is desired to remain open after completion, you must open the connection before calling this method.
88+
/// </summary>
89+
/// <typeparam name="T">The return type of the transform function.</typeparam>
90+
/// <param name="command">The DbCommand to generate a reader from.</param>
91+
/// <param name="target">The target block to receive the results.</param>
92+
/// <param name="transform">The transform function for each IDataRecord.</param>
93+
/// <param name="behavior">The behavior to use with the data reader.</param>
94+
/// <param name="useReadAsync">If true (default) will iterate the results using .ReadAsync() otherwise will only Execute the reader asynchronously and then use .Read() to iterate the results but still allowing cancellation.</param>
95+
/// <param name="cancellationToken"></param>
96+
public static async Task ToTargetBlockAsync<T>(this DbCommand command,
8797
ITargetBlock<T> target,
8898
Func<IDataRecord, T> transform,
8999
CommandBehavior behavior = CommandBehavior.Default,
90-
bool useReadAsync = true)
100+
bool useReadAsync = true,
101+
CancellationToken cancellationToken = default)
91102
{
92103
if (target == null) throw new ArgumentNullException(nameof(target));
93104
if (transform == null) throw new ArgumentNullException(nameof(transform));
94105
Contract.EndContractBlock();
95106

96107
if (target.IsStillAlive())
97108
{
98-
var state = await command.Connection.EnsureOpenAsync();
109+
var state = await command.Connection.EnsureOpenAsync(cancellationToken);
99110
if (state == ConnectionState.Closed) behavior = behavior | CommandBehavior.CloseConnection;
100-
using (var reader = await command.ExecuteReaderAsync(behavior))
111+
using (var reader = await command.ExecuteReaderAsync(behavior, cancellationToken))
101112
{
102113
if (target.IsStillAlive())
103-
await reader.ToTargetBlockAsync(target, transform, useReadAsync);
114+
await reader.ToTargetBlockAsync(target, transform, useReadAsync, cancellationToken);
104115
}
105116
}
106117
}

Open.Database.Extensions.csproj

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,10 @@
1111
<Description>Useful set of utilities and abstractions for simplifying modern data-access operations and ensuring DI compatibility.</Description>
1212
<RepositoryUrl>https://github.com/electricessence/Open.Database.Extensions</RepositoryUrl>
1313
<RepositoryType>git</RepositoryType>
14-
<Version>5.13.2</Version>
15-
<AssemblyVersion>5.13.0.0</AssemblyVersion>
16-
<FileVersion>5.13.0.0</FileVersion>
17-
<PackageReleaseNotes>Cleanup and inspection fixes with added Dataflow options.
18-
Improved Expressive Command DB-Open behavior to occur before command is created to ensure compatibility with other DbCommands like Snowflake.</PackageReleaseNotes>
14+
<Version>5.13.3</Version>
15+
<AssemblyVersion>5.13.3.0</AssemblyVersion>
16+
<FileVersion>5.13.3.0</FileVersion>
17+
<PackageReleaseNotes>Improved Dataflow behavior to include faulting and cancellation.</PackageReleaseNotes>
1918
<LangVersion>latest</LangVersion>
2019
</PropertyGroup>
2120

0 commit comments

Comments
 (0)