33from odm2api .ODM2 .services import ReadODM2 , UpdateODM2 , DeleteODM2 , CreateODM2
44from odm2api import serviceBase
55from odm2api .ODM2 .models import *
6- from odmtools .odmservices .to_sql_newrows import get_insert , get_delete , get_update
6+ # from odmtools.odmservices.to_sql_newrows import get_insert, get_delete, get_update
77import datetime
88from odmtools .common .logger import LoggerTool
99import pandas as pd
@@ -646,14 +646,14 @@ def _get_df_query(self, values):
646646 def upsert_values (self , values ):
647647 setSchema (self ._session_factory .engine )
648648 query = self ._get_df_query (values )
649- newvals = get_insert (df = values , query = query , dup_cols = ["valuedatetime" , "resultid" ], engine = self ._session_factory .engine )
649+ newvals = get_insert (df = values , query = query , dup_cols = ["valuedatetime" , "resultid" ], engine = self ._session_factory .engine )
650650 if not newvals .empty :
651651 self .insert_values (newvals )
652- delvals = get_delete (df = values , query = query , dup_cols = ["valuedatetime" , "resultid" ], engine = self ._session_factory .engine )
652+ delvals = get_delete (df = values , query = query , dup_cols = ["valuedatetime" , "resultid" ], engine = self ._session_factory .engine )
653653 if not delvals .empty :
654654 self .delete_dvs (delvals ["valuedatetime" ].tolist ())
655655
656- upvals = get_update (df = values , query = query , dup_cols = ["valuedatetime" , "resultid" ], engine = self ._session_factory .engine )
656+ upvals = get_update (df = values , query = query , dup_cols = ["valuedatetime" , "resultid" ], engine = self ._session_factory .engine )
657657 if not upvals .empty :
658658 self .update_values (upvals )
659659
@@ -981,3 +981,46 @@ def get_values_by_series(self, series_id):
981981 q = q .order_by (TimeSeriesResultValues .ValueDateTime )
982982
983983 return q .all ()
984+
985+
986+ def get_delete (df , engine , query , dup_cols = []):
987+ #query = get_df_query(df, tablename, dup_cols, filter_continuous_col=filter_continuous_col, filter_categorical_col=filter_categorical_col, filter_equal_col= filter_equal_col)
988+ df .drop_duplicates (dup_cols , keep = 'last' , inplace = True )
989+ newdf = pd .merge (df , pd .read_sql (query , engine ), how = 'right' , on = dup_cols , indicator = True )
990+ newdf = newdf [newdf ['_merge' ] == 'right_only' ]
991+ newdf .drop (['_merge' ], axis = 1 , inplace = True )
992+ return df [df ['valuedatetime' ].isin (newdf ['valuedatetime' ])]
993+
994+ def get_update (df , engine , query , dup_cols = []):
995+ #query = get_df_query(df, tablename, dup_cols, filter_continuous_col=filter_continuous_col, filter_categorical_col=filter_categorical_col, filter_equal_col= filter_equal_col)
996+ df .drop_duplicates (dup_cols , keep = 'last' , inplace = True )
997+ newdf = pd .merge (df , pd .read_sql (query , engine ), how = 'inner' , on = dup_cols , indicator = True )
998+ #newdf = newdf[newdf['_merge'] == 'right_only']
999+ newdf .drop (['_merge' ], axis = 1 , inplace = True )
1000+ test = newdf [newdf ['datavalue_x' ] != newdf ['datavalue_y' ]]
1001+ return df [df ['valuedatetime' ].isin (test ['valuedatetime' ])]
1002+
1003+ def get_insert (df , engine , query , dup_cols = []):
1004+ """
1005+ Remove rows from a dataframe that already exist in a database
1006+ Required:
1007+ df : dataframe to remove duplicate rows from
1008+ engine: SQLAlchemy engine object
1009+ tablename: tablename to check duplicates in
1010+ dup_cols: list or tuple of column names to check for duplicate row values
1011+ Optional:
1012+ filter_continuous_col: the name of the continuous data column for BETWEEEN min/max filter
1013+ can be either a datetime, int, or float data type
1014+ useful for restricting the database table size to check
1015+ filter_categorical_col : the name of the categorical data column for Where = value check
1016+ Creates an "IN ()" check on the unique values in this column
1017+ Returns
1018+ Unique list of values from dataframe compared to database table
1019+ """
1020+
1021+ #query = get_df_query(df, tablename, dup_cols, filter_continuous_col=filter_continuous_col, filter_categorical_col=filter_categorical_col, filter_equal_col= filter_equal_col)
1022+ df .drop_duplicates (dup_cols , keep = 'last' , inplace = True )
1023+ newdf = pd .merge (df , pd .read_sql (query , engine ), how = 'left' , on = dup_cols , indicator = True )
1024+ newdf = newdf [newdf ['_merge' ] == 'left_only' ]
1025+ newdf .drop (['_merge' ], axis = 1 , inplace = True )
1026+ return df [df ['valuedatetime' ].isin (newdf ['valuedatetime' ])]
0 commit comments