Skip to content

Core Library: Common

Utilities shared across services.

Context

context

Secure context derivation for encrypting and decrypting provider secrets.

DerivedContext

DerivedContext(
    aesgcm: AESGCM, nonce: bytes, ciphertext: bytes
)

Decryption helper holding derived AES context and encrypted payload.

Store encryption artifacts for later retrieval.

Parameters:

Name Type Description Default
aesgcm AESGCM

Derived cipher context.

required
nonce bytes

Nonce used during encryption.

required
ciphertext bytes

Encrypted payload.

required
Source code in quasar/lib/common/context.py
72
73
74
75
76
77
78
79
80
81
82
def __init__(self, aesgcm: AESGCM, nonce: bytes, ciphertext: bytes):
    """Store encryption artifacts for later retrieval.

    Args:
        aesgcm (AESGCM): Derived cipher context.
        nonce (bytes): Nonce used during encryption.
        ciphertext (bytes): Encrypted payload.
    """
    self.aesgcm = aesgcm
    self.nonce = nonce
    self.ciphertext = ciphertext

get

get(key: str) -> str

Return a secret field from the encrypted JSON payload.

Parameters:

Name Type Description Default
key str

Key inside the decrypted JSON blob.

required

Returns:

Name Type Description
str str

Retrieved value for the requested key.

Raises:

Type Description
KeyError

If the key is missing.

ValueError

If the payload cannot be parsed.

Source code in quasar/lib/common/context.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def get(self, key: str) -> str:
    """Return a secret field from the encrypted JSON payload.

    Args:
        key (str): Key inside the decrypted JSON blob.

    Returns:
        str: Retrieved value for the requested key.

    Raises:
        KeyError: If the key is missing.
        ValueError: If the payload cannot be parsed.
    """
    try:
        dat = self.aesgcm.decrypt(self.nonce, self.ciphertext, None).decode('utf-8')
        dat = json.loads(dat)
        if key not in dat:
            raise KeyError(f"Key {key} not found in derived context.")
        return dat.get(key)
    except Exception as e:
        logging.error(f"Error accessing derived context: {e}")
        raise

SystemContext

Singleton that loads the system context and derives per-provider keys.

__new__

__new__(*args, **kwargs)

Create or return the singleton instance.

Source code in quasar/lib/common/context.py
18
19
20
21
22
23
24
25
26
27
def __new__(cls, *args, **kwargs):
    """Create or return the singleton instance."""
    if cls._instance is None:
        cls._instance = super(SystemContext, cls).__new__(cls)
        cls._instance._system_context_path = Path(os.getenv("QUASAR_SYSTEM_CONTEXT", ""))
        if not cls._instance._system_context_path.is_file():
            logging.error(f"CRITIAL: System context path {cls._instance._system_context_path} does not exist.")
            raise FileNotFoundError(f"System context path {cls._instance._system_context_path} does not exist.")
    # Return existing instance if already created (proper singleton behavior)
    return cls._instance

create_context_data

create_context_data(
    hash: bytes, data: bytes
) -> tuple[bytes, bytes]

Encrypt data with a derived AES context.

Parameters:

Name Type Description Default
hash bytes

Salt/input used to derive a unique key.

required
data bytes

Plaintext payload to encrypt.

required

Returns:

Type Description
tuple[bytes, bytes]

tuple[bytes, bytes]: Tuple of (nonce, ciphertext).

Source code in quasar/lib/common/context.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def create_context_data(self, hash: bytes, data: bytes) -> tuple[bytes, bytes]:
    """Encrypt data with a derived AES context.

    Args:
        hash (bytes): Salt/input used to derive a unique key.
        data (bytes): Plaintext payload to encrypt.

    Returns:
        tuple[bytes, bytes]: Tuple of (nonce, ciphertext).
    """
    derived_context = self.get_derived_context(hash)
    nonce = os.urandom(12)
    ciphertext = derived_context.encrypt(nonce, data, None)
    return nonce, ciphertext

get_derived_context

get_derived_context(hash: bytes) -> AESGCM | None

Derive an AESGCM key using the system context and provided hash.

Source code in quasar/lib/common/context.py
42
43
44
45
46
47
48
49
50
51
def get_derived_context(self, hash: bytes) -> AESGCM | None:
    """Derive an AESGCM key using the system context and provided hash."""
    hkdf = HKDF(
        algorithm=hashes.SHA256(),
        length=32,
        salt=None,
        info=hash,
        backend=default_backend()
    )
    return AESGCM(hkdf.derive(self._read_system_context()))

Database handler

database_handler

Shared database handler with pooled connection lifecycle helpers.

DatabaseHandler

DatabaseHandler(
    dsn: str | None = None, pool: Optional[Pool] = None
)

Bases: ABC

Manage asyncpg pools and expose a unified interface.

Configure the handler with either a DSN or an existing pool.

Parameters:

Name Type Description Default
dsn str | None

Database connection string used to create a pool.

None
pool Pool | None

Pre-existing pool to reuse.

None

Raises:

Type Description
ValueError

If neither dsn nor pool is provided.

Source code in quasar/lib/common/database_handler.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def __init__(
        self,
        dsn: str | None = None,
        pool: Optional[asyncpg.Pool] = None) -> None:
    """Configure the handler with either a DSN or an existing pool.

    Args:
        dsn (str | None): Database connection string used to create a pool.
        pool (asyncpg.Pool | None): Pre-existing pool to reuse.

    Raises:
        ValueError: If neither ``dsn`` nor ``pool`` is provided.
    """

    # Pool Setup
    if not dsn and not pool:
        logger.error(f"{self.name} was intialized without DSN or Pool for database connection.")
        raise ValueError("Provide either dsn or pool")
    self._dsn = dsn
    self._pool: Optional[asyncpg.Pool] = pool

name abstractmethod property

name: str

Human-friendly identifier used for logging.

pool property

pool: Pool

Return the active pool or raise if it is not initialized.

close_pool async

close_pool() -> None

Close the owned pool.

Source code in quasar/lib/common/database_handler.py
54
55
56
57
async def close_pool(self) -> None:
    """Close the owned pool."""
    if self._pool is not None and not self._pool._closed:   # type: ignore
        await self._pool.close()

init_pool async

init_pool() -> None

Create the asyncpg pool if this handler owns it.

Source code in quasar/lib/common/database_handler.py
49
50
51
52
async def init_pool(self) -> None:
    """Create the asyncpg pool if this handler owns it."""
    if self._pool is None:
        self._pool = await asyncpg.create_pool(self._dsn)

API handler

api_handler

Shared FastAPI handler base with lifecycle helpers.

APIHandler

APIHandler(api_host: str = '0.0.0.0', api_port: int = 8080)

Bases: ABC

Serve a FastAPI application and manage its lifecycle.

Configure and create the FastAPI application.

Parameters:

Name Type Description Default
api_host str

Host interface to bind to.

'0.0.0.0'
api_port int

Port number to expose the API on.

8080
Source code in quasar/lib/common/api_handler.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def __init__(
        self,
        api_host: str = '0.0.0.0',
        api_port: int = 8080) -> None: 
    """Configure and create the FastAPI application.

    Args:
        api_host (str): Host interface to bind to.
        api_port (int): Port number to expose the API on.
    """

    # API Server
    self._api_host = api_host
    self._api_port = api_port
    self._api_app = FastAPI(title=f"{self.name} API")
    self._server: Optional[uvicorn.Server] = None
    self._server_task: Optional[asyncio.Task] = None

    # Setup CORS - allow configurable origins via environment variable
    # CORS_ORIGINS can be a comma-separated list: "http://localhost:3000,http://192.168.1.100:3000"
    cors_origins_env = os.getenv("CORS_ORIGINS", "http://localhost:3000")
    cors_origins = [origin.strip() for origin in cors_origins_env.split(",")]
    logger.debug(f"{self.name} CORS allowed origins: {cors_origins}")

    self._api_app.add_middleware(
        CORSMiddleware,
        allow_origins=cors_origins,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
        expose_headers=["*"],
    )

    # Setup Routes
    self._setup_routes()

name abstractmethod property

name: str

Human-friendly service name used for logging and titles.

start_api_server async

start_api_server() -> None

Start the internal API server.

Source code in quasar/lib/common/api_handler.py
65
66
67
68
69
70
71
72
73
74
75
76
77
78
async def start_api_server(self) -> None:
    """Start the internal API server."""
    config = uvicorn.Config(
        self._api_app,
        host=self._api_host,
        port=self._api_port,
        log_level="info",
        access_log=False,  # We handle logging ourselves
    )
    self._server = uvicorn.Server(config)

    # Run server in background task
    self._server_task = asyncio.create_task(self._server.serve())
    logger.info(f"{self.name} Internal API server started on http://{self._api_host}:{self._api_port}")

stop_api_server async

stop_api_server() -> None

Stop the internal API server and await shutdown.

Source code in quasar/lib/common/api_handler.py
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def stop_api_server(self) -> None:
    """Stop the internal API server and await shutdown."""
    if self._server:
        self._server.should_exit = True
        # Wait for server to shutdown
        if self._server_task:
            try:
                await asyncio.wait_for(self._server_task, timeout=5.0)
            except asyncio.TimeoutError:
                logger.warning(f"{self.name} API server shutdown timeout")
            except Exception as e:
                logger.error(f"{self.name} Error stopping API server: {e}")
            finally:
                self._server_task = None
        logger.info(f"{self.name} Internal API server stopped.")

Secret store

secret_store

Secret store that can read from local files or AWS SSM.

SecretStore

SecretStore(
    mode: str = "auto", aws_region: str | None = None
)

Load provider secrets from disk or AWS Parameter Store.

Create a secret store.

Parameters:

Name Type Description Default
mode str

One of auto, local, or aws.

'auto'
aws_region str | None

AWS region to use when mode='aws'.

None
Source code in quasar/lib/common/secret_store.py
21
22
23
24
25
26
27
28
29
30
31
def __init__(self, mode: str = "auto", aws_region: str | None = None):
    """Create a secret store.

    Args:
        mode (str): One of ``auto``, ``local``, or ``aws``.
        aws_region (str | None): AWS region to use when ``mode='aws'``.
    """
    self.mode = mode
    self._cache: dict[str, dict] = {}
    if mode == "aws":
        self._ssm = boto3.client("ssm", region_name=aws_region)

get async

get(provider: str) -> dict

Return provider secrets using local files or AWS depending on mode.

Parameters:

Name Type Description Default
provider str

Provider key to load secrets for.

required

Returns:

Name Type Description
dict dict

Provider secrets payload.

Raises:

Type Description
SecretsFileNotFoundError

When auto mode cannot locate a secrets file.

Source code in quasar/lib/common/secret_store.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def get(self, provider: str) -> dict:
    """Return provider secrets using local files or AWS depending on mode.

    Args:
        provider (str): Provider key to load secrets for.

    Returns:
        dict: Provider secrets payload.

    Raises:
        SecretsFileNotFoundError: When auto mode cannot locate a secrets file.
    """
    if provider in self._cache:
        return self._cache[provider]

    if self.mode == 'local':
        cfg = self.load_cfg_from_file(provider, _DEFAULT_PATHS[-1])
    elif self.mode == 'auto':
        cfg = None
        for p in _DEFAULT_PATHS:
            try:
                cfg = self.load_cfg_from_file(provider, p)
                break
            except:
                pass
        if cfg is None:
            message = f"Local secrets file not found, or provider not in secrets file. Must be in {_DEFAULT_PATHS[1]}, {_DEFAULT_PATHS[2]}, a filepath defined by environment variable QUASAR_SECRET_FILE."
            raise SecretsFileNotFoundError(message)
    else:
        # AWS
        if hasattr(self, '_ssm'):
            param = self._ssm.get_parameter(
                Name=f"/quasar/{provider}", WithDecryption=True
            )["Parameter"]["Value"]
            cfg = json.loads(param)

    self._cache[provider] = cfg
    return cfg

load_cfg_from_file

load_cfg_from_file(provider: str, file: Path) -> dict

Read provider config from a JSON secrets file.

Parameters:

Name Type Description Default
provider str

Provider key inside the secrets file.

required
file Path

Path to the JSON file.

required

Returns:

Name Type Description
dict dict

Provider configuration.

Raises:

Type Description
FileNotFoundError

If the file is missing.

KeyError

If the provider key is not present.

Source code in quasar/lib/common/secret_store.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def load_cfg_from_file(self, provider: str, file: Path) -> dict:
    """Read provider config from a JSON secrets file.

    Args:
        provider (str): Provider key inside the secrets file.
        file (Path): Path to the JSON file.

    Returns:
        dict: Provider configuration.

    Raises:
        FileNotFoundError: If the file is missing.
        KeyError: If the provider key is not present.
    """
    if not file.is_file():
        raise FileNotFoundError(f"Secret file: {file} not found.")

    data = json.loads(file.read_text())
    if not provider in data:
        raise KeyError(f"Provider: {provider} not found in secret file {file}.")

    return data[provider]

SecretsFileNotFoundError

Bases: Exception

Raised when no secrets file can be located in auto mode.

Offset cron

offset_cron

Cron trigger variant that applies a positive or negative offset.

OffsetCronTrigger

OffsetCronTrigger(offset_seconds: int = 0, **kwargs: Any)

Bases: CronTrigger

CronTrigger that fires at a specified offset from the base schedule.

Initialize the trigger with an offset.

Parameters:

Name Type Description Default
offset_seconds int

Seconds to shift the scheduled time. Negative values schedule before the base trigger.

0
**kwargs Any

Standard CronTrigger keyword arguments.

{}
Source code in quasar/lib/common/offset_cron.py
11
12
13
14
15
16
17
18
19
20
21
def __init__(self, offset_seconds: int = 0, **kwargs: Any):
    """Initialize the trigger with an offset.

    Args:
        offset_seconds (int): Seconds to shift the scheduled time. Negative
            values schedule before the base trigger.
        **kwargs: Standard ``CronTrigger`` keyword arguments.
    """
    self._sign = 1 if offset_seconds >= 0 else -1
    self.offset_seconds = abs(offset_seconds)
    super().__init__(**kwargs)

from_crontab classmethod

from_crontab(
    expr: str,
    offset_seconds: int = 0,
    timezone: tzinfo | str | None = None,
) -> OffsetCronTrigger

Create an OffsetCronTrigger from a crontab expression.

Parameters:

Name Type Description Default
expr str

Standard crontab fields minute hour day month day_of_week.

required
offset_seconds int

Seconds to offset the trigger time; can be negative.

0
timezone tzinfo | str | None

Time zone for calculations; defaults to scheduler timezone.

None

Returns:

Name Type Description
OffsetCronTrigger OffsetCronTrigger

Configured trigger instance.

Raises:

Type Description
ValueError

If the crontab expression does not contain 5 fields.

Source code in quasar/lib/common/offset_cron.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
@classmethod
def from_crontab(cls, expr: str, offset_seconds: int = 0, timezone: tzinfo | str | None = None) -> "OffsetCronTrigger":
    """Create an ``OffsetCronTrigger`` from a crontab expression.

    Args:
        expr (str): Standard crontab fields ``minute hour day month day_of_week``.
        offset_seconds (int): Seconds to offset the trigger time; can be negative.
        timezone: Time zone for calculations; defaults to scheduler timezone.

    Returns:
        OffsetCronTrigger: Configured trigger instance.

    Raises:
        ValueError: If the crontab expression does not contain 5 fields.
    """
    values = expr.split()
    if len(values) != 5:
        raise ValueError(f"Wrong number of fields; got {len(values)}, expected 5")

    return cls(
        offset_seconds=offset_seconds,
        minute=values[0],
        hour=values[1],
        day=values[2],
        month=values[3],
        day_of_week=values[4],
        timezone=timezone,
    )

get_next_fire_time

get_next_fire_time(previous_fire_time, now)

Return the next fire time adjusted by the configured offset.

Source code in quasar/lib/common/offset_cron.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def get_next_fire_time(self, previous_fire_time, now):
    """Return the next fire time adjusted by the configured offset."""
    # If we have a previous fire time, artificially advance it by 1 microsecond
    # beyond what it would normally be without our offset

    # If we want a negative offset, we need to trick the scheduler
    if self._sign < 0:
        if previous_fire_time:
            previous_fire_time = previous_fire_time + timedelta(seconds=self.offset_seconds)
        now = now + timedelta(seconds=self.offset_seconds)

    # Calculate the Original Fire Time
    og_fire_time = super().get_next_fire_time(previous_fire_time, now)

    # Offset the Original Fire Time by the specified seconds
    if og_fire_time:
        if self._sign < 0:
            next_fire_time = og_fire_time - timedelta(seconds=self.offset_seconds)
        else:
            next_fire_time = og_fire_time + timedelta(seconds=self.offset_seconds)
        return next_fire_time

    return None

Trading calendar

calendar

Trading calendar utility wrapping exchange_calendars with custom Forex support.

This module provides a unified interface for checking market status across various exchanges and asset classes. Assets without an exchange (e.g., cryptocurrencies) default to 'always open' behavior.

ForexCalendar

Bases: ExchangeCalendar

24/5 Trading Calendar for Forex (Standard Sunday 5pm ET to Friday 5pm ET).

close_times property

close_times

Return the daily close times.

Returns:

Name Type Description
tuple

Tuple containing the daily close time.

name property

name: str

Return the calendar name.

Returns:

Name Type Description
str str

The calendar name ("XFX").

open_times property

open_times

Return the daily open times.

Returns:

Name Type Description
tuple

Tuple containing the daily open time.

regular_holidays property

regular_holidays

Return the list of regular holidays.

Returns:

Name Type Description
None

Forex markets handled via weekmask for now.

tz property

tz

Return the timezone for the calendar.

Returns:

Type Description

datetime.tzinfo: The America/New_York timezone.

weekmask property

weekmask: str

Return the weekmask defining active days (Mon-Fri + Sun).

Returns:

Name Type Description
str str

The weekmask "1111101".

TradingCalendar

Wrapper for exchange_calendars providing a unified interface for Quasar.

has_sessions_in_range classmethod

has_sessions_in_range(
    mic: str, start: date, end: date
) -> bool

Check if there were any trading sessions between two dates.

Used primarily by Historical Data providers to determine if a "gap" in data contains any actual sessions worth pulling.

Parameters:

Name Type Description Default
mic str

The Market Identifier Code (ISO 10383).

required
start date

Start of the range (inclusive).

required
end date

End of the range (inclusive).

required

Returns:

Name Type Description
bool bool

True if at least one session occurred, False otherwise. Defaults to True if MIC is unknown.

Source code in quasar/lib/common/calendar.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
@classmethod
def has_sessions_in_range(cls, mic: str, start: date, end: date) -> bool:
    """Check if there were any trading sessions between two dates.

    Used primarily by Historical Data providers to determine if a "gap"
    in data contains any actual sessions worth pulling.

    Args:
        mic (str): The Market Identifier Code (ISO 10383).
        start (date): Start of the range (inclusive).
        end (date): End of the range (inclusive).

    Returns:
        bool: True if at least one session occurred, False otherwise.
            Defaults to True if MIC is unknown.
    """
    cal = cls._get_calendar(mic)
    if cal is None:
        return True

    # Normalize to pandas Timestamps for library consistency
    ts_start = pd.Timestamp(start)
    ts_end = pd.Timestamp(end)

    # exchange_calendars.sessions_in_range returns an index of all trading days
    sessions = cal.sessions_in_range(ts_start, ts_end)
    return len(sessions) > 0

is_open_now classmethod

is_open_now(mic: str) -> bool

Check if the market for the given MIC is currently open.

Used primarily by Live Data providers to determine if a connection should be established.

Parameters:

Name Type Description Default
mic str

The Market Identifier Code (ISO 10383).

required

Returns:

Name Type Description
bool bool

True if the market is open, False otherwise. Defaults to True if MIC is unknown.

Source code in quasar/lib/common/calendar.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@classmethod
def is_open_now(cls, mic: str) -> bool:
    """Check if the market for the given MIC is currently open.

    Used primarily by Live Data providers to determine if a connection
    should be established.

    Args:
        mic (str): The Market Identifier Code (ISO 10383).

    Returns:
        bool: True if the market is open, False otherwise. Defaults to True if MIC is unknown.
    """
    cal = cls._get_calendar(mic)
    if cal is None:
        return True

    # Get current time in UTC as a pandas Timestamp
    now = pd.Timestamp.now(tz=timezone.utc)

    # exchange_calendars is_open_on_minute checks if the market is open at a specific minute
    return cal.is_open_on_minute(now)

is_session classmethod

is_session(mic: str, day: date) -> bool

Check if the given date was a valid trading session for the MIC.

Used primarily by Historical Data providers to determine if a data pull should be executed for a specific date.

Parameters:

Name Type Description Default
mic str

The Market Identifier Code (ISO 10383).

required
day date

The date to check.

required

Returns:

Name Type Description
bool bool

True if the date was a session, False otherwise. Defaults to True if MIC is unknown.

Source code in quasar/lib/common/calendar.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
@classmethod
def is_session(cls, mic: str, day: date) -> bool:
    """Check if the given date was a valid trading session for the MIC.

    Used primarily by Historical Data providers to determine if a data
    pull should be executed for a specific date.

    Args:
        mic (str): The Market Identifier Code (ISO 10383).
        day (date): The date to check.

    Returns:
        bool: True if the date was a session, False otherwise. Defaults to True if MIC is unknown.
    """
    cal = cls._get_calendar(mic)
    if cal is None:
        return True

    # Normalize to pandas Timestamp for library consistency
    return cal.is_session(pd.Timestamp(day))

Enum guard

enum_guard

Optional runtime guard to ensure enums match database lookup tables.

validate_enums async

validate_enums(pool: Pool, strict: bool = False) -> None

Compare generated enums with DB lookup tables.

Parameters:

Name Type Description Default
pool Pool

The asyncpg connection pool to use for DB checks.

required
strict bool

If True, raise a RuntimeError if mismatches are found. If False, only log warnings. Defaults to False.

False

Raises:

Type Description
RuntimeError

If strict is True and enums do not match the database.

Source code in quasar/lib/common/enum_guard.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
async def validate_enums(pool: Pool, strict: bool = False) -> None:
    """Compare generated enums with DB lookup tables.

    Args:
        pool (Pool): The asyncpg connection pool to use for DB checks.
        strict (bool): If True, raise a RuntimeError if mismatches are found. 
            If False, only log warnings. Defaults to False.

    Raises:
        RuntimeError: If ``strict`` is True and enums do not match the database.
    """
    db_assets = await _fetch_codes(pool, "asset_class", "code")
    db_intervals = await _fetch_codes(pool, "accepted_intervals", "interval")

    issues: list[str] = []

    if db_assets is not None:
        missing = set(ASSET_CLASSES) - db_assets
        extra = db_assets - set(ASSET_CLASSES)
        if missing:
            issues.append(f"asset_class missing {sorted(missing)}")
        if extra:
            issues.append(f"asset_class has extras {sorted(extra)}")

    if db_intervals is not None:
        missing = set(INTERVALS) - db_intervals
        extra = db_intervals - set(INTERVALS)
        if missing:
            issues.append(f"accepted_intervals missing {sorted(missing)}")
        if extra:
            issues.append(f"accepted_intervals has extras {sorted(extra)}")

    if not issues:
        logger.info("Enum guard: DB lookup tables match generated enums")
        return

    msg = "; ".join(issues)
    if strict:
        raise RuntimeError(f"Enum guard failed: {msg}")
    logger.warning("Enum guard warning: %s", msg)