# Copyright 2019, 2021-2024 The Debusine Developers
# See the AUTHORS file at the top-level directory of this distribution
#
# This file is part of Debusine. It is subject to the license terms
# in the LICENSE file found in the top-level directory of this
# distribution. No part of Debusine, including this file, may be copied,
# modified, propagated, or distributed except according to the terms
# contained in the LICENSE file.
"""Data models for db users and authentication."""
import hashlib
import secrets
from typing import Optional, TYPE_CHECKING
from django.conf import settings
from django.contrib.auth.models import AbstractUser
from django.core.validators import (
MaxLengthValidator,
MinLengthValidator,
)
from django.db import models
from django.db.models import (
CheckConstraint,
Q,
QuerySet,
UniqueConstraint,
)
from debusine.server import notifications
if TYPE_CHECKING:
from django_stubs_ext.db.models import TypedModelMeta
else:
TypedModelMeta = object
class TokenManager(models.Manager["Token"]):
"""Manager for Token model."""
def get_tokens(
self, username: str | None = None, key: str | None = None
) -> QuerySet["Token"]:
"""
Return all the tokens filtered by a specific owner and/or token.
To avoid filtering by owner or token set them to None
"""
tokens = self.get_queryset()
if username:
tokens = tokens.filter(user__username=username)
if key:
token_hash = hashlib.sha256(key.encode()).hexdigest()
tokens = tokens.filter(hash=token_hash)
return tokens
def get_token_or_none(self, token_key: str) -> Optional["Token"]:
"""Return the token with token_key or None."""
assert isinstance(token_key, str)
token_hash = hashlib.sha256(token_key.encode()).hexdigest()
try:
return self.select_related('worker').get(hash=token_hash)
except Token.DoesNotExist:
return None
[docs]
class Token(models.Model):
"""
Database model of a token.
A token contains a key and other related data. It's used as a shared
key between debusine server and clients (workers).
This token model is very similar to rest_framework.authtoken.models.Token.
The bigger difference is that debusine's token's owner is a CharField,
the rest_framework owner is a OneToOne foreign key to a user.
Database-wise we don't store the token itself, but a hash of the token.
:py:func:`TokenManager.get_token_or_none` can be used to check a provided
token key against the database.
"""
hash = models.CharField(
max_length=64,
unique=True,
verbose_name='Hexadecimal hash, length is 64 chars',
validators=[MaxLengthValidator(64), MinLengthValidator(64)],
)
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
null=True,
blank=True,
on_delete=models.PROTECT,
)
comment = models.CharField(
max_length=100,
default='',
verbose_name='Reason that this token was created',
blank=True,
)
created_at = models.DateTimeField(auto_now_add=True)
enabled = models.BooleanField(default=False)
last_seen_at = models.DateTimeField(
blank=True,
null=True,
help_text="Last time that the token was used",
)
key: str
[docs]
def save(self, *args, **kwargs) -> None:
"""Save the token. If it's a new token it generates a key."""
if not self.hash:
self.key = self._generate_key()
self.hash = self._generate_hash(self.key)
super().save(*args, **kwargs)
[docs]
def enable(self) -> None:
"""Enable the token and save it."""
self.enabled = True
self.save()
[docs]
def disable(self) -> None:
"""Disable the token and save it."""
self.enabled = False
self.save()
notifications.notify_worker_token_disabled(self)
def __str__(self) -> str:
"""Return the hash of the Token."""
return self.hash
@classmethod
def _generate_key(cls) -> str:
"""Create and return a key."""
return secrets.token_hex(32)
@classmethod
def _generate_hash(cls, secret: str) -> str:
"""Hash the given secret."""
return hashlib.sha256(secret.encode()).hexdigest()
objects = TokenManager()
SYSTEM_USER_NAME = "_system"
[docs]
def system_user() -> "User":
"""Return the `_system` user."""
return User.objects.get(username=SYSTEM_USER_NAME)
[docs]
class User(AbstractUser):
"""Debusine user."""
is_system = models.BooleanField(default=False)
class Meta(AbstractUser.Meta):
constraints = [
# System users must not have an email address.
CheckConstraint(
check=Q(is_system=False) | Q(email=""),
name="%(app_label)s_%(class)s_non_system_email",
),
# Email addresses of non-system users must be unique.
UniqueConstraint(
fields=["email"],
condition=Q(is_system=False),
name="%(app_label)s_%(class)s_unique_email",
),
]
[docs]
class Identity(models.Model):
"""
Identity for a user in a remote user database.
An Identity is bound if it's associated with a Django user, or unbound if
no Django user is known for it.
"""
class Meta(TypedModelMeta):
constraints = [
UniqueConstraint(
fields=["issuer", "subject"],
name="%(app_label)s_%(class)s_unique_issuer_subject",
),
]
user = models.ForeignKey(
settings.AUTH_USER_MODEL,
related_name="identities",
null=True,
on_delete=models.SET_NULL,
)
issuer = models.CharField(
max_length=512,
help_text="identifier of auhoritative system for this identity",
)
subject = models.CharField(
max_length=512,
help_text="identifier of the user in the issuer system",
)
last_used = models.DateTimeField(
auto_now=True, help_text="last time this identity has been used"
)
claims = models.JSONField(default=dict)
def __str__(self):
"""Return str for the object."""
return f"{self.issuer}:{self.subject}"