comicbox.schemas.base

[docs] module comicbox.schemas.base

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
"""Skip keys instead of throwing errors."""

from abc import ABC
from pathlib import Path
from types import MappingProxyType
from typing import Any

from loguru import logger
from marshmallow import EXCLUDE
from marshmallow.decorators import (
    post_dump,
    post_load,
    pre_dump,
    pre_load,
)
from marshmallow.types import RenderModule, StrSequenceOrSet
from typing_extensions import override

from comicbox.empty import is_empty
from comicbox.fields.fields import StringField
from comicbox.schemas.decorators import trap_error
from comicbox.schemas.error_store import ClearingErrorStoreSchema


class BaseRenderModule(RenderModule, ABC):
    """Base Render Module."""

    @staticmethod
    def clean_string(s: str | bytes | bytearray) -> str | None:
        """Clean a string."""
        return StringField(clean_tabs=True).deserialize(s)


class BaseSubSchema(ClearingErrorStoreSchema, ABC):
    """Base schema."""

    TAG_ORDER: tuple[str, ...] = ()
    # Currently only mapping "pages" and "reprints" fields for each schema for Codex out of laziness
    # But this should speed up Codex reads
    DELETE_KEY_MAP = MappingProxyType({})

    def _create_exclude(self, exclude: StrSequenceOrSet) -> set[str]:
        final_exclude = set()
        fields = getattr(self, "fields", {})
        for key in exclude:
            if "." in key:
                # Deep keypaths not allowed
                continue
            if local_keys := self.DELETE_KEY_MAP.get(key):
                final_exclude |= local_keys
            elif key in fields:
                final_exclude.add(key)
        return final_exclude

    def __init__(
        self, *args: Any, exclude: StrSequenceOrSet = (), **kwargs: Any
    ) -> None:
        """Initialize with exclude keys."""
        exclude = self._create_exclude(exclude)
        super().__init__(*args, exclude=exclude, **kwargs)

    @classmethod
    def pre_load_validate(cls, data: dict[str, Any] | None) -> dict[str, Any] | None:
        """Validate schema type first thing to fail as early as possible."""
        # Meant to be overridden in BaseSchema
        return data

    @trap_error(pre_load)
    def pre_load(
        self, data: dict[str, Any] | None, **_kwargs: Any
    ) -> dict[str, Any] | None:
        """Singular pre_load hook."""
        return self.pre_load_validate(data)

    @classmethod
    def clean_empties(cls, data: dict) -> dict:
        """Clean empties from loaded data."""
        return {k: v for k, v in data.items() if not is_empty(v)}

    @trap_error(post_load)
    def post_load(self, data: dict[str, Any], **_kwargs: Any) -> dict[str, Any]:
        """Singular post_load hook."""
        return self.clean_empties(data)

    @pre_dump
    def pre_dump(
        self, data: dict[str, Any] | MappingProxyType[str, Any], **_kwargs: Any
    ) -> dict[str, Any] | MappingProxyType[str, Any]:
        """Singular pre_dump hook."""
        return data

    @classmethod
    def _sort_tag_by_order(cls, data: dict) -> dict:
        """Sort tag by schema class order tuple."""
        result = {}
        for tag in cls.TAG_ORDER:
            value = data.get(tag)
            if is_empty(value):
                continue
            result[tag] = value
        return result

    @classmethod
    def sort_dump(cls, data: dict) -> dict:
        """Sort dump by key."""
        if cls.TAG_ORDER:
            data = cls._sort_tag_by_order(data)
        else:
            data = {k: v for k, v in sorted(data.items()) if not is_empty(v)}
        return data

    @post_dump
    def post_dump(self, data: dict, **_kwargs: Any) -> dict:
        """Singular post_dump hook."""
        return self.sort_dump(data)

    def loadf(self, path) -> list | None | dict:
        """Read the string from the designated file."""
        with Path(path).open("r") as f:
            str_data = f.read()
        return self.loads(str_data)

    def dumpf(
        self, data: MappingProxyType[str, Any], path: Path, **kwargs: Any
    ) -> None:
        """Write the string in the designated file."""
        str_data = self.dumps(data, **kwargs) + "\n"
        with Path(path).open("w") as f:
            f.write(str_data)

    class Meta(ClearingErrorStoreSchema.Meta):
        """Schema options."""

        unknown = EXCLUDE


class BaseSchema(BaseSubSchema, ABC):
    """Top level base schema that traps errors and records path."""

    ROOT_TAG: str = ""
    ROOT_DATA_KEY: str = ""
    ROOT_KEYPATH: str = ""
    LEGACY_NESTED_MD_KEYPATH: str = ""
    HAS_PAGE_COUNT: bool = False
    HAS_PAGES: bool = False

    @override
    @classmethod
    def pre_load_validate(cls, data: dict[str, Any] | None) -> dict:
        """Validate the root tag so we don't confuse it with other JSON."""
        if not data:
            reason = "No data."
            logger.debug(reason)
            data = {}
        elif cls.ROOT_TAG not in data and cls.ROOT_DATA_KEY not in data:
            reason = f"Root tag '{cls.ROOT_TAG}' not found in {tuple(data.keys())}."
            logger.debug(reason)
            # Do not throw an exception so the trapper doesn't trap it and the
            # loader tries another schema. Return empty dict.
            data = {}
        return data