Skip to content

Commit 1bb72d2

Browse files
committed
Query.Store() works in async queries
1 parent e7c82cc commit 1bb72d2

8 files changed

Lines changed: 99 additions & 13 deletions

File tree

Orm/Xtensive.Orm.Tests/Linq/QueryMethodTests.cs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
using System.Collections.Generic;
88
using System.Linq;
9+
using System.Threading.Tasks;
910
using NUnit.Framework;
1011
using Xtensive.Collections;
1112
using Xtensive.Core;
@@ -119,6 +120,29 @@ public void Store1Test()
119120
Assert.AreEqual(0, expected.Except(query).Count());
120121
}
121122

123+
[Test]
124+
public async Task Store1TestAsync()
125+
{
126+
Require.AllFeaturesSupported(ProviderFeatures.TemporaryTables);
127+
128+
var localCustomers = Session.Query.All<Customer>().Take(10).ToList();
129+
var query = (await Session.Query.All<Customer>()
130+
.Join(
131+
Session.Query.Store(localCustomers),
132+
customer => customer,
133+
localCustomer => localCustomer,
134+
(customer, localCustomer) => new { customer, localCustomer }).AsAsync()).ToList();
135+
var expected = Session.Query.All<Customer>().AsEnumerable()
136+
.Join(
137+
Session.Query.Store(localCustomers),
138+
customer => customer,
139+
localCustomer => localCustomer,
140+
(customer, localCustomer) => new { customer, localCustomer });
141+
142+
Assert.That(query, Is.Not.Empty);
143+
Assert.AreEqual(0, expected.Except(query).Count());
144+
}
145+
122146
[Test]
123147
public void Store2Test()
124148
{

Orm/Xtensive.Orm/Orm/Providers/Interfaces/IProviderExecutor.cs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ public interface IProviderExecutor
3838
/// <param name="tuples">The tuples to store.</param>
3939
void Store(IPersistDescriptor descriptor, IEnumerable<Tuple> tuples);
4040

41+
/// <summary>
42+
/// Asynchronously stores the specified tuples in specified table.
43+
/// </summary>
44+
/// <param name="descriptor">Persist descriptor.</param>
45+
/// <param name="tuples">The tuples to store.</param>
46+
Task StoreAsync(EnumerationContext context, IPersistDescriptor descriptor, IEnumerable<Tuple> tuples, CancellationToken token);
47+
4148
/// <summary>
4249
/// Clears the specified table.
4350
/// </summary>

Orm/Xtensive.Orm/Orm/Providers/SqlSessionHandler.IProviderExecutor.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using System.Collections.Generic;
88
using System.Threading;
99
using System.Threading.Tasks;
10+
using Xtensive.Orm.Rse.Providers;
1011
using Tuple = Xtensive.Tuples.Tuple;
1112

1213
namespace Xtensive.Orm.Providers
@@ -45,6 +46,27 @@ void IProviderExecutor.Store(IPersistDescriptor descriptor, IEnumerable<Tuple> t
4546
commandProcessor.ExecuteTasks(context);
4647
}
4748

49+
50+
51+
async Task IProviderExecutor.StoreAsync(EnumerationContext enumerationContext,IPersistDescriptor descriptor, IEnumerable<Tuple> tuples, CancellationToken token)
52+
{
53+
await PrepareAsync(token);
54+
55+
if (tuples is ExecutableRawProvider rawProvider) {
56+
var enumerator = await rawProvider.GetEnumeratorAsync(enumerationContext, token);
57+
while(enumerator.MoveNext()) {
58+
commandProcessor.RegisterTask(new SqlPersistTask(descriptor.StoreRequest, enumerator.Current));
59+
}
60+
}
61+
else {
62+
foreach (var tuple in tuples)
63+
commandProcessor.RegisterTask(new SqlPersistTask(descriptor.StoreRequest, tuple));
64+
}
65+
66+
using (var context = new CommandProcessorContext())
67+
await commandProcessor.ExecuteTasksAsync(context, token);
68+
}
69+
4870
/// <inheritdoc/>
4971
void IProviderExecutor.Clear(IPersistDescriptor descriptor)
5072
{

Orm/Xtensive.Orm/Orm/Providers/SqlSessionHandler.cs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
using System;
88
using System.Collections.Generic;
9+
using System.Data;
910
using System.Linq;
1011
using System.Threading;
1112
using System.Threading.Tasks;
@@ -162,7 +163,8 @@ private void Prepare()
162163
private async Task PrepareAsync(CancellationToken cancellationToken)
163164
{
164165
Session.EnsureNotDisposed();
165-
await driver.OpenConnectionAsync(Session, connection, cancellationToken).ConfigureAwait(false);
166+
if (connection.State != ConnectionState.Open)
167+
await driver.OpenConnectionAsync(Session, connection, cancellationToken).ConfigureAwait(false);
166168

167169
try {
168170
foreach (var initializationSqlScript in initializationSqlScripts)

Orm/Xtensive.Orm/Orm/Providers/SqlStoreProvider.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
// Created by: Dmitri Maximov
55
// Created: 2008.09.05
66

7+
using System.Threading;
8+
using System.Threading.Tasks;
79
using Xtensive.Orm.Rse.Providers;
810

911
namespace Xtensive.Orm.Providers
@@ -30,6 +32,12 @@ protected override void OnBeforeEnumerate(Rse.Providers.EnumerationContext conte
3032
LockAndStore(context, Source);
3133
}
3234

35+
protected override async Task OnBeforeEnumerateAsync(Rse.Providers.EnumerationContext context, CancellationToken token)
36+
{
37+
await base.OnBeforeEnumerateAsync(context, token);
38+
await LockAndStoreAsync(context, Source, token);
39+
}
40+
3341
protected override void OnAfterEnumerate(Rse.Providers.EnumerationContext context)
3442
{
3543
ClearAndUnlock(context);

Orm/Xtensive.Orm/Orm/Providers/SqlTemporaryDataProvider.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
using Xtensive.Tuples;
1111
using Tuple = Xtensive.Tuples.Tuple;
1212
using Xtensive.Orm.Rse.Providers;
13+
using System.Threading.Tasks;
14+
using System.Threading;
1315

1416
namespace Xtensive.Orm.Providers
1517
{
@@ -33,6 +35,17 @@ protected void LockAndStore(Rse.Providers.EnumerationContext context, IEnumerabl
3335
executor.Store(tableDescriptor, data);
3436
}
3537

38+
protected async Task LockAndStoreAsync(Rse.Providers.EnumerationContext context, IEnumerable<Tuple> data, CancellationToken token)
39+
{
40+
var storageContext = (EnumerationContext) context;
41+
var tableLock = DomainHandler.TemporaryTableManager.Acquire(storageContext, tableDescriptor);
42+
if (tableLock == null)
43+
return;
44+
storageContext.SetValue(this, TemporaryTableLockName, tableLock);
45+
var executor = storageContext.Session.Services.Demand<IProviderExecutor>();
46+
await executor.StoreAsync(storageContext, tableDescriptor, data, token);
47+
}
48+
3649
protected bool ClearAndUnlock(Rse.Providers.EnumerationContext context)
3750
{
3851
var tableLock = context.GetValue<IDisposable>(this, TemporaryTableLockName);

Orm/Xtensive.Orm/Orm/Rse/Providers/Executable/ExecutableRawProvider.cs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66

77
using System;
88
using System.Collections.Generic;
9+
using System.Threading;
10+
using System.Threading.Tasks;
911
using Tuple = Xtensive.Tuples.Tuple;
1012

1113
namespace Xtensive.Orm.Rse.Providers
@@ -16,28 +18,26 @@ namespace Xtensive.Orm.Rse.Providers
1618
[Serializable]
1719
public sealed class ExecutableRawProvider : ExecutableProvider<Providers.RawProvider>
1820
{
19-
#region Cached properties
20-
2121
private const string CachedSourceName = "CachedSource";
2222

23-
private IEnumerable<Tuple> CachedSource {
24-
get { return GetValue<IEnumerable<Tuple>>(EnumerationContext.Current, CachedSourceName); }
25-
set { SetValue(EnumerationContext.Current, CachedSourceName, value); }
26-
}
27-
28-
#endregion
29-
3023
/// <inheritdoc/>
3124
protected override void OnBeforeEnumerate(EnumerationContext context)
3225
{
3326
base.OnBeforeEnumerate(context);
34-
CachedSource = Origin.CompiledSource.Invoke();
27+
context.SetValue(this, CachedSourceName, Origin.CompiledSource.Invoke());
28+
}
29+
30+
/// <inheritdoc/>
31+
protected override async Task OnBeforeEnumerateAsync(EnumerationContext context, CancellationToken token)
32+
{
33+
await base.OnBeforeEnumerateAsync(context, token);
34+
context.SetValue(this, CachedSourceName, Origin.CompiledSource.Invoke());
3535
}
3636

3737
/// <inheritdoc/>
3838
protected override IEnumerable<Tuple> OnEnumerate(EnumerationContext context)
3939
{
40-
return CachedSource;
40+
return context.GetValue<IEnumerable<Tuple>>(this, CachedSourceName);
4141
}
4242

4343

Orm/Xtensive.Orm/Orm/Rse/Providers/ExecutableProvider.cs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,16 @@ protected virtual void OnBeforeEnumerate(EnumerationContext context)
4141
}
4242
}
4343

44+
protected virtual async Task OnBeforeEnumerateAsync(EnumerationContext context, CancellationToken token)
45+
{
46+
token.ThrowIfCancellationRequested();
47+
foreach (var source in Sources) {
48+
var ep = source as ExecutableProvider;
49+
if (ep != null)
50+
await ep.OnBeforeEnumerateAsync(context, token);
51+
}
52+
}
53+
4454
/// <summary>
4555
/// Called when enumeration is finished.
4656
/// </summary>
@@ -117,7 +127,7 @@ public async Task<IEnumerator<Tuple>> GetEnumeratorAsync(EnumerationContext cont
117127
var enumerated = context.GetValue<bool>(this, enumerationMarker);
118128
bool onEnumerationExecuted = false;
119129
if (!enumerated)
120-
OnBeforeEnumerate(context);
130+
await OnBeforeEnumerateAsync(context, token);
121131
try {
122132
context.SetValue(this, enumerationMarker, true);
123133
var enumerator = (await OnEnumerateAsync(context, token).ConfigureAwait(false))

0 commit comments

Comments
 (0)