The following is an example of a python tool that can login with OIDC.  The tool will open a web browser and require the user to login. On success credentials will be stored securely in the keyring.

To support this setup in Microsoft Entra ID, an app registration of type "Mobile and Desktop" has to be created and as redirect URL http://localhost:8888/callback has to be registered. The client id of this app registration and the tenant id has to be updated in the script.

The app registration has to be given permissions on the backend app registration where the Delft-FEWS WebServices is registered. The scope of the backend app registration api has to be updated in the script.




# This script demonstrates the OAuth 2.0 Authorization Code flow with PKCE to obtain an access token for the Delft-FEWS WebServices API.
# It uses the `keyring` library to securely store the refresh token and cache the access token, allowing for seamless token refresh without user interaction.
# The script opens the user's browser for login and handles the redirect to capture the authorization code, which is then exchanged for tokens.
# Subsequent runs will reuse valid access tokens or refresh them as needed, minimizing the need for repeated logins.

import base64
import hashlib
import os
import time
import urllib.parse
import webbrowser
import requests
import keyring
from keyring.errors import KeyringError
from http.server import HTTPServer, BaseHTTPRequestHandler

WEBSERVICES_URL = "https://my-server.com/FewsWebServices/"
TENANT = "entraid-tenant-id"  # Tenant ID of the Entra ID where the App Registrations are created
# Client ID op the App Registration in Entra ID of type: Mobile and desktop applications with redirect URL http://localhost:8888/callback
CLIENT_ID = "client-id-of-the-app-registration"
REDIRECT_URI = "http://localhost:8888/callback"
AUTH_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/authorize"
TOKEN_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/token"
# Scope needs the exposed API of the backend APP Registration (The WebServices API in Entra ID) and also Offline_Access to get a refresh token
SCOPES = [
    "openid",
    "profile",
    "Offline_Access",
    "api://backend-api-client-id/FewsWebServices",
]

# unique service name for keyring to store the refresh token
SERVICE_NAME = f"delft_fews_webservices_python_oidc_tool_{hashlib.sha256(WEBSERVICES_URL.encode('utf-8')).hexdigest()[:16]}"
DEBUG_KEYRING = False

# WinVault stores passwords as UTF-16LE; 2560-byte blob limit = max 1280 chars.
# Use 1200-char chunks to stay safely under that limit.
CHUNK_SIZE = 1200


def _debug_keyring(message):
    if DEBUG_KEYRING:
        print(f"[keyring-debug] {message}")


def _store_chunked(key_prefix, value):
    """Store a potentially large value in keyring by splitting into CHUNK_SIZE chunks."""
    chunks = [value[i: i + CHUNK_SIZE] for i in range(0, len(value), CHUNK_SIZE)]
    _debug_keyring(f"store '{key_prefix}': {len(value)} chars → {len(chunks)} chunk(s)")
    try:
        old_count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
        if old_count_str:
            for i in range(len(chunks), int(old_count_str)):
                try:
                    keyring.delete_password(SERVICE_NAME, f"{key_prefix}_{i}")
                    _debug_keyring(f"deleted stale chunk {key_prefix}_{i}")
                except Exception:
                    pass
        keyring.set_password(SERVICE_NAME, f"{key_prefix}_count", str(len(chunks)))
        for i, chunk in enumerate(chunks):
            keyring.set_password(SERVICE_NAME, f"{key_prefix}_{i}", chunk)
        _debug_keyring(f"store '{key_prefix}' success")
        return True
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"store '{key_prefix}' failed: {ex}")
        print(f"Warning: could not persist '{key_prefix}' in keyring ({ex}).")
        return False


def _load_chunked(key_prefix):
    """Reassemble a chunked value from keyring. Returns None if missing or incomplete."""
    try:
        count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
        if count_str is None:
            _debug_keyring(f"load '{key_prefix}': not found")
            return None
        count = int(count_str)
        chunks = []
        for i in range(count):
            chunk = keyring.get_password(SERVICE_NAME, f"{key_prefix}_{i}")
            if chunk is None:
                _debug_keyring(f"load '{key_prefix}': chunk {i} missing")
                return None
            chunks.append(chunk)
        value = "".join(chunks)
        _debug_keyring(f"load '{key_prefix}' success: {count} chunk(s), {len(value)} chars")
        return value
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"load '{key_prefix}' failed: {ex}")
        print(f"Warning: could not read '{key_prefix}' from keyring ({ex}).")
        return None


def _store_refresh_token(refresh_token_value):
    return _store_chunked("refresh_token", refresh_token_value)


def _load_refresh_token():
    return _load_chunked("refresh_token")


def _store_access_token(access_token_value, expires_in):
    """Cache access token with expiry timestamp."""
    expires_at = time.time() + int(expires_in)
    try:
        keyring.set_password(SERVICE_NAME, "access_token_expires_at", str(expires_at))
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"store access_token_expires_at failed: {ex}")
        return False
    return _store_chunked("access_token", access_token_value)


def _load_valid_access_token():
    """Return cached access token if still valid (with 60s buffer), else None."""
    try:
        expires_at_str = keyring.get_password(SERVICE_NAME, "access_token_expires_at")
        if expires_at_str is None:
            _debug_keyring("no cached access token expiry found")
            return None
        expires_at = float(expires_at_str)
        remaining = int(expires_at - time.time())
        if remaining < 60:
            _debug_keyring(f"cached access token expired or expiring in {remaining}s, will refresh")
            return None
        value = _load_chunked("access_token")
        if value:
            _debug_keyring(f"reusing cached access token ({remaining}s remaining)")
        return value
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"load access token failed: {ex}")
        return None


auth_code = None


def generate_pkce():
    verifier = base64.urlsafe_b64encode(os.urandom(40)).rstrip(b"=").decode()
    challenge = base64.urlsafe_b64encode(
        hashlib.sha256(verifier.encode()).digest()
    ).rstrip(b"=").decode()

    return verifier, challenge


class CallbackHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        global auth_code

        parsed = urllib.parse.urlparse(self.path)
        params = urllib.parse.parse_qs(parsed.query)

        if "code" in params:
            auth_code = params["code"][0]

        self.send_response(200)
        self.end_headers()
        self.wfile.write(b"Login successful. You may close this window.")


def get_authorization_code(verifier, challenge):
    params = {
        "client_id": CLIENT_ID,
        "response_type": "code",
        "redirect_uri": REDIRECT_URI,
        "response_mode": "query",
        "scope": " ".join(SCOPES),
        "code_challenge": challenge,
        "code_challenge_method": "S256",
    }

    url = AUTH_ENDPOINT + "?" + urllib.parse.urlencode(params)

    webbrowser.open(url)

    server = HTTPServer(("localhost", 8888), CallbackHandler)
    server.handle_request()

    return auth_code


def exchange_code(code, verifier):
    data = {
        "client_id": CLIENT_ID,
        "grant_type": "authorization_code",
        "code": code,
        "redirect_uri": REDIRECT_URI,
        "code_verifier": verifier,
    }

    r = requests.post(TOKEN_ENDPOINT, data=data)
    r.raise_for_status()

    return r.json()


def refresh_token(refresh_token):
    data = {
        "client_id": CLIENT_ID,
        "grant_type": "refresh_token",
        "refresh_token": refresh_token,
    }

    r = requests.post(TOKEN_ENDPOINT, data=data)
    r.raise_for_status()

    return r.json()


def login():
    verifier, challenge = generate_pkce()

    code = get_authorization_code(verifier, challenge)

    token = exchange_code(code, verifier)

    _store_refresh_token(token["refresh_token"])
    _store_access_token(token["access_token"], token.get("expires_in", 3600))

    return token


def get_token():
    # Reuse valid cached access token — no network call needed
    access_token = _load_valid_access_token()
    if access_token:
        return {"access_token": access_token}

    refresh = _load_refresh_token()

    if refresh is None:
        return login()

    try:
        token = refresh_token(refresh)

        if "refresh_token" in token:
            _store_refresh_token(token["refresh_token"])
        _store_access_token(token["access_token"], token.get("expires_in", 3600))

        return token

    except requests.HTTPError:
        return login()


if __name__ == "__main__":
    token = get_token()

    access_token = token["access_token"]

    # Get all filters in JSON format. This endpoint requires authentication, so we include the access token in the Authorization header.
    r = requests.get(
        f"{WEBSERVICES_URL}rest/fewspiservice/v1/filters?documentFormat=PI_JSON",
        headers={"Authorization": f"Bearer {access_token}"},
    )

    print(r.json())



  • No labels