1+ import logging
2+ import sys
13from typing import Dict , Any , List
24import pandas as pd
35import json
46import duckdb
57import random
68import string
9+ from datetime import datetime
710
811from azure .kusto .data import KustoClient , KustoConnectionStringBuilder
912from azure .kusto .data .helpers import dataframe_from_result_table
1013
1114from data_formulator .data_loader .external_data_loader import ExternalDataLoader , sanitize_table_name
1215
16+ # Configure root logger for general application logging
17+ logging .basicConfig (
18+ level = logging .INFO ,
19+ format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ,
20+ handlers = [logging .StreamHandler (sys .stdout )]
21+ )
22+
23+ # Get logger for this module
24+ logger = logging .getLogger (__name__ )
1325
1426class KustoDataLoader (ExternalDataLoader ):
1527
@@ -67,23 +79,93 @@ def __init__(self, params: Dict[str, Any], duck_db_conn: duckdb.DuckDBPyConnecti
6779 self .kusto_cluster , self .client_id , self .client_secret , self .tenant_id ))
6880 else :
6981 # This function provides an interface to Kusto. It uses Azure CLI auth, but you can also use other auth types.
70- self .client = KustoClient (KustoConnectionStringBuilder .with_az_cli_authentication (self .kusto_cluster ))
82+ cluster_url = KustoConnectionStringBuilder .with_az_cli_authentication (self .kusto_cluster )
83+ logger .info (f"Connecting to Kusto cluster: { self .kusto_cluster } " )
84+ self .client = KustoClient (cluster_url )
85+ logger .info ("Using Azure CLI authentication for Kusto client. Ensure you have run `az login` in your terminal." )
7186 except Exception as e :
72- raise Exception (f"Error creating Kusto client: { e } , please authenticate with Azure CLI when starting the app. " )
73-
87+ logger . error (f"Error creating Kusto client: { e } " )
88+ raise Exception ( f"Error creating Kusto client: { e } , please authenticate with Azure CLI when starting the app." )
7489 self .duck_db_conn = duck_db_conn
7590
91+ def _convert_kusto_datetime_columns (self , df : pd .DataFrame ) -> pd .DataFrame :
92+ """Convert Kusto datetime columns to proper pandas datetime format"""
93+ logger .info (f"Processing DataFrame with columns: { list (df .columns )} " )
94+ logger .info (f"Column dtypes before conversion: { dict (df .dtypes )} " )
95+
96+ for col in df .columns :
97+ original_dtype = df [col ].dtype
98+
99+ if df [col ].dtype == 'object' :
100+ # Try to identify datetime columns by checking sample values
101+ sample_values = df [col ].dropna ().head (3 )
102+ if len (sample_values ) > 0 :
103+ # Check if values look like datetime strings or timestamp numbers
104+ first_val = sample_values .iloc [0 ]
105+
106+ # Handle Kusto datetime format (ISO 8601 strings)
107+ if isinstance (first_val , str ) and ('T' in first_val or '-' in first_val ):
108+ try :
109+ # Try to parse as datetime
110+ pd .to_datetime (sample_values .iloc [0 ])
111+ logger .info (f"Converting column '{ col } ' from string to datetime" )
112+ df [col ] = pd .to_datetime (df [col ], errors = 'coerce' , utc = True ).dt .tz_localize (None )
113+ except Exception as e :
114+ logger .debug (f"Failed to convert column '{ col } ' as string datetime: { e } " )
115+
116+ # Handle numeric timestamps (Unix timestamps in various formats)
117+ elif isinstance (first_val , (int , float )) and first_val > 1000000000 :
118+ try :
119+ # Try different timestamp formats
120+ if first_val > 1e15 : # Likely microseconds since epoch
121+ logger .info (f"Converting column '{ col } ' from microseconds timestamp to datetime" )
122+ df [col ] = pd .to_datetime (df [col ], unit = 'us' , errors = 'coerce' , utc = True ).dt .tz_localize (None )
123+ elif first_val > 1e12 : # Likely milliseconds since epoch
124+ logger .info (f"Converting column '{ col } ' from milliseconds timestamp to datetime" )
125+ df [col ] = pd .to_datetime (df [col ], unit = 'ms' , errors = 'coerce' , utc = True ).dt .tz_localize (None )
126+ else : # Likely seconds since epoch
127+ logger .info (f"Converting column '{ col } ' from seconds timestamp to datetime" )
128+ df [col ] = pd .to_datetime (df [col ], unit = 's' , errors = 'coerce' , utc = True ).dt .tz_localize (None )
129+ except Exception as e :
130+ logger .debug (f"Failed to convert column '{ col } ' as numeric timestamp: { e } " )
131+
132+ # Handle datetime64 columns that might have timezone info
133+ elif pd .api .types .is_datetime64_any_dtype (df [col ]):
134+ # Ensure timezone-aware datetimes are properly handled
135+ if hasattr (df [col ].dt , 'tz' ) and df [col ].dt .tz is not None :
136+ logger .info (f"Converting timezone-aware datetime column '{ col } ' to UTC" )
137+ df [col ] = df [col ].dt .tz_convert ('UTC' ).dt .tz_localize (None )
138+
139+ # Log if conversion happened
140+ if original_dtype != df [col ].dtype :
141+ logger .info (f"Column '{ col } ' converted from { original_dtype } to { df [col ].dtype } " )
142+
143+ logger .info (f"Column dtypes after conversion: { dict (df .dtypes )} " )
144+ return df
145+
76146 def query (self , kql : str ) -> pd .DataFrame :
147+ logger .info (f"Executing KQL query: { kql } on database { self .kusto_database } " )
77148 result = self .client .execute (self .kusto_database , kql )
78- return dataframe_from_result_table (result .primary_results [0 ])
149+ logger .info (f"Query executed successfully, returning results." )
150+ df = dataframe_from_result_table (result .primary_results [0 ])
151+
152+ # Convert datetime columns properly
153+ df = self ._convert_kusto_datetime_columns (df )
154+
155+ return df
79156
80- def list_tables (self ) -> List [Dict [str , Any ]]:
157+ def list_tables (self , table_filter : str = None ) -> List [Dict [str , Any ]]:
81158 query = ".show tables"
82159 tables_df = self .query (query )
83160
84161 tables = []
85162 for table in tables_df .to_dict (orient = "records" ):
86163 table_name = table ['TableName' ]
164+
165+ # Apply table filter if provided
166+ if table_filter and table_filter .lower () not in table_name .lower ():
167+ continue
168+
87169 schema_result = self .query (f".show table ['{ table_name } '] schema as json" ).to_dict (orient = "records" )
88170 columns = [{
89171 'name' : r ["Name" ],
@@ -94,7 +176,10 @@ def list_tables(self) -> List[Dict[str, Any]]:
94176 row_count = row_count_result [0 ]["TotalRowCount" ]
95177
96178 sample_query = f"['{ table_name } '] | take { 5 } "
97- sample_result = json .loads (self .query (sample_query ).to_json (orient = "records" ))
179+ sample_df = self .query (sample_query )
180+
181+ # Convert sample data to JSON with proper datetime handling
182+ sample_result = json .loads (sample_df .to_json (orient = "records" , date_format = 'iso' ))
98183
99184 table_metadata = {
100185 "row_count" : row_count ,
@@ -159,7 +244,8 @@ def ingest_data(self, table_name: str, name_as: str = None, size: int = 5000000)
159244 total_rows_ingested += len (chunk_df )
160245
161246 def view_query_sample (self , query : str ) -> str :
162- return json .loads (self .query (query ).head (10 ).to_json (orient = "records" ))
247+ df = self .query (query ).head (10 )
248+ return json .loads (df .to_json (orient = "records" , date_format = 'iso' ))
163249
164250 def ingest_data_from_query (self , query : str , name_as : str ) -> pd .DataFrame :
165251 # Sanitize the table name for SQL compatibility
0 commit comments