Skip to content

Commit 868e15f

Browse files
committed
优化 线程池 队列处理。 加入线程池为空时超时等待。
1 parent f71ddc5 commit 868e15f

6 files changed

Lines changed: 149 additions & 100 deletions

File tree

launchSettings.json

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
{
2+
"profiles": {
3+
"Docker Compose": {
4+
"commandName": "DockerCompose",
5+
"commandVersion": "1.0",
6+
"composeLaunchAction": "None",
7+
"composeLaunchServiceName": "apache.iotdb.samples",
8+
"serviceActions": {
9+
"apache.iotdb.samples": "StartDebugging",
10+
"iotdb": "StartWithoutDebugging"
11+
}
12+
}
13+
}
14+
}

samples/Apache.IoTDB.Samples/SessionPoolTest.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,6 +276,7 @@ public async Task TestNonSqlBy_ADO()
276276
{
277277
var cnts = new IoTDB.Data.IoTDBConnectionStringBuilder();
278278
cnts.DataSource = host;
279+
cnts.TimeOut =(int) TimeSpan.FromSeconds(20).TotalMilliseconds;
279280
var cnt = new IoTDB.Data.IoTDBConnection(cnts.ConnectionString);
280281
await cnt.OpenAsync();
281282
var session_pool = cnt.SessionPool;

src/Apache.IoTDB.Data/DataReaderExtensions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ public static class DataReaderExtensions
1414
{
1515
public static SessionPool CreateSession(this IoTDBConnectionStringBuilder db)
1616
{
17-
return new SessionPool(db.DataSource, db.Port, db.Username, db.Password, db.FetchSize, db.ZoneId, db.PoolSize,db.Compression);
17+
return new SessionPool(db.DataSource, db.Port, db.Username, db.Password, db.FetchSize, db.ZoneId, db.PoolSize,db.Compression,db.TimeOut);
1818
}
1919

2020
public static List<T> ToObject<T>(this IDataReader dataReader)

src/Apache.IoTDB.Data/IoTDBConnectionStringBuilder.cs

Lines changed: 30 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Diagnostics;
77
using System.Globalization;
88
using System.Reflection;
9+
using System.Threading;
910

1011
namespace Apache.IoTDB.Data
1112
{
@@ -23,7 +24,8 @@ public class IoTDBConnectionStringBuilder : DbConnectionStringBuilder
2324
private const string CompressionKeyword = "Compression";
2425
private const string PoolSizeKeyword = "PoolSize";
2526
private const string ZoneIdKeyword = "ZoneId";
26-
27+
private const string TimeOutKeyword = "TimeOut";
28+
2729
private enum Keywords
2830
{
2931
DataSource,
@@ -33,7 +35,8 @@ private enum Keywords
3335
FetchSize,
3436
Compression,
3537
PoolSize,
36-
ZoneId
38+
ZoneId,
39+
TimeOut
3740
}
3841

3942
private static readonly IReadOnlyList<string> _validKeywords;
@@ -47,9 +50,11 @@ private enum Keywords
4750
private string _zoneId = "UTC+08:00";
4851
private int _port = 6667;
4952
private int _poolSize =8;
53+
private int _timeOut=10000;
54+
5055
static IoTDBConnectionStringBuilder()
5156
{
52-
var validKeywords = new string[8];
57+
var validKeywords = new string[9];
5358
validKeywords[(int)Keywords.DataSource] = DataSourceKeyword;
5459
validKeywords[(int)Keywords.Username] = UserNameKeyword;
5560
validKeywords[(int)Keywords.Password] = PasswordKeyword;
@@ -58,9 +63,10 @@ static IoTDBConnectionStringBuilder()
5863
validKeywords[(int)Keywords.Compression] = CompressionKeyword;
5964
validKeywords[(int)Keywords.PoolSize] = PoolSizeKeyword;
6065
validKeywords[(int)Keywords.ZoneId] = ZoneIdKeyword;
66+
validKeywords[(int)Keywords.TimeOut] =TimeOutKeyword;
6167
_validKeywords = validKeywords;
6268

63-
_keywords = new Dictionary<string, Keywords>(6, StringComparer.OrdinalIgnoreCase)
69+
_keywords = new Dictionary<string, Keywords>(9, StringComparer.OrdinalIgnoreCase)
6470
{
6571
[DataSourceKeyword] = Keywords.DataSource,
6672
[UserNameKeyword] = Keywords.Username,
@@ -69,7 +75,8 @@ static IoTDBConnectionStringBuilder()
6975
[FetchSizeKeyword] = Keywords.FetchSize,
7076
[CompressionKeyword] = Keywords.Compression,
7177
[PoolSizeKeyword] = Keywords.PoolSize,
72-
[ZoneIdKeyword] = Keywords.ZoneId
78+
[ZoneIdKeyword] = Keywords.ZoneId,
79+
[TimeOutKeyword] = Keywords.TimeOut
7380
};
7481
}
7582

@@ -135,8 +142,12 @@ public virtual string ZoneId
135142
get => _zoneId;
136143
set => base[ZoneIdKeyword] = _zoneId = value;
137144
}
138-
139-
145+
146+
public virtual int TimeOut
147+
{
148+
get => _timeOut;
149+
set => base[PoolSizeKeyword] = _timeOut = value;
150+
}
140151

141152
/// <summary>
142153
/// Gets a collection containing the keys used by the connection string.
@@ -162,8 +173,10 @@ public override ICollection Values
162173
return new ReadOnlyCollection<object>(values);
163174
}
164175
}
165-
166-
176+
177+
178+
179+
167180

168181

169182

@@ -211,6 +224,9 @@ public override object this[string keyword]
211224
case Keywords.ZoneId:
212225
ZoneId = Convert.ToString(value, CultureInfo.InvariantCulture);
213226
return;
227+
case Keywords.TimeOut:
228+
TimeOut = Convert.ToInt32(value, CultureInfo.InvariantCulture);
229+
return;
214230
default:
215231
Debug.WriteLine(false, "Unexpected keyword: " + keyword);
216232
return;
@@ -339,6 +355,8 @@ private object GetAt(Keywords index)
339355
return PoolSize;
340356
case Keywords.ZoneId:
341357
return ZoneId;
358+
case Keywords.TimeOut:
359+
return TimeOut;
342360
default:
343361
Debug.Assert(false, "Unexpected keyword: " + index);
344362
return null;
@@ -378,6 +396,9 @@ private void Reset(Keywords index)
378396
case Keywords.ZoneId:
379397
_zoneId = "UTC+08:00";
380398
return;
399+
case Keywords.TimeOut:
400+
_timeOut = 10000;//10sec.
401+
return;
381402
default:
382403
Debug.Assert(false, "Unexpected keyword: " + index);
383404
return;
Lines changed: 78 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,86 @@
1+
using System;
12
using System.Collections.Concurrent;
23
using System.Collections.Generic;
4+
using System.Diagnostics;
35
using System.Threading;
46

57
namespace Apache.IoTDB
68
{
7-
public class ConcurrentClientQueue
8-
{
9-
public ConcurrentQueue<Client> ClientQueue { get; }
10-
11-
public ConcurrentClientQueue(List<Client> clients)
12-
{
13-
ClientQueue = new ConcurrentQueue<Client>(clients);
14-
}
15-
16-
public ConcurrentClientQueue()
17-
{
18-
ClientQueue = new ConcurrentQueue<Client>();
19-
}
20-
21-
public void Add(Client client)
22-
{
23-
Monitor.Enter(ClientQueue);
24-
ClientQueue.Enqueue(client);
25-
Monitor.Pulse(ClientQueue);
26-
Monitor.Exit(ClientQueue);
27-
}
28-
29-
public Client Take()
30-
{
31-
Monitor.Enter(ClientQueue);
32-
33-
if (ClientQueue.IsEmpty)
34-
{
35-
Monitor.Wait(ClientQueue);
36-
}
37-
38-
ClientQueue.TryDequeue(out var client);
39-
Monitor.Exit(ClientQueue);
40-
return client;
41-
}
9+
public class ConcurrentClientQueue
10+
{
11+
public ConcurrentQueue<Client> ClientQueue { get; }
12+
13+
public ConcurrentClientQueue(List<Client> clients)
14+
{
15+
ClientQueue = new ConcurrentQueue<Client>(clients);
16+
}
17+
public ConcurrentClientQueue()
18+
{
19+
ClientQueue = new ConcurrentQueue<Client>();
20+
}
21+
public void Add(Client client) => Return(client);
22+
23+
public void Return(Client client)
24+
{
25+
Monitor.Enter(ClientQueue);
26+
ClientQueue.Enqueue(client);
27+
#if DEBUG
28+
Console.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 归还 {client}");
29+
#endif
30+
Monitor.Pulse(ClientQueue);
31+
Monitor.Exit(ClientQueue);
32+
Thread.Sleep(0);
33+
}
34+
int _ref = 0;
35+
public void AddRef()
36+
{
37+
lock (this)
38+
{
39+
_ref++;
40+
}
41+
}
42+
public int GetRef()
43+
{
44+
return _ref;
45+
}
46+
public void RemoveRef()
47+
{
48+
lock (this)
49+
{
50+
_ref--;
51+
}
52+
}
53+
public int Timeout { get; set; } = 10;
54+
public Client Take()
55+
{
56+
Client client = null;
57+
Monitor.Enter(ClientQueue);
58+
if (ClientQueue.IsEmpty)
59+
{
60+
#if DEBUG
61+
Console.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 连接池已空,请等待 超时时长:{Timeout}");
62+
#endif
63+
Monitor.Wait(ClientQueue, TimeSpan.FromSeconds(Timeout));
64+
}
65+
if (!ClientQueue.TryDequeue(out client))
66+
{
67+
#if DEBUG
68+
Console.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 从连接池获取连接失败,等待并重试");
69+
#endif
70+
}
71+
else
72+
{
73+
74+
#if DEBUG
75+
Console.WriteLine($"线程{Thread.CurrentThread.ManagedThreadId} 拿走 {client}");
76+
#endif
77+
}
78+
Monitor.Exit(ClientQueue);
79+
if (client == null)
80+
{
81+
throw new TimeoutException($"Connection pool is empty and wait time out({Timeout}s)!");
82+
}
83+
return client;
84+
}
4285
}
4386
}

0 commit comments

Comments
 (0)