44
55import logging
66from abc import ABC
7- from collections .abc import Callable
7+ from collections .abc import Callable , Generator
88from dataclasses import dataclass
99from pathlib import Path
1010
@@ -87,7 +87,7 @@ def _pivot_multiselect(
8787 # response_index=pl.col("item_option").struct.field("value"),
8888 # response_name=pl.col("item_option").struct.field("name"),
8989 )
90- .drop ("response_value" , "response_raw_score" )
90+ .drop ("response_value" )
9191 # Generate pivot column.
9292 .with_columns (
9393 item_option_pivot = pl .concat_str (
@@ -107,6 +107,16 @@ def _map_response_column_names(cname: str) -> str:
107107 parts = cname .split ("__" , 1 )
108108 return "_" .join ([parts [1 ], parts [0 ].removeprefix ("response" )])
109109
110+ @staticmethod
111+ def _fill_item_response (* response_columns : str ) -> Generator [pl .Expr , None , None ]:
112+ for response_col in response_columns :
113+ yield (
114+ pl .when (pl .col (response_col ).is_null ())
115+ .then (pl .col (f"{ response_col } __name" ))
116+ .otherwise (pl .col (response_col ))
117+ .alias (response_col )
118+ )
119+
110120 @staticmethod
111121 def _pivot_singleselect (
112122 df : pl .DataFrame , option_scores : pl .DataFrame
@@ -118,27 +128,25 @@ def _pivot_singleselect(
118128 pl .col ("item_option_name" ).alias ("response_name" ),
119129 ).drop ("item_option_score" , "item_option_value" , "item_option_name" )
120130
121- return (
122- (
123- df .with_columns (
124- response_index = pl .col ("response_value" ).struct .field ("single_value" )
125- )
126- .drop ("response_value" )
127- .join (
128- response_options ,
129- on = [
130- "applet_version" ,
131- "activity_flow" ,
132- "activity" ,
133- "item" ,
134- "response_index" ,
135- ],
136- how = "left" ,
137- validate = "m:1" ,
138- )
139- .with_columns (item_name = pl .col ("item" ).struct .field ("name" ))
140- .drop ("item" )
131+ df = (
132+ df .with_columns (
133+ response_index = pl .col ("response_value" ).struct .field ("single_value" )
141134 )
135+ .drop ("response_value" )
136+ .join (
137+ response_options ,
138+ on = [
139+ "applet_version" ,
140+ "activity_flow" ,
141+ "activity" ,
142+ "item" ,
143+ "response_index" ,
144+ ],
145+ how = "left" ,
146+ validate = "m:1" ,
147+ )
148+ .with_columns (item_name = pl .col ("item" ).struct .field ("name" ))
149+ .drop ("item" )
142150 .pivot (on = "item_name" , values = cs .starts_with ("response" ), separator = "__" )
143151 .with_columns (
144152 cs .starts_with ("response" ).name .map (
@@ -148,6 +156,18 @@ def _pivot_singleselect(
148156 .drop (cs .starts_with ("response" ))
149157 )
150158
159+ response_columns = {
160+ s : s .rsplit ("__" )[0 ]
161+ for s in cs .expand_selector (df , cs .ends_with ("__score" ))
162+ }
163+ return (
164+ df .rename (response_columns ) # Rename <QUESTION>__score to <QUESTION>.
165+ .with_columns (
166+ WideFormat ._fill_item_response (* response_columns .values ())
167+ ) # Use value of __name if __score is null.
168+ .drop (cs .ends_with ("__index" , "__name" ))
169+ )
170+
151171 @staticmethod
152172 def _pivot_text (df : pl .DataFrame , option_scores : pl .DataFrame ) -> pl .DataFrame :
153173 del option_scores
@@ -156,7 +176,7 @@ def _pivot_text(df: pl.DataFrame, option_scores: pl.DataFrame) -> pl.DataFrame:
156176 response_value = pl .col ("response_value" ).struct .field ("text" ),
157177 item_name = pl .col ("item" ).struct .field ("name" ),
158178 )
159- .drop ("response_raw_score" , " item" )
179+ .drop ("item" )
160180 .pivot (on = "item_name" , values = "response_value" )
161181 )
162182
@@ -168,7 +188,7 @@ def _pivot_subscale(df: pl.DataFrame, option_scores: pl.DataFrame) -> pl.DataFra
168188 response_value = pl .col ("response_value" ).struct .field ("subscale" ),
169189 item_name = pl .col ("item" ).struct .field ("name" ),
170190 )
171- .drop ("response_raw_score" , " item" )
191+ .drop ("item" )
172192 .pivot (on = "item_name" , values = "response_value" )
173193 )
174194
@@ -188,10 +208,7 @@ def _typed_pivot(
188208 self , df : pl .DataFrame , option_scores : pl .DataFrame
189209 ) -> pl .DataFrame :
190210 df = (
191- df .with_columns (
192- response_value = pl .col ("response" ).struct .field ("value" ),
193- response_raw_score = pl .col ("response" ).struct .field ("raw_score" ),
194- )
211+ df .with_columns (response_value = pl .col ("response" ).struct .field ("value" ))
195212 .drop ("response" )
196213 .with_columns (
197214 response_value = pl .struct (
@@ -219,8 +236,7 @@ def _typed_pivot(
219236 self ._get_pivot_fn (partition_type )(partition_df , option_scores )
220237 for partition_type , partition_df in typed_partitions .items ()
221238 ]
222- metadata_columns = ["legacy_user_id" , "applet_version" ]
223- index_struct_columns = [
239+ struct_idx_columns = [
224240 "target_user" ,
225241 "source_user" ,
226242 "input_user" ,
@@ -231,17 +247,15 @@ def _typed_pivot(
231247 "activity_time" ,
232248 "activity_schedule" ,
233249 ]
234- return (
235- pl .concat (pivoted_dfs , how = "diagonal_relaxed" )
236- .with_columns (util .unnest_structs (* index_struct_columns ))
237- .drop (index_struct_columns )
238- .select (
239- pl .col ("legacy_user_id" ),
240- pl .col ("applet_version" ),
241- cs .starts_with (* index_struct_columns ),
242- ~ cs .starts_with (* (index_struct_columns + metadata_columns )),
243- )
250+
251+ df = (
252+ pl .concat (pivoted_dfs , how = "align" )
253+ .with_columns (util .unnest_structs (* struct_idx_columns ))
254+ .drop (struct_idx_columns )
244255 )
256+ idx_columns = cs .starts_with (* (["applet_version" ] + struct_idx_columns ))
257+ response_columns = cs .by_name (sorted (cs .expand_selector (df , ~ idx_columns )))
258+ return df .select (idx_columns , response_columns )
245259
246260 def _format (self , data : MindloggerData ) -> list [NamedOutput ]:
247261 if (
0 commit comments