|
7 | 7 | import json |
8 | 8 | import os |
9 | 9 | import subprocess |
| 10 | +import tempfile |
10 | 11 | from tempfile import TemporaryDirectory |
11 | 12 | from typing import Dict, List, Optional |
12 | 13 | from unittest import TestCase, mock |
|
17 | 18 | from intelmq_api import dependencies |
18 | 19 | from intelmq_api.api import runner |
19 | 20 | from intelmq_api.config import Config |
| 21 | +from intelmq_api.dependencies import session_store |
20 | 22 | from intelmq_api.main import app |
21 | 23 | from intelmq_api.runctl import RunIntelMQCtl |
| 24 | +from intelmq_api.session import SessionStore |
22 | 25 | from intelmq_api.version import __version__ |
23 | 26 |
|
24 | 27 |
|
@@ -165,3 +168,43 @@ def test_post_positions(self): |
165 | 168 | with open(f"{self.conf_dir.name}/manager/positions.conf", "r") as f: |
166 | 169 | saved = json.load(f) |
167 | 170 | self.assertEqual(saved, data) |
| 171 | + |
| 172 | + |
| 173 | +class TestAPILogin(TestCase): |
| 174 | + def setUp(self) -> None: |
| 175 | + self.client = TestClient(app=app) |
| 176 | + dependencies.startup(DummyConfig()) |
| 177 | + self.temp_dir = tempfile.TemporaryDirectory() |
| 178 | + self.addCleanup(self.temp_dir.cleanup) |
| 179 | + |
| 180 | + self.session = SessionStore(os.path.join(self.temp_dir.name, 'sessionsb'), 1000000) |
| 181 | + self.session.add_user('test', 'pass') |
| 182 | + |
| 183 | + app.dependency_overrides[session_store] = lambda: self.session |
| 184 | + app.dependency_overrides[runner] = get_dummy_reader() |
| 185 | + |
| 186 | + def tearDown(self) -> None: |
| 187 | + app.dependency_overrides = {} |
| 188 | + |
| 189 | + def test_login(self): |
| 190 | + response = self.client.post("/v1/api/login", data={"username": "test", "password": "pass"}) |
| 191 | + self.assertEqual(response.status_code, 200) |
| 192 | + self.assertIsNotNone(response.json().get("login_token")) |
| 193 | + |
| 194 | + def test_login_and_call(self): |
| 195 | + response = self.client.post("/v1/api/login", data={"username": "test", "password": "pass"}) |
| 196 | + self.assertEqual(response.status_code, 200) |
| 197 | + |
| 198 | + token = response.json().get("login_token") |
| 199 | + authorized_response = self.client.get("/v1/api/version", headers={"authorization": token}) |
| 200 | + self.assertEqual(authorized_response.status_code, 200) |
| 201 | + self.assertEqual(authorized_response.json()["intelmq-api"], __version__) |
| 202 | + |
| 203 | + def test_unauthorized_call(self): |
| 204 | + response = self.client.get("/v1/api/version") |
| 205 | + self.assertEqual(response.status_code, 401) |
| 206 | + |
| 207 | + def test_bad_token(self): |
| 208 | + response = self.client.get( |
| 209 | + "/v1/api/version", headers={"authorization": "not-a-valid-token"}) |
| 210 | + self.assertEqual(response.status_code, 401) |
0 commit comments