Skip to content

Core Library: Providers

Base classes and provider implementations.

Provider base classes

core

Provider base classes and helpers for historical and live data.

DataProvider

DataProvider(
    context: DerivedContext, preferences: dict | None = None
)

Bases: ABC

Base class for all data provider types.

Initialize provider with rate limiting.

Parameters:

Name Type Description Default
context DerivedContext

Derived context containing provider secrets.

required
preferences dict | None

Optional configuration preferences from database.

None
Note

The HTTP session is created lazily in aenter to ensure it is initialized within an async context, avoiding aiohttp event loop issues.

Source code in quasar/lib/providers/core.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def __init__(self, context: DerivedContext, preferences: dict | None = None):
    """Initialize provider with rate limiting.

    Args:
        context (DerivedContext): Derived context containing provider secrets.
        preferences (dict | None): Optional configuration preferences from database.

    Note:
        The HTTP session is created lazily in __aenter__ to ensure it is
        initialized within an async context, avoiding aiohttp event loop issues.
    """
    calls, seconds = self.RATE_LIMIT or (float("inf"), 1)
    self._limiter = AsyncLimiter(calls, seconds)
    self._session: aiohttp.ClientSession | None = None
    self.context = context
    self.preferences = preferences or {}
    self._usage = asyncio.Semaphore(1000)  # Track concurrent usage

in_use property

in_use: bool

Check if provider is currently in use by any operation.

__aenter__ async

__aenter__()

Initialize session and return self for async context manager use.

Creates the aiohttp ClientSession within the async context to ensure proper event loop attachment.

Source code in quasar/lib/providers/core.py
240
241
242
243
244
245
246
247
248
249
250
async def __aenter__(self):   
    """Initialize session and return self for async context manager use.

    Creates the aiohttp ClientSession within the async context to ensure
    proper event loop attachment.
    """
    if self._session is None:
        self._session = aiohttp.ClientSession(
            connector=aiohttp.TCPConnector(limit=self.CONCURRENCY)
        )
    return self

__aexit__ async

__aexit__(*exc)

Close the HTTP session on context manager exit.

Source code in quasar/lib/providers/core.py
252
253
254
async def __aexit__(self, *exc):
    """Close the HTTP session on context manager exit."""
    await self.aclose()

aclose async

aclose()

Close the underlying HTTP session if it exists.

Source code in quasar/lib/providers/core.py
234
235
236
237
238
async def aclose(self):
    """Close the underlying HTTP session if it exists."""
    if self._session is not None:
        await self._session.close()
        self._session = None

fetch_available_symbols abstractmethod async

fetch_available_symbols() -> list[SymbolInfo]

Return all symbols available for trading on this provider.

Source code in quasar/lib/providers/core.py
203
204
205
@abstractmethod
async def fetch_available_symbols(self) -> list[SymbolInfo]:
    """Return all symbols available for trading on this provider."""

get_available_symbols async

get_available_symbols() -> list[SymbolInfo]

Return all symbols available for trading on this provider.

Source code in quasar/lib/providers/core.py
207
208
209
210
async def get_available_symbols(self) -> list[SymbolInfo]:
    """Return all symbols available for trading on this provider."""
    async with self._usage:
        return await self.fetch_available_symbols()

get_data async

get_data(reqs: Iterable[Req]) -> AsyncIterator[Bar]
get_data(
    interval: Interval, symbols: list[str]
) -> list[Bar]
get_data(*args: Any, **kwargs: Any) -> AsyncIterator[Bar]

Route to historical or live data based on provider type.

Parameters:

Name Type Description Default
*args Any

Positional arguments for the concrete provider.

()
**kwargs Any

Keyword arguments for the concrete provider.

{}

Yields:

Name Type Description
Bar AsyncIterator[Bar]

Bars produced by the provider implementation.

Source code in quasar/lib/providers/core.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
async def get_data(self, *args: Any, **kwargs: Any) -> AsyncIterator[Bar]:
    """Route to historical or live data based on provider type.

    Args:
        *args: Positional arguments for the concrete provider.
        **kwargs: Keyword arguments for the concrete provider.

    Yields:
        Bar: Bars produced by the provider implementation.
    """
    async with self._usage:  # Add wrapper
        if self.provider_type == ProviderType.HISTORICAL:
            async for bar in self.get_history_many(*args, **kwargs):
                yield bar
        elif self.provider_type == ProviderType.REALTIME:
            bars = await self.get_live(*args, **kwargs)
            for bar in bars:
                yield bar
        else:
            raise ValueError(f"Unsupported provider type: {self.provider_type}")

HistoricalDataProvider

HistoricalDataProvider(
    context: DerivedContext, preferences: dict | None = None
)

Bases: DataProvider

Base class for all historical data provider types.

Source code in quasar/lib/providers/core.py
283
284
def __init__(self, context: DerivedContext, preferences: dict | None = None):
    super().__init__(context, preferences)

get_history abstractmethod async

get_history(
    sym: str, start: date, end: date, interval: Interval
) -> AsyncIterator[Bar]

Return inclusive [start, end] bars ordered oldest→newest.

Source code in quasar/lib/providers/core.py
305
306
307
308
309
310
311
312
313
@abstractmethod
async def get_history(
    self,
    sym: str,
    start: date,
    end: date,
    interval: Interval,
) -> AsyncIterator[Bar]:
    """Return inclusive [start, end] bars ordered oldest→newest."""

get_history_many async

get_history_many(reqs: Iterable[Req]) -> AsyncIterator[Bar]

Yield bars for multiple requests using get_history by default.

Parameters:

Name Type Description Default
reqs Iterable[Req]

Requests to process.

required

Yields:

Name Type Description
Bar AsyncIterator[Bar]

Bars returned from get_history.

Source code in quasar/lib/providers/core.py
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
async def get_history_many(          # OPTIONAL override
    self,
    reqs: Iterable[Req],
) -> AsyncIterator[Bar]:
    """Yield bars for multiple requests using ``get_history`` by default.

    Args:
        reqs (Iterable[Req]): Requests to process.

    Yields:
        Bar: Bars returned from ``get_history``.
    """
    # default: loop over single‑symbol method
    for r in reqs:
        async for bar in self.get_history(
            r.sym, r.start, r.end, r.interval
        ):
            yield bar

IndexConstituent

Bases: TypedDict

Constituent of an index with optional weight and asset metadata.

Required fields

symbol: Provider's symbol for this constituent

Optional fields

weight: Weight in the index (None = equal weight) name: Human-readable name (e.g., "Bitcoin") asset_class: Asset classification (e.g., "crypto", "equity") matcher_symbol: Symbol used for matching (defaults to symbol if not provided) base_currency: Base currency for pairs (e.g., "BTC") quote_currency: Quote currency for pairs (e.g., "USD") exchange: Exchange or market identifier

IndexProvider

IndexProvider(
    context: DerivedContext, preferences: dict | None = None
)

Bases: DataProvider

Base class for index providers that return constituent lists.

Initialize index provider.

Parameters:

Name Type Description Default
context DerivedContext

Derived context containing provider secrets.

required
preferences dict | None

Optional configuration preferences from database.

None
Source code in quasar/lib/providers/core.py
465
466
467
468
469
470
471
472
def __init__(self, context: DerivedContext, preferences: dict | None = None):
    """Initialize index provider.

    Args:
        context (DerivedContext): Derived context containing provider secrets.
        preferences (dict | None): Optional configuration preferences from database.
    """
    super().__init__(context, preferences)

name abstractmethod property

name: str

Unique provider identifier (e.g., 'SP500Provider').

fetch_available_symbols async

fetch_available_symbols() -> list[SymbolInfo]

Not applicable for index providers.

Raises:

Type Description
NotImplementedError

IndexProvider does not support this method.

Source code in quasar/lib/providers/core.py
512
513
514
515
516
517
518
519
520
521
async def fetch_available_symbols(self) -> list[SymbolInfo]:
    """Not applicable for index providers.

    Raises:
        NotImplementedError: IndexProvider does not support this method.
    """
    raise NotImplementedError(
        "IndexProvider does not support fetch_available_symbols. "
        "Use get_constituents() instead."
    )

fetch_constituents abstractmethod async

fetch_constituents(
    as_of_date: date | None = None,
) -> list[IndexConstituent]

Return index constituents for the given date.

Parameters:

Name Type Description Default
as_of_date date | None

Date for which to fetch constituents. None means current/latest.

None

Returns:

Type Description
list[IndexConstituent]

List of constituents with provider symbols and optional weights.

Source code in quasar/lib/providers/core.py
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
@abstractmethod
async def fetch_constituents(
    self,
    as_of_date: date | None = None
) -> list[IndexConstituent]:
    """Return index constituents for the given date.

    Args:
        as_of_date: Date for which to fetch constituents.
                    None means current/latest.

    Returns:
        List of constituents with provider symbols and optional weights.
    """
    ...

get_constituents async

get_constituents(
    as_of_date: date | None = None,
) -> list[IndexConstituent]

Public method to get constituents with usage tracking.

Parameters:

Name Type Description Default
as_of_date date | None

Date for which to fetch constituents. None means current/latest.

None

Returns:

Type Description
list[IndexConstituent]

List of constituents with provider symbols and optional weights.

Source code in quasar/lib/providers/core.py
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
async def get_constituents(
    self,
    as_of_date: date | None = None
) -> list[IndexConstituent]:
    """Public method to get constituents with usage tracking.

    Args:
        as_of_date: Date for which to fetch constituents.
                    None means current/latest.

    Returns:
        List of constituents with provider symbols and optional weights.
    """
    async with self._usage:
        return await self.fetch_constituents(as_of_date)

LiveDataProvider

LiveDataProvider(
    context: DerivedContext, preferences: dict | None = None
)

Bases: DataProvider

Base class for all live data provider types.

Source code in quasar/lib/providers/core.py
341
342
def __init__(self, context: DerivedContext, preferences: dict | None = None):
    super().__init__(context, preferences)

close_buffer_seconds abstractmethod property

close_buffer_seconds: int

Number of seconds to keep listening for messages after bar close.

get_live async

get_live(
    interval: Interval,
    symbols: list[str],
    timeout: int = None,
) -> list[Bar]

Get live bars for the given symbols via websocket stream.

Source code in quasar/lib/providers/core.py
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
@async_timeout()
async def get_live(self, interval: Interval, symbols: list[str], timeout: int = None) -> list[Bar]:
    """Get live bars for the given symbols via websocket stream."""
    # Open Asynchronous WebSocket connection
    connection = await self._connect()
    async with connection as conn:
        # Subscribe to the Feed
        await self.subscribe(conn, interval, symbols)

        # Determine Cutoff Time
        bar_end = get_next_interval_timestamp(interval)
        cutoff = bar_end + timedelta(seconds=self.close_buffer_seconds)

        # Listen for messages
        symbol_bars: dict[str, Bar] = {}
        async for message in conn:
            # Check current time against cutoff
            now = datetime.now(timezone.utc)
            if now >= cutoff:
                break

            # Parse the message
            ebars = await self._parse_message(message)
            if (ebars is None) or ebars.__len__() == 0:
                continue

            for bar in ebars:
                if bar['ts'] > bar_end:
                    # If bar is older than cutoff, skip it
                    continue
                symbol_bars[bar['sym']] = bar


        # Unsubscribe from the feed
        await self.unsubscribe(conn, symbols)

    # Check if we got a bar for each symbol
    if set(symbol_bars.keys()) != set(symbols):
        missing_symbols = set(symbols) - set(symbol_bars.keys())
        logger.warning(f"Did not recieve bars for {missing_symbols.__len__()}: {missing_symbols}")

    # Return raw bars
    bars = list(symbol_bars.values())
    return bars

subscribe async

subscribe(
    connection: WebSocketClientConnection,
    interval: Interval,
    symbols: list[str],
)

Subscribe to the given symbols for the specified interval.

Parameters:

Name Type Description Default
connection WebSocketClientConnection

Active websocket connection.

required
interval Interval

Interval requested.

required
symbols list[str]

Symbols to subscribe.

required
Source code in quasar/lib/providers/core.py
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
async def subscribe(
        self,
        connection: websockets.asyncio.client.WebSocketClientConnection,
        interval: Interval,
        symbols: list[str]):
    """Subscribe to the given symbols for the specified interval.

    Args:
        connection (WebSocketClientConnection): Active websocket connection.
        interval (Interval): Interval requested.
        symbols (list[str]): Symbols to subscribe.
    """
    logger.info(f"Subscribing to {symbols.__len__()} on {self.name} WebSocket API")
    try:
        sus_msg = await self._subscribe(interval, symbols)
        await connection.send(json.dumps(sus_msg))
    except Exception as e:
        logger.error(f"Error subscribing to {symbols}: {e}")
        raise e 

unsubscribe async

unsubscribe(
    conn: WebSocketClientConnection, symbols: list[str]
)

Unsubscribe from the given symbols.

Source code in quasar/lib/providers/core.py
390
391
392
393
394
395
396
397
398
399
400
async def unsubscribe(
        self,
        conn: websockets.asyncio.client.WebSocketClientConnection,
        symbols: list[str]):
    """Unsubscribe from the given symbols."""
    logger.info(f"Unsubscribing from {symbols.__len__()} on {self.name} WebSocket API")
    try:
        unsub_msg = await self._unsubscribe(symbols)
        await conn.send(json.dumps(unsub_msg))
    except Exception as e:
        logger.error(f"Error unsubscribing from {symbols}: {e}")

async_timeout

async_timeout(
    seconds: int = 60,
) -> Callable[
    [Callable[..., Awaitable[T]]],
    Callable[..., Awaitable[T]],
]

Return an async decorator that enforces a timeout.

Parameters:

Name Type Description Default
seconds int

Default timeout in seconds.

60

Returns:

Name Type Description
Callable Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]

Decorator that wraps a coroutine with asyncio.wait_for.

Raises:

Type Description
Exception

Propagates any exception from the wrapped coroutine.

Source code in quasar/lib/providers/core.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def async_timeout(seconds: int = 60) -> Callable[[Callable[..., Awaitable[T]]], Callable[..., Awaitable[T]]]:
    """Return an async decorator that enforces a timeout.

    Args:
        seconds (int): Default timeout in seconds.

    Returns:
        Callable: Decorator that wraps a coroutine with ``asyncio.wait_for``.

    Raises:
        Exception: Propagates any exception from the wrapped coroutine.
    """
    def decorator(func: Callable[..., Awaitable[T]]) -> Callable[..., Awaitable[T]]:
        nonlocal seconds
        @functools.wraps(func)
        async def wrapper(*args: Any, **kwargs: Any) -> T:
            # Extract Timeout, if Provided
            nonlocal seconds
            seconds = kwargs.pop('timeout', seconds)
            try:
                return await asyncio.wait_for(func(*args, **kwargs), timeout=seconds)
            except Exception as e:
                class_name = args[0].__class__.__name__ if args else ""
                func_name = f"{class_name}.{func.__name__}" if class_name else "Unknown Function"
                logger.error(f"{func_name} timed out after {seconds} seconds. This may be due to a hung/orphaned APScheduler job.")
                raise e
        return wrapper
    return decorator

get_next_interval_timestamp

get_next_interval_timestamp(interval: Interval) -> datetime

Calculate the next even interval timestamp from current time

Source code in quasar/lib/providers/core.py
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
def get_next_interval_timestamp(interval: Interval) -> datetime:
    """Calculate the next even interval timestamp from current time"""
    now = datetime.now(timezone.utc)

    if interval == '1min':
        # Next minute
        return datetime(now.year, now.month, now.day, now.hour, 
                      now.minute + (0 if now.second == 0 else 1), 
                      0, 0, tzinfo=timezone.utc)

    elif interval == '5min':
        # Next 5 minute mark (00, 05, 10, ...)
        current_minute = now.minute
        next_5min = ((current_minute // 5) + 1) * 5
        if next_5min == 60:
            return (datetime(now.year, now.month, now.day, now.hour + 1, 
                           0, 0, 0, tzinfo=timezone.utc))
        return datetime(now.year, now.month, now.day, now.hour, 
                      next_5min, 0, 0, tzinfo=timezone.utc)

    elif interval == '15min':
        # Next 15 minute mark (00, 15, 30, 45)
        current_minute = now.minute
        next_15min = ((current_minute // 15) + 1) * 15
        if next_15min == 60:
            return (datetime(now.year, now.month, now.day, now.hour + 1, 
                           0, 0, 0, tzinfo=timezone.utc))
        return datetime(now.year, now.month, now.day, now.hour, 
                      next_15min, 0, 0, tzinfo=timezone.utc)

    elif interval == '30min':
        # Next 30 minute mark (00, 30)
        next_30min = 0 if now.minute >= 30 else 30
        hour_adjust = 1 if now.minute >= 30 else 0
        return datetime(now.year, now.month, now.day, now.hour + hour_adjust, 
                      next_30min, 0, 0, tzinfo=timezone.utc)

    elif interval == '1h':
        # Next hour
        return datetime(now.year, now.month, now.day, now.hour + 1, 
                      0, 0, 0, tzinfo=timezone.utc)

    elif interval == '4h':
        # Next 4 hour mark (00, 04, 08, 12, 16, 20)
        current_hour = now.hour
        next_4h = ((current_hour // 4) + 1) * 4
        day_adjust = 0
        if next_4h == 24:
            next_4h = 0
            day_adjust = 1
        return datetime(now.year, now.month, now.day + day_adjust, 
                      next_4h, 0, 0, 0, tzinfo=timezone.utc)

    elif interval == '1d':
        # Next day, midnight
        return datetime(now.year, now.month, now.day + 1, 
                      0, 0, 0, 0, tzinfo=timezone.utc)

    elif interval == '1w':
        # Next Monday
        days_to_monday = (7 - now.weekday()) % 7
        if days_to_monday == 0:
            days_to_monday = 7
        return datetime(now.year, now.month, now.day + days_to_monday, 
                      0, 0, 0, 0, tzinfo=timezone.utc)

    elif interval == '1M':
        # First day of next month
        if now.month == 12:
            return datetime(now.year + 1, 1, 1, 
                          0, 0, 0, 0, tzinfo=timezone.utc)
        else:
            return datetime(now.year, now.month + 1, 1, 
                          0, 0, 0, 0, tzinfo=timezone.utc)

    raise ValueError(f"Unsupported interval: {interval}")

EODHD provider

eodhd

Built-in historical data provider for EODHD.

EODHDProvider

EODHDProvider(
    context: DerivedContext, preferences: dict | None = None
)

Bases: HistoricalDataProvider

Source code in quasar/lib/providers/core.py
283
284
def __init__(self, context: DerivedContext, preferences: dict | None = None):
    super().__init__(context, preferences)

fetch_available_symbols async

fetch_available_symbols() -> list[SymbolInfo]

Return available symbols from EODHD filtered to supported exchanges.

Source code in quasar/lib/providers/examples/eodhd.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
async def fetch_available_symbols(self) -> list[SymbolInfo]:
    """Return available symbols from EODHD filtered to supported exchanges."""
    symbols = []

    # Pull Data from Exchanges of Interest
    exchanges = ['NASDAQ', 'NYSE', 'CC', 'FOREX']
    for exchange in exchanges:
        url = f"{BASE}/exchange-symbol-list/{exchange}?" \
            f"api_token={self.context.get('api_token')}&fmt=json"
        data = await self._api_get(url)
        symbols.extend(data)


    class_map = {
        'common stock': AssetClass.EQUITY.value,
        'fund': AssetClass.FUND.value,
        'etf': AssetClass.ETF.value,
        'bond': AssetClass.BOND.value,
        'currency': AssetClass.CURRENCY.value,
    }

    symbol_info = []
    for e in symbols:
        # Normalize Exchange Name to MIC (with graceful fallback)
        eodhd_exchange = e.get('Exchange', '')
        exchange = self.EXCHANGE_MAP.get(eodhd_exchange, eodhd_exchange)

        # Asset Class / API Suffix Information
        if eodhd_exchange.lower() == 'cc':
            asset_class = AssetClass.CRYPTO.value
            # EODHD API uses .CC for crypto symbols
            api_symbol_suffix = 'CC'
        elif eodhd_exchange.upper() == 'FOREX':
            asset_class = AssetClass.CURRENCY.value
            # EODHD API uses .FOREX for forex symbols
            api_symbol_suffix = 'FOREX'
        else:
            asset_class = class_map.get(e['Type'].lower())
            # EODHD API uses .US for all U.S. exchanges (NASDAQ, NYSE, etc.)
            if eodhd_exchange in ['NASDAQ', 'NYSE']:
                api_symbol_suffix = 'US'
            else:
                # For other exchanges, use the exchange name as-is
                api_symbol_suffix = eodhd_exchange

        if asset_class is None:
            continue

        # Currency Information
        matcher_symbol = e['Code'].split('.')[0]
        base_currency = 'USD'
        quote_currency = None
        currs = None
        if asset_class == AssetClass.CRYPTO.value:
            currs = e['Code'].split('-')
            matcher_symbol = currs[0]
        elif asset_class == AssetClass.CURRENCY.value:
            try:
                currs = [e['Code'][:3], e['Code'][3:]]
                assert len(currs[0]) == 3
                assert len(currs[1]) == 3
            except:
                continue
        if currs:
            if len(currs) == 2:
                base_currency = currs[0]
                quote_currency = currs[1]
                if quote_currency != 'USD':
                    # Skip non-USD pairs
                    continue
            else:
                continue

        syminfo = SymbolInfo(
            provider=self.name,
            provider_id=None,
            primary_id=None,  # Provider does not supply FIGI
            # Use API-compatible symbol format (e.g., AAPL.US for U.S. stocks)
            symbol=f"{e['Code']}.{api_symbol_suffix}",
            matcher_symbol=matcher_symbol,
            name=e['Name'],
            exchange=exchange,
            asset_class=asset_class,
            base_currency=base_currency,
            quote_currency=quote_currency,
            country=e['Country']
        )
        symbol_info.append(syminfo)

    return symbol_info

get_history async

get_history(
    sym: str, start: date, end: date, interval: Interval
)

Yield historical bars from EODHD for the given symbol and range.

Source code in quasar/lib/providers/examples/eodhd.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
async def get_history(
    self,
    sym: str,
    start: date,
    end: date,
    interval: Interval,
):
    """Yield historical bars from EODHD for the given symbol and range."""
    # Map Interval to EODHD API (supported subset: 1d,1w,1M)
    eodhd_interval_map = {
        Interval.I_1D: '1d',
        Interval.I_1W: '1w',
        Interval.I_1M: '1M'
    }
    eodhd_interval = eodhd_interval_map.get(interval)
    if eodhd_interval is None:
        raise ValueError(f"Unsupported interval: {interval}")

    # Create Request
    url = (
        f"{BASE}/eod/{sym}"
        f"?from={str(start)}"
        f"&to={str(end)}"
        f"&period={eodhd_interval}"
        f"&api_token={self.context.get('api_token')}&fmt=json"
    )

    # Make Request (Uses Rate Builtin Rate Limiter)
    data = await self._api_get(url)

    for e in data:
        # Format Date to UTC
        ts_date = datetime.strptime(e["date"], "%Y-%m-%d")
        ts = datetime(ts_date.year, ts_date.month, ts_date.day, tzinfo=timezone.utc)
        yield Bar(
            ts=ts,
            sym=sym,
            o=e["open"],
            h=e["high"],
            l=e["low"],
            c=e["close"],
            v=e["volume"],
        )

Kraken provider

kraken

Built-in live data provider for Kraken WebSocket OHLC data.

KrakenProvider

KrakenProvider(
    context: DerivedContext, preferences: dict | None = None
)

Bases: LiveDataProvider

Source code in quasar/lib/providers/examples/kraken.py
23
24
25
def __init__(self, context: DerivedContext, preferences: dict | None = None):
    super().__init__(context, preferences)
    self._url = "wss://ws.kraken.com/v2"

fetch_available_symbols async

fetch_available_symbols() -> list[SymbolInfo]

Return supported Kraken trading pairs denominated in USD or USDC.

Source code in quasar/lib/providers/examples/kraken.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
async def fetch_available_symbols(self) -> list[SymbolInfo]:
    """Return supported Kraken trading pairs denominated in USD or USDC."""
    base_url = f"https://api.kraken.com/0/public/AssetPairs"
    params = {
        'country_code': 'US:TX'
    }
    query_string = urllib.parse.urlencode(params)
    url = f"{base_url}?{query_string}"

    data = await self._api_get(url)
    result = data.get('result')
    if not result:
        raise ValueError("Error fetching asset pairs from Kraken")

    # These Values are the same for all symbols from Kraken
    asset_class = AssetClass.CRYPTO.value
    country = None
    exchange = None
    symbols = []
    for sym, e in result.items():
        if e['quote'] not in ['ZUSD', 'USDC']:
            continue
        quote_currency = 'USD' if e['quote'] == 'ZUSD' else 'USDC'
        base_currency = e['base']
        if not e.get('wsname') or not e.get('altname'):
            continue

        # print(f"{e['wsname']} - {e['altname']} -> {e['wsname'].split('/')[0]}")
        symbol = SymbolInfo(
            provider=self.name,
            provider_id=e['altname'],
            primary_id=None,  # Provider does not supply FIGI
            symbol=e['wsname'],
            matcher_symbol=e['wsname'].split('/')[0],
            name=sym,
            exchange=exchange,
            asset_class=asset_class,
            base_currency=base_currency,
            quote_currency=quote_currency,
            country=country
        )
        symbols.append(symbol)

    return symbols

Developer harness

Use quasar.lib.providers.devtools to exercise historical and live provider subclasses without bringing up the full stack.

  • Python API: run_historical(config) / run_live(config) / run_symbols(config) always validate outputs (strict by default).
  • CLI (bars): python -m quasar.lib.providers.devtools bars --config ./config.json --limit 100
  • CLI (symbols): python -m quasar.lib.providers.devtools symbols --config ./config.json
  • Config must include provider_type (historical or live) and provider.
  • Strict mode is on by default; disable with --no-strict (CLI) or strict=False (API).
  • Example configs and stub providers live under quasar/lib/providers/devtools/examples/ and quasar.lib.providers.devtools.stubs.

Minimal examples:

python -m quasar.lib.providers.devtools bars \
  --config quasar/lib/providers/devtools/examples/historical_stub.json

Fetch symbols:

python -m quasar.lib.providers.devtools symbols \
  --config quasar/lib/providers/devtools/examples/historical_stub.json
from quasar.lib.providers.devtools import run_live

config = {
    "provider_type": "live",
    "provider": "quasar.lib.providers.devtools.stubs:LiveStub",
    "interval": "1min",
    "symbols": ["AAA", "BBB"],
    "secrets": {},
}

bars = run_live(config)