1+ from argparse import ArgumentParser
2+ import json
13import os
4+ from pathlib import Path
25import sqlite3
36import sys
4-
5- from glob import glob
7+ import typing as t
68import pandas as pd
79
810DATABASE_NAME = "mimic4.db"
911THRESHOLD_SIZE = 5 * 10 ** 7
1012CHUNKSIZE = 10 ** 6
1113
12- if os .path .exists (DATABASE_NAME ):
13- msg = "File {} already exists." .format (DATABASE_NAME )
14- print (msg )
15- sys .exit ()
16-
17- with sqlite3 .Connection (DATABASE_NAME ) as connection :
18- for f in glob ("**/*.csv*" , recursive = True ):
19- print ("Starting processing {}" .format (f ))
20- folder , filename = os .path .split (f )
21- tablename = filename .lower ()
22- if tablename .endswith ('.gz' ):
23- tablename = tablename [:- 3 ]
24- if tablename .endswith ('.csv' ):
25- tablename = tablename [:- 4 ]
26- if os .path .getsize (f ) < THRESHOLD_SIZE :
27- df = pd .read_csv (f )
28- df .to_sql (tablename , connection , index = False )
29- else :
30- # If the file is too large, let's do the work in chunks
31- for chunk in pd .read_csv (f , chunksize = CHUNKSIZE , low_memory = False ):
32- chunk .to_sql (tablename , connection , if_exists = "append" )
33- print ("Finished processing {}" .format (f ))
34-
35- print ("Should be all done!" )
14+ _MIMIC_TABLES = (
15+ # hospital EHR derived tables
16+ 'admissions' ,
17+ 'd_hcpcs' ,
18+ 'd_icd_diagnoses' ,
19+ 'd_icd_procedures' ,
20+ 'd_labitems' ,
21+ 'diagnoses_icd' ,
22+ 'drgcodes' ,
23+ 'emar' ,
24+ 'emar_detail' ,
25+ 'hcpcsevents' ,
26+ 'labevents' ,
27+ 'microbiologyevents' ,
28+ 'omr' ,
29+ 'patients' ,
30+ 'pharmacy' ,
31+ 'poe' ,
32+ 'poe_detail' ,
33+ 'prescriptions' ,
34+ 'procedures_icd' ,
35+ 'provider' ,
36+ 'services' ,
37+ 'transfers' ,
38+ # ICU derived tables
39+ 'caregiver' ,
40+ 'chartevents' ,
41+ 'd_items' ,
42+ 'datetimeevents' ,
43+ 'icustays' ,
44+ 'ingredientevents' ,
45+ 'inputevents' ,
46+ 'outputevents' ,
47+ 'procedureevents' ,
48+ )
49+
50+ def process_dataframe (df : pd .DataFrame , subjects : t .Optional [t .List [int ]] = None ) -> pd .DataFrame :
51+ for c in df .columns :
52+ if c .endswith ('time' ) or c .endswith ('date' ):
53+ df [c ] = pd .to_datetime (df [c ], format = 'ISO8601' )
54+
55+ if subjects is not None and 'subject_id' in df :
56+ df = df .loc [df ['subject_id' ].isin (subjects )]
57+
58+ return df
59+
60+ def main ():
61+ argparser = ArgumentParser ()
62+ argparser .add_argument (
63+ '--limit' , type = int , default = 0 ,
64+ help = 'Restrict the database to the first N subject_id.'
65+ )
66+ argparser .add_argument (
67+ '--data_dir' , type = str , default = '.' ,
68+ help = 'Path to the directory containing the MIMIC-IV CSV files.'
69+ )
70+ args = argparser .parse_args ()
71+
72+ # validate that we can find all the files
73+ data_dir = Path (args .data_dir ).resolve ()
74+ data_files = list (data_dir .rglob ('**/*.csv*' ))
75+ if not data_files :
76+ print (f"No CSV files found in { data_dir } " )
77+ sys .exit ()
78+
79+ # remove suffixes from data files -> also lower case tablenames
80+ # creates index aligned array for data files
81+ tablenames = []
82+ for f in data_files :
83+ while f .suffix .lower () in {'.csv' , '.gz' }:
84+ f = f .with_suffix ('' )
85+ tablenames .append (f .stem .lower ())
86+
87+ # check that all the expected tables are present
88+ expected_tables = set ([t for t in tablenames ])
89+ missing_tables = set (_MIMIC_TABLES ) - expected_tables
90+ if missing_tables :
91+ print (expected_tables )
92+ print (f"Missing tables: { missing_tables } " )
93+ sys .exit ()
94+
95+ pt = None
96+ subjects = None
97+ if args .limit > 0 :
98+ for f in data_files :
99+ if 'patients' in f .name :
100+ pt = pd .read_csv (f )
101+ break
102+ if pt is None :
103+ raise FileNotFoundError ('Unable to find a patients file in current folder.' )
104+
105+ pt = pt [['subject_id' ]].sort_values ('subject_id' ).head (args .limit )
106+ subjects = set (sorted (pt ['subject_id' ].tolist ())[:args .limit ])
107+ print (f'Limiting to { len (subjects )} subjects.' )
108+
109+ if os .path .exists (DATABASE_NAME ):
110+ msg = "File {} already exists." .format (DATABASE_NAME )
111+ print (msg )
112+ sys .exit ()
113+
114+ # For a subset of columns, we specify the data types to ensure
115+ # pandas loads the data correctly.
116+ mimic_dtypes = {
117+ "subject_id" : pd .Int64Dtype (),
118+ "hadm_id" : pd .Int64Dtype (),
119+ "stay_id" : pd .Int64Dtype (),
120+ "caregiver_id" : pd .Int64Dtype (),
121+ "provider_id" : str ,
122+ "category" : str , # d_hcpcs
123+ "parent_field_ordinal" : str ,
124+ "pharmacy_id" : pd .Int64Dtype (),
125+ "emar_seq" : pd .Int64Dtype (),
126+ "poe_seq" : pd .Int64Dtype (),
127+ "ndc" : str ,
128+ "doses_per_24_hrs" : pd .Int64Dtype (),
129+ "drg_code" : str ,
130+ "org_itemid" : pd .Int64Dtype (),
131+ "isolate_num" : pd .Int64Dtype (),
132+ "quantity" : str ,
133+ "ab_itemid" : pd .Int64Dtype (),
134+ "dilution_text" : str ,
135+ "warning" : pd .Int64Dtype (),
136+ "valuenum" : float ,
137+ }
138+
139+ row_counts = {t : 0 for t in set (tablenames ) | set (_MIMIC_TABLES )}
140+ with sqlite3 .Connection (DATABASE_NAME ) as connection :
141+ for i , f in enumerate (data_files ):
142+ tablename = tablenames [i ]
143+ print ("Starting processing {}" .format (tablename ), end = '.. ' )
144+ if os .path .getsize (f ) < THRESHOLD_SIZE :
145+ df = pd .read_csv (f , dtype = mimic_dtypes )
146+ df = process_dataframe (df , subjects = subjects )
147+ df .to_sql (tablename , connection , index = False )
148+ row_counts [tablename ] += len (df )
149+ else :
150+ # If the file is too large, let's do the work in chunks
151+ for chunk in pd .read_csv (f , chunksize = CHUNKSIZE , low_memory = False , dtype = mimic_dtypes ):
152+ chunk = process_dataframe (chunk )
153+ chunk .to_sql (tablename , connection , if_exists = "append" , index = False )
154+ row_counts [tablename ] += len (chunk )
155+ print ("done!" )
156+
157+ print ("Should be all done! Row counts of loaded data:\n " )
158+
159+ print (json .dumps (row_counts , indent = 2 ))
160+
161+
162+
163+ if __name__ == '__main__' :
164+ main ()
0 commit comments