Skip to content

Services: Registry

app

Entrypoint script to start the Registry service.

main async

main()

Bootstrap the Registry service and block until shutdown.

Source code in quasar/services/registry/app.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
async def main():
    """Bootstrap the Registry service and block until shutdown."""
    # Load Environment
    dsn   = os.environ["DSN"]                      # provided by compose

    # Create Registry
    reg = Registry(dsn=dsn)
    await reg.start()

    logging.info("Registry started → DSN=%s", dsn)
    try:
        await asyncio.Event().wait()              # keep running
    finally:
        await reg.stop()
        logging.info("Registry stopped")

core

Registry service core: code uploads, asset catalog, and mappings.

This module defines the Registry class which coordinates all registry operations through handler mixins. The actual handler implementations are in the handlers/ subpackage, organized by domain:

  • handlers/assets.py: Asset management handlers
  • handlers/code.py: Code upload and management handlers
  • handlers/config.py: Provider configuration handlers
  • handlers/indices.py: Index management handlers
  • handlers/mappings.py: Asset mapping handlers

Utility modules: - utils/pagination.py: Cursor encoding/decoding - utils/query_builder.py: Dynamic SQL filter building

MembershipSyncResult dataclass

MembershipSyncResult(
    added: int,
    removed: int,
    unchanged: int,
    weights_updated: int,
)

Result of index membership synchronization.

Attributes:

Name Type Description
added int

Number of new memberships created.

removed int

Number of memberships closed.

unchanged int

Number of memberships with no changes.

weights_updated int

Number of memberships with weight changes.

Registry

Registry(
    dsn: str | None = None,
    pool: Optional[Pool] = None,
    refresh_seconds: int = 30,
    api_host: str = "0.0.0.0",
    api_port: int = 8080,
)

Bases: AssetHandlersMixin, CodeHandlersMixin, ConfigHandlersMixin, IndexHandlersMixin, MappingHandlersMixin, DatabaseHandler, APIHandler

Manage uploaded provider/broker code and asset mappings.

This class combines multiple handler mixins to provide a complete registry service. Each mixin handles a specific domain area while sharing common resources (pool, matcher, mapper) through the base class.

Handler Domains
  • AssetHandlersMixin: Asset CRUD, updates, identity matching
  • CodeHandlersMixin: File upload, validation, code registration
  • ConfigHandlersMixin: Provider preferences, quote currencies
  • IndexHandlersMixin: Index management, membership sync
  • MappingHandlersMixin: Asset mapping suggestions and CRUD

Create a Registry instance.

Parameters:

Name Type Description Default
dsn str | None

Database DSN for internal pool creation.

None
pool Pool | None

Existing pool to reuse.

None
refresh_seconds int

Reserved for future use.

30
api_host str

Host interface for the internal API.

'0.0.0.0'
api_port int

Port number for the internal API.

8080
Source code in quasar/services/registry/core.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def __init__(
        self,
        dsn: str | None = None,
        pool: Optional[asyncpg.Pool] = None,
        refresh_seconds: int = 30,
        api_host: str = '0.0.0.0',
        api_port: int = 8080) -> None:
    """Create a Registry instance.

    Args:
        dsn (str | None): Database DSN for internal pool creation.
        pool (asyncpg.Pool | None): Existing pool to reuse.
        refresh_seconds (int): Reserved for future use.
        api_host (str): Host interface for the internal API.
        api_port (int): Port number for the internal API.
    """
    # Initialize Supers
    DatabaseHandler.__init__(self, dsn=dsn, pool=pool)
    APIHandler.__init__(self, api_host=api_host, api_port=api_port)

    # Initialize Matcher
    self.matcher = IdentityMatcher(dsn=dsn, pool=pool)

    # Initialize AutomatedMapper
    self.mapper = AutomatedMapper(dsn=dsn, pool=pool)

start async

start() -> None

Start the Registry.

Source code in quasar/services/registry/core.py
338
339
340
341
342
343
344
345
346
347
348
async def start(self) -> None:
    """Start the Registry."""
    # Start API
    await self.start_api_server()

    # Start Database
    await self.init_pool()
    self.matcher._pool = self.pool  # Share the initialized pool with the matcher
    self.mapper._pool = self.pool  # Share the initialized pool with the mapper
    await self._run_enum_guard()
    await self._seed_identity_manifests()

stop async

stop() -> None

Stop the Registry.

Source code in quasar/services/registry/core.py
350
351
352
353
354
355
356
async def stop(self) -> None:
    """Stop the Registry."""
    # Stop Database
    await self.close_pool()

    # Stop API
    await self.stop_api_server()

schemas

Registry-specific Pydantic schemas for API request/response models.

AssetItem

Bases: BaseModel

Single asset item.

AssetMappingCreate

Bases: BaseModel

Request model for creating asset mapping.

AssetMappingPaginatedResponse

Bases: BaseModel

Response model for GET /api/registry/asset-mappings endpoint (paginated).

AssetMappingQueryParams

Bases: BaseModel

Query parameters for GET /api/registry/asset-mappings endpoint.

AssetMappingRemapPreview

Bases: BaseModel

Response model for GET /api/registry/asset-mappings/re-map/preview endpoint.

Preview of the impact of a re-map operation.

AssetMappingRemapRequest

Bases: BaseModel

Request model for POST /api/registry/asset-mappings/re-map endpoint.

Filter parameters for the re-map operation. At least one filter should be specified to avoid re-mapping all mappings.

AssetMappingRemapResponse

Bases: BaseModel

Response model for POST /api/registry/asset-mappings/re-map endpoint.

Result of the re-map operation.

AssetMappingResponse

Bases: BaseModel

Response model for asset mapping endpoints.

AssetMappingUpdate

Bases: BaseModel

Request model for updating asset mapping (partial update).

AssetQueryParams

Bases: BaseModel

Query parameters for GET /internal/assets endpoint.

AssetResponse

Bases: BaseModel

Response model for GET /internal/assets endpoint.

AvailableQuoteCurrenciesResponse

Bases: BaseModel

Response model for available quote currencies endpoint.

ClassSummaryItem

Bases: BaseModel

Single class summary item.

CommonSymbolItem

Bases: BaseModel

Single common symbol item with provider count.

CommonSymbolQueryParams

Bases: BaseModel

Query parameters for GET /api/registry/common-symbols endpoint.

CommonSymbolRenameRequest

Bases: BaseModel

Request model for renaming a common symbol.

CommonSymbolRenameResponse

Bases: BaseModel

Response model for common symbol rename endpoint.

CommonSymbolResponse

Bases: BaseModel

Response model for GET /api/registry/common-symbols endpoint.

ConfigSchemaResponse

Bases: BaseModel

Response model for GET /api/registry/config/schema endpoint.

CryptoPreferences

Bases: BaseModel

Crypto-specific trading preferences.

DataPreferences

Bases: BaseModel

Data preferences for historical providers.

DataPreferencesField

Bases: BaseModel

Data preferences for historical providers.

DeleteClassResponse

Bases: BaseModel

Response model for delete class endpoint.

FileUploadResponse

Bases: BaseModel

Response model for file upload endpoint.

HistoricalSchedulingPreferences

Bases: BaseModel

Scheduling preferences for historical data providers.

IndexConstituentSync

Bases: BaseModel

Single constituent from IndexProvider for sync endpoint.

IndexDetailResponse

Bases: BaseModel

Response model for GET /api/registry/indices/{name} endpoint.

IndexHistoryChange

Bases: BaseModel

Group of events that occurred on a single date.

IndexHistoryEvent

Bases: BaseModel

Single change event in index history.

IndexHistoryResponse

Bases: BaseModel

Response model for GET /api/registry/indices/{name}/history endpoint.

IndexItem

Bases: BaseModel

Single index item for list response.

IndexListResponse

Bases: BaseModel

Response model for GET /api/registry/indices endpoint (paginated).

IndexMemberItem

Bases: BaseModel

Single index member item.

IndexMemberQueryParams

Bases: BaseModel

Query parameters for GET /api/registry/indices/{name}/members endpoint.

IndexMembersResponse

Bases: BaseModel

Response model for GET /api/registry/indices/{name}/members endpoint (paginated).

IndexQueryParams

Bases: BaseModel

Query parameters for GET /api/registry/indices endpoint.

IndexSyncRequest

Bases: BaseModel

Request body for sync endpoint (from DataHub).

IndexSyncResponse

Bases: BaseModel

Response model for POST /api/registry/indices/{name}/sync endpoint.

LiveSchedulingPreferences

Bases: BaseModel

Scheduling preferences for live data providers.

ProviderPreferences

Bases: BaseModel

Provider configuration preferences.

ProviderPreferencesResponse

Bases: BaseModel

Response model for provider preferences endpoint.

ProviderPreferencesUpdate

Bases: BaseModel

Request model for updating provider preferences (partial update).

SchedulingPreferences

Bases: BaseModel

Unified scheduling preferences model for all provider types.

For historical providers: use delay_hours For live providers: use pre_close_seconds and post_close_seconds For index providers: use sync_frequency

SecretKeysResponse

Bases: BaseModel

Response model for GET /api/registry/config/secret-keys endpoint.

SecretsUpdateRequest

Bases: BaseModel

Request model for PATCH /api/registry/config/secrets endpoint.

SecretsUpdateResponse

Bases: BaseModel

Response model for PATCH /api/registry/config/secrets endpoint.

SuggestionItem

Bases: BaseModel

Single suggested mapping candidate.

SuggestionsResponse

Bases: BaseModel

Response payload for suggestions endpoint with cursor-based pagination.

Cursor pagination provides consistent, efficient paging through large result sets. Use next_cursor for subsequent requests instead of incrementing offset.

UpdateAssetsResponse

Bases: BaseModel

Response model for asset update endpoints.

UserIndexCreate

Bases: BaseModel

Request model for creating a UserIndex.

UserIndexMemberCreate

Bases: BaseModel

Single member for UserIndex.

UserIndexMembersUpdate

Bases: BaseModel

Request model for updating UserIndex members (full replacement).