Skip to content

Services: DataHub

app

Entrypoint script to start the DataHub service.

main async

main()

Bootstrap the DataHub service and block until shutdown.

Source code in quasar/services/datahub/app.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
async def main():
    """Bootstrap the DataHub service and block until shutdown."""
    # Load Environment
    dsn   = os.environ["DSN"]                      # provided by compose
    mode  = os.getenv("QUASAR_SECRET_MODE", "auto")

    # Create Secret Store
    store = SecretStore(mode=mode)

    # Create DataHub
    hub = DataHub(secret_store=store, dsn=dsn)
    await hub.start()

    logging.info("DataHub started → DSN=%s", dsn)
    try:
        await asyncio.Event().wait()              # keep running
    finally:
        await hub.stop()
        logging.info("DataHub stopped")

core

DataHub service core: scheduler, provider loading, and API handlers.

DataHub

DataHub(
    *,
    secret_store: SecretStore,
    dsn: str | None = None,
    pool: Optional[Pool] = None,
    refresh_seconds: int = 30,
    api_host: str = "0.0.0.0",
    api_port: int = 8080
)

Bases: ProviderHandlersMixin, CollectionHandlersMixin, DataExplorerHandlersMixin, DatabaseHandler, APIHandler

Schedule data provider jobs, storage, and the DataHub API.

Create a DataHub instance.

Parameters:

Name Type Description Default
secret_store SecretStore

Provider secret loader.

required
dsn str | None

Database DSN when creating the pool internally.

None
pool Pool | None

Reusable pool if managed externally.

None
refresh_seconds int

Interval to refresh provider subscriptions.

30
api_host str

Host interface for the internal API.

'0.0.0.0'
api_port int

Port number for the internal API.

8080
Source code in quasar/services/datahub/core.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def __init__(
        self, *,
        secret_store: SecretStore,
        dsn: str | None = None,
        pool: Optional[asyncpg.Pool] = None,
        refresh_seconds: int = 30,
        api_host: str = '0.0.0.0',
        api_port: int = 8080): 
    """Create a DataHub instance.

    Args:
        secret_store (SecretStore): Provider secret loader.
        dsn (str | None): Database DSN when creating the pool internally.
        pool (asyncpg.Pool | None): Reusable pool if managed externally.
        refresh_seconds (int): Interval to refresh provider subscriptions.
        api_host (str): Host interface for the internal API.
        api_port (int): Port number for the internal API.
    """

    # Initialize parent class
    DatabaseHandler.__init__(self, dsn=dsn, pool=pool)
    APIHandler.__init__(self, api_host=api_host, api_port=api_port) 

    # Secret Store
    self.secret_store = secret_store

    # Store DataProvider objects
    self._providers: dict[str, HistoricalDataProvider] = {}

    # Store provider preferences for runtime access
    self._provider_preferences: dict[str, dict | None] = {}

    # Job Key Tracking
    self.job_keys: set[str] = set()
    self.index_sync_job_keys: set[str] = set()

    # Provider Subscription Refreshing
    logger.debug("Creating async DataHub scheduler.")
    self._sched = AsyncIOScheduler(timezone='UTC')
    self._stop_scheduler()
    self._refresh_seconds = refresh_seconds

start async

start()

Start database pool, refresh subscriptions, and run the API server.

Source code in quasar/services/datahub/core.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
async def start(self):
    """Start database pool, refresh subscriptions, and run the API server."""

    # Start Database Pool
    await self.init_pool()
    await self._run_enum_guard()

    # Stop A Previous Scheduler if it is running
    self._stop_scheduler()

    # Refresh DataProvider Subscriptions, then schedule them to run
    await self.refresh_subscriptions()
    self._sched.add_job(
        self.refresh_subscriptions,
        trigger=IntervalTrigger(seconds=self._refresh_seconds),
        id='subscription_refresh',
        replace_existing=True,
    )

    # Initialize IndexProvider sync jobs
    await self.refresh_index_sync_jobs()

    self._sched.start()
    logger.info("DataHub started, subscription refresh interval: %ss", self._refresh_seconds)

    # Start Internal API Server
    await self.start_api_server()

stop async

stop()

Stop API server, scheduler, and close database pool.

Source code in quasar/services/datahub/core.py
174
175
176
177
178
179
180
181
182
183
184
185
186
async def stop(self):
    """Stop API server, scheduler, and close database pool."""
    logger.info("DataHub shutting down.")

    # Stop Internal API Server
    await self.stop_api_server()

    # Stop Scheduler
    if self._sched and self._sched.state == STATE_RUNNING:
        self._sched.shutdown(wait=False)

    # Close Database Pool
    await self.close_pool()

schemas

DataHub-specific Pydantic schemas for API request/response models.

AssetInfo

Bases: BaseModel

Asset metadata information.

AvailableSymbolsResponse

Bases: BaseModel

Response wrapper for available symbols endpoint.

ConstituentsResponse

Bases: BaseModel

Response wrapper for index constituents endpoint.

DataTypeInfo

Bases: BaseModel

Information about data availability for a specific data type.

IndexSyncRefreshResponse

Bases: BaseModel

Response model for index sync refresh endpoint.

OHLCBar

Bases: BaseModel

Single OHLC bar data.

OHLCDataResponse

Bases: BaseModel

Response model for OHLC data retrieval endpoint.

OtherProvider

Bases: BaseModel

Information about another provider for the same common symbol.

ProviderUnloadResponse

Bases: BaseModel

Response model for provider unload endpoint.

ProviderValidateRequest

Bases: BaseModel

Request model for provider validation endpoint.

ProviderValidateResponse

Bases: BaseModel

Response model for provider validation endpoint.

SymbolMetadataResponse

Bases: BaseModel

Response model for symbol metadata endpoint.

SymbolSearchItem

Bases: BaseModel

Single symbol search result.

SymbolSearchResponse

Bases: BaseModel

Response model for symbol search endpoint.