The following is an example of a python tool that can login with OIDC. The tool will open a web browser and require the user to login. On success credentials will be stored securely in the keyring.
To support this setup in Microsoft Entra ID, an app registration of type "Mobile and Desktop" has to be created and as redirect URL http://localhost:8888/callback has to be registered. The client id of this app registration and the tenant id has to be updated in the script.
The app registration has to be given permissions on the backend app registration where the Delft-FEWS WebServices is registered. The scope of the backend app registration api has to be updated in the script.
# This script demonstrates the OAuth 2.0 Authorization Code flow with PKCE to obtain an access token for the Delft-FEWS WebServices API.
# It uses the `keyring` library to securely store the refresh token and cache the access token, allowing for seamless token refresh without user interaction.
# The script opens the user's browser for login and handles the redirect to capture the authorization code, which is then exchanged for tokens.
# Subsequent runs will reuse valid access tokens or refresh them as needed, minimizing the need for repeated logins.
import base64
import hashlib
import os
import time
import urllib.parse
import webbrowser
import requests
import keyring
from keyring.errors import KeyringError
from http.server import HTTPServer, BaseHTTPRequestHandler
WEBSERVICES_URL = "https://my-server.com/FewsWebServices/"
TENANT = "entraid-tenant-id" # Tenant ID of the Entra ID where the App Registrations are created
# Client ID op the App Registration in Entra ID of type: Mobile and desktop applications with redirect URL http://localhost:8888/callback
CLIENT_ID = "client-id-of-the-app-registration"
REDIRECT_URI = "http://localhost:8888/callback"
AUTH_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/authorize"
TOKEN_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/token"
# Scope needs the exposed API of the backend APP Registration (The WebServices API in Entra ID) and also Offline_Access to get a refresh token
SCOPES = [
"openid",
"profile",
"Offline_Access",
"api://backend-api-client-id/FewsWebServices",
]
# unique service name for keyring to store the refresh token
SERVICE_NAME = f"delft_fews_webservices_python_oidc_tool_{hashlib.sha256(WEBSERVICES_URL.encode('utf-8')).hexdigest()[:16]}"
DEBUG_KEYRING = False
# WinVault stores passwords as UTF-16LE; 2560-byte blob limit = max 1280 chars.
# Use 1200-char chunks to stay safely under that limit.
CHUNK_SIZE = 1200
def _debug_keyring(message):
if DEBUG_KEYRING:
print(f"[keyring-debug] {message}")
def _store_chunked(key_prefix, value):
"""Store a potentially large value in keyring by splitting into CHUNK_SIZE chunks."""
chunks = [value[i: i + CHUNK_SIZE] for i in range(0, len(value), CHUNK_SIZE)]
_debug_keyring(f"store '{key_prefix}': {len(value)} chars → {len(chunks)} chunk(s)")
try:
old_count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
if old_count_str:
for i in range(len(chunks), int(old_count_str)):
try:
keyring.delete_password(SERVICE_NAME, f"{key_prefix}_{i}")
_debug_keyring(f"deleted stale chunk {key_prefix}_{i}")
except Exception:
pass
keyring.set_password(SERVICE_NAME, f"{key_prefix}_count", str(len(chunks)))
for i, chunk in enumerate(chunks):
keyring.set_password(SERVICE_NAME, f"{key_prefix}_{i}", chunk)
_debug_keyring(f"store '{key_prefix}' success")
return True
except (KeyringError, Exception) as ex:
_debug_keyring(f"store '{key_prefix}' failed: {ex}")
print(f"Warning: could not persist '{key_prefix}' in keyring ({ex}).")
return False
def _load_chunked(key_prefix):
"""Reassemble a chunked value from keyring. Returns None if missing or incomplete."""
try:
count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
if count_str is None:
_debug_keyring(f"load '{key_prefix}': not found")
return None
count = int(count_str)
chunks = []
for i in range(count):
chunk = keyring.get_password(SERVICE_NAME, f"{key_prefix}_{i}")
if chunk is None:
_debug_keyring(f"load '{key_prefix}': chunk {i} missing")
return None
chunks.append(chunk)
value = "".join(chunks)
_debug_keyring(f"load '{key_prefix}' success: {count} chunk(s), {len(value)} chars")
return value
except (KeyringError, Exception) as ex:
_debug_keyring(f"load '{key_prefix}' failed: {ex}")
print(f"Warning: could not read '{key_prefix}' from keyring ({ex}).")
return None
def _store_refresh_token(refresh_token_value):
return _store_chunked("refresh_token", refresh_token_value)
def _load_refresh_token():
return _load_chunked("refresh_token")
def _store_access_token(access_token_value, expires_in):
"""Cache access token with expiry timestamp."""
expires_at = time.time() + int(expires_in)
try:
keyring.set_password(SERVICE_NAME, "access_token_expires_at", str(expires_at))
except (KeyringError, Exception) as ex:
_debug_keyring(f"store access_token_expires_at failed: {ex}")
return False
return _store_chunked("access_token", access_token_value)
def _load_valid_access_token():
"""Return cached access token if still valid (with 60s buffer), else None."""
try:
expires_at_str = keyring.get_password(SERVICE_NAME, "access_token_expires_at")
if expires_at_str is None:
_debug_keyring("no cached access token expiry found")
return None
expires_at = float(expires_at_str)
remaining = int(expires_at - time.time())
if remaining < 60:
_debug_keyring(f"cached access token expired or expiring in {remaining}s, will refresh")
return None
value = _load_chunked("access_token")
if value:
_debug_keyring(f"reusing cached access token ({remaining}s remaining)")
return value
except (KeyringError, Exception) as ex:
_debug_keyring(f"load access token failed: {ex}")
return None
auth_code = None
def generate_pkce():
verifier = base64.urlsafe_b64encode(os.urandom(40)).rstrip(b"=").decode()
challenge = base64.urlsafe_b64encode(
hashlib.sha256(verifier.encode()).digest()
).rstrip(b"=").decode()
return verifier, challenge
class CallbackHandler(BaseHTTPRequestHandler):
def do_GET(self):
global auth_code
parsed = urllib.parse.urlparse(self.path)
params = urllib.parse.parse_qs(parsed.query)
if "code" in params:
auth_code = params["code"][0]
self.send_response(200)
self.end_headers()
self.wfile.write(b"Login successful. You may close this window.")
def get_authorization_code(verifier, challenge):
params = {
"client_id": CLIENT_ID,
"response_type": "code",
"redirect_uri": REDIRECT_URI,
"response_mode": "query",
"scope": " ".join(SCOPES),
"code_challenge": challenge,
"code_challenge_method": "S256",
}
url = AUTH_ENDPOINT + "?" + urllib.parse.urlencode(params)
webbrowser.open(url)
server = HTTPServer(("localhost", 8888), CallbackHandler)
server.handle_request()
return auth_code
def exchange_code(code, verifier):
data = {
"client_id": CLIENT_ID,
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI,
"code_verifier": verifier,
}
r = requests.post(TOKEN_ENDPOINT, data=data)
r.raise_for_status()
return r.json()
def refresh_token(refresh_token):
data = {
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
r = requests.post(TOKEN_ENDPOINT, data=data)
r.raise_for_status()
return r.json()
def login():
verifier, challenge = generate_pkce()
code = get_authorization_code(verifier, challenge)
token = exchange_code(code, verifier)
_store_refresh_token(token["refresh_token"])
_store_access_token(token["access_token"], token.get("expires_in", 3600))
return token
def get_token():
# Reuse valid cached access token — no network call needed
access_token = _load_valid_access_token()
if access_token:
return {"access_token": access_token}
refresh = _load_refresh_token()
if refresh is None:
return login()
try:
token = refresh_token(refresh)
if "refresh_token" in token:
_store_refresh_token(token["refresh_token"])
_store_access_token(token["access_token"], token.get("expires_in", 3600))
return token
except requests.HTTPError:
return login()
if __name__ == "__main__":
token = get_token()
access_token = token["access_token"]
# Get all filters in JSON format. This endpoint requires authentication, so we include the access token in the Authorization header.
r = requests.get(
f"{WEBSERVICES_URL}rest/fewspiservice/v1/filters?documentFormat=PI_JSON",
headers={"Authorization": f"Bearer {access_token}"},
)
print(r.json())
