The following python tool is not fully tested due to conditional access policy restrictions of public clients. Only if your organization allows public clients, this setup can be used. It is recommended to use the authorization code flow as described in: Python command line with OIDC authorization code flow


The following is an example of a python tool that can login with OIDC.  The tool will will ask to enter a device code in the browser that is logged on the console.

To support this setup in Microsoft Entra ID, an app registration of type "Public client/Native (Mobile and Desktop)" has to be created without a redirect URL. In the Authentication settings "Allow public client flows" has to be enabled. The client id of this app registration and the tenant id has to be updated in the script. 

The app registration has to be given permissions on the backend app registration where the Delft-FEWS WebServices is registered. The scope of the backend app registration api has to be updated in the script.






# This script demonstrates the OAuth 2.0 Device Code flow to obtain an access token for the Delft-FEWS WebServices API.
# It uses the `keyring` library to securely store the refresh token and cache the access token, allowing for seamless token refresh without user interaction.
# The script asks the user to complete authentication on the Microsoft verification page and then polls the token endpoint until authorization completes.
# Subsequent runs will reuse valid access tokens or refresh them as needed, minimizing the need for repeated logins.

import hashlib
import time
import webbrowser
import requests
import keyring
from keyring.errors import KeyringError

WEBSERVICES_URL = "https://my-server.com/FewsWebServices/"
TENANT = "my-tenant-id"
# Client ID of the App Registration in Entra ID configured as a public client.
CLIENT_ID = "my-client-id"
DEVICE_CODE_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/devicecode"
TOKEN_ENDPOINT = f"https://login.microsoftonline.com/{TENANT}/oauth2/v2.0/token"
# Scope needs the exposed API of the backend APP Registration (The WebServices API in Entra ID) and also Offline_Access to get a refresh token
SCOPES = [
    "openid",
    "profile",
    "offline_access",
    "api://backend-client-id/BackendApis"
]

REQUEST_TIMEOUT = 30

# unique service name for keyring to store the refresh token
SERVICE_NAME = f"delft_fews_webservices_python_oidc_tool_{hashlib.sha256(WEBSERVICES_URL.encode('utf-8')).hexdigest()[:16]}"
DEBUG_KEYRING = False

# WinVault stores passwords as UTF-16LE; 2560-byte blob limit = max 1280 chars.
# Use 1200-char chunks to stay safely under that limit.
CHUNK_SIZE = 1200


def _debug_keyring(message):
    if DEBUG_KEYRING:
        print(f"[keyring-debug] {message}")


def _store_chunked(key_prefix, value):
    """Store a potentially large value in keyring by splitting into CHUNK_SIZE chunks."""
    chunks = [value[i: i + CHUNK_SIZE] for i in range(0, len(value), CHUNK_SIZE)]
    _debug_keyring(f"store '{key_prefix}': {len(value)} chars → {len(chunks)} chunk(s)")
    try:
        old_count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
        if old_count_str:
            for i in range(len(chunks), int(old_count_str)):
                try:
                    keyring.delete_password(SERVICE_NAME, f"{key_prefix}_{i}")
                    _debug_keyring(f"deleted stale chunk {key_prefix}_{i}")
                except Exception:
                    pass
        keyring.set_password(SERVICE_NAME, f"{key_prefix}_count", str(len(chunks)))
        for i, chunk in enumerate(chunks):
            keyring.set_password(SERVICE_NAME, f"{key_prefix}_{i}", chunk)
        _debug_keyring(f"store '{key_prefix}' success")
        return True
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"store '{key_prefix}' failed: {ex}")
        print(f"Warning: could not persist '{key_prefix}' in keyring ({ex}).")
        return False


def _load_chunked(key_prefix):
    """Reassemble a chunked value from keyring. Returns None if missing or incomplete."""
    try:
        count_str = keyring.get_password(SERVICE_NAME, f"{key_prefix}_count")
        if count_str is None:
            _debug_keyring(f"load '{key_prefix}': not found")
            return None
        count = int(count_str)
        chunks = []
        for i in range(count):
            chunk = keyring.get_password(SERVICE_NAME, f"{key_prefix}_{i}")
            if chunk is None:
                _debug_keyring(f"load '{key_prefix}': chunk {i} missing")
                return None
            chunks.append(chunk)
        value = "".join(chunks)
        _debug_keyring(f"load '{key_prefix}' success: {count} chunk(s), {len(value)} chars")
        return value
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"load '{key_prefix}' failed: {ex}")
        print(f"Warning: could not read '{key_prefix}' from keyring ({ex}).")
        return None


def _store_refresh_token(refresh_token_value):
    return _store_chunked("refresh_token", refresh_token_value)


def _load_refresh_token():
    return _load_chunked("refresh_token")


def _store_access_token(access_token_value, expires_in):
    """Cache access token with expiry timestamp."""
    expires_at = time.time() + int(expires_in)
    try:
        keyring.set_password(SERVICE_NAME, "access_token_expires_at", str(expires_at))
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"store access_token_expires_at failed: {ex}")
        return False
    return _store_chunked("access_token", access_token_value)


def _load_valid_access_token():
    """Return cached access token if still valid (with 60s buffer), else None."""
    try:
        expires_at_str = keyring.get_password(SERVICE_NAME, "access_token_expires_at")
        if expires_at_str is None:
            _debug_keyring("no cached access token expiry found")
            return None
        expires_at = float(expires_at_str)
        remaining = int(expires_at - time.time())
        if remaining < 60:
            _debug_keyring(f"cached access token expired or expiring in {remaining}s, will refresh")
            return None
        value = _load_chunked("access_token")
        if value:
            _debug_keyring(f"reusing cached access token ({remaining}s remaining)")
        return value
    except (KeyringError, Exception) as ex:
        _debug_keyring(f"load access token failed: {ex}")
        return None


def request_device_code():
    data = {
        "client_id": CLIENT_ID,
        "scope": " ".join(SCOPES),
    }

    r = requests.post(DEVICE_CODE_ENDPOINT, data=data, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()

    return r.json()


def poll_device_code(device_code, interval, expires_in):
    poll_interval = max(1, int(interval or 5))
    deadline = time.time() + int(expires_in or 900)

    while time.time() < deadline:
        time.sleep(poll_interval)
        data = {
            "client_id": CLIENT_ID,
            "grant_type": "urn:ietf:params:oauth:grant-type:device_code",
            "device_code": device_code,
        }

        r = requests.post(TOKEN_ENDPOINT, data=data, timeout=REQUEST_TIMEOUT)
        if r.status_code == 200:
            return r.json()

        error = None
        description = ""
        try:
            body = r.json()
            error = body.get("error")
            description = body.get("error_description", "")
        except ValueError:
            r.raise_for_status()

        if error == "authorization_pending":
            continue
        if error == "slow_down":
            poll_interval += 5
            continue
        if error == "expired_token":
            raise TimeoutError("Device code expired before authorization was completed")
        if error == "access_denied":
            raise RuntimeError("User denied device authorization")

        raise RuntimeError(f"Device code authorization failed: {error} {description}".strip())

    raise TimeoutError("Timed out while polling for device authorization")


def refresh_token(refresh_token):
    data = {
        "client_id": CLIENT_ID,
        "grant_type": "refresh_token",
        "refresh_token": refresh_token,
    }

    r = requests.post(TOKEN_ENDPOINT, data=data, timeout=REQUEST_TIMEOUT)
    r.raise_for_status()

    return r.json()


def _clear_refresh_token():
    try:
        count_str = keyring.get_password(SERVICE_NAME, "refresh_token_count")
        if count_str:
            for i in range(int(count_str)):
                try:
                    keyring.delete_password(SERVICE_NAME, f"refresh_token_{i}")
                except Exception:
                    pass
            keyring.delete_password(SERVICE_NAME, "refresh_token_count")
    except Exception:
        pass


def login():
    device = request_device_code()

    verification_uri = device.get("verification_uri")
    verification_uri_complete = device.get("verification_uri_complete")
    user_code = device.get("user_code")

    if verification_uri_complete:
        print("Complete sign-in in your browser:")
        print(verification_uri_complete)
        webbrowser.open(verification_uri_complete)
    else:
        print("Complete sign-in using the code below:")
        print(f"1) Open: {verification_uri}")
        print(f"2) Enter code: {user_code}")
        if verification_uri:
            webbrowser.open(verification_uri)

    token = poll_device_code(
        device_code=device["device_code"],
        interval=device.get("interval", 5),
        expires_in=device.get("expires_in", 900),
    )

    if "refresh_token" in token:
        _store_refresh_token(token["refresh_token"])
    _store_access_token(token["access_token"], token.get("expires_in", 3600))

    return token


def get_token():
    # Reuse valid cached access token — no network call needed
    access_token = _load_valid_access_token()
    if access_token:
        return {"access_token": access_token}

    refresh = _load_refresh_token()

    if refresh is None:
        return login()

    try:
        token = refresh_token(refresh)

        if "refresh_token" in token:
            _store_refresh_token(token["refresh_token"])
        _store_access_token(token["access_token"], token.get("expires_in", 3600))

        return token

    except requests.HTTPError as ex:
        if ex.response is not None and ex.response.status_code in (400, 401):
            _clear_refresh_token()
        return login()


if __name__ == "__main__":
    try:
        token = get_token()

        access_token = token["access_token"]

        # Get all filters in JSON format. This endpoint requires authentication, so we include the access token in the Authorization header.
        r = requests.get(
            f"{WEBSERVICES_URL}rest/fewspiservice/v1/filters?documentFormat=PI_JSON",
            headers={"Authorization": f"Bearer {access_token}"},
            timeout=REQUEST_TIMEOUT,
        )
        r.raise_for_status()

        print(r.json())
    except Exception as ex:
        print(f"Authentication or API request failed: {ex}")




  • No labels