skills/4444j99/a-i--skills/oauth-flow-architect

oauth-flow-architect

SKILL.md

OAuth Flow Architect

This skill provides guidance for implementing OAuth 2.0 and OpenID Connect (OIDC) authentication flows securely and correctly.

Core Competencies

  • OAuth 2.0 Flows: Authorization Code, PKCE, Client Credentials
  • OpenID Connect: ID tokens, UserInfo, discovery
  • Token Management: Refresh, revocation, storage
  • Security: CSRF, token theft, redirect URI validation

OAuth 2.0 Fundamentals

The Problem OAuth Solves

Without OAuth:                   With OAuth:
┌──────┐  credentials  ┌──────┐  ┌──────┐            ┌──────┐
│ User │──────────────▶│ App  │  │ User │            │ App  │
└──────┘               └──┬───┘  └──┬───┘            └──┬───┘
                          │         │ Login at          │
                          │         │ provider          │
                          ▼         ▼                   │
                       ┌──────┐  ┌──────┐  token     ┌──────┐
                       │Google│  │Google│───────────▶│Google│
                       └──────┘  └──────┘            └──────┘

App has your password        App never sees password

OAuth Roles

Role Description Example
Resource Owner User who owns data End user
Client Application requesting access Your app
Authorization Server Issues tokens Google, Auth0
Resource Server Hosts protected resources Google API

Grant Types Overview

Grant Type Use Case Security Level
Authorization Code + PKCE Web apps, mobile, SPAs Highest
Authorization Code Traditional server apps High
Client Credentials Machine-to-machine High
Refresh Token Token renewal High
Implicit (deprecated) Legacy SPAs Low
Password (deprecated) Legacy migrations Low

Authorization Code Flow with PKCE

The recommended flow for all user-facing applications.

Flow Diagram

┌──────┐                              ┌─────────────┐                    ┌──────────┐
│ User │                              │   Client    │                    │  Auth    │
│      │                              │   (App)     │                    │  Server  │
└──┬───┘                              └──────┬──────┘                    └────┬─────┘
   │  1. Click "Login"                       │                               │
   │────────────────────────────────────────▶│                               │
   │                                         │  2. Generate code_verifier    │
   │                                         │     code_challenge = SHA256() │
   │                                         │                               │
   │  3. Redirect to authorization endpoint  │                               │
   │◀────────────────────────────────────────│                               │
   │                                         │                               │
   │  4. Redirect (login at auth server)     │                               │
   │────────────────────────────────────────────────────────────────────────▶│
   │                                         │                               │
   │  5. User authenticates & consents       │                               │
   │◀────────────────────────────────────────────────────────────────────────│
   │                                         │                               │
   │  6. Redirect with authorization code    │                               │
   │────────────────────────────────────────▶│                               │
   │                                         │                               │
   │                                         │  7. Exchange code + verifier  │
   │                                         │     for tokens                │
   │                                         │──────────────────────────────▶│
   │                                         │                               │
   │                                         │  8. Access token + ID token   │
   │                                         │◀──────────────────────────────│
   │                                         │                               │
   │  9. User is logged in                   │                               │
   │◀────────────────────────────────────────│                               │

Implementation

import secrets
import hashlib
import base64
from urllib.parse import urlencode

class OAuthClient:
    """OAuth 2.0 client with PKCE"""

    def __init__(self, config):
        self.client_id = config['client_id']
        self.client_secret = config.get('client_secret')  # Optional with PKCE
        self.redirect_uri = config['redirect_uri']
        self.authorization_endpoint = config['authorization_endpoint']
        self.token_endpoint = config['token_endpoint']
        self.scopes = config.get('scopes', ['openid', 'profile', 'email'])

    def generate_pkce(self):
        """Generate PKCE code verifier and challenge"""
        # Code verifier: 43-128 chars, URL-safe
        code_verifier = secrets.token_urlsafe(32)

        # Code challenge: SHA256 hash of verifier
        digest = hashlib.sha256(code_verifier.encode()).digest()
        code_challenge = base64.urlsafe_b64encode(digest).rstrip(b'=').decode()

        return code_verifier, code_challenge

    def get_authorization_url(self, state=None):
        """Build authorization URL for redirect"""
        code_verifier, code_challenge = self.generate_pkce()

        # State for CSRF protection
        state = state or secrets.token_urlsafe(16)

        params = {
            'response_type': 'code',
            'client_id': self.client_id,
            'redirect_uri': self.redirect_uri,
            'scope': ' '.join(self.scopes),
            'state': state,
            'code_challenge': code_challenge,
            'code_challenge_method': 'S256'
        }

        url = f"{self.authorization_endpoint}?{urlencode(params)}"

        return {
            'url': url,
            'state': state,
            'code_verifier': code_verifier  # Store server-side
        }

    async def exchange_code(self, code, code_verifier):
        """Exchange authorization code for tokens"""
        data = {
            'grant_type': 'authorization_code',
            'client_id': self.client_id,
            'code': code,
            'redirect_uri': self.redirect_uri,
            'code_verifier': code_verifier
        }

        # Include client_secret if confidential client
        if self.client_secret:
            data['client_secret'] = self.client_secret

        response = await self.http.post(
            self.token_endpoint,
            data=data,
            headers={'Content-Type': 'application/x-www-form-urlencoded'}
        )

        if response.status_code != 200:
            raise OAuthError(response.json())

        return response.json()  # {access_token, refresh_token, id_token, ...}

Callback Handler

from flask import request, session, redirect

@app.route('/callback')
async def oauth_callback():
    # Verify state to prevent CSRF
    state = request.args.get('state')
    stored_state = session.get('oauth_state')

    if not state or state != stored_state:
        return 'Invalid state parameter', 400

    # Check for errors
    error = request.args.get('error')
    if error:
        error_desc = request.args.get('error_description', 'Unknown error')
        return f'OAuth error: {error_desc}', 400

    # Exchange code for tokens
    code = request.args.get('code')
    code_verifier = session.get('oauth_code_verifier')

    try:
        tokens = await oauth_client.exchange_code(code, code_verifier)
    except OAuthError as e:
        return f'Token exchange failed: {e}', 400

    # Validate ID token if using OIDC
    if 'id_token' in tokens:
        user_info = validate_id_token(tokens['id_token'])
    else:
        user_info = await fetch_userinfo(tokens['access_token'])

    # Create session
    session['user'] = user_info
    session['tokens'] = tokens

    # Clean up OAuth state
    session.pop('oauth_state', None)
    session.pop('oauth_code_verifier', None)

    return redirect('/dashboard')

OpenID Connect

OIDC adds identity layer on top of OAuth 2.0.

ID Token Structure

# ID token is a JWT with claims
{
    # Standard claims
    "iss": "https://accounts.google.com",  # Issuer
    "sub": "110169484474386276334",         # Subject (user ID)
    "aud": "your-client-id",                # Audience
    "exp": 1706616000,                      # Expiration
    "iat": 1706612400,                      # Issued at
    "nonce": "abc123",                      # Replay protection

    # Profile claims
    "name": "Alice Smith",
    "email": "alice@example.com",
    "email_verified": true,
    "picture": "https://..."
}

ID Token Validation

import jwt
from jwt import PyJWKClient

class IDTokenValidator:
    """Validate OIDC ID tokens"""

    def __init__(self, issuer, client_id, jwks_uri):
        self.issuer = issuer
        self.client_id = client_id
        self.jwks_client = PyJWKClient(jwks_uri)

    def validate(self, id_token, nonce=None):
        """Validate and decode ID token"""
        try:
            # Get signing key
            signing_key = self.jwks_client.get_signing_key_from_jwt(id_token)

            # Decode and validate
            claims = jwt.decode(
                id_token,
                signing_key.key,
                algorithms=['RS256'],
                audience=self.client_id,
                issuer=self.issuer
            )

            # Verify nonce if provided
            if nonce and claims.get('nonce') != nonce:
                raise ValueError('Invalid nonce')

            return claims

        except jwt.ExpiredSignatureError:
            raise AuthenticationError('ID token expired')
        except jwt.InvalidAudienceError:
            raise AuthenticationError('Invalid audience')
        except jwt.InvalidIssuerError:
            raise AuthenticationError('Invalid issuer')
        except Exception as e:
            raise AuthenticationError(f'Token validation failed: {e}')

OIDC Discovery

async def discover_oidc_config(issuer):
    """Fetch OIDC provider configuration"""
    discovery_url = f"{issuer.rstrip('/')}/.well-known/openid-configuration"

    response = await http.get(discovery_url)
    config = response.json()

    return {
        'authorization_endpoint': config['authorization_endpoint'],
        'token_endpoint': config['token_endpoint'],
        'userinfo_endpoint': config['userinfo_endpoint'],
        'jwks_uri': config['jwks_uri'],
        'scopes_supported': config['scopes_supported'],
        'response_types_supported': config['response_types_supported']
    }

# Example: Google
# https://accounts.google.com/.well-known/openid-configuration

Client Credentials Flow

For machine-to-machine authentication (no user involved).

class ClientCredentialsAuth:
    """OAuth client credentials flow"""

    def __init__(self, client_id, client_secret, token_endpoint):
        self.client_id = client_id
        self.client_secret = client_secret
        self.token_endpoint = token_endpoint
        self._token = None
        self._token_expiry = None

    async def get_token(self, scopes=None):
        """Get access token, refreshing if needed"""
        if self._token and self._token_expiry > time.time():
            return self._token

        data = {
            'grant_type': 'client_credentials',
            'client_id': self.client_id,
            'client_secret': self.client_secret
        }

        if scopes:
            data['scope'] = ' '.join(scopes)

        response = await http.post(
            self.token_endpoint,
            data=data,
            headers={'Content-Type': 'application/x-www-form-urlencoded'}
        )

        tokens = response.json()
        self._token = tokens['access_token']
        self._token_expiry = time.time() + tokens.get('expires_in', 3600) - 60

        return self._token

Token Management

Refresh Token Flow

class TokenManager:
    """Manage access and refresh tokens"""

    def __init__(self, oauth_client, token_storage):
        self.oauth = oauth_client
        self.storage = token_storage

    async def get_valid_access_token(self, user_id):
        """Get valid access token, refreshing if needed"""
        tokens = await self.storage.get_tokens(user_id)

        if not tokens:
            raise AuthenticationError('No tokens found')

        # Check if access token is still valid (with buffer)
        if tokens.get('expires_at', 0) > time.time() + 60:
            return tokens['access_token']

        # Refresh the token
        if 'refresh_token' not in tokens:
            raise AuthenticationError('No refresh token, re-auth required')

        new_tokens = await self._refresh(tokens['refresh_token'])
        await self.storage.save_tokens(user_id, new_tokens)

        return new_tokens['access_token']

    async def _refresh(self, refresh_token):
        """Exchange refresh token for new access token"""
        data = {
            'grant_type': 'refresh_token',
            'client_id': self.oauth.client_id,
            'refresh_token': refresh_token
        }

        if self.oauth.client_secret:
            data['client_secret'] = self.oauth.client_secret

        response = await http.post(
            self.oauth.token_endpoint,
            data=data
        )

        if response.status_code != 200:
            raise TokenRefreshError(response.json())

        tokens = response.json()
        tokens['expires_at'] = time.time() + tokens.get('expires_in', 3600)

        return tokens

Token Storage Security

class SecureTokenStorage:
    """Store tokens securely"""

    def __init__(self, encryption_key, backend):
        self.cipher = Fernet(encryption_key)
        self.backend = backend  # Redis, database, etc.

    async def save_tokens(self, user_id, tokens):
        """Encrypt and store tokens"""
        # Encrypt sensitive fields
        encrypted = {
            'access_token': self._encrypt(tokens['access_token']),
            'expires_at': tokens['expires_at']
        }

        if 'refresh_token' in tokens:
            encrypted['refresh_token'] = self._encrypt(tokens['refresh_token'])

        if 'id_token' in tokens:
            # ID token doesn't need encryption (it's signed, not secret)
            encrypted['id_token'] = tokens['id_token']

        await self.backend.set(f"tokens:{user_id}", json.dumps(encrypted))

    async def get_tokens(self, user_id):
        """Retrieve and decrypt tokens"""
        data = await self.backend.get(f"tokens:{user_id}")
        if not data:
            return None

        encrypted = json.loads(data)

        return {
            'access_token': self._decrypt(encrypted['access_token']),
            'refresh_token': self._decrypt(encrypted.get('refresh_token', '')),
            'expires_at': encrypted['expires_at'],
            'id_token': encrypted.get('id_token')
        }

    def _encrypt(self, value):
        if not value:
            return ''
        return self.cipher.encrypt(value.encode()).decode()

    def _decrypt(self, value):
        if not value:
            return ''
        return self.cipher.decrypt(value.encode()).decode()

Security Considerations

Redirect URI Validation

def validate_redirect_uri(redirect_uri, registered_uris):
    """Strict redirect URI validation"""
    # Exact match required (no wildcards in production)
    if redirect_uri not in registered_uris:
        raise SecurityError('Invalid redirect_uri')

    # Additional checks
    parsed = urlparse(redirect_uri)

    # Must be HTTPS (except localhost for development)
    if parsed.scheme != 'https':
        if parsed.hostname not in ('localhost', '127.0.0.1'):
            raise SecurityError('Redirect URI must use HTTPS')

    # No fragments
    if parsed.fragment:
        raise SecurityError('Redirect URI cannot have fragment')

    return True

CSRF Protection

# State parameter prevents CSRF attacks

# 1. Generate state before redirect
state = secrets.token_urlsafe(32)
session['oauth_state'] = state

# 2. Include in authorization URL
auth_url = f"{authorization_endpoint}?state={state}&..."

# 3. Verify on callback
if request.args.get('state') != session.get('oauth_state'):
    abort(400, 'CSRF detected')

Common Vulnerabilities

Vulnerability Prevention
CSRF State parameter, SameSite cookies
Token theft HTTPS only, secure storage
Open redirect Strict redirect URI validation
Code injection PKCE, short-lived codes
Replay Nonce in ID tokens

Provider-Specific Setup

Google

GOOGLE_CONFIG = {
    'client_id': 'xxx.apps.googleusercontent.com',
    'client_secret': 'xxx',
    'authorization_endpoint': 'https://accounts.google.com/o/oauth2/v2/auth',
    'token_endpoint': 'https://oauth2.googleapis.com/token',
    'userinfo_endpoint': 'https://openidconnect.googleapis.com/v1/userinfo',
    'scopes': ['openid', 'email', 'profile']
}

GitHub (OAuth 2.0, not OIDC)

GITHUB_CONFIG = {
    'client_id': 'xxx',
    'client_secret': 'xxx',
    'authorization_endpoint': 'https://github.com/login/oauth/authorize',
    'token_endpoint': 'https://github.com/login/oauth/access_token',
    'userinfo_endpoint': 'https://api.github.com/user',
    'scopes': ['read:user', 'user:email']
}

References

  • references/oauth-security.md - Security best practices and threats
  • references/provider-configs.md - Configuration for common providers
  • references/token-patterns.md - Token storage and refresh patterns
Weekly Installs
2
GitHub Stars
3
First Seen
5 days ago
Installed on
amp2
cline2
openclaw2
opencode2
cursor2
kimi-cli2