|
| 1 | +from scipy.optimize import curve_fit |
| 2 | +from scipy.signal import savgol_filter |
| 3 | +from dtw import dtw |
| 4 | + |
| 5 | +import rosbag |
| 6 | +import numpy as np |
| 7 | + |
| 8 | +import os.path |
| 9 | +import sys |
| 10 | +import re |
| 11 | + |
| 12 | + |
| 13 | +def read_bagfile(session): |
| 14 | + if not os.path.isdir(session): |
| 15 | + print("-s must specify a directory") |
| 16 | + sys.exit() |
| 17 | + |
| 18 | + os.chdir(session) |
| 19 | + map_file = [x for x in os.listdir() if ".txt" in x][0] |
| 20 | + recordings = [x for x in os.listdir() if ".bag" in x] |
| 21 | + |
| 22 | + if len(recordings) == 0: |
| 23 | + print("There are no rosbag files in the specified directory.") |
| 24 | + sys.exit() |
| 25 | + |
| 26 | + if len(map_file) == 0: |
| 27 | + print("There are no mapping files in the specified directory.") |
| 28 | + sys.exit() |
| 29 | + |
| 30 | + full_data = {} |
| 31 | + |
| 32 | + for r in recordings: |
| 33 | + bag = rosbag.Bag(r) |
| 34 | + data_name = r.split('.')[0] |
| 35 | + full_data[data_name] = { |
| 36 | + 'response': [], |
| 37 | + 'time': [] |
| 38 | + } |
| 39 | + for topic, msg, t in bag.read_messages(): |
| 40 | + full_data[data_name]["response"].append(msg.data[2]) |
| 41 | + full_data[data_name]["time"].append(t.to_time()) |
| 42 | + |
| 43 | + full_data["map"] = {} |
| 44 | + with open(map_file, 'r') as f: |
| 45 | + unit_no = -1 |
| 46 | + for _ in range(42): |
| 47 | + line = f.readline() |
| 48 | + if 'PD' in line: |
| 49 | + result = re.match(r"= PD ([0-9]+) =", line) |
| 50 | + if result is None: |
| 51 | + continue |
| 52 | + unit_no = int(result.group(1)) - 1 |
| 53 | + full_data["map"][unit_no] = { |
| 54 | + "angle": [], |
| 55 | + "min_response": [], |
| 56 | + "max_response": [] |
| 57 | + } |
| 58 | + elif unit_no >= 0: |
| 59 | + result = re.match(r"([0-9]+), \[(.*), (.*)\]", line) |
| 60 | + if result is None: |
| 61 | + continue |
| 62 | + full_data["map"][unit_no]["angle"].append(float(result.group(1))) |
| 63 | + full_data["map"][unit_no]["min_response"].append(float(result.group(2))) |
| 64 | + full_data["map"][unit_no]["max_response"].append(float(result.group(3))) |
| 65 | + |
| 66 | + return full_data |
| 67 | + |
| 68 | + |
| 69 | +if __name__ == "__main__": |
| 70 | + import argparse |
| 71 | + import matplotlib.pyplot as plt |
| 72 | + |
| 73 | + calling_directory = os.getcwd() |
| 74 | + parser = argparse.ArgumentParser( |
| 75 | + description="Produce a recording-session plot for the pol units." |
| 76 | + ) |
| 77 | + |
| 78 | + parser.add_argument("-s", dest="session", type=str, required=True, |
| 79 | + help="The recording session directory." |
| 80 | + ) |
| 81 | + |
| 82 | + args = parser.parse_args() |
| 83 | + |
| 84 | + session = os.path.abspath(args.session) |
| 85 | + |
| 86 | + data = read_bagfile(session) |
| 87 | + |
| 88 | + def objective(x, a, b, c): |
| 89 | + return a * np.minimum(np.exp(-0.5 * np.square((x - b) / abs(c))), .8) |
| 90 | + |
| 91 | + angles_3 = np.array(data["map"][2]["angle"]) - 90 |
| 92 | + pd_3_min = abs(np.array(data["map"][2]["min_response"])) |
| 93 | + pd_3_max = abs(np.array(data["map"][2]["max_response"])) |
| 94 | + pd_3 = np.mean([pd_3_min, pd_3_max], axis=0) |
| 95 | + pd_3_sigma = np.std([pd_3_min, pd_3_max], axis=0) |
| 96 | + angles_4 = np.array(data["map"][3]["angle"]) - 90 |
| 97 | + pd_4_min = abs(np.array(data["map"][3]["min_response"])) |
| 98 | + pd_4_max = abs(np.array(data["map"][3]["max_response"])) |
| 99 | + pd_4 = np.mean([pd_4_min, pd_4_max], axis=0) |
| 100 | + pd_4_sigma = np.std([pd_4_min, pd_4_max], axis=0) |
| 101 | + |
| 102 | + run_3_1 = abs(np.array(data["1"]["response"])) |
| 103 | + run_3_2 = abs(np.array(data["2"]["response"])) |
| 104 | + time_3_1 = abs(np.array(data["1"]["time"])) |
| 105 | + time_3_2 = abs(np.array(data["2"]["time"])) |
| 106 | + |
| 107 | + alignment_3_1 = dtw(run_3_1, pd_3, keep_internals=True) |
| 108 | + angles_3_1 = angles_3[alignment_3_1.index2] |
| 109 | + |
| 110 | + alignment_3_2 = dtw(run_3_2, pd_3, keep_internals=True) |
| 111 | + angles_3_2 = angles_3[alignment_3_2.index2] |
| 112 | + |
| 113 | + popt_3, _ = curve_fit(objective, angles_3, pd_3, sigma=pd_3_sigma) |
| 114 | + print(f"PD 3: mean = {popt_3[1]:.2f}, SD = {popt_3[2]:.2f}") |
| 115 | + |
| 116 | + popt_4, _ = curve_fit(objective, angles_4, pd_4, sigma=pd_4_sigma) |
| 117 | + print(f"PD 4: mean = {popt_4[1]:.2f}, SD = {popt_4[2]:.2f}") |
| 118 | + |
| 119 | + angles_3_1 = savgol_filter(angles_3_1, 51, 3) |
| 120 | + plt.plot(angles_3_1, run_3_1, c='C0', marker='x', ls='', lw=1) |
| 121 | + |
| 122 | + angles_3_2 = savgol_filter(angles_3_2, 51, 3) |
| 123 | + plt.plot(angles_3_2, run_3_2, c='C0', marker='+', ls='', lw=1) |
| 124 | + |
| 125 | + # plt.fill_between(angles_3, np.zeros_like(angles_3), objective(angles_3, *popt_3), color='C0', alpha=0.2) |
| 126 | + # plt.fill_between(angles_4, np.zeros_like(angles_4), objective(angles_4, *popt_4), color='C1', alpha=0.2) |
| 127 | + |
| 128 | + plt.fill_between(angles_3, np.zeros_like(angles_3), pd_3, color='C0', alpha=0.2, label="unit 3") |
| 129 | + plt.fill_between(angles_4, np.zeros_like(angles_4), pd_4, color='C1', alpha=0.2, label="unit 4") |
| 130 | + |
| 131 | + # plt.plot(angles_3, pd_3, c='C0', lw=2, label='unit 3') |
| 132 | + # plt.plot(angles_4, pd_4, c='C1', lw=2, label='unit 4') |
| 133 | + |
| 134 | + plt.plot(angles_3, np.mean([abs(pd_3), abs(pd_4)], axis=0), c='k', lw=3, alpha=0.7) |
| 135 | + |
| 136 | + plt.legend() |
| 137 | + |
| 138 | + plt.xlim(-90, 90) |
| 139 | + plt.ylim(-1, 12000) |
| 140 | + |
| 141 | + plt.ylabel("photodiode response") |
| 142 | + plt.xlabel("direction of light with respect to photodiode orientation\n(minus = left, plus = right)") |
| 143 | + |
| 144 | + plt.tight_layout() |
| 145 | + plt.show() |
0 commit comments