diff --git a/automation/jobs.yaml b/automation/jobs.yaml index aafaff12..892b6b1b 100644 --- a/automation/jobs.yaml +++ b/automation/jobs.yaml @@ -37,6 +37,7 @@ profiles: - { name: "usdai", script: protocols/usdai/main.py } - { name: "usdai-large-mints", script: protocols/usdai/large_mints.py } - { name: "stables-dune-large-transfers", script: protocols/stables/dune_large_transfers.py } + - { name: "stables-oracles", script: protocols/stables/oracles.py } - { name: "yearn-alert-large-flows", script: protocols/yearn/alert_large_flows.py } # Cache: tks-trigger-cache.json under $CACHE_DIR (check_stuck_triggers.DEFAULT_CACHE_FILE). - { name: "yearn-check-stuck-triggers", script: protocols/yearn/check_stuck_triggers.py, enabled: false } diff --git a/protocols/ustb/abi/ChainlinkAggregator.json b/common-abi/ChainlinkAggregator.json similarity index 100% rename from protocols/ustb/abi/ChainlinkAggregator.json rename to common-abi/ChainlinkAggregator.json diff --git a/protocols/lrt-pegs/README.md b/protocols/lrt-pegs/README.md index 65449453..f7748b49 100644 --- a/protocols/lrt-pegs/README.md +++ b/protocols/lrt-pegs/README.md @@ -10,8 +10,8 @@ Curve pools that are checked are: - ETH+/WETH - weETH-WETH -- frxETH-WETH - OETH/ETH +- stETH/ETH (Lido) β€” canonical wstETH-vs-ETH depeg gauge; wstETH deterministically wraps stETH ### Uniswap V3 pools - DISABLED ⚠️ diff --git a/protocols/lrt-pegs/curve/main.py b/protocols/lrt-pegs/curve/main.py index 0f5d5070..1a817c85 100644 --- a/protocols/lrt-pegs/curve/main.py +++ b/protocols/lrt-pegs/curve/main.py @@ -18,28 +18,34 @@ ("OETH/ETH Curve Pool", "0xcc7d5785AD5755B6164e21495E07aDb0Ff11C2A8", 0, 1, THRESHOLD_RATIO, "origin"), # NOTE: bool is unbalanced, whole liquidity is moved to univ3: https://app.uniswap.org/explore/pools/ethereum/0x202a6012894ae5c288ea824cbc8a9bfb26a49b93 ("weETH-WETH Curve Pool", "0xDB74dfDD3BB46bE8Ce6C33dC9D82777BCFc3dEd5", 1, 0, THRESHOLD_RATIO, "weeth"), + # Lido stETH/ETH β€” deepest stETH<>ETH venue and the canonical wstETH depeg + # gauge (wstETH deterministically wraps stETH). Legacy pool: exposes + # balances(i) but not get_balances(). idx 0 = ETH, idx 1 = stETH. + ("stETH/ETH Curve Pool", "0xDC24316b9AE028F1497c275EB9192a3Ea0f67022", 1, 0, THRESHOLD_RATIO, "wsteth"), ] def process_pools(chain: Chain = Chain.MAINNET): client = ChainManager.get_client(chain) - contracts = [] - # Prepare batch calls + # Read each pool's two relevant coin balances. Using ``balances(i)`` (instead + # of ``get_balances()``) keeps a single code path for both modern pools and + # legacy pools like Lido stETH/ETH that don't expose ``get_balances()``. with client.batch_requests() as batch: - for _, pool_address, _, _, _, _ in POOL_CONFIGS: + for _, pool_address, idx_lrt, idx_other_token, _, _ in POOL_CONFIGS: pool = client.eth.contract(address=pool_address, abi=ABI_CURVE_POOL) - contracts.append(pool) - - batch.add(pool.functions.get_balances()) + batch.add(pool.functions.balances(idx_lrt)) + batch.add(pool.functions.balances(idx_other_token)) responses = client.execute_batch(batch) - if len(responses) != len(POOL_CONFIGS): - raise ValueError(f"Expected {len(POOL_CONFIGS)} responses from batch, got: {len(responses)}") + if len(responses) != len(POOL_CONFIGS) * 2: + raise ValueError(f"Expected {len(POOL_CONFIGS) * 2} responses from batch, got: {len(responses)}") # Process results - for (pool_name, _, idx_lrt, idx_other_token, peg_threshold, protocol), balances in zip(POOL_CONFIGS, responses): - percentage = (balances[idx_lrt] / (balances[idx_lrt] + balances[idx_other_token])) * 100 + for i, (pool_name, _, _, _, peg_threshold, protocol) in enumerate(POOL_CONFIGS): + lrt_balance = responses[i * 2] + other_balance = responses[i * 2 + 1] + percentage = (lrt_balance / (lrt_balance + other_balance)) * 100 logger.info("%s ratio is %s%%", pool_name, f"{percentage:.2f}") if percentage > peg_threshold: message = f"🚨 Curve Alert! {pool_name} ratio is {percentage:.2f}%" diff --git a/protocols/stables/oracles.py b/protocols/stables/oracles.py new file mode 100644 index 00000000..f58dea11 --- /dev/null +++ b/protocols/stables/oracles.py @@ -0,0 +1,380 @@ +"""Layer 2 peg monitoring β€” on-chain oracle health for pegged assets (hourly). + +Where ``protocols/stables/main.py`` watches *market* price (DeFiLlama), this watches +the **on-chain oracles our lending markets actually liquidate on**. Driven by the +shared :data:`PeggedAsset` registry, for every Chainlink-backed asset it checks: + +* **staleness** β€” ``now βˆ’ updatedAt > heartbeat + buffer``; +* **round sanity / monotonicity** β€” positive answer, completed round, + ``answeredInRound β‰₯ roundId``, and a non-decreasing ``roundId`` vs the cached run; +* **deviation from peg** β€” oracle price vs the asset's :class:`PegTarget`; +* **oracle ↔ market divergence** β€” Chainlink vs DeFiLlama, the actual + liquidation-risk signal (markets liquidate on the oracle, not market price). + +Staleness and round-sanity run only for feeds that report reliable round metadata +(``ChainlinkFeed.reports_round_metadata``); feeds that return constant or zero +``roundId`` / ``updatedAt`` are skipped for those two checks to avoid false positives. + +For **rate / fundamental oracles** (vault-rate, capped Redstone feeds) it checks +monotonicity + delta-vs-cached (the ``protocols/apyusd/main.py`` approach); any +fundamental-oracle depeg is ``CRITICAL`` (per #196). + +Runs hourly via ``automation/jobs.yaml``. +""" + +from dataclasses import dataclass +from decimal import Decimal + +from utils.alert import Alert, AlertSeverity, send_alert +from utils.cache import cache_filename, get_last_value_for_key_from_file, write_last_value_to_file +from utils.chainlink import FeedReading, RoundData, read_feeds +from utils.chains import Chain +from utils.config import Config +from utils.defillama import fetch_prices +from utils.logger import get_logger +from utils.pegged_assets import PEGGED_ASSETS, PeggedAsset, PegTarget, price_deviation, resolve_peg_prices +from utils.web3_wrapper import ChainManager, Web3Client + +PROTOCOL = "pegs" +logger = get_logger("stables-oracles") + +# Tunables (env-overridable). +STALENESS_BUFFER = Config.get_env_int("PEG_ORACLE_STALENESS_BUFFER", 600) # 10 min grace on heartbeat +DIVERGENCE_THRESHOLD = Decimal(str(Config.get_env_float("PEG_ORACLE_DIVERGENCE_THRESHOLD", 0.01))) # 1% +RATE_DELTA_THRESHOLD = Decimal(str(Config.get_env_float("PEG_ORACLE_RATE_DELTA_THRESHOLD", 0.05))) # 5% + +CACHE_FILE = cache_filename + + +def _round_cache_key(address: str) -> str: + return f"peg_oracle_round_{address.lower()}" + + +def _rate_cache_key(address: str) -> str: + return f"peg_oracle_rate_{address.lower()}" + + +# --------------------------------------------------------------------------- +# Observation + pure per-check helpers (unit tested without a chain) +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class OracleObservation: + """Everything needed to evaluate one Chainlink-backed asset for one run.""" + + asset: PeggedAsset + reading: FeedReading + peg_price_usd: Decimal # USD price of the asset's peg target + quote_price_usd: Decimal # USD price of the feed's quote unit + now: int + market_price_usd: Decimal | None = None # DeFiLlama; None when unavailable + prev_round_id: int | None = None # cached roundId from the previous run + + @property + def oracle_price_usd(self) -> Decimal: + """Chainlink answer expressed in USD (answer Γ— quote price).""" + return self.reading.price * self.quote_price_usd + + +def _round_issues(round_data: RoundData) -> list[str]: + """Collect Chainlink round sanity-check failures. + + Checks the invariants a consumer is expected to enforce: a positive answer, an + initialised round, and an ``answeredInRound`` not behind ``roundId`` (a stale + answer carried over from an earlier round). Only meaningful for feeds that + report real round metadata (see ``ChainlinkFeed.reports_round_metadata``). + + Returns: + Human-readable problem descriptions; empty when healthy. + """ + issues: list[str] = [] + if round_data.answer <= 0: + issues.append(f"non-positive answer ({round_data.answer})") + if round_data.updated_at <= 0: + issues.append("round not complete (updatedAt is 0)") + if round_data.answered_in_round < round_data.round_id: + issues.append(f"stale round (answeredInRound {round_data.answered_in_round} < roundId {round_data.round_id})") + return issues + + +def _is_round_healthy(round_data: RoundData) -> bool: + """Return ``True`` if the round passes all checks in :func:`_round_issues`.""" + return not _round_issues(round_data) + + +def check_staleness(obs: OracleObservation, buffer: int = STALENESS_BUFFER) -> Alert | None: + """Alert (HIGH) if the feed has not updated within heartbeat + buffer.""" + feed = obs.asset.chainlink_feed + assert feed is not None + updated_at = obs.reading.round_data.updated_at + age = obs.now - updated_at + if updated_at <= 0 or age > feed.heartbeat + buffer: + return Alert( + AlertSeverity.HIGH, + f"*{obs.asset.name} oracle stale* ({feed.description})\n" + f"Age: {age}s β€” heartbeat {feed.heartbeat}s + buffer {buffer}s\n" + f"updatedAt: {updated_at}\n" + f"Feed: `{feed.address}`", + obs.asset.protocol, + channel=obs.asset.channel, + ) + return None + + +def check_round_health(obs: OracleObservation) -> Alert | None: + """Alert if round sanity checks fail or ``roundId`` moved backwards. + + A non-positive answer, an incomplete round, or a backwards ``roundId`` is a + feed malfunction (``CRITICAL``); a stale ``answeredInRound`` is ``HIGH``. + """ + feed = obs.asset.chainlink_feed + assert feed is not None + round_data = obs.reading.round_data + issues = _round_issues(round_data) + + if obs.prev_round_id is not None and round_data.round_id < obs.prev_round_id: + issues.append(f"roundId went backwards ({obs.prev_round_id} -> {round_data.round_id})") + + if not issues: + return None + + # Anything beyond a lagging answeredInRound means the feed is broken. + critical = any("answeredInRound" not in issue for issue in issues) + severity = AlertSeverity.CRITICAL if critical else AlertSeverity.HIGH + return Alert( + severity, + f"*{obs.asset.name} oracle round unhealthy* ({feed.description})\n" + + "\n".join(f"- {issue}" for issue in issues) + + f"\nFeed: `{feed.address}`", + obs.asset.protocol, + channel=obs.asset.channel, + ) + + +def check_peg_deviation(obs: OracleObservation) -> Alert | None: + """Alert (HIGH) if the oracle price deviates from the peg beyond ``depeg_pct``.""" + if obs.peg_price_usd <= 0: + return None + if not obs.asset.is_depegged(obs.oracle_price_usd, obs.peg_price_usd): + return None + dev = obs.asset.deviation(obs.oracle_price_usd, obs.peg_price_usd) + return Alert( + AlertSeverity.HIGH, + f"*{obs.asset.name} oracle off peg* ({obs.asset.peg.value})\n" + f"Oracle: ${obs.oracle_price_usd:.6f}\n" + f"Peg: ${obs.peg_price_usd:.6f}\n" + f"Deviation: {dev:+.2%} (tolerance {obs.asset.depeg_pct:.2%})", + obs.asset.protocol, + channel=obs.asset.channel, + ) + + +def check_market_divergence(obs: OracleObservation, threshold: Decimal = DIVERGENCE_THRESHOLD) -> Alert | None: + """Alert (HIGH) if the oracle and DeFiLlama market price diverge beyond ``threshold``.""" + if obs.market_price_usd is None or obs.market_price_usd <= 0: + return None + dev = price_deviation(obs.oracle_price_usd, obs.market_price_usd) + if abs(dev) < threshold: + return None + return Alert( + AlertSeverity.HIGH, + f"*{obs.asset.name} oracle ↔ market divergence*\n" + f"Oracle: ${obs.oracle_price_usd:.6f}\n" + f"Market (DeFiLlama): ${obs.market_price_usd:.6f}\n" + f"Divergence: {dev:+.2%} (threshold {threshold:.2%})", + obs.asset.protocol, + channel=obs.asset.channel, + ) + + +def evaluate_chainlink_asset( + obs: OracleObservation, + *, + buffer: int = STALENESS_BUFFER, + divergence_threshold: Decimal = DIVERGENCE_THRESHOLD, +) -> list[Alert]: + """Run all Chainlink-asset checks, returning the alerts that fired. + + Staleness and round-health rely on the feed reporting real ``roundId`` / + ``updatedAt``; feeds flagged ``reports_round_metadata=False`` (constant or zero + round/time values) skip those two checks to avoid false positives. Peg-deviation + and oracle↔market divergence always run. + """ + feed = obs.asset.chainlink_feed + assert feed is not None + candidates: list[Alert | None] = [] + if feed.reports_round_metadata: + candidates.append(check_staleness(obs, buffer)) + candidates.append(check_round_health(obs)) + candidates.append(check_peg_deviation(obs)) + candidates.append(check_market_divergence(obs, divergence_threshold)) + return [alert for alert in candidates if alert is not None] + + +def next_cached_round(prev_round_id: int | None, round_data: RoundData) -> int: + """High-water-mark ``roundId`` to persist so a regression never poisons the cache. + + The cached ``roundId`` is the baseline the next run compares against for + monotonicity. Writing a lower or malfunctioning round would make the + regression the new baseline, so it is flagged only once and a feed stuck at + (or crawling below) the regressed round looks "monotonic" forever. Only a + healthy, non-decreasing round advances the mark. + """ + if not _is_round_healthy(round_data): + return prev_round_id or 0 + if prev_round_id is None: + return round_data.round_id + return max(prev_round_id, round_data.round_id) + + +def check_rate_oracle( + asset: PeggedAsset, + current_rate: int, + prev_rate: int | None, + threshold: Decimal = RATE_DELTA_THRESHOLD, +) -> list[Alert]: + """Monotonicity + delta-vs-cached checks for a fundamental / rate oracle. + + A decrease in a monotonic (capped) oracle is a fundamental depeg + (``CRITICAL``); any delta beyond ``threshold`` is ``HIGH``. + """ + oracle = asset.rate_oracle + assert oracle is not None + if prev_rate is None or prev_rate <= 0: + return [] + + alerts: list[Alert] = [] + delta = Decimal(current_rate - prev_rate) / Decimal(prev_rate) + + if oracle.monotonic and current_rate < prev_rate: + alerts.append( + Alert( + AlertSeverity.CRITICAL, + f"*{asset.name} fundamental oracle DECREASED* (monotonic/capped)\n" + f"Previous: {prev_rate}\nCurrent: {current_rate}\nDelta: {delta:+.4%}\n" + f"Oracle: `{oracle.address}`", + asset.protocol, + channel=asset.channel, + ) + ) + elif abs(delta) >= threshold: + alerts.append( + Alert( + AlertSeverity.HIGH, + f"*{asset.name} fundamental oracle delta* {delta:+.4%} (threshold {threshold:.2%})\n" + f"Previous: {prev_rate}\nCurrent: {current_rate}\nOracle: `{oracle.address}`", + asset.protocol, + channel=asset.channel, + ) + ) + return alerts + + +# --------------------------------------------------------------------------- +# Orchestration +# --------------------------------------------------------------------------- + + +def _build_rate_oracle_abi(function: str) -> list[dict]: + return [ + { + "inputs": [], + "name": function, + "outputs": [{"internalType": "uint256", "name": "", "type": "uint256"}], + "stateMutability": "view", + "type": "function", + } + ] + + +def _monitor_chainlink_assets(client: Web3Client) -> None: + """Check every registry asset that has a Chainlink feed.""" + assets = [a for a in PEGGED_ASSETS if a.chainlink_feed is not None] + if not assets: + return + + needed_targets: set[PegTarget] = set() + for asset in assets: + assert asset.chainlink_feed is not None + needed_targets.add(asset.peg) + needed_targets.add(asset.chainlink_feed.quote) + peg_prices = resolve_peg_prices(needed_targets) + + market_prices = fetch_prices([a.defillama_key for a in assets]) + readings = read_feeds(client, [a.chainlink_feed.address for a in assets]) # type: ignore[union-attr] + now = int(client.eth.get_block("latest")["timestamp"]) + + for asset in assets: + feed = asset.chainlink_feed + assert feed is not None + reading = readings[feed.address] + + prev_round_raw = get_last_value_for_key_from_file(CACHE_FILE, _round_cache_key(feed.address)) + try: + prev_round_id: int | None = int(str(prev_round_raw)) if str(prev_round_raw) != "0" else None + except ValueError: + prev_round_id = None + + obs = OracleObservation( + asset=asset, + reading=reading, + peg_price_usd=peg_prices[asset.peg], + quote_price_usd=peg_prices[feed.quote], + now=now, + market_price_usd=market_prices.get(asset.defillama_key), + prev_round_id=prev_round_id, + ) + + alerts = evaluate_chainlink_asset(obs) + logger.info( + "%s oracle: $%.6f (peg $%.6f, market %s) β€” %d alert(s)", + asset.name, + obs.oracle_price_usd, + obs.peg_price_usd, + f"${obs.market_price_usd:.6f}" if obs.market_price_usd is not None else "n/a", + len(alerts), + ) + for alert in alerts: + send_alert(alert) + + write_last_value_to_file( + CACHE_FILE, _round_cache_key(feed.address), next_cached_round(prev_round_id, reading.round_data) + ) + + +def _monitor_rate_oracles(client: Web3Client) -> None: + """Check every registry asset that has a fundamental / rate oracle.""" + assets = [a for a in PEGGED_ASSETS if a.rate_oracle is not None] + for asset in assets: + oracle = asset.rate_oracle + assert oracle is not None + contract = client.get_contract(oracle.address, _build_rate_oracle_abi(oracle.function)) + current_rate = int(contract.functions[oracle.function]().call()) + + prev_raw = get_last_value_for_key_from_file(CACHE_FILE, _rate_cache_key(oracle.address)) + try: + prev_rate: int | None = int(str(prev_raw)) if str(prev_raw) != "0" else None + except ValueError: + prev_rate = None + + alerts = check_rate_oracle(asset, current_rate, prev_rate) + logger.info("%s rate oracle %s: %d (%d alert(s))", asset.name, oracle.address, current_rate, len(alerts)) + for alert in alerts: + send_alert(alert) + + write_last_value_to_file(CACHE_FILE, _rate_cache_key(oracle.address), current_rate) + + +def main() -> None: + """Run all L2 oracle-health checks driven by the pegged-asset registry.""" + client = ChainManager.get_client(Chain.MAINNET) + _monitor_chainlink_assets(client) + _monitor_rate_oracles(client) + logger.info("L2 oracle health check complete") + + +if __name__ == "__main__": + from utils.runner import run_with_alert + + run_with_alert(main, PROTOCOL) diff --git a/protocols/ustb/main.py b/protocols/ustb/main.py index 40bb0bf7..e068027d 100644 --- a/protocols/ustb/main.py +++ b/protocols/ustb/main.py @@ -48,7 +48,7 @@ # ABIs # --------------------------------------------------------------------------- ABI_ORACLE = load_abi("protocols/ustb/abi/SuperstateOracle.json") -ABI_CHAINLINK = load_abi("protocols/ustb/abi/ChainlinkAggregator.json") +ABI_CHAINLINK = load_abi("common-abi/ChainlinkAggregator.json") ABI_ERC20 = load_abi("common-abi/ERC20.json") USTB_DECIMALS = 6 diff --git a/tests/test_chainlink.py b/tests/test_chainlink.py new file mode 100644 index 00000000..348b7836 --- /dev/null +++ b/tests/test_chainlink.py @@ -0,0 +1,55 @@ +import unittest +from decimal import Decimal + +from utils.chainlink import ( + FeedReading, + RoundData, + scale_price, +) + + +def _round( + round_id: int = 10, + answer: int = 100_000_000, + started_at: int = 1_000, + updated_at: int = 1_000, + answered_in_round: int = 10, +) -> RoundData: + return RoundData(round_id, answer, started_at, updated_at, answered_in_round) + + +class TestScalePrice(unittest.TestCase): + def test_scales_by_decimals(self): + self.assertEqual(scale_price(100_000_000, 8), Decimal("1")) + + def test_scales_fractional(self): + self.assertEqual(scale_price(99_960_043, 8), Decimal("0.99960043")) + + def test_zero_decimals_raises(self): + with self.assertRaises(ValueError): + scale_price(1, 0) + + def test_negative_decimals_raises(self): + with self.assertRaises(ValueError): + scale_price(1, -1) + + +class TestRoundData(unittest.TestCase): + def test_from_tuple_decodes_fields(self): + rd = RoundData.from_tuple((10, 99_851_375, 900, 1_000, 10)) + self.assertEqual(rd.round_id, 10) + self.assertEqual(rd.answer, 99_851_375) + self.assertEqual(rd.updated_at, 1_000) + self.assertEqual(rd.answered_in_round, 10) + + def test_from_tuple_wrong_length_raises(self): + with self.assertRaises(ValueError): + RoundData.from_tuple((1, 2, 3)) + + def test_feed_reading_price_uses_decimals(self): + reading = FeedReading(address="0xabc", round_data=_round(answer=605_044_986_7456), decimals=8) + self.assertEqual(reading.price, scale_price(605_044_986_7456, 8)) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_pegged_assets.py b/tests/test_pegged_assets.py new file mode 100644 index 00000000..18eb7fc8 --- /dev/null +++ b/tests/test_pegged_assets.py @@ -0,0 +1,128 @@ +import unittest +from decimal import Decimal +from unittest.mock import patch + +from utils.pegged_assets import ( + BTC_USD_DEFILLAMA_KEY, + PEGGED_ASSETS, + PEGGED_ASSETS_BY_NAME, + PegTarget, + get_asset, + price_deviation, + resolve_peg_prices, +) + +# Asset set the registry must cover per the issue acceptance criteria. +REQUIRED_ASSETS = {"WBTC", "cbBTC", "LBTC", "iUSD", "cUSD", "USDe", "USDC", "USDT", "USDS"} + + +class TestPriceDeviation(unittest.TestCase): + def test_no_deviation(self): + self.assertEqual(price_deviation(Decimal("1"), Decimal("1")), Decimal("0")) + + def test_positive_deviation(self): + self.assertEqual(price_deviation(Decimal("1.05"), Decimal("1")), Decimal("0.05")) + + def test_negative_deviation(self): + self.assertEqual(price_deviation(Decimal("0.97"), Decimal("1")), Decimal("-0.03")) + + def test_btc_denominated_deviation(self): + # asset at 60,300 vs 60,000 BTC peg -> +0.5% + self.assertEqual(price_deviation(Decimal("60300"), Decimal("60000")), Decimal("0.005")) + + def test_zero_peg_raises(self): + with self.assertRaises(ValueError): + price_deviation(Decimal("1"), Decimal("0")) + + +class TestIsDepegged(unittest.TestCase): + def test_within_tolerance_is_not_depegged(self): + usdc = get_asset("USDC") # depeg_pct = 0.02 + self.assertFalse(usdc.is_depegged(Decimal("0.99"), Decimal("1"))) + + def test_beyond_tolerance_is_depegged(self): + usdc = get_asset("USDC") + self.assertTrue(usdc.is_depegged(Decimal("0.97"), Decimal("1"))) + + def test_at_threshold_is_depegged(self): + usdc = get_asset("USDC") + self.assertTrue(usdc.is_depegged(Decimal("1.02"), Decimal("1"))) + + def test_btc_asset_uses_peg_price(self): + cbbtc = get_asset("cbBTC") # depeg_pct = 0.02, peg = BTC + self.assertFalse(cbbtc.is_depegged(Decimal("60500"), Decimal("60000"))) + self.assertTrue(cbbtc.is_depegged(Decimal("58000"), Decimal("60000"))) + + def test_downside_only_ignores_upside(self): + lbtc = get_asset("LBTC") # depeg_pct = 0.03, downside_only + # +5% above peg (LBTC legitimately trades above 1 BTC) -> not a depeg. + self.assertFalse(lbtc.is_depegged(Decimal("63000"), Decimal("60000"))) + # -5% below peg -> depeg. + self.assertTrue(lbtc.is_depegged(Decimal("57000"), Decimal("60000"))) + # exactly at the downside threshold -> depeg. + self.assertTrue(lbtc.is_depegged(Decimal("58200"), Decimal("60000"))) + + def test_symmetric_asset_flags_upside(self): + usdc = get_asset("USDC") # depeg_pct = 0.02, symmetric + self.assertTrue(usdc.is_depegged(Decimal("1.05"), Decimal("1"))) + + +class TestResolvePegPrices(unittest.TestCase): + def test_usd_only_does_not_hit_network(self): + with patch("utils.pegged_assets.fetch_prices") as mock_fetch: + prices = resolve_peg_prices({PegTarget.USD}) + mock_fetch.assert_not_called() + self.assertEqual(prices, {PegTarget.USD: Decimal(1)}) + + def test_btc_fetches_live_price(self): + with patch("utils.pegged_assets.fetch_prices", return_value={BTC_USD_DEFILLAMA_KEY: Decimal("60000")}): + prices = resolve_peg_prices({PegTarget.USD, PegTarget.BTC}) + self.assertEqual(prices[PegTarget.USD], Decimal(1)) + self.assertEqual(prices[PegTarget.BTC], Decimal("60000")) + + def test_missing_btc_price_raises(self): + with patch("utils.pegged_assets.fetch_prices", return_value={}): + with self.assertRaises(ValueError): + resolve_peg_prices({PegTarget.BTC}) + + +class TestRegistry(unittest.TestCase): + def test_covers_required_assets(self): + self.assertTrue(REQUIRED_ASSETS.issubset(set(PEGGED_ASSETS_BY_NAME))) + + def test_names_are_unique(self): + names = [a.name for a in PEGGED_ASSETS] + self.assertEqual(len(names), len(set(names))) + + def test_address_parsed_from_defillama_key(self): + self.assertEqual(get_asset("USDC").address, "0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48") + + def test_btc_pegged_assets_target_btc(self): + self.assertEqual(get_asset("cbBTC").peg, PegTarget.BTC) + self.assertEqual(get_asset("LBTC").peg, PegTarget.BTC) + + def test_get_asset_unknown_raises(self): + with self.assertRaises(KeyError): + get_asset("NOPE") + + def test_every_asset_has_positive_depeg_tolerance(self): + for asset in PEGGED_ASSETS: + self.assertGreater(asset.depeg_pct, Decimal("0"), asset.name) + + def test_btc_wrappers_are_downside_only(self): + for name in ("WBTC", "cbBTC", "LBTC"): + self.assertTrue(get_asset(name).downside_only, name) + + def test_usd_stables_are_symmetric(self): + self.assertFalse(get_asset("USDC").downside_only) + + def test_chainlink_feed_quote_denomination(self): + # BTC-denominated feeds (answer ~1.0) vs USD-denominated feeds (absolute price). + self.assertEqual(get_asset("WBTC").chainlink_feed.quote, PegTarget.BTC) + self.assertEqual(get_asset("LBTC").chainlink_feed.quote, PegTarget.BTC) + self.assertEqual(get_asset("cbBTC").chainlink_feed.quote, PegTarget.USD) + self.assertEqual(get_asset("USDC").chainlink_feed.quote, PegTarget.USD) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_stables_oracles.py b/tests/test_stables_oracles.py new file mode 100644 index 00000000..0a8ca15b --- /dev/null +++ b/tests/test_stables_oracles.py @@ -0,0 +1,269 @@ +import unittest +from decimal import Decimal + +from protocols.stables.oracles import ( + OracleObservation, + check_market_divergence, + check_peg_deviation, + check_rate_oracle, + check_round_health, + check_staleness, + evaluate_chainlink_asset, + next_cached_round, +) +from utils.alert import AlertSeverity +from utils.chainlink import FeedReading, RoundData +from utils.dispatch import DISPATCHABLE_PROTOCOLS +from utils.pegged_assets import ChainlinkFeed, PeggedAsset, PegTarget, RateOracle, get_asset + +NOW = 2_000_000 +HEARTBEAT = 86_400 # matches registry _STABLE_HEARTBEAT + + +def _reading( + address: str, + answer: int, + *, + decimals: int = 8, + round_id: int = 100, + updated_at: int = NOW - 100, + answered_in_round: int = 100, +) -> FeedReading: + rd = RoundData( + round_id=round_id, + answer=answer, + started_at=updated_at, + updated_at=updated_at, + answered_in_round=answered_in_round, + ) + return FeedReading(address=address, round_data=rd, decimals=decimals) + + +def _cbbtc_obs(**overrides) -> OracleObservation: + """A healthy cbBTC (USD-quoted feed, BTC peg) observation; override per test.""" + asset = get_asset("cbBTC") + defaults = dict( + asset=asset, + reading=_reading(asset.chainlink_feed.address, 60_100 * 10**8), # $60,100 + peg_price_usd=Decimal("60000"), + quote_price_usd=Decimal("1"), # USD-quoted feed + now=NOW, + market_price_usd=Decimal("60100"), + prev_round_id=99, + ) + defaults.update(overrides) + return OracleObservation(**defaults) + + +class TestHealthyAsset(unittest.TestCase): + def test_no_alerts_when_healthy(self): + self.assertEqual(evaluate_chainlink_asset(_cbbtc_obs()), []) + + +class TestStaleness(unittest.TestCase): + def test_fresh_feed_ok(self): + self.assertIsNone(check_staleness(_cbbtc_obs(), buffer=600)) + + def test_forced_stale_fires(self): + stale = _cbbtc_obs( + reading=_reading( + get_asset("cbBTC").chainlink_feed.address, 60_100 * 10**8, updated_at=NOW - (HEARTBEAT + 1000) + ) + ) + alert = check_staleness(stale, buffer=600) + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, AlertSeverity.HIGH) + # cbBTC has no dispatchable owner: protocol is "pegs", channel override empty. + self.assertEqual(alert.channel, "pegs") + self.assertEqual(alert.protocol, "coinbase") + + def test_zero_updated_at_is_stale(self): + obs = _cbbtc_obs(reading=_reading(get_asset("cbBTC").chainlink_feed.address, 60_100 * 10**8, updated_at=0)) + self.assertIsNotNone(check_staleness(obs)) + + +class TestRoundHealth(unittest.TestCase): + def test_healthy_round(self): + self.assertIsNone(check_round_health(_cbbtc_obs())) + + def test_non_positive_answer_is_critical(self): + obs = _cbbtc_obs(reading=_reading(get_asset("cbBTC").chainlink_feed.address, 0)) + alert = check_round_health(obs) + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, AlertSeverity.CRITICAL) + + def test_lagging_answered_in_round_is_high(self): + addr = get_asset("cbBTC").chainlink_feed.address + obs = _cbbtc_obs(reading=_reading(addr, 60_100 * 10**8, round_id=100, answered_in_round=99)) + alert = check_round_health(obs) + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, AlertSeverity.HIGH) + + def test_roundid_backwards_is_critical(self): + obs = _cbbtc_obs(prev_round_id=200) # current round_id is 100 + alert = check_round_health(obs) + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, AlertSeverity.CRITICAL) + + +class TestPegDeviation(unittest.TestCase): + def test_within_tolerance_ok(self): + self.assertIsNone(check_peg_deviation(_cbbtc_obs())) + + def test_off_peg_fires(self): + # cbBTC is downside_only; oracle $58,200 vs $60,000 peg = -3% < -2% tolerance + obs = _cbbtc_obs(reading=_reading(get_asset("cbBTC").chainlink_feed.address, 58_200 * 10**8)) + alert = check_peg_deviation(obs) + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, AlertSeverity.HIGH) + + def test_upside_does_not_fire_for_downside_only(self): + # cbBTC can legitimately trade above BTC; +5% upside must NOT alert. + obs = _cbbtc_obs(reading=_reading(get_asset("cbBTC").chainlink_feed.address, 63_000 * 10**8)) + self.assertIsNone(check_peg_deviation(obs)) + + +class TestMarketDivergence(unittest.TestCase): + def test_aligned_ok(self): + self.assertIsNone(check_market_divergence(_cbbtc_obs(), threshold=Decimal("0.01"))) + + def test_forced_divergence_fires(self): + # oracle $60,100 vs market $50,000 ~ +20% + obs = _cbbtc_obs(market_price_usd=Decimal("50000")) + alert = check_market_divergence(obs, threshold=Decimal("0.01")) + self.assertIsNotNone(alert) + self.assertEqual(alert.severity, AlertSeverity.HIGH) + + def test_missing_market_price_skips(self): + self.assertIsNone(check_market_divergence(_cbbtc_obs(market_price_usd=None))) + + +class TestQuoteConversion(unittest.TestCase): + def test_btc_quoted_feed_scales_to_usd(self): + # LBTC/BTC feed answer 1.004 BTC, BTC at $60,000 -> oracle $60,240 + lbtc = get_asset("LBTC") + obs = OracleObservation( + asset=lbtc, + reading=_reading(lbtc.chainlink_feed.address, 100_400_000), # 1.004 * 1e8 + peg_price_usd=Decimal("60000"), + quote_price_usd=Decimal("60000"), # feed quotes in BTC + now=NOW, + market_price_usd=Decimal("60240"), + ) + self.assertEqual(obs.oracle_price_usd, Decimal("60240.000")) + self.assertEqual(evaluate_chainlink_asset(obs), []) + + +class TestRateOracle(unittest.TestCase): + def _asset(self, monotonic: bool = True) -> PeggedAsset: + return PeggedAsset( + name="fakeRate", + defillama_key="ethereum:0x0000000000000000000000000000000000000001", + protocol="pegs", + peg=PegTarget.USD, + depeg_pct=Decimal("0.02"), + rate_oracle=RateOracle(address="0xRate", monotonic=monotonic), + ) + + def test_no_previous_rate_no_alert(self): + self.assertEqual(check_rate_oracle(self._asset(), current_rate=10**18, prev_rate=None), []) + + def test_monotonic_decrease_is_critical(self): + alerts = check_rate_oracle(self._asset(monotonic=True), current_rate=9 * 10**17, prev_rate=10**18) + self.assertEqual(len(alerts), 1) + self.assertEqual(alerts[0].severity, AlertSeverity.CRITICAL) + + def test_large_increase_is_high(self): + alerts = check_rate_oracle(self._asset(), current_rate=12 * 10**17, prev_rate=10**18, threshold=Decimal("0.05")) + self.assertEqual(len(alerts), 1) + self.assertEqual(alerts[0].severity, AlertSeverity.HIGH) + + def test_small_change_no_alert(self): + self.assertEqual( + check_rate_oracle(self._asset(), current_rate=101 * 10**16, prev_rate=10**18, threshold=Decimal("0.05")), + [], + ) + + +class TestDispatchRouting(unittest.TestCase): + """Alerts must carry the asset's logical protocol so emergency dispatch can fire.""" + + def test_owned_asset_uses_dispatchable_protocol(self): + usde = get_asset("USDe") # owner "ethena", peg USD, 3% tolerance + obs = OracleObservation( + asset=usde, + reading=_reading(usde.chainlink_feed.address, 90 * 10**6), # $0.90 -> off peg + peg_price_usd=Decimal("1"), + quote_price_usd=Decimal("1"), + now=NOW, + market_price_usd=Decimal("0.90"), + ) + alert = check_peg_deviation(obs) + self.assertIsNotNone(alert) + # protocol (not channel) carries the owner; dispatch keys off alert.protocol. + self.assertEqual(alert.protocol, "ethena") + self.assertIn(alert.protocol, DISPATCHABLE_PROTOCOLS) + + +class TestRoundMetadataGate(unittest.TestCase): + """Feeds that don't report reliable round metadata skip staleness + round checks.""" + + def _flat_obs(self) -> OracleObservation: + # Feed flagged reports_round_metadata=False, with a reading that would + # otherwise trip both staleness (updatedAt=0) and round-health (round_id=0). + asset = PeggedAsset( + name="fakeFlat", + defillama_key="ethereum:0x0000000000000000000000000000000000000002", + protocol="pegs", + peg=PegTarget.USD, + depeg_pct=Decimal("0.02"), + chainlink_feed=ChainlinkFeed("0xFeed", HEARTBEAT, "FLAT/USD", reports_round_metadata=False), + ) + return OracleObservation( + asset=asset, + reading=_reading("0xFeed", 10**8, round_id=0, updated_at=0, answered_in_round=0), + peg_price_usd=Decimal("1"), + quote_price_usd=Decimal("1"), + now=NOW, + market_price_usd=Decimal("1"), + ) + + def test_unreliable_feed_skips_staleness_and_round(self): + # On-peg, aligned price -> only peg/divergence run, and both pass -> no alerts. + self.assertEqual(evaluate_chainlink_asset(self._flat_obs()), []) + + def test_unreliable_feed_still_flags_peg_deviation(self): + # Off-peg must still fire even when round metadata is untrusted. + obs = OracleObservation( + asset=self._flat_obs().asset, + reading=_reading("0xFeed", 90 * 10**6, round_id=0, updated_at=0), # $0.90 + peg_price_usd=Decimal("1"), + quote_price_usd=Decimal("1"), + now=NOW, + market_price_usd=Decimal("0.90"), + ) + alerts = evaluate_chainlink_asset(obs) + self.assertTrue(any("off peg" in a.message for a in alerts)) + + +class TestNextCachedRound(unittest.TestCase): + def _rd(self, round_id: int, answer: int = 60_100 * 10**8, updated_at: int = NOW - 100) -> RoundData: + return RoundData(round_id, answer, updated_at, updated_at, round_id) + + def test_first_run_caches_current(self): + self.assertEqual(next_cached_round(None, self._rd(100)), 100) + + def test_advances_on_increase(self): + self.assertEqual(next_cached_round(100, self._rd(101)), 101) + + def test_keeps_high_water_mark_on_regression(self): + # Backwards round must NOT lower the cached baseline (no poisoning). + self.assertEqual(next_cached_round(100, self._rd(99)), 100) + + def test_broken_round_does_not_poison_even_if_higher(self): + # answer == 0 -> unhealthy; keep last-good rather than caching a broken round. + self.assertEqual(next_cached_round(100, self._rd(200, answer=0)), 100) + + +if __name__ == "__main__": + unittest.main() diff --git a/utils/chainlink.py b/utils/chainlink.py new file mode 100644 index 00000000..93d3adb9 --- /dev/null +++ b/utils/chainlink.py @@ -0,0 +1,122 @@ +"""Chainlink aggregator helpers shared across peg / oracle monitors. + +Generalises the inline ``latestRoundData`` handling from ``protocols/ustb/main.py``: +a batched feed reader plus a pure price-scaling helper that takes primitive +values so it is trivially unit testable without a chain connection. +""" + +from dataclasses import dataclass +from decimal import Decimal + +from utils.abi import load_abi +from utils.logger import get_logger +from utils.web3_wrapper import Web3Client + +logger = get_logger("chainlink") + +# Shared Chainlink AggregatorV3Interface ABI (latestRoundData + decimals). +CHAINLINK_ABI = load_abi("common-abi/ChainlinkAggregator.json") + + +@dataclass(frozen=True) +class RoundData: + """Decoded Chainlink ``latestRoundData`` tuple.""" + + round_id: int + answer: int + started_at: int + updated_at: int + answered_in_round: int + + @classmethod + def from_tuple(cls, data: tuple | list) -> "RoundData": + """Build a ``RoundData`` from a raw ``latestRoundData`` response. + + Args: + data: The 5-element tuple/list returned by ``latestRoundData``. + + Raises: + ValueError: If ``data`` does not have exactly five elements. + """ + if len(data) != 5: + raise ValueError(f"latestRoundData expects 5 fields, got {len(data)}: {data!r}") + return cls( + round_id=int(data[0]), + answer=int(data[1]), + started_at=int(data[2]), + updated_at=int(data[3]), + answered_in_round=int(data[4]), + ) + + +@dataclass(frozen=True) +class FeedReading: + """A single feed's decoded round data, decimals and scaled price.""" + + address: str + round_data: RoundData + decimals: int + + @property + def price(self) -> Decimal: + """Answer scaled to a human-readable value by the feed's decimals.""" + return scale_price(self.round_data.answer, self.decimals) + + +# --------------------------------------------------------------------------- +# Pure helpers (no chain connection required β€” unit tested directly) +# --------------------------------------------------------------------------- + + +def scale_price(answer: int, decimals: int) -> Decimal: + """Scale a raw integer answer to a decimal price using the feed decimals. + + Args: + answer: Raw integer answer from the aggregator. + decimals: Number of decimals reported by the feed. + + Returns: + The answer divided by ``10 ** decimals`` as a ``Decimal``. + + Raises: + ValueError: If ``decimals`` is negative. + """ + if decimals < 1: + raise ValueError(f"decimals must be positive, got {decimals}") + return Decimal(answer) / (Decimal(10) ** decimals) + + +# --------------------------------------------------------------------------- +# Batched on-chain reader +# --------------------------------------------------------------------------- + + +def read_feeds(client: Web3Client, feed_addresses: list[str]) -> dict[str, FeedReading]: + """Read ``latestRoundData`` and ``decimals`` for several feeds in one batch. + + Args: + client: Connected ``Web3Client`` for the target chain. + feed_addresses: Chainlink aggregator addresses to read. + + Returns: + Mapping of feed address to its :class:`FeedReading`, preserving input order. + """ + if not feed_addresses: + return {} + + contracts = [client.get_contract(address, CHAINLINK_ABI) for address in feed_addresses] + + with client.batch_requests() as batch: + for contract in contracts: + batch.add(contract.functions.latestRoundData()) + batch.add(contract.functions.decimals()) + responses = client.execute_batch(batch) + + readings: dict[str, FeedReading] = {} + for index, address in enumerate(feed_addresses): + round_data = RoundData.from_tuple(responses[2 * index]) + decimals = int(responses[2 * index + 1]) + readings[address] = FeedReading(address=address, round_data=round_data, decimals=decimals) + logger.info("Chainlink feed %s: price=%s decimals=%d", address, readings[address].price, decimals) + + return readings diff --git a/utils/dispatch.py b/utils/dispatch.py index dbb7c066..72930f7b 100644 --- a/utils/dispatch.py +++ b/utils/dispatch.py @@ -29,7 +29,22 @@ # Protocols that have emergency withdrawal config in liquidity-monitoring. # Only these protocols will trigger a dispatch. -DISPATCHABLE_PROTOCOLS = {"infinifi", "cap", "ethena", "ethplus", "usdai", "origin", "maple", "3jane"} +DISPATCHABLE_PROTOCOLS = { + "infinifi", + "cap", + "ethena", + "ethplus", + "usdai", + "origin", + "maple", + "3jane", + "wbtc", + "coinbase", + "lombard", + "tether", + "circle", + "maker", +} def _is_on_cooldown(protocol: str, cooldown_seconds: int = DEFAULT_COOLDOWN_SECONDS) -> bool: diff --git a/utils/pegged_assets.py b/utils/pegged_assets.py new file mode 100644 index 00000000..509ee72c --- /dev/null +++ b/utils/pegged_assets.py @@ -0,0 +1,279 @@ +"""Single source of truth for pegged-asset peg monitoring. + +This registry is consumed by every layer of peg/oracle monitoring: + +* L1 β€” market depeg (DeFiLlama price vs ``peg`` target, deviation > ``depeg_pct``) +* L2 β€” oracle health (``chainlink_feed`` price cross-check, ``rate_oracle`` drift) +* L3 β€” event consumers + +Peg deviation is expressed relative to a :class:`PegTarget` (``USD`` is the +constant ``1``; ``BTC`` is the live BTC/USD price from DeFiLlama), so a single +entry covers both dollar- and bitcoin-denominated assets. ``depeg_pct`` is a +*deviation* tolerance (fractional distance from the peg), not an absolute floor. + +Addresses and Chainlink feeds were verified on Ethereum mainnet. +""" + +from dataclasses import dataclass +from decimal import Decimal +from enum import Enum + +from utils.defillama import fetch_prices + +# DeFiLlama key for the live BTC/USD reference price (BTC peg target). +BTC_USD_DEFILLAMA_KEY = "coingecko:bitcoin" + + +class PegTarget(Enum): + """What an asset is pegged to. + + ``USD`` resolves to the constant ``1``; ``BTC`` resolves to the live BTC/USD + price fetched from DeFiLlama. + """ + + USD = "USD" + BTC = "BTC" + + +@dataclass(frozen=True) +class ChainlinkFeed: + """A Chainlink aggregator backing an asset's price (consumed by L2). + + Args: + address: Aggregator contract address. + heartbeat: Max expected seconds between updates (Chainlink mainnet default). + description: Human-readable feed pair, e.g. ``"WBTC/BTC"``. + quote: Denomination of the feed's ``answer``. A ``USD`` feed reports an + absolute price; a ``BTC`` feed reports the asset-to-BTC ratio (~1.0). + Lets consumers scale correctly β€” BTC-quoted feeds compare straight to + ``1.0`` with no BTC/USD lookup, USD-quoted feeds need live BTC/USD. + L2 multiplies a USD-quoted answer by the live quote price to compare + oracle and market on a common USD basis. + reports_round_metadata: Whether the feed reports reliable ``roundId`` / + ``updatedAt``. Set ``False`` for feeds that return constant or zero + round/timestamp values (some non-standard aggregators do); L2 then + skips the staleness and round-health checks for that feed to avoid + false positives. Verify on-chain before trusting these for a new feed. + """ + + address: str + heartbeat: int + description: str = "" + quote: PegTarget = PegTarget.USD + reports_round_metadata: bool = True + + +@dataclass(frozen=True) +class RateOracle: + """A continuous / exchange-rate oracle backing a yield-bearing asset (L2/L3). + + Args: + address: Oracle contract address. + monotonic: Whether the rate is expected to be non-decreasing; a decrease + is a loss signal worth alerting on. + function: View function returning the rate. Defaults to ``"rate"``. + precision: Fixed-point precision of the returned rate. Defaults to ``1e18``. + """ + + address: str + monotonic: bool = True + function: str = "rate" + precision: int = 10**18 + description: str = "" + + +@dataclass(frozen=True) +class PeggedAsset: + """A monitored pegged asset and everything the peg layers need to check it.""" + + name: str + defillama_key: str # "chain:address" + protocol: str # logical owner β€” used as Alert.protocol so emergency dispatch can key off it + peg: PegTarget + depeg_pct: Decimal # deviation tolerance from the peg (e.g. Decimal("0.02") = 2%) + chainlink_feed: ChainlinkFeed | None = None + rate_oracle: RateOracle | None = None + channel: str = "" # Telegram channel override; empty falls back to ``protocol`` routing + # When True, only a drop *below* the peg counts as a depeg; upside is ignored. + # Use for assets that can legitimately trade above peg (e.g. BTC wrappers). + downside_only: bool = False + + @property + def address(self) -> str: + """Token address parsed from ``defillama_key`` ("chain:address").""" + return self.defillama_key.split(":", 1)[1] + + def deviation(self, price: Decimal, peg_price: Decimal) -> Decimal: + """Signed fractional deviation of ``price`` from ``peg_price``.""" + return price_deviation(price, peg_price) + + def is_depegged(self, price: Decimal, peg_price: Decimal) -> bool: + """Return ``True`` if ``price`` has depegged from ``peg_price`` beyond ``depeg_pct``. + + For ``downside_only`` assets only a drop below the peg triggers (deviation + ``<= -depeg_pct``); for all others the check is symmetric (``abs`` deviation + ``>= depeg_pct``), so an upside move flags too. + """ + deviation = self.deviation(price, peg_price) + if self.downside_only: + return deviation <= -self.depeg_pct + return abs(deviation) >= self.depeg_pct + + +# --------------------------------------------------------------------------- +# Pure helpers +# --------------------------------------------------------------------------- + + +def price_deviation(price: Decimal, peg_price: Decimal) -> Decimal: + """Signed fractional deviation of ``price`` from ``peg_price``. + + Args: + price: Observed asset price. + peg_price: Reference peg price. + + Returns: + ``(price - peg_price) / peg_price``. + + Raises: + ValueError: If ``peg_price`` is zero (deviation is undefined). + """ + if peg_price == 0: + raise ValueError("peg_price must be non-zero to compute deviation") + return (price - peg_price) / peg_price + + +def resolve_peg_prices(pegs: set[PegTarget]) -> dict[PegTarget, Decimal]: + """Resolve current prices for a set of peg targets. + + ``USD`` is the constant ``1`` and never hits the network; ``BTC`` is fetched + once from DeFiLlama only when present in ``pegs``. + + Args: + pegs: The distinct peg targets to resolve. + + Returns: + Mapping of each requested :class:`PegTarget` to its current price. + + Raises: + ValueError: If the BTC/USD price is requested but unavailable. + """ + prices: dict[PegTarget, Decimal] = {} + if PegTarget.USD in pegs: + prices[PegTarget.USD] = Decimal(1) + if PegTarget.BTC in pegs: + fetched = fetch_prices([BTC_USD_DEFILLAMA_KEY]) + btc_price = fetched.get(BTC_USD_DEFILLAMA_KEY) + if btc_price is None: + raise ValueError(f"BTC/USD price unavailable from DeFiLlama key {BTC_USD_DEFILLAMA_KEY}") + prices[PegTarget.BTC] = btc_price + return prices + + +def get_asset(name: str) -> PeggedAsset: + """Look up a registered asset by name. + + Raises: + KeyError: If no asset with ``name`` is registered. + """ + return PEGGED_ASSETS_BY_NAME[name] + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +# Chainlink mainnet stable feeds report 8 decimals with a 24h heartbeat unless +# noted; confirm per feed before tightening staleness thresholds in L2. +_STABLE_HEARTBEAT = 86_400 # 24h + +PEGGED_ASSETS: list[PeggedAsset] = [ + # --- USD-pegged blue chips ------------------------------------------------ + PeggedAsset( + name="USDC", + defillama_key="ethereum:0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48", + protocol="circle", + channel="pegs", + peg=PegTarget.USD, + depeg_pct=Decimal("0.02"), + chainlink_feed=ChainlinkFeed("0x8fFfFfd4AfB6115b954Bd326cbe7B4BA576818f6", _STABLE_HEARTBEAT, "USDC/USD"), + ), + PeggedAsset( + name="USDT", + defillama_key="ethereum:0xdAC17F958D2ee523a2206206994597C13D831ec7", + protocol="tether", + channel="pegs", + peg=PegTarget.USD, + depeg_pct=Decimal("0.02"), + chainlink_feed=ChainlinkFeed("0x3E7d1eAB13ad0104d2750B8863b489D65364e32D", _STABLE_HEARTBEAT, "USDT/USD"), + ), + PeggedAsset( + name="USDS", + defillama_key="ethereum:0xdC035D45d973E3EC169d2276DDab16f1e407384F", + protocol="maker", + peg=PegTarget.USD, + depeg_pct=Decimal("0.02"), + chainlink_feed=ChainlinkFeed("0xfF30586cD0F29eD462364C7e81375FC0C71219b1", _STABLE_HEARTBEAT, "USDS/USD"), + ), + # --- USD-pegged protocol stables ------------------------------------------ + PeggedAsset( + name="USDe", + defillama_key="ethereum:0x4c9EDD5852cd905f086C759E8383e09bff1E68B3", + protocol="ethena", + peg=PegTarget.USD, + depeg_pct=Decimal("0.03"), + chainlink_feed=ChainlinkFeed("0xa569d910839Ae8865Da8F8e70FfFb0cBA869F961", _STABLE_HEARTBEAT, "USDe/USD"), + ), + PeggedAsset( + name="cUSD", + defillama_key="ethereum:0xcccc62962d17b8914c62d74ffb843d73b2a3cccc", + protocol="cap", + peg=PegTarget.USD, + depeg_pct=Decimal("0.05"), # cap cUSD price is more volatile + ), + PeggedAsset( + name="iUSD", + defillama_key="ethereum:0x48f9e38f3070AD8945DFEae3FA70987722E3D89c", + protocol="infinifi", + peg=PegTarget.USD, + depeg_pct=Decimal("0.03"), + ), + # --- BTC-pegged ----------------------------------------------------------- + PeggedAsset( + name="WBTC", + defillama_key="ethereum:0x2260FAC5E5542a773Aa44fBCfeDf7C193bc2C599", + protocol="wbtc", + channel="pegs", + peg=PegTarget.BTC, + depeg_pct=Decimal("0.02"), + chainlink_feed=ChainlinkFeed( + "0xfdFD9C85aD200c506Cf9e21F1FD8dD01932FBB23", _STABLE_HEARTBEAT, "WBTC/BTC", quote=PegTarget.BTC + ), + downside_only=True, # only a drop below BTC is a risk + ), + PeggedAsset( + name="cbBTC", + defillama_key="ethereum:0xcbB7C0000aB88B473b1f5aFd9ef808440eed33Bf", + protocol="coinbase", + channel="pegs", + peg=PegTarget.BTC, + depeg_pct=Decimal("0.02"), + chainlink_feed=ChainlinkFeed("0x2665701293fCbEB223D11A08D826563EDcCE423A", _STABLE_HEARTBEAT, "cbBTC/USD"), + downside_only=True, # only a drop below BTC is a risk + ), + PeggedAsset( + name="LBTC", + defillama_key="ethereum:0x8236a87084f8B84306f72007F36F2618A5634494", + protocol="lombard", + channel="pegs", + peg=PegTarget.BTC, + depeg_pct=Decimal("0.03"), + # LBTC/BTC market-rate feed (8 decimals); can sit slightly above 1 BTC. + chainlink_feed=ChainlinkFeed( + "0x5c29868C58b6e15e2b962943278969Ab6a7D3212", _STABLE_HEARTBEAT, "LBTC/BTC", quote=PegTarget.BTC + ), + downside_only=True, # LBTC can trade above peg; only a drop below BTC matters + ), +] + +PEGGED_ASSETS_BY_NAME: dict[str, PeggedAsset] = {asset.name: asset for asset in PEGGED_ASSETS}