Skip to content

Commit a79ed4a

Browse files
initBastiEmantor
authored andcommitted
driver/power: Add backend driver for EG PMS2 LAN/WLAN
This driver adds support for the Energenie PMS2 LAN & WLAN programmable power sockets. They are accessed over a web interface and require a login before accessing the status of the socket or switching them on or off. A password is required for the login in POST request and this driver deliberately uses the default password, as the current state of labgrid doesn't support password encryption, meaning the password would be stored in plain text. Signed-off-by: Sebastian Fricke <sebastian.fricke@posteo.net> [r.czerwinski@pengutronix.de: add additional raise_for_status on responses] Signed-off-by: Rouven Czerwinski <r.czerwinski@pengutronix.de>
1 parent 95bd0fe commit a79ed4a

4 files changed

Lines changed: 86 additions & 0 deletions

File tree

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ New Features in 0.5.0
99
considering only the closest marker.
1010
- The labgrid client SSH command is now able to instantiate the SSHDriver when
1111
there are multiple NetworkService resources available.
12+
- eg_pms2_network power port driver supports controlling the Energenie power
13+
management series with devices like the EG_PMS2_LAN & EG_PMS2_WLAN
1214

1315
Bug fixes in 0.5.0
1416
~~~~~~~~~~~~~~~~~~

doc/configuration.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,12 @@ Currently available are:
208208
Controls TP-Link power strips via `python-kasa
209209
<https://github.com/python-kasa/python-kasa>`_.
210210

211+
``eg_pms2_network``
212+
Controls the EG_PMS2_LAN & EG_PMS2_WLAN devices, through simple HTTP POST and
213+
GET requests. The device requires a password for logging into the control
214+
interface, this module deliberately uses the standard password '1' and is
215+
not compatible with a different password.
216+
211217
Used by:
212218
- `NetworkPowerDriver`_
213219

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
import re
2+
import requests
3+
4+
from ..exception import ExecutionError
5+
6+
# This driver implementes a power port for the EG_PMS2_LAN & EG_PMS2_WLAN
7+
# devices, it works through a simple HTTP interface, that requires a login.
8+
# Driver has been tested with:
9+
# * EG_PMS2_LAN
10+
11+
# The default HTTP port for usage via the SSH proxy.
12+
PORT = 80
13+
14+
# The login was successful when we can locate a Log out button
15+
LOGIN_SUCCESS_REGEX = r'<div class="boxmenuitem"><a href="login\.html">Log Out</a></div>'
16+
# Search for a string similar to: "var sockstates = [1,1,1,1];" and create
17+
# a match group out of the numbers within the square brackets.
18+
SOCKSTATES_REGEX = r"var\s+sockstates\s+=\s+\[([01]),([01]),([01]),([01])\];"
19+
20+
21+
def login(base_url: str) -> None:
22+
"""
23+
Use the default password 1, because labgrid doesn't support password
24+
encryption, modifying the password doesn't secure the device as the
25+
password would be stored as plain-text.
26+
"""
27+
login_url = f"{base_url}/login.html"
28+
try:
29+
response = requests.post(login_url, data={'pw': 1})
30+
except requests.exceptions.ConnectionError as err:
31+
raise ExecutionError(
32+
f"Device not found at {base_url} or the network interface of the "
33+
"device is not active (press the reset button on the device)"
34+
) from err
35+
if response.status_code != 200 or not re.search(LOGIN_SUCCESS_REGEX,
36+
response.text):
37+
raise ExecutionError("Login to Energenie web interface failed")
38+
39+
40+
def logout(base_url: str) -> None:
41+
"""
42+
After a successful login, the session is reserved for the IP address.
43+
Logout explicitly to allow accessing the device from different hosts.
44+
"""
45+
response = requests.get(f"{base_url}/login.html")
46+
if response.status_code != 200:
47+
raise ExecutionError("Logout from Energenie web interface failed")
48+
49+
50+
def power_set(host, port, index, value):
51+
base_url = f"http://{host}:{port}"
52+
index = int(index)
53+
assert 1 <= index <= 4
54+
55+
value = 1 if value else 0
56+
login(base_url=base_url)
57+
response = requests.post(base_url, data={f'cte{index}': value})
58+
response.raise_for_status()
59+
logout(base_url=base_url)
60+
if not response.status_code == 200:
61+
raise ExecutionError(f"Switching socket at index {index} "
62+
f"{'on' if value else 'off'} failed")
63+
64+
65+
def power_get(host, port, index):
66+
base_url = f"http://{host}:{port}"
67+
index = int(index)
68+
assert 1 <= index <= 4
69+
70+
# Fetch status
71+
login(base_url=base_url)
72+
response = requests.get(f"{base_url}/energenie.html")
73+
response.raise_for_status()
74+
logout(base_url=base_url)
75+
match_group = re.search(SOCKSTATES_REGEX, response.text)
76+
sockstates = [int(match_group.group(x)) for x in range(1, 5)]
77+
return sockstates[index - 1] == 1

tests/test_powerdriver.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ def test_import_backends(self):
171171
import labgrid.driver.power.netio_kshell
172172
import labgrid.driver.power.rest
173173
import labgrid.driver.power.sentry
174+
import labgrid.driver.power.eg_pms2_network
174175

175176
def test_import_backend_siglent(self):
176177
pytest.importorskip("vxi11")

0 commit comments

Comments
 (0)