Skip to content

Commit f9448e4

Browse files
author
PSSEA\Oren.Ferrari
committed
Inspection fixes and moved auto-connection opening to before command is created to ensure robust compatibility with other DbCommand types like Snowflake.
1 parent 5d9e81b commit f9448e4

22 files changed

Lines changed: 2002 additions & 1814 deletions

Constants.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
namespace Open.Database.Extensions
1+
// ReSharper disable UnusedMember.Global
2+
namespace Open.Database.Extensions
23
{
34
internal static class ConnectionTimeout
45
{

Documentation.xml

Lines changed: 27 additions & 27 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ExpressiveCommandBase.Param.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@ public abstract partial class ExpressiveCommandBase<TConnection, TCommand, TDbTy
1313
#pragma warning disable CS0659 // Type overrides Object.Equals(object o) but does not override Object.GetHashCode()
1414
#pragma warning disable CS0661 // Type defines operator == or operator != but does not override Object.GetHashCode()
1515
/// <summary>
16-
/// A struct that represents the param to be created when the command is exectued.
17-
/// TDbType facillitates the difference between DbType and SqlDbType.
16+
/// A struct that represents the param to be created when the command is executed.
17+
/// TDbType facilitates the difference between DbType and SqlDbType.
1818
/// </summary>
1919
public struct Param
2020
#pragma warning restore CS0661 // Type defines operator == or operator != but does not override Object.GetHashCode()
@@ -40,7 +40,9 @@ public struct Param
4040
/// </summary>
4141
/// <param name="obj">Param to compare against.</param>
4242
/// <returns>True if properties are equal.</returns>
43+
#pragma warning disable 659
4344
public override bool Equals(object obj) => obj is Param o
45+
#pragma warning restore 659
4446
&& Name == o.Name
4547
&& EqualityComparer<object>.Default.Equals(Value, o.Value)
4648
&& EqualityComparer<TDbType?>.Default.Equals(Type, o.Type);

ExpressiveCommandBase.cs

Lines changed: 95 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
using System.Linq;
66
using System.Threading.Tasks;
77
using System.Threading.Tasks.Dataflow;
8+
// ReSharper disable MemberCanBeProtected.Global
9+
// ReSharper disable MemberCanBePrivate.Global
10+
// ReSharper disable UnusedMember.Global
11+
// ReSharper disable AutoPropertyCanBeMadeGetOnly.Global
812

913
namespace Open.Database.Extensions
1014
{
@@ -138,11 +142,11 @@ public TThis AddParam(string name, object value)
138142
throw new ArgumentException("Parameter names cannot be empty or white space.", nameof(name));
139143
Contract.EndContractBlock();
140144

141-
var p = new Param { Name = name };
142-
if (value != null) p.Value = value;
143-
else p.Value = DBNull.Value;
144-
145-
Params.Add(p);
145+
Params.Add(new Param
146+
{
147+
Name = name,
148+
Value = value ?? DBNull.Value
149+
});
146150
return (TThis)this;
147151
}
148152

@@ -380,23 +384,23 @@ public void Execute(Action<TCommand> action)
380384

381385
UsingConnection((con, t) =>
382386
{
383-
using (var cmd = con.CreateCommand(Type, Command, Timeout))
387+
var state = con.EnsureOpen(); // MUST occur before command creation as some DbCommands require it.
388+
try
384389
{
385-
if (!(cmd is TCommand c))
386-
throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
387-
if (t != null)
388-
c.Transaction = t;
389-
390-
AddParams(c);
391-
var state = con.EnsureOpen();
392-
try
390+
using (var cmd = con.CreateCommand(Type, Command, Timeout))
393391
{
392+
if (!(cmd is TCommand c))
393+
throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
394+
if (t != null)
395+
c.Transaction = t;
396+
397+
AddParams(c);
394398
action(c);
395399
}
396-
finally
397-
{
398-
if (state == ConnectionState.Closed) con.Close();
399-
}
400+
}
401+
finally
402+
{
403+
if (state == ConnectionState.Closed) con.Close();
400404
}
401405
});
402406
}
@@ -415,23 +419,25 @@ public T Execute<T>(Func<TCommand, T> transform)
415419

416420
return UsingConnection((con, t) =>
417421
{
418-
using (var cmd = con.CreateCommand(Type, Command, Timeout))
422+
var state = con.EnsureOpen(); // MUST occur before command creation as some DbCommands require it.
423+
try
419424
{
420-
if (!(cmd is TCommand c))
421-
throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
422-
if (t != null)
423-
c.Transaction = t;
424-
AddParams(c);
425-
var state = con.EnsureOpen();
426-
try
425+
using (var cmd = con.CreateCommand(Type, Command, Timeout))
427426
{
427+
if (!(cmd is TCommand c))
428+
throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
429+
if (t != null)
430+
c.Transaction = t;
431+
AddParams(c);
432+
428433
return transform(c);
429-
}
430-
finally
431-
{
432-
if (state == ConnectionState.Closed) con.Close();
434+
433435
}
434436
}
437+
finally
438+
{
439+
if (state == ConnectionState.Closed) con.Close();
440+
}
435441
});
436442

437443
}
@@ -443,25 +449,26 @@ public T Execute<T>(Func<TCommand, T> transform)
443449
public object ExecuteReturn()
444450
=> UsingConnection((con, t) =>
445451
{
446-
using (var cmd = con.CreateCommand(Type, Command, Timeout))
452+
var state = con.EnsureOpen(); // MUST occur before command creation as some DbCommands require it.
453+
try
447454
{
448-
if (!(cmd is TCommand c))
449-
throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
450-
if (t != null)
451-
c.Transaction = t;
452-
453-
AddParams(c);
454-
var returnParameter = c.AddReturnParameter();
455-
var state = con.EnsureOpen();
456-
try
455+
using (var cmd = con.CreateCommand(Type, Command, Timeout))
457456
{
457+
if (!(cmd is TCommand c))
458+
throw new InvalidCastException($"Actual command type ({cmd.GetType()}) is not compatible with expected command type ({typeof(TCommand)}).");
459+
if (t != null)
460+
c.Transaction = t;
461+
462+
AddParams(c);
463+
var returnParameter = c.AddReturnParameter();
464+
458465
c.ExecuteNonQuery();
459466
return returnParameter.Value;
460467
}
461-
finally
462-
{
463-
if (state == ConnectionState.Closed) con.Close();
464-
}
468+
}
469+
finally
470+
{
471+
if (state == ConnectionState.Closed) con.Close();
465472
}
466473
});
467474

@@ -524,7 +531,7 @@ public TResult IterateReader<TEntity, TResult>(
524531
=> ExecuteReader(reader => selector(reader.Iterate(transform)), CommandBehavior.SingleResult);
525532

526533
/// <summary>
527-
/// Iterates an IDataReader and returns the first result through a transform funciton. Throws if none.
534+
/// Iterates an IDataReader and returns the first result through a transform function. Throws if none.
528535
/// </summary>
529536
/// <typeparam name="T">The return type of the transform function.</typeparam>
530537
/// <param name="transform">The transform function to process each IDataRecord.</param>
@@ -533,7 +540,7 @@ public T First<T>(Func<IDataRecord, T> transform)
533540
=> ExecuteReader(reader => reader.Iterate(transform).First(), CommandBehavior.SingleRow | CommandBehavior.SingleResult);
534541

535542
/// <summary>
536-
/// Iterates an IDataReader and returns the first result through a transform funciton. Returns default(T) if none.
543+
/// Iterates an IDataReader and returns the first result through a transform function. Returns default(T) if none.
537544
/// </summary>
538545
/// <typeparam name="T">The return type of the transform function.</typeparam>
539546
/// <param name="transform">The transform function to process each IDataRecord.</param>
@@ -542,7 +549,7 @@ public T FirstOrDefault<T>(Func<IDataRecord, T> transform)
542549
=> ExecuteReader(reader => reader.Iterate(transform).FirstOrDefault(), CommandBehavior.SingleRow | CommandBehavior.SingleResult);
543550

544551
/// <summary>
545-
/// Iterates a IDataReader and returns the first result through a transform funciton. Throws if none or more than one entry.
552+
/// Iterates a IDataReader and returns the first result through a transform function. Throws if none or more than one entry.
546553
/// </summary>
547554
/// <typeparam name="T">The return type of the transform function.</typeparam>
548555
/// <param name="transform">The transform function to process each IDataRecord.</param>
@@ -551,7 +558,7 @@ public T Single<T>(Func<IDataRecord, T> transform)
551558
=> ExecuteReader(reader => reader.Iterate(transform).Single(), CommandBehavior.SingleResult);
552559

553560
/// <summary>
554-
/// Iterates an IDataReader and returns the first result through a transform funciton. Returns default(T) if none. Throws if more than one entry.
561+
/// Iterates an IDataReader and returns the first result through a transform function. Returns default(T) if none. Throws if more than one entry.
555562
/// </summary>
556563
/// <typeparam name="T">The return type of the transform function.</typeparam>
557564
/// <param name="transform">The transform function to process each IDataRecord.</param>
@@ -600,30 +607,30 @@ public int ExecuteNonQuery()
600607
/// <summary>
601608
/// Calls ExecuteScalar on the underlying command.
602609
/// </summary>
603-
/// <returns>The varlue returned from the method.</returns>
610+
/// <returns>The value returned from the method.</returns>
604611
public object ExecuteScalar()
605612
=> Execute(command => command.ExecuteScalar());
606613

607614
/// <summary>
608615
/// Calls ExecuteScalar on the underlying command.
609616
/// </summary>
610617
/// <typeparam name="T">The type expected.</typeparam>
611-
/// <returns>The varlue returned from the method.</returns>
618+
/// <returns>The value returned from the method.</returns>
612619
public T ExecuteScalar<T>()
613620
=> (T)ExecuteScalar();
614621

615622
/// <summary>
616623
/// Calls ExecuteScalar on the underlying command.
617624
/// </summary>
618625
/// <typeparam name="T">The type expected.</typeparam>
619-
/// <returns>The varlue returned from the method.</returns>
626+
/// <returns>The value returned from the method.</returns>
620627
public T ExecuteScalar<T>(Func<object, T> transform)
621628
=> transform(ExecuteScalar());
622629

623630
/// <summary>
624631
/// Imports all data using an IDataReader into a DataTable.
625632
/// </summary>
626-
/// <returns>The resultant DataTabel.</returns>
633+
/// <returns>The resultant DataTable.</returns>
627634
public DataTable LoadTable()
628635
=> ExecuteReader(reader => reader.ToDataTable(), CommandBehavior.SequentialAccess | CommandBehavior.SingleResult);
629636

@@ -752,6 +759,7 @@ public IEnumerable<T> Results<T>(params (string Field, string Column)[] fieldMap
752759

753760
/// <summary>
754761
/// Posts all records to a target block using the transform function.
762+
/// Stops if the target block rejects.
755763
/// </summary>
756764
/// <typeparam name="T">The expected type.</typeparam>
757765
/// <param name="transform">The transform function.</param>
@@ -765,22 +773,25 @@ public void ToTargetBlock<T>(ITargetBlock<T> target, Func<IDataRecord, T> transf
765773
/// <typeparam name="T">The expected type.</typeparam>
766774
/// <param name="transform">The transform function.</param>
767775
/// <param name="synchronousExecution">By default the command is deferred.
768-
/// If set to true, the command runs synchronusly and all data is acquired before the method returns.
769-
/// If set to false (default) the data is recieved asynchronously (deferred: data will be subsequently posted) and the source block (transform) can be completed early.</param>
776+
/// If set to true, the command runs synchronously and all data is acquired before the method returns.
777+
/// If set to false (default) the data is received asynchronously (deferred: data will be subsequently posted) and the source block (transform) can be completed early.</param>
770778
/// <returns>The buffer block that will contain the results.</returns>
771-
public ISourceBlock<T> AsSourceBlock<T>(Func<IDataRecord, T> transform, bool synchronousExecution = false)
779+
public ISourceBlock<T> AsSourceBlock<T>(
780+
Func<IDataRecord, T> transform,
781+
bool synchronousExecution = false)
772782
{
773783
if (transform == null) throw new ArgumentNullException(nameof(transform));
774784
Contract.EndContractBlock();
775785

776786
var source = new BufferBlock<T>();
777-
void i()
787+
void I()
778788
{
779789
ToTargetBlock(source, transform);
780790
source.Complete();
781-
};
782-
if (synchronousExecution) i();
783-
else Task.Run(() => i());
791+
}
792+
793+
if (synchronousExecution) I();
794+
else Task.Run(I);
784795
return source;
785796
}
786797

@@ -790,20 +801,25 @@ void i()
790801
/// <typeparam name="T">The model type to map the values to (using reflection).</typeparam>
791802
/// <param name="fieldMappingOverrides">An override map of field names to column names where the keys are the property names, and values are the column names.</param>
792803
/// <param name="synchronousExecution">By default the command is deferred.
793-
/// If set to true, the command runs synchronusly and all data is acquired before the method returns.
794-
/// If set to false (default) the data is recieved asynchronously (data will be subsequently posted) and the source block (transform) can be completed early.</param>
795-
/// <returns>A transform block that is recieving the results.</returns>
796-
public ISourceBlock<T> AsSourceBlock<T>(IEnumerable<(string Field, string Column)> fieldMappingOverrides, bool synchronousExecution = false)
804+
/// If set to true, the command runs synchronously and all data is acquired before the method returns.
805+
/// If set to false (default) the data is received asynchronously (data will be subsequently posted) and the source block (transform) can be completed early.</param>
806+
/// <param name="options">The optional ExecutionDataflowBlockOptions to use with the source.</param>
807+
/// <returns>A transform block that is receiving the results.</returns>
808+
public ISourceBlock<T> AsSourceBlock<T>(
809+
IEnumerable<(string Field, string Column)> fieldMappingOverrides,
810+
bool synchronousExecution = false,
811+
ExecutionDataflowBlockOptions options = null)
797812
where T : new()
798813
{
799814
var x = new Transformer<T>(fieldMappingOverrides);
800815
var cn = x.ColumnNames;
801-
var q = x.Results(out Action<QueryResult<IEnumerable<object[]>>> deferred);
816+
var q = x.Results(out var deferred, options);
802817

803-
void i() => ExecuteReader(reader =>
818+
void I() => ExecuteReader(reader =>
804819
{
805-
// Ignores fields that don't match.
806-
var columns = reader.GetMatchingOrdinals(cn, true);
820+
// Ignores fields that don't match.
821+
// ReSharper disable once PossibleMultipleEnumeration
822+
var columns = reader.GetMatchingOrdinals(cn, true);
807823

808824
var ordinalValues = columns.Select(c => c.Ordinal).ToArray();
809825
deferred(new QueryResult<IEnumerable<object[]>>(
@@ -813,8 +829,8 @@ void i() => ExecuteReader(reader =>
813829

814830
});
815831

816-
if (synchronousExecution) i();
817-
else Task.Run(() => i());
832+
if (synchronousExecution) I();
833+
else Task.Run(I);
818834
return q;
819835
}
820836

@@ -824,19 +840,23 @@ void i() => ExecuteReader(reader =>
824840
/// <typeparam name="T">The model type to map the values to (using reflection).</typeparam>
825841
/// <param name="fieldMappingOverrides">An override map of field names to column names where the keys are the property names, and values are the column names.</param>
826842
/// <param name="synchronousExecution">By default the command is deferred.
827-
/// If set to true, the command runs synchronusly and all data is acquired before the method returns.
828-
/// If set to false (default) the data is recieved asynchronously (data will be subsequently posted) and the source block (transform) can be completed early.</param>
829-
/// <returns>A transform block that is recieving the results.</returns>
830-
public ISourceBlock<T> AsSourceBlock<T>(IEnumerable<KeyValuePair<string, string>> fieldMappingOverrides, bool synchronousExecution = false)
843+
/// If set to true, the command runs synchronously and all data is acquired before the method returns.
844+
/// If set to false (default) the data is received asynchronously (data will be subsequently posted) and the source block (transform) can be completed early.</param>
845+
/// <param name="options">The optional ExecutionDataflowBlockOptions to use with the source.</param>
846+
/// <returns>A transform block that is receiving the results.</returns>
847+
public ISourceBlock<T> AsSourceBlock<T>(
848+
IEnumerable<KeyValuePair<string, string>> fieldMappingOverrides,
849+
bool synchronousExecution = false,
850+
ExecutionDataflowBlockOptions options = null)
831851
where T : new()
832-
=> AsSourceBlock<T>(fieldMappingOverrides?.Select(kvp => (kvp.Key, kvp.Value)), synchronousExecution);
852+
=> AsSourceBlock<T>(fieldMappingOverrides?.Select(kvp => (kvp.Key, kvp.Value)), synchronousExecution, options);
833853

834854
/// <summary>
835855
/// Provides a transform block as the source of records.
836856
/// </summary>
837857
/// <typeparam name="T">The model type to map the values to (using reflection).</typeparam>
838858
/// <param name="fieldMappingOverrides">An override map of field names to column names where the keys are the property names, and values are the column names.</param>
839-
/// <returns>A transform block that is recieving the results.</returns>
859+
/// <returns>A transform block that is receiving the results.</returns>
840860
public ISourceBlock<T> AsSourceBlock<T>(params (string Field, string Column)[] fieldMappingOverrides)
841861
where T : new()
842862
=> AsSourceBlock<T>(fieldMappingOverrides as IEnumerable<(string Field, string Column)>);

0 commit comments

Comments
 (0)