The following python tool is not fully tested due to conditional access policy restrictions of public clients. Only if your organization allows public clients, this setup can be used. It is recommended to use the authorization code flow as described in: Python command line with OIDC authorization code flow
The following is an example of a python tool that can login with OIDC. The tool will will ask to enter a device code in the browser that is logged on the console.
To support this setup in Microsoft Entra ID, an app registration of type "Public client/Native (Mobile and Desktop)" has to be created without a redirect URL. In the Authentication settings "Allow public client flows" has to be enabled. The client id of this app registration and the tenant id has to be updated in the script.
The app registration has to be given permissions on the backend app registration where the Delft-FEWS WebServices is registered. The scope of the backend app registration api has to be updated in the script.
# This script demonstrates the OAuth 2.0 Device Code flow to obtain an access token for the Delft-FEWS WebServices API.
# It uses the `keyring` library to securely store the refresh token and cache the access token, allowing for seamless token refresh without user interaction.
# The script asks the user to complete authentication on the Microsoft verification page and then polls the token endpoint until authorization completes.
# Subsequent runs will reuse valid access tokens or refresh them as needed, minimizing the need for repeated logins.
import hashlib
import time
import webbrowser
import requests
import keyring
from keyring.errors import KeyringError
WEBSERVICES_URL = "https://my-server.com/FewsWebServices/"
TENANT = "my-tenant-id"
# Client ID of the App Registration in Entra ID configured as a public client.
CLIENT_ID = "my-client-id"
DEVICE_CODE_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/devicecode"
TOKEN_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/token"
# Scope needs the exposed API of the backend APP Registration (The WebServices API in Entra ID) and also Offline_Access to get a refresh token
SCOPES = [
"openid",
"profile",
"offline_access",
"api://backend-client-id/BackendApis"
]
REQUEST_TIMEOUT = 30
# unique service name for keyring to store the refresh token
SERVICE_NAME = f"delft_fews_webservices_python_oidc_tool_{hashlib.sha256(WEBSERVICES_URL.encode('utf-8')).hexdigest()[:16]}"
DEBUG_KEYRING = False
# WinVault stores passwords as UTF-16LE; 2560-byte blob limit = max 1280 chars.
# Use 1200-char chunks to stay safely under that limit.
CHUNK_SIZE = 1200
def _debug_keyring(message):
if DEBUG_KEYRING:
print(f"[keyring-debug] {message}")
def _store_chunked(key_prefix, value):
"""Store a potentially large value in keyring by splitting into CHUNK_SIZE chunks."""
chunks = [value[i: i + CHUNK_SIZE] for i in range(0, len(value), CHUNK_SIZE)]
_debug_keyring(f"store '{key_prefix}': {len(value)} chars → {len(chunks)} chunk(s)")
try:
old_count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
if old_count_str:
for i in range(len(chunks), int(old_count_str)):
try:
keyring.delete_password(SERVICE_NAME, f"{key_prefix}_{i}")
_debug_keyring(f"deleted stale chunk {key_prefix}_{i}")
except Exception:
pass
keyring.set_password(SERVICE_NAME, f"{key_prefix}_count", str(len(chunks)))
for i, chunk in enumerate(chunks):
keyring.set_password(SERVICE_NAME, f"{key_prefix}_{i}", chunk)
_debug_keyring(f"store '{key_prefix}' success")
return True
except (KeyringError, Exception) as ex:
_debug_keyring(f"store '{key_prefix}' failed: {ex}")
print(f"Warning: could not persist '{key_prefix}' in keyring ({ex}).")
return False
def _load_chunked(key_prefix):
"""Reassemble a chunked value from keyring. Returns None if missing or incomplete."""
try:
count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
if count_str is None:
_debug_keyring(f"load '{key_prefix}': not found")
return None
count = int(count_str)
chunks = []
for i in range(count):
chunk = keyring.get_password(SERVICE_NAME, f"{key_prefix}_{i}")
if chunk is None:
_debug_keyring(f"load '{key_prefix}': chunk {i} missing")
return None
chunks.append(chunk)
value = "".join(chunks)
_debug_keyring(f"load '{key_prefix}' success: {count} chunk(s), {len(value)} chars")
return value
except (KeyringError, Exception) as ex:
_debug_keyring(f"load '{key_prefix}' failed: {ex}")
print(f"Warning: could not read '{key_prefix}' from keyring ({ex}).")
return None
def _store_refresh_token(refresh_token_value):
return _store_chunked("refresh_token", refresh_token_value)
def _load_refresh_token():
return _load_chunked("refresh_token")
def _store_access_token(access_token_value, expires_in):
"""Cache access token with expiry timestamp."""
expires_at = time.time() + int(expires_in)
try:
keyring.set_password(SERVICE_NAME, "access_token_expires_at", str(expires_at))
except (KeyringError, Exception) as ex:
_debug_keyring(f"store access_token_expires_at failed: {ex}")
return False
return _store_chunked("access_token", access_token_value)
def _load_valid_access_token():
"""Return cached access token if still valid (with 60s buffer), else None."""
try:
expires_at_str = keyring.get_password(SERVICE_NAME, "access_token_expires_at")
if expires_at_str is None:
_debug_keyring("no cached access token expiry found")
return None
expires_at = float(expires_at_str)
remaining = int(expires_at - time.time())
if remaining < 60:
_debug_keyring(f"cached access token expired or expiring in {remaining}s, will refresh")
return None
value = _load_chunked("access_token")
if value:
_debug_keyring(f"reusing cached access token ({remaining}s remaining)")
return value
except (KeyringError, Exception) as ex:
_debug_keyring(f"load access token failed: {ex}")
return None
def request_device_code():
data = {
"client_id": CLIENT_ID,
"scope": " ".join(SCOPES),
}
r = requests.post(DEVICE_CODE_ENDPOINT, data=data, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
def poll_device_code(device_code, interval, expires_in):
poll_interval = max(1, int(interval or 5))
deadline = time.time() + int(expires_in or 900)
while time.time() < deadline:
time.sleep(poll_interval)
data = {
"client_id": CLIENT_ID,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
"device_code": device_code,
}
r = requests.post(TOKEN_ENDPOINT, data=data, timeout=REQUEST_TIMEOUT)
if r.status_code == 200:
return r.json()
error = None
description = ""
try:
body = r.json()
error = body.get("error")
description = body.get("error_description", "")
except ValueError:
r.raise_for_status()
if error == "authorization_pending":
continue
if error == "slow_down":
poll_interval += 5
continue
if error == "expired_token":
raise TimeoutError("Device code expired before authorization was completed")
if error == "access_denied":
raise RuntimeError("User denied device authorization")
raise RuntimeError(f"Device code authorization failed: {error} {description}".strip())
raise TimeoutError("Timed out while polling for device authorization")
def refresh_token(refresh_token):
data = {
"client_id": CLIENT_ID,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
r = requests.post(TOKEN_ENDPOINT, data=data, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
def _clear_refresh_token():
try:
count_str = keyring.get_password(SERVICE_NAME, "refresh_token_count")
if count_str:
for i in range(int(count_str)):
try:
keyring.delete_password(SERVICE_NAME, f"refresh_token_{i}")
except Exception:
pass
keyring.delete_password(SERVICE_NAME, "refresh_token_count")
except Exception:
pass
def login():
device = request_device_code()
verification_uri = device.get("verification_uri")
verification_uri_complete = device.get("verification_uri_complete")
user_code = device.get("user_code")
if verification_uri_complete:
print("Complete sign-in in your browser:")
print(verification_uri_complete)
webbrowser.open(verification_uri_complete)
else:
print("Complete sign-in using the code below:")
print(f"1) Open: {verification_uri}")
print(f"2) Enter code: {user_code}")
if verification_uri:
webbrowser.open(verification_uri)
token = poll_device_code(
device_code=device["device_code"],
interval=device.get("interval", 5),
expires_in=device.get("expires_in", 900),
)
if "refresh_token" in token:
_store_refresh_token(token["refresh_token"])
_store_access_token(token["access_token"], token.get("expires_in", 3600))
return token
def get_token():
# Reuse valid cached access token — no network call needed
access_token = _load_valid_access_token()
if access_token:
return {"access_token": access_token}
refresh = _load_refresh_token()
if refresh is None:
return login()
try:
token = refresh_token(refresh)
if "refresh_token" in token:
_store_refresh_token(token["refresh_token"])
_store_access_token(token["access_token"], token.get("expires_in", 3600))
return token
except requests.HTTPError as ex:
if ex.response is not None and ex.response.status_code in (400, 401):
_clear_refresh_token()
return login()
if __name__ == "__main__":
try:
token = get_token()
access_token = token["access_token"]
# Get all filters in JSON format. This endpoint requires authentication, so we include the access token in the Authorization header.
r = requests.get(
f"{WEBSERVICES_URL}rest/fewspiservice/v1/filters?documentFormat=PI_JSON",
headers={"Authorization": f"Bearer {access_token}"},
timeout=REQUEST_TIMEOUT,
)
r.raise_for_status()
print(r.json())
except Exception as ex:
print(f"Authentication or API request failed: {ex}")

