Skip to content

Commit 9a21dd6

Browse files
committed
refactoring into library
1 parent 23c9299 commit 9a21dd6

11 files changed

Lines changed: 443 additions & 202 deletions

File tree

app.py

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from recorder.settings import DEFAULT_CONFIG_PATH, DEFAULT_OUTPUT_FOLDER
2+
from recorder.device import Device
3+
from recorder.io import yaml_dump, yaml_load
4+
5+
import typer
6+
7+
from pathlib import Path
8+
from typing import Optional
9+
from PyInquirer import prompt
10+
11+
# Command line application
12+
app = typer.Typer(
13+
add_completion=False,
14+
help=__doc__,
15+
)
16+
17+
DEFAULT_OUTPUT_FOLDER_OPTION = typer.Option(
18+
DEFAULT_OUTPUT_FOLDER,
19+
help="""
20+
Path to a folder where the recorded data and the recording configuration
21+
will be stored."""
22+
)
23+
DEFAULT_CONFIG_PATH_OPTION = typer.Option(
24+
DEFAULT_CONFIG_PATH,
25+
help="Path to a `yaml` config file. Run `config` command to generate one."
26+
)
27+
28+
DEVICE_CLASSES = {cls.__name__.lower(): cls for cls in Device.__subclasses__()}
29+
DEVICE_CLASS_ARGUMENT = typer.Argument(
30+
None,
31+
metavar='DEVICE_CLASS',
32+
help=(
33+
"Case independent name of the device class to use. Available options: "
34+
f"{[d for d in DEVICE_CLASSES.keys()]}."
35+
)
36+
)
37+
DEVICE_ID_ARGUMENT = typer.Argument(
38+
None,
39+
metavar='DEVICE_ID',
40+
help=(
41+
"Numerical id of the device to use. Use the show command to see the available devices for each device class."
42+
)
43+
)
44+
VERBOSE_OPTION = typer.Option(False, "--verbose", "-v", help="Verbose output.")
45+
46+
47+
def typer_warn(message: str):
48+
return typer.secho(message, bg='black', fg='yellow')
49+
50+
51+
def choose(message: str, choices: list):
52+
# Utility function that asks the user to configure the devices
53+
return prompt({
54+
'type': 'list',
55+
'name': 'choice',
56+
'message': message,
57+
'choices': choices,
58+
})['choice']
59+
60+
61+
def wait(seconds: int = 4, label: str = 'Recording...', **kwargs):
62+
with typer.progressbar(range(seconds * 10), label=label, **kwargs) as p:
63+
for _ in p:
64+
sd.sleep(100)
65+
66+
67+
@app.command(help='Display the available devices')
68+
def show(
69+
device_class: Optional[str] = DEVICE_CLASS_ARGUMENT,
70+
verbose: bool = VERBOSE_OPTION
71+
):
72+
if device_class:
73+
try:
74+
device_classes = [DEVICE_CLASSES[device_class.lower()]]
75+
except KeyError:
76+
raise RuntimeError(
77+
f'Invalid device class `{device_class}`. Available options: '
78+
f'{list(DEVICE_CLASSES.keys())}'
79+
)
80+
else:
81+
device_classes = DEVICE_CLASSES.values()
82+
for cls in device_classes:
83+
devices = cls.find()
84+
if devices:
85+
if not verbose:
86+
devices = {i: d['name'] for i, d in devices.items()}
87+
typer.echo(f'{cls}:\n' + yaml_dump(devices))
88+
else:
89+
typer_warn(f"Could not find {cls} devices")
90+
91+
92+
def config_device_class(device: Device) -> dict:
93+
choices = []
94+
devices = device.find()
95+
if not devices:
96+
typer_warn(f"Could not find {device} devices")
97+
return choices
98+
while typer.confirm(
99+
f"Add {'another' if choices else 'a'} {device} device?"
100+
):
101+
_id = choose(
102+
message=f"Select the {device} to add to the configuration:",
103+
choices=[{
104+
'name': f"{_id} {d['name']}",
105+
'value': _id
106+
} for _id, d in devices.items()]
107+
)
108+
choices.append(devices[_id])
109+
return {i: c for i, c in enumerate(choices)}
110+
111+
112+
@app.command(help='Create a configuration `yaml` file.')
113+
def config(output: Path = DEFAULT_CONFIG_PATH_OPTION) -> dict:
114+
config = {}
115+
for name, device_class in DEVICE_CLASSES.items():
116+
config[name] = config_device_class(device_class)
117+
yaml_dump(config, to_file=output)
118+
typer.echo(
119+
f"Configuration file written to {output}. Remember that it can be "
120+
"edited manually. Check the repository for an explained example file "
121+
"https://github.com/AcousticOdometry/VAO-recorder/blob/main/example-config.yaml"
122+
)
123+
return config
124+
125+
126+
def get_config(path: Path = DEFAULT_CONFIG_PATH) -> dict:
127+
_config = None
128+
if path.exists():
129+
_config = yaml_load(path)
130+
if not _config:
131+
typer_warn(f"No configuration found in {path}. Generate one.")
132+
_config = config(path)
133+
return _config
134+
135+
136+
def record():
137+
pass
138+
139+
140+
@app.command(help='Test the available devices of a certain [device_class]')
141+
def test(
142+
device_class: Optional[str] = DEVICE_CLASS_ARGUMENT,
143+
device_id: Optional[int] = DEVICE_ID_ARGUMENT
144+
):
145+
if not device_id:
146+
config = get_config()
147+
if not device_class:
148+
pass
149+
else:
150+
# Both device_class and device_id are given
151+
pass
152+
153+
154+
if __name__ == '__main__':
155+
# Launch the command line application
156+
try:
157+
app()
158+
except RuntimeError as e:
159+
typer.secho(str(e), fg='red')

recorder/__init__.py

Whitespace-only changes.

recorder/device/__init__.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
from .device import Device
2+
from .microphone import Microphone
3+
from .realsense import RealSense

recorder/device/camera.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# def find_cameras(max_index: int = 10) -> dict:
2+
# return {
3+
# i: {
4+
# 'cap':
5+
# cap,
6+
# 'name':
7+
# cv2.videoio_registry.getBackendName(
8+
# int(cap.get(cv2.CAP_PROP_BACKEND))
9+
# ),
10+
# 'width':
11+
# int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)),
12+
# 'height':
13+
# int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)),
14+
# }
15+
# for i in range(max_index) if (cap := cv2.VideoCapture(i)).isOpened()
16+
# }
17+
18+
# Camera = Device(find_cameras, 'camera')
19+
20+
# @app.command(help='Display the available cameras')
21+
# def show_cameras():
22+
# # Remove the `cap` attribute from the camera devices
23+
# devices = {
24+
# i: {k: v
25+
# for k, v in camera.items() if k != 'cap'}
26+
# for i, camera in Camera.find().items()
27+
# }
28+
# typer.echo('Real Sense devices:\n' + yaml_dump(devices))
29+

recorder/device/device.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from typing import Dict
2+
from pathlib import Path
3+
from abc import ABC, ABCMeta, abstractmethod
4+
5+
6+
class MetaDevice(ABCMeta):
7+
8+
def __str__(cls):
9+
return cls.__name__
10+
11+
12+
class Device(ABC, metaclass=MetaDevice):
13+
14+
def __init__(self, _index: int, output_folder: Path, *args, **kwargs):
15+
self.name = kwargs.get('name', 'unknown')
16+
self.output_file = output_folder / f'{self.__name__.lower()}{_index}'
17+
18+
def __str__(self) -> str:
19+
return self.name
20+
21+
@classmethod
22+
@abstractmethod
23+
def find(cls) -> Dict[int, dict]:
24+
"""Finds devices connected to the current system.
25+
26+
Returns:
27+
dict: A dictionary of devices with a numeric `id` as key and a
28+
the device properties as value.
29+
"""
30+
pass
31+
32+
@abstractmethod
33+
def start(self) -> None:
34+
"""Starts recording.
35+
36+
Returns:
37+
None
38+
"""
39+
pass
40+
41+
@abstractmethod
42+
def stop(self) -> None:
43+
"""Stops recording.
44+
45+
Returns:
46+
None
47+
"""
48+
pass

recorder/device/microphone.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
from recorder.device import Device
2+
3+
import sounddevice as sd
4+
5+
from pathlib import Path
6+
7+
8+
class Microphone(Device):
9+
10+
def __init__(
11+
self, *args, index: int, samplerate: int, channels: int, **kwargs
12+
):
13+
super().__init__(*args, **kwargs)
14+
self.index = index
15+
self.samplerate = samplerate
16+
self.channels = channels
17+
self.stream = self._get_stream(
18+
device=index,
19+
output_path=self.output_file.with_suffix('.wav'),
20+
)
21+
22+
@classmethod
23+
def find(cls) -> dict:
24+
devices = {}
25+
for idx, d in enumerate(sd.query_devices()):
26+
if d['max_input_channels'] > 0:
27+
devices[idx] = {
28+
'samplerate': int(d['default_samplerate']),
29+
'channels': int(d['max_input_channels']),
30+
'name': d['name'],
31+
'index': idx,
32+
}
33+
return devices
34+
35+
@staticmethod
36+
def _get_stream(
37+
device: int, # Device identifier
38+
output_path: Path, # Path to where the audio file should be written
39+
samplewidth: int = 2, # Sample width in bytes
40+
samplerate: int = None, # Sample rate
41+
channels: int = None, # Number of channels
42+
**stream_kwargs, # Additional keyword arguments for sd.RawInputStream
43+
# https://python-sounddevice.readthedocs.io/en/0.4.4/api/streams.html#sounddevice.InputStream
44+
) -> sd.InputStream:
45+
f = wave.open(str(output_path), 'wb')
46+
f.setnchannels(int(channels))
47+
f.setframerate(int(samplerate))
48+
f.setsampwidth(samplewidth)
49+
return sd.RawInputStream(
50+
device=device,
51+
dtype=f'int{samplewidth*8}',
52+
samplerate=samplerate,
53+
channels=channels,
54+
callback=lambda data, N, t, status: f.writeframesraw(data),
55+
finished_callback=f.close,
56+
**stream_kwargs
57+
)
58+
59+
def start(self):
60+
self.stream.start()
61+
62+
def stop(self):
63+
self.stream.stop()
64+
self.stream.close()
65+
# Write start timestamp
66+
# with open(self.output_folder / 'audio_start.txt', 'w') as f:
67+
# f.write(str(datetime.now().timestamp()))

recorder/device/realsense.py

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
from recorder.device import Device
2+
3+
import pyrealsense2 as rs
4+
5+
6+
class RealSense(Device):
7+
8+
def __init__(self, *args, streams: dict, serial_number: str, **kwargs):
9+
super().__init__(*args, **kwargs)
10+
self.streams = streams
11+
self.serial_number = serial_number
12+
13+
@classmethod
14+
def find(cls) -> dict:
15+
devices = {}
16+
for d in rs.context().query_devices():
17+
sn = d.get_info(rs.camera_info.serial_number)
18+
config = rs.config()
19+
config.enable_device(sn)
20+
config.enable_all_streams()
21+
pipeline_profile = config.resolve(
22+
rs.pipeline_wrapper(rs.pipeline())
23+
)
24+
streams = {}
25+
for s in pipeline_profile.get_streams():
26+
name = s.stream_name()
27+
streams[name] = {
28+
'format': str(s.format())[7:], # remove 'format.'
29+
'framerate': s.fps(),
30+
'type': str(s.stream_type())[7:], # remove 'stream.'
31+
}
32+
if s.is_motion_stream_profile():
33+
streams[name]['intrinsics'] = \
34+
s.as_motion_stream_profile().get_motion_intrinsics().data
35+
elif s.is_video_stream_profile():
36+
intrinsics = s.as_video_stream_profile().get_intrinsics()
37+
streams[name].update({
38+
'coeffs': intrinsics.coeffs,
39+
'model': str(intrinsics.model)
40+
[11:], # remove 'distortion.'
41+
'fx': intrinsics.fx,
42+
'fy': intrinsics.fy,
43+
'width': intrinsics.width,
44+
'height': intrinsics.height,
45+
'ppx': intrinsics.ppx,
46+
'ppy': intrinsics.ppy,
47+
})
48+
devices[sn] = {
49+
'name': d.get_info(rs.camera_info.name),
50+
'streams': streams,
51+
'serial_number': sn,
52+
}
53+
return devices
54+
55+
56+
# RealSense = Device(find_realsense, 'RealSense')
57+
# RealSense.config_map = {
58+
# 'streams': 'streams',
59+
# **RealSense.config_map,
60+
# }

0 commit comments

Comments
 (0)