#!/usr/bin/env python3 """ sybil_scorer.py =============== Open-source sybil-likelihood scorer for Stacks / AIBTC agent addresses. USAGE ----- python sybil_scorer.py SP329V8K7VBJXV8N2NDB0EA45TGG0EZH0M3ZHWBMK python sybil_scorer.py SP114F8BJ5MJEZP561TYWCSCYYBXDV0X023R0P93G SP3R6FW7AKCPY0H9WR74DBQP180MXN1EW0J4ZTRWH SOURCES ------- All data fetched from public APIs — no private data, no authentication: - Hiro API (https://api.hiro.so): wallet age, tx history, STX flow - AIBTC API (https://aibtc.com/api): agent level, inbox/outbox activity SIGNALS (higher = more sybil-like) ----------------------------------- wallet_age_days — days since first transaction (new wallets = higher risk) tx_count — total on-chain transactions (sparse = higher risk) funding_pattern — was initial funding from another fresh address? (yes = higher risk) stx_flow_ratio — total_sent / total_received (0 = pure receiver = higher risk) aibtc_level — agent level 1 or 2 (L1 = less established = higher risk) inbox_activity — message count in AIBTC inbox (low = higher risk) registration_age — days since AIBTC registration (new = higher risk) SCORE RANGE ----------- 0 = almost certainly legitimate 100 = almost certainly sybil cluster member EXPLAINABILITY -------------- Each score is accompanied by the top 3 signals driving it. LICENSE: MIT """ import json import sys import time import urllib.request import urllib.error from datetime import datetime, timezone # ── API helpers ─────────────────────────────────────────────────────────────── def _get(url: str, retries: int = 2) -> dict: for attempt in range(retries + 1): try: req = urllib.request.Request( url, headers={'User-Agent': 'sybil-scorer/1.0', 'Accept': 'application/json'} ) with urllib.request.urlopen(req, timeout=12) as r: return json.loads(r.read()) except urllib.error.HTTPError as e: if e.code in (404, 403): return {} if attempt == retries: return {} time.sleep(0.5) except Exception: if attempt == retries: return {} time.sleep(0.5) return {} def _now_ts() -> float: return datetime.now(timezone.utc).timestamp() # ── Signal collectors ───────────────────────────────────────────────────────── def _fetch_hiro_stx(addr: str) -> dict: return _get(f'https://api.hiro.so/extended/v1/address/{addr}/stx') def _fetch_hiro_txs(addr: str) -> list: """Fetch all transactions in ascending order (first = oldest).""" data = _get( f'https://api.hiro.so/extended/v1/address/{addr}/transactions' f'?limit=50&offset=0' ) results = data.get('results', []) # Sort by burn_block_time ascending to get oldest first results.sort(key=lambda x: x.get('burn_block_time', 0)) total = data.get('total', len(results)) return results, total def _fetch_aibtc_agent(addr: str) -> dict: """Returns full agent response including trust, activity, level at top level.""" return _get(f'https://aibtc.com/api/agents/{addr}') def _fetch_aibtc_inbox(addr: str) -> int: data = _get(f'https://aibtc.com/api/inbox/{addr}') msgs = data.get('messages', data.get('items', [])) return len(msgs) def _fetch_aibtc_outbox(addr: str) -> int: data = _get(f'https://aibtc.com/api/outbox/{addr}') msgs = data.get('messages', data.get('items', [])) return len(msgs) # ── Scoring engine ──────────────────────────────────────────────────────────── def _score_wallet_age(first_tx_ts: float | None) -> tuple[float, str]: """ Score 0–25. Fresh wallets (< 7 days) get max score. Very old wallets (> 180 days) get 0. """ if first_tx_ts is None: return 12.0, "wallet_age: no transactions on chain (neutral, new or unused)" age_days = (_now_ts() - first_tx_ts) / 86400 if age_days < 1: return 25.0, f"wallet_age: {age_days:.1f}d — created today (very high risk)" elif age_days < 7: return 20.0, f"wallet_age: {age_days:.1f}d — under 1 week old (high risk)" elif age_days < 30: return 15.0, f"wallet_age: {age_days:.1f}d — under 1 month old (medium risk)" elif age_days < 90: return 8.0, f"wallet_age: {age_days:.1f}d — 1–3 months old (low risk)" elif age_days < 180: return 3.0, f"wallet_age: {age_days:.1f}d — 3–6 months old (very low risk)" else: return 0.0, f"wallet_age: {age_days:.1f}d — established wallet (minimal risk)" def _score_tx_count(total_txs: int) -> tuple[float, str]: """ Score 0–20. Very few transactions = new account = higher risk. """ if total_txs == 0: return 15.0, f"tx_count: {total_txs} — no history (high risk)" elif total_txs <= 2: return 18.0, f"tx_count: {total_txs} — only registration tx (very high risk)" elif total_txs <= 5: return 12.0, f"tx_count: {total_txs} — minimal activity (medium risk)" elif total_txs <= 20: return 6.0, f"tx_count: {total_txs} — moderate activity (low risk)" else: return 0.0, f"tx_count: {total_txs} — active wallet (minimal risk)" def _score_funding_pattern(txs: list) -> tuple[float, str]: """ Score 0–20. Check if this wallet was funded by another fresh-looking address. Heuristic: look at first incoming token_transfer — if the sender is also a very-recently-created address, the funding is suspicious. """ if not txs: return 10.0, "funding_pattern: no transactions to analyze (neutral)" # Find first incoming transfer (not contract calls from self) for tx in txs: if tx.get('tx_type') == 'token_transfer': sender = tx.get('sender_address', '') # Check if the sender's first tx timestamp is also very recent sender_data = _get( f'https://api.hiro.so/extended/v1/address/{sender}/transactions' f'?limit=5&offset=0' ) sender_txs = sender_data.get('results', []) if not sender_txs: return 15.0, (f"funding_pattern: funded by {sender[:20]}... who has no " f"tx history themselves (suspicious)") sender_txs.sort(key=lambda x: x.get('burn_block_time', 0)) s_age = (_now_ts() - sender_txs[0].get('burn_block_time', _now_ts())) / 86400 if s_age < 3: return 18.0, (f"funding_pattern: funded by {sender[:20]}... which is " f"also <3d old (cluster funding pattern, very high risk)") elif s_age < 14: return 10.0, (f"funding_pattern: funded by {sender[:20]}... which is " f"{s_age:.1f}d old (moderately suspicious)") else: return 2.0, (f"funding_pattern: funded by {sender[:20]}... " f"({s_age:.1f}d old, appears legitimate)") elif tx.get('tx_type') == 'contract_call': # Self-initiated contract call as first tx: likely sybil registering return 8.0, "funding_pattern: first tx is a contract call (possibly self-funded sybil)" return 5.0, "funding_pattern: could not determine clear funding source" def _score_stx_flow(stx_data: dict, total_txs: int = 0) -> tuple[float, str]: """ Score 0–15. Pure STX receiver with no sends = passive / likely sybil. If an address has many txs but zero STX flow, it's a contract-call-heavy agent — not suspicious in itself. """ try: sent = int(stx_data.get('total_sent', '0')) recv = int(stx_data.get('total_received', '0')) except (ValueError, TypeError): return 7.0, "stx_flow: could not parse balance data (neutral)" if recv == 0 and sent == 0: if total_txs >= 10: return 3.0, f"stx_flow: no direct STX transfers but {total_txs} txs (contract-call agent, low risk)" return 10.0, "stx_flow: no STX activity at all (high risk)" if sent == 0: if total_txs >= 20: return 3.0, f"stx_flow: received STX only, but {total_txs} contract txs (likely legit)" return 10.0, f"stx_flow: received {recv//1e6:.2f} STX, sent 0 (pure receiver, high risk)" ratio = sent / max(recv, 1) if ratio < 0.05: return 6.0, f"stx_flow: sent {ratio*100:.1f}% of received (very low activity ratio)" elif ratio < 0.3: return 3.0, f"stx_flow: sent {ratio*100:.1f}% of received (low activity ratio)" else: return 0.0, f"stx_flow: sent {ratio*100:.1f}% of received (active participant)" def _score_aibtc_level(full_resp: dict) -> tuple[float, str]: """ Score 0–15. Uses trust.level (top-level in API response). Not registered = highest risk. On-chain identity = strong legitimacy signal. """ if not full_resp or not full_resp.get('found'): return 15.0, "aibtc_level: not registered on AIBTC (high risk)" trust = full_resp.get('trust', {}) level = trust.get('level', full_resp.get('level', 0)) lname = trust.get('levelName', full_resp.get('levelName', f'L{level}')) on_chain = trust.get('onChainIdentity', False) rep_score = trust.get('reputationScore') rep_count = trust.get('reputationCount', 0) base = 0.0 note = "" if level >= 3: base = 0.0 note = f"L{level} ({lname}) established" elif level == 2: base = 2.0 note = f"L{level} ({lname})" elif level == 1: base = 8.0 note = f"L{level} ({lname}) newly registered" else: base = 13.0 note = "unverified" if on_chain: base = max(0.0, base - 3.0) note += ", on-chain identity verified" if rep_count > 5: base = max(0.0, base - 2.0) note += f", {rep_count} reputation events" return base, f"aibtc_level: {note} (score {base})" def _score_inbox_activity(full_resp: dict) -> tuple[float, str]: """ Score 0–10. Uses activity.sentCount from agent response. Zero outbox activity = no peer engagement = sybil signal. """ if not full_resp or not full_resp.get('found'): return 5.0, "inbox_activity: agent not found (neutral)" activity = full_resp.get('activity', {}) sent = activity.get('sentCount', 0) has_inbox = activity.get('hasInboxMessages', False) unread = activity.get('unreadInboxCount', 0) total = sent + (1 if has_inbox else 0) if total == 0: return 10.0, f"inbox_activity: 0 messages sent, no inbox (no peer interaction, high risk)" elif sent <= 2: return 6.0, f"inbox_activity: {sent} sent, inbox={has_inbox} (minimal interaction)" elif sent <= 10: return 3.0, f"inbox_activity: {sent} sent (moderate interaction)" else: return 0.0, f"inbox_activity: {sent} sent (active participant, low risk)" def _score_registration_age(full_resp: dict) -> tuple[float, str]: """ Score 0–10. Very recently registered on AIBTC = higher risk. """ if not full_resp or not full_resp.get('found'): return 5.0, "registration_age: agent not found (neutral)" agent = full_resp.get('agent', {}) verified_at = agent.get('verifiedAt', '') if not verified_at: return 5.0, "registration_age: unknown registration date" try: reg_ts = datetime.fromisoformat(verified_at.replace('Z', '+00:00')).timestamp() age_days = (_now_ts() - reg_ts) / 86400 except Exception: return 5.0, "registration_age: could not parse registration date" if age_days < 1: return 10.0, f"registration_age: registered today ({age_days*24:.1f}h ago, very high risk)" elif age_days < 3: return 7.0, f"registration_age: registered {age_days:.1f}d ago (high risk)" elif age_days < 14: return 4.0, f"registration_age: registered {age_days:.1f}d ago (moderate risk)" else: return 0.0, f"registration_age: registered {age_days:.1f}d ago (established)" # ── Main scoring function ───────────────────────────────────────────────────── def score_address(addr: str) -> dict: """ Score one address for sybil likelihood. Returns: { "address": str, "score": int, # 0–100 "verdict": str, # "low" | "medium" | "high" | "very_high" "top_signals": list, # top 3 signal strings "all_signals": dict, # all signal scores "error": str | None } """ print(f' Fetching data for {addr}...', flush=True) try: stx_data = _fetch_hiro_stx(addr) txs, total_txs = _fetch_hiro_txs(addr) agent_resp = _fetch_aibtc_agent(addr) except Exception as exc: return {'address': addr, 'score': -1, 'verdict': 'error', 'top_signals': [], 'all_signals': {}, 'error': str(exc)} # Determine first transaction timestamp first_tx_ts = None if txs: first_tx_ts = txs[0].get('burn_block_time') # Compute individual signal scores s_age, r_age = _score_wallet_age(first_tx_ts) s_tx, r_tx = _score_tx_count(total_txs) s_fund, r_fund = _score_funding_pattern(txs) s_flow, r_flow = _score_stx_flow(stx_data, total_txs) s_lvl, r_lvl = _score_aibtc_level(agent_resp) s_inbox, r_inbox = _score_inbox_activity(agent_resp) s_reg, r_reg = _score_registration_age(agent_resp) all_signals = { 'wallet_age': (s_age, r_age), 'tx_count': (s_tx, r_tx), 'funding_pattern': (s_fund, r_fund), 'stx_flow': (s_flow, r_flow), 'aibtc_level': (s_lvl, r_lvl), 'inbox_activity': (s_inbox, r_inbox), 'registration_age': (s_reg, r_reg), } # Weighted sum (signals have different max values per their design) raw_score = s_age + s_tx + s_fund + s_flow + s_lvl + s_inbox + s_reg # Max possible = 25 + 20 + 20 + 15 + 15 + 10 + 10 = 115; normalize to 100 score = min(100, round(raw_score / 1.15)) # Top 3 signals (highest score = most damning) sorted_signals = sorted(all_signals.items(), key=lambda x: x[1][0], reverse=True) top_signals = [v[1] for _, v in sorted_signals[:3]] if score >= 75: verdict = 'very_high' elif score >= 50: verdict = 'high' elif score >= 25: verdict = 'medium' else: verdict = 'low' return { 'address': addr, 'score': score, 'verdict': verdict, 'top_signals': top_signals, 'all_signals': {k: {'score': v[0], 'reason': v[1]} for k, v in all_signals.items()}, 'error': None, } # ── Cluster analysis ────────────────────────────────────────────────────────── def cluster_analysis(results: list[dict]) -> dict: """ Given a list of scored addresses, identify potential cluster members. Heuristic: addresses with similar registration dates AND similar funding sources are likely in the same sybil cluster. """ high_risk = [r for r in results if r['score'] >= 50] return { 'total_scored': len(results), 'high_risk_count': len(high_risk), 'high_risk_addresses': [r['address'] for r in high_risk], 'cluster_probability': 'high' if len(high_risk) >= 2 else 'low', } # ── CLI ─────────────────────────────────────────────────────────────────────── def print_result(r: dict) -> None: print(f"\n{'='*60}") print(f"Address : {r['address']}") if r.get('error'): print(f"ERROR : {r['error']}") return print(f"Score : {r['score']}/100 [{r['verdict'].upper()} sybil likelihood]") print(f"\nTop 3 risk signals:") for i, sig in enumerate(r['top_signals'], 1): print(f" {i}. {sig}") print(f"\nAll signal scores:") for name, data in r['all_signals'].items(): print(f" {name:20s} {data['score']:5.1f} {data['reason'][:70]}") if __name__ == '__main__': addresses = sys.argv[1:] if len(sys.argv) > 1 else [ # Default demo: score a few known agents 'SP219TWC8G12CSX5AB093127NC82KYQWEH8ADD1AY', # Micro Basilisk (leaderboard #1) 'SP114F8BJ5MJEZP561TYWCSCYYBXDV0X023R0P93G', # Fair Otto (L2) 'SP329V8K7VBJXV8N2NDB0EA45TGG0EZH0M3ZHWBMK', # Huge Kraken (us, L1) ] print(f"Sybil Scorer — scoring {len(addresses)} address(es)...") results = [] for addr in addresses: r = score_address(addr) print_result(r) results.append(r) time.sleep(0.3) if len(results) > 1: print(f"\n{'='*60}") print("CLUSTER ANALYSIS:") cluster = cluster_analysis(results) print(json.dumps(cluster, indent=2))