11"""Test Duck DB helpers"""
2+
3+ import datetime
24import tempfile
35from pathlib import Path
6+ from typing import Any
47
58import pytest
69import pyspark .sql .types as pst
710from duckdb import DuckDBPyRelation , DuckDBPyConnection
811from pyspark .sql import Row , SparkSession
912
10- from dve .core_engine .backends .implementations .duckdb .duckdb_helpers import _ddb_read_parquet
13+ from dve .core_engine .backends .implementations .duckdb .duckdb_helpers import (
14+ _ddb_read_parquet ,
15+ duckdb_rel_to_dictionaries )
1116
1217
1318class TempConnection :
1419 """
1520 Full object would be a DataContract object but this simplified down to meet min requirements
1621 of the test.
1722 """
23+
1824 def __init__ (self , connection : DuckDBPyConnection ) -> None :
1925 self ._connection = connection
2026
@@ -25,7 +31,7 @@ def __init__(self, connection: DuckDBPyConnection) -> None:
2531 ("movie_ratings" ),
2632 ("movie_ratings/" ),
2733 ("file://movie_ratings/" ),
28- ]
34+ ],
2935)
3036def test__ddb_read_parquet_with_hive_format (
3137 spark : SparkSession , temp_ddb_conn : DuckDBPyConnection , outpath : str
@@ -38,11 +44,13 @@ def test__ddb_read_parquet_with_hive_format(
3844 Row (movie_name = "Hot Fuzz" , avg_user_rating = 7.7 , avg_critic_rating = 6.5 ),
3945 Row (movie_name = "Nemo" , avg_user_rating = 8.8 , avg_critic_rating = 7.6 ),
4046 ],
41- pst .StructType ([
42- pst .StructField ("movie_name" , pst .StringType ()),
43- pst .StructField ("avg_user_rating" , pst .FloatType ()),
44- pst .StructField ("avg_critic_rating" , pst .FloatType ()),
45- ])
47+ pst .StructType (
48+ [
49+ pst .StructField ("movie_name" , pst .StringType ()),
50+ pst .StructField ("avg_user_rating" , pst .FloatType ()),
51+ pst .StructField ("avg_critic_rating" , pst .FloatType ()),
52+ ]
53+ ),
4654 )
4755 out_path = str (Path (temp_dir_path , outpath ))
4856 test_data_df .coalesce (1 ).write .parquet (out_path )
@@ -51,3 +59,39 @@ def test__ddb_read_parquet_with_hive_format(
5159
5260 assert isinstance (ddby_relation , DuckDBPyRelation )
5361 assert ddby_relation .count ("*" ).fetchone ()[0 ] == 2 # type: ignore
62+
63+
64+ @pytest .mark .parametrize (
65+ "data" ,
66+ (
67+
68+ [
69+ {
70+ "str_field" : "hi" ,
71+ "int_field" : 5 ,
72+ "array_float_field" : [6.5 , 7.25 ],
73+ "date_field" : datetime .date (2021 , 5 , 3 ),
74+ "timestamp_field" : datetime .datetime (2022 , 6 , 7 , 1 , 2 , 3 ),
75+ },
76+ {
77+ "str_field" : "bye" ,
78+ "int_field" : 3 ,
79+ "array_float_field" : None ,
80+ "date_field" : datetime .date (2021 , 8 , 11 ),
81+ "timestamp_field" : datetime .datetime (2022 , 4 , 3 , 1 , 2 , 3 ),
82+ },
83+ ],
84+
85+ ),
86+ )
87+ def test_duckdb_rel_to_dictionaries (temp_ddb_conn : DuckDBPyConnection ,
88+ data : list [dict [str , Any ]]):
89+ _ , con = temp_ddb_conn
90+ test_rel = con .query ("select dta.* from (select unnest($data) as dta)" ,
91+ params = {"data" : data })
92+ res : list = []
93+ for chunk in duckdb_rel_to_dictionaries (test_rel ):
94+ res .extend (chunk )
95+
96+ assert res == data
97+
0 commit comments