Skip to content

Commit f4dce9f

Browse files
committed
add string record insertion test
1 parent b0483bb commit f4dce9f

2 files changed

Lines changed: 414 additions & 0 deletions

File tree

samples/Apache.IoTDB.Samples/SessionPoolTest.AlignedRecord.cs

Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,52 @@ public async Task TestInsertAlignedRecord()
4444
await session_pool.Close();
4545
Console.WriteLine("TestInsertAlignedRecordAsync Passed");
4646
}
47+
public async Task TestInsertAlignedStringRecord()
48+
{
49+
var session_pool = new SessionPool(host, port, pool_size);
50+
var status = 0;
51+
await session_pool.Open(false);
52+
if (debug) session_pool.OpenDebugMode();
53+
54+
System.Diagnostics.Debug.Assert(session_pool.IsOpen());
55+
await session_pool.DeleteStorageGroupAsync(test_group_name);
56+
57+
status = await session_pool.CreateAlignedTimeseriesAsync(
58+
string.Format("{0}.{1}", test_group_name, test_device),
59+
new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] },
60+
new List<TSDataType>() { TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT },
61+
new List<TSEncoding>() { TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN },
62+
new List<Compressor>() { Compressor.UNCOMPRESSED, Compressor.UNCOMPRESSED, Compressor.UNCOMPRESSED });
63+
64+
System.Diagnostics.Debug.Assert(status == 0);
65+
var measurements = new List<string>
66+
{test_measurements[0], test_measurements[1], test_measurements[2]};
67+
var values = new List<string> { "test_text1", "test_text2", "test_text3" };
68+
var tasks = new List<Task<int>>();
69+
var start_ms = DateTime.Now.Ticks / 10000;
70+
for (var timestamp = 1; timestamp <= fetch_size * processed_size; timestamp++)
71+
{
72+
var task = session_pool.InsertAlignedStringRecordAsync(
73+
string.Format("{0}.{1}", test_group_name, test_device), measurements, values, timestamp);
74+
tasks.Add(task);
75+
}
76+
77+
Task.WaitAll(tasks.ToArray());
78+
var end_ms = DateTime.Now.Ticks / 10000;
79+
Console.WriteLine(string.Format("total insert aligned string record time is {0}", end_ms - start_ms));
80+
var res = await session_pool.ExecuteQueryStatementAsync("select * from " + string.Format("{0}.{1}", test_group_name, test_device));
81+
var res_cnt = 0;
82+
while (res.HasNext())
83+
{
84+
res.Next();
85+
res_cnt++;
86+
}
87+
Console.WriteLine(res_cnt + " " + fetch_size * processed_size);
88+
System.Diagnostics.Debug.Assert(res_cnt == fetch_size * processed_size);
89+
await session_pool.DeleteStorageGroupAsync(test_group_name);
90+
await session_pool.Close();
91+
Console.WriteLine("TestInsertAlignedStringRecordAsync Passed");
92+
}
4793
public async Task TestInsertAlignedRecords()
4894
{
4995
var session_pool = new SessionPool(host, port, pool_size);
@@ -165,6 +211,87 @@ public async Task TestInsertAlignedRecords()
165211
await session_pool.Close();
166212
Console.WriteLine("TestInsertAlignedRecords Passed!");
167213
}
214+
public async Task TestInsertAlignedStringRecords()
215+
{
216+
var session_pool = new SessionPool(host, port, pool_size);
217+
await session_pool.Open(false);
218+
if (debug) session_pool.OpenDebugMode();
219+
220+
System.Diagnostics.Debug.Assert(session_pool.IsOpen());
221+
var status = 0;
222+
await session_pool.DeleteStorageGroupAsync(test_group_name);
223+
224+
string prefixPath = string.Format("{0}.{1}", test_group_name, test_device);
225+
var measurement_lst = new List<string>() { test_measurements[1], test_measurements[2] };
226+
var data_type_lst = new List<TSDataType>() { TSDataType.TEXT, TSDataType.TEXT };
227+
var encoding_lst = new List<TSEncoding>() { TSEncoding.PLAIN, TSEncoding.PLAIN };
228+
var compressor_lst = new List<Compressor>() { Compressor.SNAPPY, Compressor.SNAPPY };
229+
status = await session_pool.CreateAlignedTimeseriesAsync(prefixPath, measurement_lst, data_type_lst, encoding_lst,
230+
compressor_lst);
231+
System.Diagnostics.Debug.Assert(status == 0);
232+
233+
var device_id = new List<string>() { };
234+
for (var i = 0; i < 3; i++) device_id.Add(string.Format("{0}.{1}", test_group_name, test_device));
235+
var measurements_lst = new List<List<string>>() { };
236+
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
237+
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
238+
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
239+
var values_lst = new List<List<string>>() { };
240+
values_lst.Add(new List<string>() { "test1", "test2" });
241+
values_lst.Add(new List<string>() { "test3", "test4" });
242+
values_lst.Add(new List<string>() { "test5", "test6" });
243+
List<long> timestamp_lst = new List<long>() { 1, 2, 3 };
244+
245+
status = await session_pool.InsertAlignedStringRecordsAsync(device_id, measurements_lst, values_lst, timestamp_lst);
246+
System.Diagnostics.Debug.Assert(status == 0);
247+
var res = await session_pool.ExecuteQueryStatementAsync(
248+
"select * from " + string.Format("{0}.{1}", test_group_name, test_device) + " where time<10");
249+
res.ShowTableNames();
250+
while (res.HasNext()) Console.WriteLine(res.Next());
251+
252+
await res.Close();
253+
254+
// large data test
255+
device_id = new List<string>() { };
256+
measurements_lst = new List<List<string>>() { };
257+
values_lst = new List<List<string>>() { };
258+
timestamp_lst = new List<long>() { };
259+
List<Task<int>> tasks = new List<Task<int>>();
260+
for (var timestamp = 4; timestamp <= fetch_size * processed_size; timestamp++)
261+
{
262+
device_id.Add(string.Format("{0}.{1}", test_group_name, test_device));
263+
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
264+
values_lst.Add(new List<string>() { "test1", "test2" });
265+
timestamp_lst.Add(timestamp);
266+
if (timestamp % fetch_size == 0)
267+
{
268+
tasks.Add(session_pool.InsertAlignedStringRecordsAsync(device_id, measurements_lst, values_lst, timestamp_lst));
269+
device_id = new List<string>() { };
270+
measurements_lst = new List<List<string>>() { };
271+
values_lst = new List<List<string>>() { };
272+
timestamp_lst = new List<long>() { };
273+
}
274+
}
275+
276+
Task.WaitAll(tasks.ToArray());
277+
res = await session_pool.ExecuteQueryStatementAsync(
278+
"select * from " + string.Format("{0}.{1}", test_group_name, test_device));
279+
res.ShowTableNames();
280+
var res_count = 0;
281+
while (res.HasNext())
282+
{
283+
res.Next();
284+
res_count += 1;
285+
}
286+
287+
await res.Close();
288+
Console.WriteLine(res_count + " " + fetch_size * processed_size);
289+
System.Diagnostics.Debug.Assert(res_count == fetch_size * processed_size);
290+
status = await session_pool.DeleteStorageGroupAsync(test_group_name);
291+
System.Diagnostics.Debug.Assert(status == 0);
292+
await session_pool.Close();
293+
Console.WriteLine("TestInsertAlignedStringRecords Passed!");
294+
}
168295
public async Task TestInsertAlignedRecordsOfOneDevice()
169296
{
170297
var session_pool = new SessionPool(host, port, pool_size);
@@ -275,5 +402,78 @@ public async Task TestInsertAlignedRecordsOfOneDevice()
275402
await session_pool.Close();
276403
Console.WriteLine("TestInsertAlignedRecordsOfOneDevice Passed!");
277404
}
405+
public async Task TestInsertAlignedStringRecordsOfOneDevice(){
406+
var session_pool = new SessionPool(host, port, pool_size);
407+
await session_pool.Open(false);
408+
if(debug) session_pool.OpenDebugMode();
409+
410+
System.Diagnostics.Debug.Assert(session_pool.IsOpen());
411+
var status = 0;
412+
await session_pool.DeleteStorageGroupAsync(test_group_name);
413+
var device_id = string.Format("{0}.{1}", test_group_name, test_device);
414+
var measurements = new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] };
415+
var data_type_lst = new List<TSDataType>() { TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT };
416+
var encoding_lst = new List<TSEncoding>() { TSEncoding.PLAIN, TSEncoding.PLAIN, TSEncoding.PLAIN };
417+
var compressor_lst = new List<Compressor>() { Compressor.SNAPPY, Compressor.SNAPPY, Compressor.SNAPPY };
418+
status = await session_pool.CreateAlignedTimeseriesAsync(device_id, measurements, data_type_lst, encoding_lst, compressor_lst);
419+
System.Diagnostics.Debug.Assert(status == 0);
420+
421+
var measurements_lst = new List<List<string>>() { };
422+
measurements_lst.Add(new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] });
423+
measurements_lst.Add(new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] });
424+
measurements_lst.Add(new List<string>() { test_measurements[0], test_measurements[1], test_measurements[2] });
425+
426+
var values_lst = new List<List<string>>() { };
427+
values_lst.Add(new List<string>() { "test1", "test2", "test3" });
428+
values_lst.Add(new List<string>() { "test4", "test5", "test6" });
429+
values_lst.Add(new List<string>() { "test7", "test8", "test9" });
430+
431+
var timestamp_lst = new List<long>() { 1, 2, 3 };
432+
433+
status = await session_pool.InsertAlignedStringRecordsOfOneDeviceAsync(device_id, timestamp_lst, measurements_lst, values_lst);
434+
System.Diagnostics.Debug.Assert(status == 0);
435+
var res = await session_pool.ExecuteQueryStatementAsync(
436+
"select * from " + string.Format("{0}.{1}", test_group_name, test_device) + " where time<10");
437+
res.ShowTableNames();
438+
while (res.HasNext()) Console.WriteLine(res.Next());
439+
440+
await res.Close();
441+
// large data test
442+
values_lst = new List<List<string>>() { };
443+
var tasks = new List<Task<int>>();
444+
measurements_lst = new List<List<string>>() { };
445+
timestamp_lst = new List<long>() { };
446+
for (var timestamp = 4; timestamp <= fetch_size * processed_size; timestamp++)
447+
{
448+
values_lst.Add(new List<string>() { "test1", "test2" });
449+
measurements_lst.Add(new List<string>() { test_measurements[1], test_measurements[2] });
450+
timestamp_lst.Add(timestamp);
451+
if (timestamp % fetch_size == 0)
452+
{
453+
tasks.Add(session_pool.InsertAlignedStringRecordsOfOneDeviceAsync(device_id, timestamp_lst, measurements_lst, values_lst));
454+
values_lst = new List<List<string>>() { };
455+
measurements_lst = new List<List<string>>() { };
456+
timestamp_lst = new List<long>() { };
457+
}
458+
}
459+
460+
Task.WaitAll(tasks.ToArray());
461+
res = await session_pool.ExecuteQueryStatementAsync(
462+
"select * from " + string.Format("{0}.{1}", test_group_name, test_device));
463+
var res_count = 0;
464+
while (res.HasNext())
465+
{
466+
res.Next();
467+
res_count += 1;
468+
}
469+
470+
await res.Close();
471+
Console.WriteLine(res_count + " " + fetch_size * processed_size);
472+
System.Diagnostics.Debug.Assert(res_count == fetch_size * processed_size);
473+
status = await session_pool.DeleteStorageGroupAsync(test_group_name);
474+
System.Diagnostics.Debug.Assert(status == 0);
475+
await session_pool.Close();
476+
Console.WriteLine("TestInsertAlignedStringRecordsOfOneDevice Passed!");
477+
}
278478
}
279479
}

0 commit comments

Comments
 (0)