Skip to content

Commit a33bb9d

Browse files
committed
Add DualSecretJWTStrategy for unified secret support
Introduce a JWTStrategy subclass that accepts tokens signed with either the primary or a unified secret. Tokens are always written with the primary secret; on read, the primary is tried first and the unified secret is used as fallback. Signed-off-by: Denys Fedoryshchenko <denys.f@collabora.com>
1 parent 64a510f commit a33bb9d

2 files changed

Lines changed: 365 additions & 2 deletions

File tree

api/auth.py

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,94 @@
66

77
"""User authentication utilities"""
88

9+
from typing import Optional
10+
11+
import jwt as pyjwt
12+
13+
from fastapi_users import exceptions, models
914
from fastapi_users.authentication import (
1015
AuthenticationBackend,
1116
BearerTransport,
1217
JWTStrategy,
1318
)
19+
from fastapi_users.jwt import SecretType, decode_jwt
20+
from fastapi_users.manager import BaseUserManager
1421
from passlib.context import CryptContext
1522

1623
from .config import AuthSettings
1724

1825

26+
class DualSecretJWTStrategy(JWTStrategy):
27+
"""JWTStrategy that accepts tokens signed with either of two secrets.
28+
29+
Tokens are always *written* with the primary secret. On *read*, the
30+
primary secret is tried first; if verification fails **and** a unified
31+
secret is configured, the token is retried with the unified secret.
32+
"""
33+
34+
def __init__(
35+
self,
36+
secret: SecretType,
37+
lifetime_seconds: Optional[int],
38+
algorithm: str = "HS256",
39+
unified_secret: str = "",
40+
):
41+
super().__init__(
42+
secret=secret,
43+
lifetime_seconds=lifetime_seconds,
44+
algorithm=algorithm,
45+
)
46+
self.unified_secret = unified_secret
47+
48+
async def read_token(
49+
self,
50+
token: Optional[str],
51+
user_manager: BaseUserManager[models.UP, models.ID],
52+
) -> Optional[models.UP]:
53+
if token is None:
54+
return None
55+
56+
# Try primary secret first
57+
user = await self._decode_and_lookup(
58+
token, self.decode_key, user_manager
59+
)
60+
if user is not None:
61+
return user
62+
63+
# Fallback to unified secret
64+
if self.unified_secret:
65+
return await self._decode_and_lookup(
66+
token, self.unified_secret, user_manager
67+
)
68+
69+
return None
70+
71+
async def _decode_and_lookup(
72+
self,
73+
token: str,
74+
secret: SecretType,
75+
user_manager: BaseUserManager[models.UP, models.ID],
76+
) -> Optional[models.UP]:
77+
try:
78+
data = decode_jwt(
79+
token,
80+
secret,
81+
self.token_audience,
82+
algorithms=[self.algorithm],
83+
)
84+
user_id = data.get("sub")
85+
if user_id is None:
86+
return None
87+
except pyjwt.PyJWTError:
88+
return None
89+
90+
try:
91+
parsed_id = user_manager.parse_id(user_id)
92+
return await user_manager.get(parsed_id)
93+
except (exceptions.UserNotExists, exceptions.InvalidID):
94+
return None
95+
96+
1997
class Authentication:
2098
"""Authentication utility class"""
2199

@@ -30,12 +108,13 @@ def get_password_hash(cls, password):
30108
"""Get a password hash for a given clear text password string"""
31109
return cls.CRYPT_CTX.hash(password)
32110

33-
def get_jwt_strategy(self) -> JWTStrategy:
111+
def get_jwt_strategy(self) -> DualSecretJWTStrategy:
34112
"""Get JWT strategy for authentication backend"""
35-
return JWTStrategy(
113+
return DualSecretJWTStrategy(
36114
secret=self._settings.secret_key,
37115
algorithm=self._settings.algorithm,
38116
lifetime_seconds=self._settings.access_token_expire_seconds,
117+
unified_secret=self._settings.unified_secret,
39118
)
40119

41120
def get_user_authentication_backend(self):
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
# SPDX-License-Identifier: LGPL-2.1-or-later
2+
#
3+
# Copyright (C) 2025 Collabora Limited
4+
# Author: Denys Fedoryshchenko <denys.f@collabora.com>
5+
6+
"""Unit tests for DualSecretJWTStrategy"""
7+
8+
import time
9+
from unittest.mock import AsyncMock, MagicMock
10+
11+
import jwt as pyjwt
12+
import pytest
13+
from fastapi_users.jwt import generate_jwt
14+
15+
from api.auth import DualSecretJWTStrategy
16+
17+
PRIMARY_SECRET = "primary-secret-key"
18+
UNIFIED_SECRET = "unified-secret-key"
19+
WRONG_SECRET = "wrong-secret-key"
20+
USER_ID = "65265305c74695807499037f"
21+
AUDIENCE = ["fastapi-users:auth"]
22+
23+
24+
def _make_user_manager(user=None):
25+
"""Create a mock user manager that returns the given user."""
26+
manager = AsyncMock()
27+
manager.parse_id = MagicMock(return_value=USER_ID)
28+
manager.get = AsyncMock(return_value=user)
29+
return manager
30+
31+
32+
def _make_user():
33+
"""Create a minimal mock user."""
34+
user = MagicMock()
35+
user.id = USER_ID
36+
return user
37+
38+
39+
def _generate_token(secret, user_id=USER_ID):
40+
"""Generate a JWT token with the given secret."""
41+
data = {
42+
"sub": user_id,
43+
"aud": AUDIENCE,
44+
"email": "test@kernelci.org",
45+
"origin": "kernelci-pipeline",
46+
}
47+
return generate_jwt(data, secret, lifetime_seconds=3600)
48+
49+
50+
@pytest.mark.asyncio
51+
async def test_read_token_primary_secret():
52+
"""Token signed with primary secret should authenticate."""
53+
strategy = DualSecretJWTStrategy(
54+
secret=PRIMARY_SECRET,
55+
lifetime_seconds=3600,
56+
unified_secret=UNIFIED_SECRET,
57+
)
58+
user = _make_user()
59+
manager = _make_user_manager(user)
60+
token = _generate_token(PRIMARY_SECRET)
61+
62+
result = await strategy.read_token(token, manager)
63+
assert result is user
64+
65+
66+
@pytest.mark.asyncio
67+
async def test_read_token_unified_secret():
68+
"""Token signed with unified secret should authenticate via fallback."""
69+
strategy = DualSecretJWTStrategy(
70+
secret=PRIMARY_SECRET,
71+
lifetime_seconds=3600,
72+
unified_secret=UNIFIED_SECRET,
73+
)
74+
user = _make_user()
75+
manager = _make_user_manager(user)
76+
token = _generate_token(UNIFIED_SECRET)
77+
78+
result = await strategy.read_token(token, manager)
79+
assert result is user
80+
81+
82+
@pytest.mark.asyncio
83+
async def test_read_token_wrong_secret():
84+
"""Token signed with unknown secret should fail."""
85+
strategy = DualSecretJWTStrategy(
86+
secret=PRIMARY_SECRET,
87+
lifetime_seconds=3600,
88+
unified_secret=UNIFIED_SECRET,
89+
)
90+
manager = _make_user_manager(_make_user())
91+
token = _generate_token(WRONG_SECRET)
92+
93+
result = await strategy.read_token(token, manager)
94+
assert result is None
95+
96+
97+
@pytest.mark.asyncio
98+
async def test_read_token_no_unified_secret():
99+
"""Without unified secret, only primary secret works."""
100+
strategy = DualSecretJWTStrategy(
101+
secret=PRIMARY_SECRET,
102+
lifetime_seconds=3600,
103+
unified_secret="",
104+
)
105+
user = _make_user()
106+
manager = _make_user_manager(user)
107+
108+
# Primary should work
109+
token_ok = _generate_token(PRIMARY_SECRET)
110+
result = await strategy.read_token(token_ok, manager)
111+
assert result is user
112+
113+
# Unified-signed token should fail
114+
token_fail = _generate_token(UNIFIED_SECRET)
115+
result = await strategy.read_token(token_fail, manager)
116+
assert result is None
117+
118+
119+
@pytest.mark.asyncio
120+
async def test_read_token_none():
121+
"""None token should return None."""
122+
strategy = DualSecretJWTStrategy(
123+
secret=PRIMARY_SECRET,
124+
lifetime_seconds=3600,
125+
unified_secret=UNIFIED_SECRET,
126+
)
127+
manager = _make_user_manager()
128+
129+
result = await strategy.read_token(None, manager)
130+
assert result is None
131+
132+
133+
@pytest.mark.asyncio
134+
async def test_read_token_user_not_found():
135+
"""Valid token but user not in DB should return None."""
136+
strategy = DualSecretJWTStrategy(
137+
secret=PRIMARY_SECRET,
138+
lifetime_seconds=3600,
139+
unified_secret=UNIFIED_SECRET,
140+
)
141+
manager = _make_user_manager(user=None)
142+
token = _generate_token(PRIMARY_SECRET)
143+
144+
result = await strategy.read_token(token, manager)
145+
assert result is None
146+
147+
148+
@pytest.mark.asyncio
149+
async def test_unified_token_primary_secret():
150+
"""Unified token (all fields) signed with primary secret should work."""
151+
strategy = DualSecretJWTStrategy(
152+
secret=PRIMARY_SECRET,
153+
lifetime_seconds=3600,
154+
unified_secret=UNIFIED_SECRET,
155+
)
156+
user = _make_user()
157+
manager = _make_user_manager(user)
158+
data = {
159+
"sub": USER_ID,
160+
"email": "test@kernelci.org",
161+
"origin": "kernelci-pipeline",
162+
"permissions": ["checkout", "testretry", "patchset"],
163+
"aud": AUDIENCE,
164+
}
165+
token = generate_jwt(data, PRIMARY_SECRET, lifetime_seconds=3600)
166+
167+
result = await strategy.read_token(token, manager)
168+
assert result is user
169+
170+
171+
@pytest.mark.asyncio
172+
async def test_unified_token_unified_secret():
173+
"""Unified token (all fields) signed with unified secret should work."""
174+
strategy = DualSecretJWTStrategy(
175+
secret=PRIMARY_SECRET,
176+
lifetime_seconds=3600,
177+
unified_secret=UNIFIED_SECRET,
178+
)
179+
user = _make_user()
180+
manager = _make_user_manager(user)
181+
data = {
182+
"sub": USER_ID,
183+
"email": "test@kernelci.org",
184+
"origin": "kernelci-pipeline",
185+
"permissions": ["checkout", "testretry", "patchset"],
186+
"aud": AUDIENCE,
187+
}
188+
token = generate_jwt(data, UNIFIED_SECRET, lifetime_seconds=3600)
189+
190+
result = await strategy.read_token(token, manager)
191+
assert result is user
192+
193+
194+
@pytest.mark.asyncio
195+
async def test_unified_token_expired():
196+
"""Expired unified token should be rejected."""
197+
strategy = DualSecretJWTStrategy(
198+
secret=PRIMARY_SECRET,
199+
lifetime_seconds=3600,
200+
unified_secret=UNIFIED_SECRET,
201+
)
202+
manager = _make_user_manager(_make_user())
203+
data = {
204+
"sub": USER_ID,
205+
"email": "test@kernelci.org",
206+
"origin": "kernelci-pipeline",
207+
"permissions": ["checkout", "testretry", "patchset"],
208+
"aud": AUDIENCE,
209+
"exp": int(time.time()) - 3600, # expired 1 hour ago
210+
}
211+
token = pyjwt.encode(data, UNIFIED_SECRET, algorithm="HS256")
212+
213+
result = await strategy.read_token(token, manager)
214+
assert result is None
215+
216+
217+
@pytest.mark.asyncio
218+
async def test_unified_token_wrong_audience():
219+
"""Unified token with wrong audience should be rejected."""
220+
strategy = DualSecretJWTStrategy(
221+
secret=PRIMARY_SECRET,
222+
lifetime_seconds=3600,
223+
unified_secret=UNIFIED_SECRET,
224+
)
225+
manager = _make_user_manager(_make_user())
226+
data = {
227+
"sub": USER_ID,
228+
"email": "test@kernelci.org",
229+
"origin": "kernelci-pipeline",
230+
"permissions": ["checkout", "testretry", "patchset"],
231+
"aud": ["wrong-audience"],
232+
}
233+
token = generate_jwt(data, UNIFIED_SECRET, lifetime_seconds=3600)
234+
235+
result = await strategy.read_token(token, manager)
236+
assert result is None
237+
238+
239+
@pytest.mark.asyncio
240+
async def test_unified_token_missing_sub():
241+
"""Unified token without sub claim should be rejected."""
242+
strategy = DualSecretJWTStrategy(
243+
secret=PRIMARY_SECRET,
244+
lifetime_seconds=3600,
245+
unified_secret=UNIFIED_SECRET,
246+
)
247+
manager = _make_user_manager(_make_user())
248+
data = {
249+
"email": "test@kernelci.org",
250+
"origin": "kernelci-pipeline",
251+
"permissions": ["checkout", "testretry", "patchset"],
252+
"aud": AUDIENCE,
253+
}
254+
token = generate_jwt(data, UNIFIED_SECRET, lifetime_seconds=3600)
255+
256+
result = await strategy.read_token(token, manager)
257+
assert result is None
258+
259+
260+
@pytest.mark.asyncio
261+
async def test_write_token_uses_primary_secret():
262+
"""write_token should always use the primary secret."""
263+
strategy = DualSecretJWTStrategy(
264+
secret=PRIMARY_SECRET,
265+
lifetime_seconds=3600,
266+
unified_secret=UNIFIED_SECRET,
267+
)
268+
user = _make_user()
269+
token = await strategy.write_token(user)
270+
271+
# Should be verifiable with primary secret
272+
manager = _make_user_manager(user)
273+
result = await strategy.read_token(token, manager)
274+
assert result is user
275+
276+
# Verify it was NOT signed with unified secret by creating
277+
# a strategy that only knows the unified secret
278+
strategy_unified_only = DualSecretJWTStrategy(
279+
secret=UNIFIED_SECRET,
280+
lifetime_seconds=3600,
281+
unified_secret="",
282+
)
283+
result = await strategy_unified_only.read_token(token, manager)
284+
assert result is None

0 commit comments

Comments
 (0)