Skip to content

Commit f1b6fb6

Browse files
committed
add oauth.py for getting jwt for test/dev/whatever
1 parent 309e291 commit f1b6fb6

1 file changed

Lines changed: 80 additions & 0 deletions

File tree

src/oauth.py

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
"""Quick oauth2 authorization code flow demo for whatever.
2+
3+
Create a user, login, go to /o/applications/ and create a new application:
4+
- type: public
5+
- grant type: authorization code
6+
- redirect uri https://example.com/redirect/ (url can be whatever)
7+
8+
Then call this script with <hostname> <username> <password> <client_id>
9+
./oauth.py http://127.0.0.1:8080 username password some_client_id
10+
11+
Only works with local users, not social users.
12+
"""
13+
14+
import base64
15+
import hashlib
16+
import secrets
17+
import string
18+
import sys
19+
from urllib.parse import parse_qs
20+
from urllib.parse import urlparse
21+
22+
import requests
23+
24+
host = sys.argv[1]
25+
username = sys.argv[2]
26+
password = sys.argv[3]
27+
client_id = sys.argv[4]
28+
29+
s = requests.Session()
30+
csrf = s.get(host + "/api/csrf/")
31+
csrf = s.post(
32+
# set next to /api/csrf/ so getting the next csrftoken is easy
33+
host + "/login/?next=/api/csrf/",
34+
data={
35+
"csrfmiddlewaretoken": csrf.text.strip(),
36+
"login": username,
37+
"password": password,
38+
},
39+
headers={"Referer": host + "/login/"},
40+
)
41+
alphabet = string.ascii_uppercase + string.digits
42+
code_verifier = "".join(secrets.choice(alphabet) for i in range(43 + secrets.randbelow(86)))
43+
code_verifier_base64 = base64.urlsafe_b64encode(code_verifier.encode("utf-8"))
44+
code_challenge = hashlib.sha256(code_verifier_base64).digest()
45+
code_challenge_base64 = base64.urlsafe_b64encode(code_challenge).decode("utf-8").replace("=", "")
46+
state = "".join(secrets.choice(alphabet) for i in range(15))
47+
48+
data = {
49+
"csrfmiddlewaretoken": csrf.text.strip(),
50+
"client_id": client_id,
51+
"state": state,
52+
"redirect_uri": f"{host}/api/csrf/",
53+
"response_type": "code",
54+
"code_challenge": code_challenge_base64,
55+
"code_challenge_method": "S256",
56+
"nonce": "",
57+
"claims": "",
58+
"scope": "openid profile",
59+
"allow": "Authorize",
60+
}
61+
auth = s.post(host + "/o/authorize/", allow_redirects=False, data=data, headers={"Referer": host + "/o/authorize/"})
62+
if auth.status_code != 302: # noqa: PLR2004
63+
print(f"/o/authorize/ returned status code {auth.status_code} - no token today") # noqa: T201
64+
sys.exit(1)
65+
url = auth.headers["Location"]
66+
result = urlparse(url)
67+
qs = parse_qs(result.query)
68+
assert state == qs["state"][0] # noqa: S101
69+
authcode = qs["code"][0]
70+
token = s.post(
71+
host + "/o/token/",
72+
data={
73+
"grant_type": "authorization_code",
74+
"code": authcode,
75+
"redirect_uri": f"{host}/api/csrf/",
76+
"client_id": client_id,
77+
"code_verifier": code_verifier_base64,
78+
},
79+
)
80+
print(token.json()) # noqa: T201

0 commit comments

Comments
 (0)