Files
personaltwin-app/backend/api/views.py
2026-03-20 15:09:41 -04:00

281 lines
11 KiB
Python

import json
from datetime import timedelta
from django.conf import settings
from django.contrib.auth import authenticate, get_user_model
from django.utils import timezone
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from rest_framework import status
from oauth2_provider.contrib.rest_framework import TokenHasReadWriteScope
import webauthn
from webauthn.helpers.structs import (
AuthenticatorSelectionCriteria,
UserVerificationRequirement,
PublicKeyCredentialDescriptor,
RegistrationCredential,
AuthenticatorAttestationResponse,
AuthenticationCredential,
AuthenticatorAssertionResponse,
)
from webauthn.helpers import bytes_to_base64url, base64url_to_bytes
from webauthn.helpers.exceptions import InvalidCBORData, InvalidAuthenticatorDataStructure
from .models import WebAuthnCredential, WebAuthnChallenge
User = get_user_model()
RP_NAME = "OAuth2 Demo"
CHALLENGE_TTL = timedelta(minutes=5)
def _rp_id():
return settings.WEBAUTHN_RP_ID
def _origin():
return settings.WEBAUTHN_ORIGIN
class MeView(APIView):
"""Return the authenticated user's profile."""
permission_classes = [IsAuthenticated, TokenHasReadWriteScope]
required_scopes = ['read']
def get(self, request):
user = request.user
return Response({
'id': user.id,
'username': user.username,
'email': user.email,
'first_name': user.first_name,
'last_name': user.last_name,
'date_joined': user.date_joined.isoformat(),
'has_touch_id': user.webauthn_credentials.exists(),
})
class LoginView(APIView):
"""Validate username/password credentials (no session or token issued)."""
permission_classes = []
authentication_classes = []
def post(self, request):
username = request.data.get('username', '').strip()
password = request.data.get('password', '')
if not username or not password:
return Response({'detail': 'Username and password required.'}, status=status.HTTP_400_BAD_REQUEST)
user = authenticate(request, username=username, password=password)
if user is None:
return Response({'detail': 'Invalid credentials.'}, status=status.HTTP_401_UNAUTHORIZED)
return Response({'username': user.username}, status=status.HTTP_200_OK)
class PingView(APIView):
"""Health check — no auth required."""
permission_classes = []
authentication_classes = []
def get(self, request):
return Response({'status': 'ok'})
# ── WebAuthn Registration ───────────────────────────────────────────────────
class WebAuthnRegisterBeginView(APIView):
"""Begin WebAuthn credential registration (requires existing OAuth token)."""
permission_classes = [IsAuthenticated, TokenHasReadWriteScope]
required_scopes = ['read']
def post(self, request):
user = request.user
# Remove stale challenges for this user
WebAuthnChallenge.objects.filter(user=user).delete()
existing_creds = WebAuthnCredential.objects.filter(user=user)
options = webauthn.generate_registration_options(
rp_id=_rp_id(),
rp_name=RP_NAME,
user_id=str(user.pk).encode(),
user_name=user.username,
user_display_name=user.get_full_name() or user.username,
authenticator_selection=AuthenticatorSelectionCriteria(
user_verification=UserVerificationRequirement.REQUIRED,
),
exclude_credentials=[
PublicKeyCredentialDescriptor(id=base64url_to_bytes(c.credential_id))
for c in existing_creds
],
)
WebAuthnChallenge.objects.create(
user=user,
challenge=bytes_to_base64url(options.challenge),
)
return Response(json.loads(webauthn.options_to_json(options)))
class WebAuthnRegisterCompleteView(APIView):
"""Complete WebAuthn credential registration."""
permission_classes = [IsAuthenticated, TokenHasReadWriteScope]
required_scopes = ['read']
def post(self, request):
user = request.user
try:
challenge_obj = WebAuthnChallenge.objects.filter(user=user).latest('created_at')
except WebAuthnChallenge.DoesNotExist:
return Response({'detail': 'No pending challenge. Begin registration first.'}, status=status.HTTP_400_BAD_REQUEST)
if timezone.now() - challenge_obj.created_at > CHALLENGE_TTL:
challenge_obj.delete()
return Response({'detail': 'Challenge expired. Please try again.'}, status=status.HTTP_400_BAD_REQUEST)
data = request.data
try:
credential = RegistrationCredential(
id=data['id'],
raw_id=base64url_to_bytes(data['rawId']),
response=AuthenticatorAttestationResponse(
client_data_json=base64url_to_bytes(data['response']['clientDataJSON']),
attestation_object=base64url_to_bytes(data['response']['attestationObject']),
),
type=data['type'],
)
verified = webauthn.verify_registration_response(
credential=credential,
expected_challenge=base64url_to_bytes(challenge_obj.challenge),
expected_rp_id=_rp_id(),
expected_origin=_origin(),
require_user_verification=True,
)
except (KeyError, ValueError, InvalidCBORData, InvalidAuthenticatorDataStructure, Exception) as exc:
return Response({'detail': f'Verification failed: {exc}'}, status=status.HTTP_400_BAD_REQUEST)
finally:
challenge_obj.delete()
WebAuthnCredential.objects.update_or_create(
credential_id=bytes_to_base64url(verified.credential_id),
defaults={
'user': user,
'public_key': verified.credential_public_key,
'sign_count': verified.sign_count,
},
)
return Response({'detail': 'Touch ID registered successfully.'})
# ── WebAuthn Authentication ─────────────────────────────────────────────────
class WebAuthnAuthBeginView(APIView):
"""Begin WebAuthn authentication (no existing token required)."""
permission_classes = []
authentication_classes = []
def post(self, request):
username = request.data.get('username', '').strip()
if not username:
return Response({'detail': 'Username required.'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
return Response({'detail': 'No Touch ID registered for this account.'}, status=status.HTTP_404_NOT_FOUND)
credentials = WebAuthnCredential.objects.filter(user=user)
if not credentials.exists():
return Response({'detail': 'No Touch ID registered for this account.'}, status=status.HTTP_404_NOT_FOUND)
# Remove stale challenges
WebAuthnChallenge.objects.filter(user=user).delete()
options = webauthn.generate_authentication_options(
rp_id=_rp_id(),
allow_credentials=[
PublicKeyCredentialDescriptor(id=base64url_to_bytes(c.credential_id))
for c in credentials
],
user_verification=UserVerificationRequirement.REQUIRED,
)
WebAuthnChallenge.objects.create(
user=user,
challenge=bytes_to_base64url(options.challenge),
)
return Response(json.loads(webauthn.options_to_json(options)))
class WebAuthnAuthCompleteView(APIView):
"""Complete WebAuthn authentication — returns username on success (same as /api/login/)."""
permission_classes = []
authentication_classes = []
def post(self, request):
username = request.data.get('username', '').strip()
if not username:
return Response({'detail': 'Username required.'}, status=status.HTTP_400_BAD_REQUEST)
try:
user = User.objects.get(username=username)
except User.DoesNotExist:
return Response({'detail': 'User not found.'}, status=status.HTTP_404_NOT_FOUND)
try:
challenge_obj = WebAuthnChallenge.objects.filter(user=user).latest('created_at')
except WebAuthnChallenge.DoesNotExist:
return Response({'detail': 'No pending challenge. Begin authentication first.'}, status=status.HTTP_400_BAD_REQUEST)
if timezone.now() - challenge_obj.created_at > CHALLENGE_TTL:
challenge_obj.delete()
return Response({'detail': 'Challenge expired. Please try again.'}, status=status.HTTP_400_BAD_REQUEST)
data = request.data
credential_id = data.get('id')
try:
stored_cred = WebAuthnCredential.objects.get(credential_id=credential_id, user=user)
except WebAuthnCredential.DoesNotExist:
return Response({'detail': 'Credential not found.'}, status=status.HTTP_400_BAD_REQUEST)
try:
credential = AuthenticationCredential(
id=data['id'],
raw_id=base64url_to_bytes(data['rawId']),
response=AuthenticatorAssertionResponse(
client_data_json=base64url_to_bytes(data['response']['clientDataJSON']),
authenticator_data=base64url_to_bytes(data['response']['authenticatorData']),
signature=base64url_to_bytes(data['response']['signature']),
user_handle=base64url_to_bytes(data['response']['userHandle']) if data['response'].get('userHandle') else None,
),
type=data['type'],
)
verified = webauthn.verify_authentication_response(
credential=credential,
expected_challenge=base64url_to_bytes(challenge_obj.challenge),
expected_rp_id=_rp_id(),
expected_origin=_origin(),
credential_public_key=bytes(stored_cred.public_key),
credential_current_sign_count=stored_cred.sign_count,
require_user_verification=True,
)
except Exception as exc:
return Response({'detail': f'Verification failed: {exc}'}, status=status.HTTP_401_UNAUTHORIZED)
finally:
challenge_obj.delete()
# Update replay-attack counter
stored_cred.sign_count = verified.new_sign_count
stored_cred.save(update_fields=['sign_count'])
return Response({'username': user.username}, status=status.HTTP_200_OK)