|
| 1 | +#OPENBCI HEX TO CSV CONVERSION REFERENCE: https://github.com/roflecopter/openbci-psg |
| 2 | +# GIVES output same as OpenBCI GUI 4.2 |
| 3 | + |
| 4 | +import numpy as np |
| 5 | +import pandas as pd |
| 6 | +import os |
| 7 | +import mne |
| 8 | +from datetime import datetime |
| 9 | + |
| 10 | +def accel_scale(signal): |
| 11 | + """ |
| 12 | + Scale accelerometer data by a factor of 0.002/(2^4). |
| 13 | +
|
| 14 | + Parameters: |
| 15 | + - signal (numpy.ndarray): Accelerometer signal values. |
| 16 | +
|
| 17 | + Returns: |
| 18 | + - numpy.ndarray: Scaled accelerometer signal values. |
| 19 | + """ |
| 20 | + # Define the scaling factor for the accelerometer data |
| 21 | + accel_scale = 0.002 / (2 ** 4) |
| 22 | + |
| 23 | + # Multiply each element of the input signal by the scaling factor |
| 24 | + scaled_signal = signal * accel_scale |
| 25 | + |
| 26 | + # Return the scaled accelerometer signal |
| 27 | + return scaled_signal |
| 28 | + |
| 29 | + |
| 30 | +def parseInt24Hex(hex_value): |
| 31 | + """ |
| 32 | + Parse a 24-bit hexadecimal value into a signed integer. |
| 33 | +
|
| 34 | + Parameters: |
| 35 | + - hex_value (str): Hexadecimal value to be parsed. |
| 36 | +
|
| 37 | + Returns: |
| 38 | + - int: Parsed signed integer value. |
| 39 | + """ |
| 40 | + # Check if the hexadecimal value is less than 16 characters long |
| 41 | + if len(hex_value) < 16: |
| 42 | + # Convert the hex value to a decimal integer |
| 43 | + # If the first character is not 'F', it's a positive value; otherwise, subtract 2^32 for a negative value |
| 44 | + value_dec = int(hex_value, 16) if hex_value[0] != 'F' else int(hex_value, 16) - 2**32 |
| 45 | + return value_dec |
| 46 | + # Return 0 if the hex value is not 24 bits long |
| 47 | + return 0 |
| 48 | + |
| 49 | + |
| 50 | + |
| 51 | +def processLine(split_line): |
| 52 | + """ |
| 53 | + Process a line of hex data and convert it to a list of parsed values. |
| 54 | +
|
| 55 | + Parameters: |
| 56 | + - split_line (list): List of hex values in a line. |
| 57 | +
|
| 58 | + Returns: |
| 59 | + - list: Parsed values from the hex data. |
| 60 | + """ |
| 61 | + # Initialize an empty list to store parsed values |
| 62 | + values_array = [] |
| 63 | + |
| 64 | + # Iterate through each hex value in the split line |
| 65 | + for i in range(1, len(split_line)): |
| 66 | + value = split_line[i] |
| 67 | + |
| 68 | + # Check if the hex value corresponds to an EEG channel (i <= 16) |
| 69 | + if i <= 16: |
| 70 | + # Adjust the hex value to ensure it represents a 24-bit value and then parse it |
| 71 | + channel_value = 'FF' + value if value[0] > '7' else '00' + value |
| 72 | + value = parseInt24Hex(channel_value) |
| 73 | + else: |
| 74 | + # Adjust the hex value for non-EEG channels and then parse it |
| 75 | + aux_value = 'FFFF' + value if value[0] > '7' else '0000' + value |
| 76 | + value = parseInt24Hex(aux_value) |
| 77 | + |
| 78 | + # Append the parsed value to the list |
| 79 | + values_array.append(value) |
| 80 | + |
| 81 | + # Return the list of parsed values |
| 82 | + return values_array |
| 83 | + |
| 84 | + |
| 85 | + |
| 86 | + |
| 87 | +def process_file(file_path:str, |
| 88 | + n_ch:int = 16, |
| 89 | + n_acc:int = 3, |
| 90 | + save_path:str = None): |
| 91 | + """ |
| 92 | + Process the OpenBCI hex file and convert it to a CSV file. |
| 93 | +
|
| 94 | + Parameters: |
| 95 | + - file_path (str): Path to the input hex file. |
| 96 | + - n_ch (int): Number of EEG channels. |
| 97 | + - n_acc (int): Number of accelerometer channels. |
| 98 | + - save_path (str): Path to save the converted CSV file. |
| 99 | +
|
| 100 | + Returns: |
| 101 | + - None |
| 102 | + """ |
| 103 | + # Get the current time for creating a timestamp in the output CSV file |
| 104 | + current_time = datetime.now() |
| 105 | + formatted_time = current_time.strftime("%Y-%m-%d_%H-%M-%S") |
| 106 | + |
| 107 | + # Open the hex file for reading |
| 108 | + with open(file_path, 'r') as file: |
| 109 | + result = [] # List to store parsed values |
| 110 | + i = 0 # Counter for line index |
| 111 | + stops_n = 0 # Counter for lines starting with '%' |
| 112 | + stops = [] # List to store indices of lines starting with '%' |
| 113 | + |
| 114 | + while True: |
| 115 | + line = file.readline() |
| 116 | + if not line: |
| 117 | + break # End of file |
| 118 | + |
| 119 | + split_line = line.strip().split(',') |
| 120 | + |
| 121 | + # Check if the line starts with '%' indicating additional information |
| 122 | + if len(split_line) == 1 and split_line[0].startswith('%'): |
| 123 | + stops_n += 1 |
| 124 | + stops.append(i) |
| 125 | + # Check if the line contains valid data (number of elements between 3 and n_ch + n_acc + 1) |
| 126 | + elif (len(split_line) > 3) and (len(split_line) <= n_ch + n_acc + 1): |
| 127 | + # Process the line and obtain parsed values |
| 128 | + values = processLine(split_line) |
| 129 | + |
| 130 | + # Determine the number of values to add based on the line structure |
| 131 | + if len(values) == (n_ch + n_acc): |
| 132 | + to_add = values |
| 133 | + elif len(values) == (n_ch): |
| 134 | + to_add = values + [0, 0, 0] |
| 135 | + else: |
| 136 | + to_add = [0] * (n_ch + n_acc) |
| 137 | + |
| 138 | + # Append the processed values to the result list |
| 139 | + result.append(to_add) |
| 140 | + |
| 141 | + i += 1 |
| 142 | + if i % 1000000 == 0: |
| 143 | + print(f"Processing... {i}") |
| 144 | + |
| 145 | + # Convert the result list to a NumPy array |
| 146 | + bci_signals = np.array(result) |
| 147 | + |
| 148 | + # Apply OpenBCI scaling to EEG signals and accelerometer scaling to accelerometer signals |
| 149 | + signals_V = np.vectorize(adc_v_bci)(bci_signals[:, :16]) |
| 150 | + accel_data = np.vectorize(accel_scale)(bci_signals[:, 16:]) |
| 151 | + |
| 152 | + # Concatenate EEG and accelerometer data along the columns |
| 153 | + data = np.concatenate((signals_V, accel_data), axis=1) |
| 154 | + |
| 155 | + # Generate an index for the output CSV file |
| 156 | + index = [str(i) for i in range(1, len(data) + 1)] |
| 157 | + |
| 158 | + # Additional information to be added at the beginning of the CSV file |
| 159 | + additional_info = [ |
| 160 | + f"%OBCI SD Convert - {formatted_time}", |
| 161 | + "%", |
| 162 | + "%Sample Rate = 250.0 Hz", |
| 163 | + "%First Column = SampleIndex", |
| 164 | + "%Last Column = Timestamp", |
| 165 | + "%Other Columns = EEG data in microvolts followed by Accel Data (in G) interleaved with Aux Data", |
| 166 | + ] |
| 167 | + |
| 168 | + # Create a Pandas DataFrame from the data and save it to a CSV file |
| 169 | + make_csv = pd.DataFrame(data, index=index) |
| 170 | + file_name_with_extension = os.path.basename(file_path) |
| 171 | + file_name, file_extension = os.path.splitext(file_name_with_extension) |
| 172 | + |
| 173 | + if save_path: |
| 174 | + file_path = os.path.join(save_path, file_name) |
| 175 | + make_csv.to_csv(f"{file_path}_converted.csv") |
| 176 | + |
| 177 | + # Write additional information to the CSV file |
| 178 | + with open(f"{file_path}_converted.csv", 'w') as file: |
| 179 | + for line in additional_info: |
| 180 | + file.write(line + '\n') |
| 181 | + |
| 182 | + # Append the index to the CSV file |
| 183 | + make_csv.to_csv(f"{file_path}_converted.csv", mode='a', index=index) |
| 184 | + else: |
| 185 | + make_csv.to_csv(f"./{file_name}_converted.csv") |
| 186 | + |
| 187 | + # Write additional information to the CSV file |
| 188 | + with open(f"{file_name}_converted.csv", 'w') as file: |
| 189 | + for line in additional_info: |
| 190 | + file.write(line + '\n') |
| 191 | + |
| 192 | + # Append the index to the CSV file |
| 193 | + make_csv.to_csv(f"{file_path}_converted.csv", mode='a', index=index) |
| 194 | + |
| 195 | + return None |
| 196 | + |
| 197 | + |
| 198 | +def adc_v_bci(signal, |
| 199 | + ADS1299_VREF:float = 4.5, |
| 200 | + gain:int = 24, |
| 201 | + ADS1299_BITS:float = (2**23-1), |
| 202 | + V_Factor:int = 1000000): |
| 203 | + """ |
| 204 | + Convert ADC values to microvolts using OpenBCI scaling factors. |
| 205 | +
|
| 206 | + Parameters: |
| 207 | + - signal (numpy.ndarray): ADC signal values. |
| 208 | + - ADS1299_VREF (float): Reference voltage for the ADS1299. |
| 209 | + - gain (int): Gain setting for the ADS1299. |
| 210 | + - ADS1299_BITS (float): Number of bits for ADC resolution. |
| 211 | + - V_Factor (int): Voltage conversion factor. |
| 212 | +
|
| 213 | + Returns: |
| 214 | + - numpy.ndarray: Signal values in microvolts. |
| 215 | + """ |
| 216 | + # Gain setting for the ADS1299 |
| 217 | + ADS1299_GAIN = gain |
| 218 | + |
| 219 | + # Calculate the conversion factor 'k' using OpenBCI scaling factors |
| 220 | + k = ADS1299_VREF / ADS1299_BITS / ADS1299_GAIN * V_Factor |
| 221 | + |
| 222 | + # Multiply each element of the input signal by the conversion factor |
| 223 | + microvolts_signal = signal * k |
| 224 | + |
| 225 | + # Return the signal values in microvolts |
| 226 | + return microvolts_signal |
| 227 | + |
| 228 | + |
| 229 | + |
| 230 | +def start_converting(sd_dir:str = "./", |
| 231 | + gain:int = 24, |
| 232 | + n_ch:int = 16, |
| 233 | + n_acc:int = 3, |
| 234 | + save_path:str = ""): |
| 235 | + """ |
| 236 | + Batch process OpenBCI hex files in a directory and convert them to CSV. |
| 237 | +
|
| 238 | + Parameters: |
| 239 | + - sd_dir (str): Directory containing OpenBCI hex files. |
| 240 | + - gain (int): Gain setting for the ADS1299. |
| 241 | + - n_ch (int): Number of EEG channels. |
| 242 | + - n_acc (int): Number of accelerometer channels. |
| 243 | + - save_path (str): Directory to save the converted CSV files. |
| 244 | +
|
| 245 | + Returns: |
| 246 | + - None |
| 247 | + """ |
| 248 | + # Assign parameters to local variables |
| 249 | + gain = gain |
| 250 | + n_ch = n_ch |
| 251 | + n_acc = n_acc |
| 252 | + save_path = save_path |
| 253 | + |
| 254 | + # Get a list of files in the specified directory with a '.txt' extension |
| 255 | + files = [file for file in os.listdir(sd_dir) if file.endswith('.txt')] |
| 256 | + |
| 257 | + # Sort the files in reverse order (latest files first) |
| 258 | + if files: |
| 259 | + files.sort(reverse=True) |
| 260 | + |
| 261 | + # Iterate through each file in the sorted list |
| 262 | + for file_name in files: |
| 263 | + # Construct the full path to the file |
| 264 | + file_path = os.path.join(sd_dir, file_name) |
| 265 | + |
| 266 | + # Print a message indicating the file being processed |
| 267 | + print(f'converting: {file_name}') |
| 268 | + |
| 269 | + # Call the process_file function to convert the hex file to CSV |
| 270 | + process_file(file_path=file_path, save_path=save_path) |
| 271 | + |
| 272 | + return None |
| 273 | + |
| 274 | + |
| 275 | + |
| 276 | +def file_type_conversion(csv_path:str = "./", |
| 277 | + save_path:str = "./", |
| 278 | + num_channels:int = 16, |
| 279 | + ch_names:[str] = None, |
| 280 | + sfreq:int = 250, |
| 281 | + file_type:str = "brainvision"): |
| 282 | + """ |
| 283 | + Convert CSV files to a specified file type using MNE library. |
| 284 | +
|
| 285 | + Parameters: |
| 286 | + - csv_path (str): Directory containing CSV files. |
| 287 | + - save_path (str): Directory to save the converted files. |
| 288 | + - num_channels (int): Number of EEG channels. |
| 289 | + - ch_names (list): List of EEG channel names. |
| 290 | + - sfreq (int): Sampling frequency. |
| 291 | + - file_type (str): Output file type (e.g., "brainvision"). |
| 292 | +
|
| 293 | + Returns: |
| 294 | + - None |
| 295 | + """ |
| 296 | + |
| 297 | + # Get a list of files in the specified directory with a '.csv' extension |
| 298 | + files = [file for file in os.listdir(csv_path) if file.endswith('.csv')] |
| 299 | + |
| 300 | + # Sort the files in reverse order (latest files first) |
| 301 | + if files: |
| 302 | + files.sort(reverse=True) |
| 303 | + |
| 304 | + # Iterate through each file in the sorted list |
| 305 | + for file_name in files: |
| 306 | + # Construct the full path to the CSV file |
| 307 | + file_path = os.path.join(csv_path, file_name) |
| 308 | + |
| 309 | + # Read the CSV file, skipping the header rows |
| 310 | + csv_file = pd.read_csv(file_path ,header=None,skiprows=[0,1,2,3,4,5,6],index_col=0,sep=',',engine='python') |
| 311 | + |
| 312 | + # Remove the last 3 columns from the CSV file |
| 313 | + csv_file = csv_file.iloc[:,:-3] |
| 314 | + |
| 315 | + # Transpose the DataFrame |
| 316 | + csv_file = pd.DataFrame.transpose(csv_file) |
| 317 | + |
| 318 | + # Convert values to volts as MNE supports volts |
| 319 | + csv_file = csv_file / 1e6 |
| 320 | + |
| 321 | + # Define EEG channel types |
| 322 | + ch_types = (['eeg'] * num_channels) |
| 323 | + |
| 324 | + # If channel names are not provided, generate default names |
| 325 | + if ch_names is None: |
| 326 | + ch_names = [str(i) for i in range(1, num_channels + 1)] |
| 327 | + |
| 328 | + # Create MNE Info object |
| 329 | + info = mne.create_info(ch_names=ch_names, sfreq=sfreq, ch_types=ch_types) |
| 330 | + |
| 331 | + # Create MNE RawArray |
| 332 | + raw = mne.io.RawArray(csv_file, info) |
| 333 | + |
| 334 | + # Export the MNE RawArray to the specified file type |
| 335 | + mne.export.export_raw(f"{save_path}/{file_name}.eeg", raw, fmt=file_type, overwrite=True) |
| 336 | + |
| 337 | + return None |
| 338 | + |
| 339 | +if __name__ == "__main__": |
| 340 | + start_converting(sd_dir="./hex_data/", |
| 341 | + save_path="raw_data") |
| 342 | + |
| 343 | + file_type_conversion(csv_path = "./raw_data/", |
| 344 | + save_path = "./raw_data/", |
| 345 | + num_channels = 16, |
| 346 | + sfreq = 250, |
| 347 | + file_type = "brainvision") |
0 commit comments