|
| 1 | +import re |
| 2 | +import requests |
| 3 | + |
| 4 | +from ..exception import ExecutionError |
| 5 | + |
| 6 | +# This driver implementes a power port for the EG_PMS2_LAN & EG_PMS2_WLAN |
| 7 | +# devices, it works through a simple HTTP interface, that requires a login. |
| 8 | +# Driver has been tested with: |
| 9 | +# * EG_PMS2_LAN |
| 10 | + |
| 11 | +# The default HTTP port for usage via the SSH proxy. |
| 12 | +PORT = 80 |
| 13 | + |
| 14 | +# The login was successful when we can locate a Log out button |
| 15 | +LOGIN_SUCCESS_REGEX = r'<div class="boxmenuitem"><a href="login\.html">Log Out</a></div>' |
| 16 | +# Search for a string similar to: "var sockstates = [1,1,1,1];" and create |
| 17 | +# a match group out of the numbers within the square brackets. |
| 18 | +SOCKSTATES_REGEX = r"var\s+sockstates\s+=\s+\[([01]),([01]),([01]),([01])\];" |
| 19 | + |
| 20 | + |
| 21 | +def login(base_url: str) -> None: |
| 22 | + """ |
| 23 | + Use the default password 1, because labgrid doesn't support password |
| 24 | + encryption, modifying the password doesn't secure the device as the |
| 25 | + password would be stored as plain-text. |
| 26 | + """ |
| 27 | + login_url = f"{base_url}/login.html" |
| 28 | + try: |
| 29 | + response = requests.post(login_url, data={'pw': 1}) |
| 30 | + except requests.exceptions.ConnectionError as err: |
| 31 | + raise ExecutionError( |
| 32 | + f"Device not found at {base_url} or the network interface of the " |
| 33 | + "device is not active (press the reset button on the device)" |
| 34 | + ) from err |
| 35 | + if response.status_code != 200 or not re.search(LOGIN_SUCCESS_REGEX, |
| 36 | + response.text): |
| 37 | + raise ExecutionError("Login to Energenie web interface failed") |
| 38 | + |
| 39 | + |
| 40 | +def logout(base_url: str) -> None: |
| 41 | + """ |
| 42 | + After a successful login, the session is reserved for the IP address. |
| 43 | + Logout explicitly to allow accessing the device from different hosts. |
| 44 | + """ |
| 45 | + response = requests.get(f"{base_url}/login.html") |
| 46 | + if response.status_code != 200: |
| 47 | + raise ExecutionError("Logout from Energenie web interface failed") |
| 48 | + |
| 49 | + |
| 50 | +def power_set(host, port, index, value): |
| 51 | + base_url = f"http://{host}:{port}" |
| 52 | + index = int(index) |
| 53 | + assert 1 <= index <= 4 |
| 54 | + |
| 55 | + value = 1 if value else 0 |
| 56 | + login(base_url=base_url) |
| 57 | + response = requests.post(base_url, data={f'cte{index}': value}) |
| 58 | + response.raise_for_status() |
| 59 | + logout(base_url=base_url) |
| 60 | + if not response.status_code == 200: |
| 61 | + raise ExecutionError(f"Switching socket at index {index} " |
| 62 | + f"{'on' if value else 'off'} failed") |
| 63 | + |
| 64 | + |
| 65 | +def power_get(host, port, index): |
| 66 | + base_url = f"http://{host}:{port}" |
| 67 | + index = int(index) |
| 68 | + assert 1 <= index <= 4 |
| 69 | + |
| 70 | + # Fetch status |
| 71 | + login(base_url=base_url) |
| 72 | + response = requests.get(f"{base_url}/energenie.html") |
| 73 | + response.raise_for_status() |
| 74 | + logout(base_url=base_url) |
| 75 | + match_group = re.search(SOCKSTATES_REGEX, response.text) |
| 76 | + sockstates = [int(match_group.group(x)) for x in range(1, 5)] |
| 77 | + return sockstates[index - 1] == 1 |
0 commit comments