AeroMesher: Complete Architecture & Implementation A. Architecture Diagram mermaidDownloadCopy codegraph TB subgraph "Android UI Layer (Java)" A[MainActivity
Activity] --> B[ChatFragment
Fragment] B --> C[ChatViewModel
MVVM] C --> D[MessageRepository] D --> E[ChaquopyBridge
Interface] style A fill:#3B5998,stroke:#1a2d4d,color:#fff style B fill:#4a6da7,stroke:#1a2d4d,color:#fff style C fill:#5a7db7,stroke:#1a2d4d,color:#fff style D fill:#6a8dc7,stroke:#1a2d4d,color:#fff style E fill:#7a9dd7,stroke:#1a2d4d,color:#fff end subgraph "Hybrid Bridge Layer" E <--> F[Chaquopy
Python Interpreter] F <--> G[IPC Message Queue
JSON/Protobuf] style F fill:#E7EEF8,stroke:#3B5998,color:#1a2d4d style G fill:#E7EEF8,stroke:#3B5998,color:#1a2d4d end subgraph "Python Core Service" G <--> H[NetworkManager
Main Controller] H --> I[KademliaDHT
Peer Discovery] H --> J[CryptoEngine
Double Ratchet] H --> K[TransportLayer
UDP Hole Punch] I --> L[RoutingTable
K-Buckets] style H fill:#2d4a7c,stroke:#1a2d4d,color:#fff style I fill:#3d5a8c,stroke:#1a2d4d,color:#fff style J fill:#4d6a9c,stroke:#1a2d4d,color:#fff style K fill:#5d7aac,stroke:#1a2d4d,color:#fff style L fill:#6d8abc,stroke:#1a2d4d,color:#fff end subgraph "Network Layer" K <--> M[UDP Socket
Port 51820] M <--> N[STUN Server
NAT Discovery] M <--> O[Peer Nodes
P2P Mesh] style M fill:#1a3d6d,stroke:#0a1d3d,color:#fff style N fill:#2a4d7d,stroke:#0a1d3d,color:#fff style O fill:#3a5d8d,stroke:#0a1d3d,color:#fff end subgraph "Data Flow Annotations" direction LR P["① UI Event (Send Message)"] -.-> Q["② Serialize + Encrypt"] Q -.-> R["③ DHT Lookup Peer"] R -.-> S["④ UDP Transmit"] S -.-> T["⑤ ACK + Callback"] end mermaidDownloadCopy codesequenceDiagram participant UI as Java UI Thread participant VM as ChatViewModel participant Bridge as ChaquopyBridge participant Python as Python NetworkManager participant DHT as Kademlia DHT participant Net as UDP Socket UI->>VM: sendMessage(peerId, content) VM->>Bridge: async dispatchToPython() Bridge->>Python: call("send_message", payload) activate Python Python->>Python: encrypt(DoubleRatchet) Python->>DHT: lookup_peer(peerId) DHT-->>Python: PeerEndpoint(ip, port) Python->>Net: udp_send(encrypted_packet) Net-->>Python: ACK / Timeout Python-->>Bridge: Result(success, msgId) deactivate Python Bridge-->>VM: LiveData.postValue() VM-->>UI: Observer notified UI->>UI: updateMessageStatus() B. The "Glass" UI Component (Frosted Aero Effect) GlassCardView.java javaDownloadCopy codepackage com.aeromesher.ui.components; import android.content.Context; import android.content.res.TypedArray; import android.graphics.*; import android.renderscript.*; import android.util.AttributeSet; import android.view.View; import android.view.ViewTreeObserver; import android.widget.FrameLayout; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import com.aeromesher.R; /** * GlassCardView - A Frutiger Aero "Frosted Glass" container component. * * Implements real-time Gaussian blur of underlying content with: * - Translucent acrylic overlay * - Glossy specular border highlights * - Orkut-inspired rounded corners * * Performance: Uses RenderScript for GPU-accelerated blur operations. * Battery: Blur updates are throttled and cached when static. */ public class GlassCardView extends FrameLayout { // Aero Design Tokens private static final int AERO_BLUE_PRIMARY = 0xFF3B5998; private static final int AERO_BLUE_LIGHT = 0xFFE7EEF8; private static final int AERO_WHITE_TRANSLUCENT = 0x99FFFFFF; private static final int AERO_BORDER_GLOSS_TOP = 0xCCFFFFFF; private static final int AERO_BORDER_GLOSS_BOTTOM = 0x66B8D4F0; private static final float DEFAULT_BLUR_RADIUS = 25f; private static final float DEFAULT_CORNER_RADIUS = 16f; private static final float DEFAULT_BORDER_WIDTH = 2.5f; private static final long BLUR_THROTTLE_MS = 64; // ~15fps for blur updates // Rendering Components private RenderScript renderScript; private ScriptIntrinsicBlur blurScript; private Allocation blurInputAllocation; private Allocation blurOutputAllocation; // Cached Bitmaps private Bitmap backgroundCapture; private Bitmap blurredBackground; private Canvas blurCanvas; // Paint Objects (reused for performance) private final Paint blurPaint = new Paint(Paint.ANTI_ALIAS_FLAG); private final Paint overlayPaint = new Paint(Paint.ANTI_ALIAS_FLAG); private final Paint borderPaint = new Paint(Paint.ANTI_ALIAS_FLAG); private final Paint glossPaint = new Paint(Paint.ANTI_ALIAS_FLAG); private final Paint innerShadowPaint = new Paint(Paint.ANTI_ALIAS_FLAG); // Geometry private final RectF boundsRect = new RectF(); private final RectF innerBoundsRect = new RectF(); private final Path clipPath = new Path(); private final Path borderPath = new Path(); // Configuration private float blurRadius = DEFAULT_BLUR_RADIUS; private float cornerRadius = DEFAULT_CORNER_RADIUS; private float borderWidth = DEFAULT_BORDER_WIDTH; private int overlayColor = AERO_WHITE_TRANSLUCENT; private boolean isBlurEnabled = true; private boolean requiresBlurUpdate = true; // Throttling private long lastBlurUpdateTime = 0; private final Runnable blurUpdateRunnable = this::performBlurUpdate; public GlassCardView(@NonNull Context context) { super(context); init(context, null); } public GlassCardView(@NonNull Context context, @Nullable AttributeSet attrs) { super(context, attrs); init(context, attrs); } public GlassCardView(@NonNull Context context, @Nullable AttributeSet attrs, int defStyleAttr) { super(context, attrs, defStyleAttr); init(context, attrs); } private void init(Context context, @Nullable AttributeSet attrs) { setWillNotDraw(false); setLayerType(LAYER_TYPE_HARDWARE, null); // Parse custom attributes if (attrs != null) { TypedArray ta = context.obtainStyledAttributes(attrs, R.styleable.GlassCardView); try { blurRadius = ta.getFloat(R.styleable.GlassCardView_glassBlurRadius, DEFAULT_BLUR_RADIUS); cornerRadius = ta.getDimension(R.styleable.GlassCardView_glassCornerRadius, dpToPx(DEFAULT_CORNER_RADIUS)); borderWidth = ta.getDimension(R.styleable.GlassCardView_glassBorderWidth, dpToPx(DEFAULT_BORDER_WIDTH)); overlayColor = ta.getColor(R.styleable.GlassCardView_glassOverlayColor, AERO_WHITE_TRANSLUCENT); isBlurEnabled = ta.getBoolean(R.styleable.GlassCardView_glassBlurEnabled, true); } finally { ta.recycle(); } } initRenderScript(context); initPaints(); setupBlurUpdateListener(); } private void initRenderScript(Context context) { try { renderScript = RenderScript.create(context); blurScript = ScriptIntrinsicBlur.create(renderScript, Element.U8_4(renderScript)); blurScript.setRadius(Math.min(blurRadius, 25f)); // RenderScript max is 25 } catch (RSRuntimeException e) { // Fallback: disable blur on unsupported devices isBlurEnabled = false; android.util.Log.w("GlassCardView", "RenderScript unavailable, blur disabled", e); } } private void initPaints() { // Blur layer paint blurPaint.setFilterBitmap(true); blurPaint.setDither(true); // Translucent overlay (Aero acrylic effect) overlayPaint.setColor(overlayColor); overlayPaint.setStyle(Paint.Style.FILL); // Glossy border with gradient borderPaint.setStyle(Paint.Style.STROKE); borderPaint.setStrokeWidth(borderWidth); borderPaint.setStrokeCap(Paint.Cap.ROUND); borderPaint.setStrokeJoin(Paint.Join.ROUND); // Top gloss highlight (specular) glossPaint.setStyle(Paint.Style.STROKE); glossPaint.setStrokeWidth(borderWidth * 0.6f); glossPaint.setColor(AERO_BORDER_GLOSS_TOP); glossPaint.setMaskFilter(new BlurMaskFilter(dpToPx(1), BlurMaskFilter.Blur.NORMAL)); // Inner shadow for depth innerShadowPaint.setColor(0x22000000); innerShadowPaint.setMaskFilter(new BlurMaskFilter(dpToPx(4), BlurMaskFilter.Blur.NORMAL)); innerShadowPaint.setStyle(Paint.Style.STROKE); innerShadowPaint.setStrokeWidth(dpToPx(6)); } private void setupBlurUpdateListener() { getViewTreeObserver().addOnPreDrawListener(new ViewTreeObserver.OnPreDrawListener() { @Override public boolean onPreDraw() { if (isBlurEnabled && requiresBlurUpdate) { long now = System.currentTimeMillis(); if (now - lastBlurUpdateTime > BLUR_THROTTLE_MS) { scheduleBlurUpdate(); lastBlurUpdateTime = now; } } return true; } }); } @Override protected void onSizeChanged(int w, int h, int oldw, int oldh) { super.onSizeChanged(w, h, oldw, oldh); if (w <= 0 || h <= 0) return; // Update geometry boundsRect.set(0, 0, w, h); float inset = borderWidth / 2f; innerBoundsRect.set(inset, inset, w - inset, h - inset); // Rebuild clip path clipPath.reset(); clipPath.addRoundRect(boundsRect, cornerRadius, cornerRadius, Path.Direction.CW); // Rebuild border path borderPath.reset(); borderPath.addRoundRect(innerBoundsRect, cornerRadius - inset, cornerRadius - inset, Path.Direction.CW); // Update border gradient (Aero glossy effect) LinearGradient borderGradient = new LinearGradient( 0, 0, 0, h, new int[]{AERO_BORDER_GLOSS_TOP, AERO_BLUE_LIGHT, AERO_BORDER_GLOSS_BOTTOM}, new float[]{0f, 0.5f, 1f}, Shader.TileMode.CLAMP ); borderPaint.setShader(borderGradient); // Reallocate blur buffers allocateBlurBuffers(w, h); requiresBlurUpdate = true; } private void allocateBlurBuffers(int width, int height) { // Downscale for performance (1/4 resolution blur) int scaledWidth = Math.max(1, width / 4); int scaledHeight = Math.max(1, height / 4); // Recycle old bitmaps if (backgroundCapture != null && !backgroundCapture.isRecycled()) { backgroundCapture.recycle(); } if (blurredBackground != null && !blurredBackground.isRecycled()) { blurredBackground.recycle(); } backgroundCapture = Bitmap.createBitmap(scaledWidth, scaledHeight, Bitmap.Config.ARGB_8888); blurredBackground = Bitmap.createBitmap(scaledWidth, scaledHeight, Bitmap.Config.ARGB_8888); blurCanvas = new Canvas(backgroundCapture); // Update RenderScript allocations if (renderScript != null) { if (blurInputAllocation != null) blurInputAllocation.destroy(); if (blurOutputAllocation != null) blurOutputAllocation.destroy(); blurInputAllocation = Allocation.createFromBitmap(renderScript, backgroundCapture); blurOutputAllocation = Allocation.createFromBitmap(renderScript, blurredBackground); } } private void scheduleBlurUpdate() { removeCallbacks(blurUpdateRunnable); post(blurUpdateRunnable); } private void performBlurUpdate() { if (!isBlurEnabled || backgroundCapture == null || getWidth() <= 0) return; View rootView = getRootView(); if (rootView == null) return; try { // Capture background content int[] location = new int[2]; getLocationOnScreen(location); blurCanvas.save(); float scale = (float) backgroundCapture.getWidth() / getWidth(); blurCanvas.scale(scale, scale); blurCanvas.translate(-location[0], -location[1]); // Temporarily hide this view to capture what's behind setVisibility(INVISIBLE); rootView.draw(blurCanvas); setVisibility(VISIBLE); blurCanvas.restore(); // Apply Gaussian blur via RenderScript if (blurScript != null && blurInputAllocation != null && blurOutputAllocation != null) { blurInputAllocation.copyFrom(backgroundCapture); blurScript.setInput(blurInputAllocation); blurScript.forEach(blurOutputAllocation); blurOutputAllocation.copyTo(blurredBackground); } requiresBlurUpdate = false; invalidate(); } catch (Exception e) { android.util.Log.e("GlassCardView", "Blur update failed", e); } } @Override protected void onDraw(Canvas canvas) { // Clip to rounded rectangle canvas.save(); canvas.clipPath(clipPath); // Layer 1: Blurred background if (isBlurEnabled && blurredBackground != null && !blurredBackground.isRecycled()) { Matrix matrix = new Matrix(); matrix.setScale( (float) getWidth() / blurredBackground.getWidth(), (float) getHeight() / blurredBackground.getHeight() ); canvas.drawBitmap(blurredBackground, matrix, blurPaint); } // Layer 2: Translucent overlay (Aero acrylic) canvas.drawRoundRect(boundsRect, cornerRadius, cornerRadius, overlayPaint); // Layer 3: Noise texture for authenticity (optional, subtle) // drawNoiseTexture(canvas); canvas.restore(); // Layer 4: Inner shadow for depth canvas.drawPath(borderPath, innerShadowPaint); // Layer 5: Glossy border canvas.drawPath(borderPath, borderPaint); // Layer 6: Top highlight streak (specular) drawSpecularHighlight(canvas); super.onDraw(canvas); } private void drawSpecularHighlight(Canvas canvas) { // Aero-style top edge gloss float highlightY = borderWidth + dpToPx(2); float highlightInset = cornerRadius + dpToPx(8); Path highlightPath = new Path(); highlightPath.moveTo(highlightInset, highlightY); highlightPath.lineTo(getWidth() - highlightInset, highlightY); Paint highlightPaint = new Paint(Paint.ANTI_ALIAS_FLAG); highlightPaint.setStyle(Paint.Style.STROKE); highlightPaint.setStrokeWidth(dpToPx(1.5f)); highlightPaint.setStrokeCap(Paint.Cap.ROUND); LinearGradient highlightGradient = new LinearGradient( highlightInset, 0, getWidth() - highlightInset, 0, new int[]{0x00FFFFFF, 0xAAFFFFFF, 0xAAFFFFFF, 0x00FFFFFF}, new float[]{0f, 0.3f, 0.7f, 1f}, Shader.TileMode.CLAMP ); highlightPaint.setShader(highlightGradient); canvas.drawPath(highlightPath, highlightPaint); } /** * Call when underlying content changes to refresh blur. */ public void invalidateBlur() { requiresBlurUpdate = true; invalidate(); } /** * Enable/disable blur for battery savings. */ public void setBlurEnabled(boolean enabled) { if (this.isBlurEnabled != enabled) { this.isBlurEnabled = enabled; requiresBlurUpdate = enabled; invalidate(); } } private float dpToPx(float dp) { return dp * getResources().getDisplayMetrics().density; } @Override protected void onDetachedFromWindow() { super.onDetachedFromWindow(); // Clean up RenderScript resources if (blurInputAllocation != null) { blurInputAllocation.destroy(); blurInputAllocation = null; } if (blurOutputAllocation != null) { blurOutputAllocation.destroy(); blurOutputAllocation = null; } if (blurScript != null) { blurScript.destroy(); blurScript = null; } if (renderScript != null) { renderScript.destroy(); renderScript = null; } if (backgroundCapture != null) { backgroundCapture.recycle(); backgroundCapture = null; } if (blurredBackground != null) { blurredBackground.recycle(); blurredBackground = null; } } } res/values/attrs.xml xmlDownloadCopy code res/drawable/aero_button_glossy.xml xmlDownloadCopy code Example Layout Usage xmlDownloadCopy code C. Python Core (NetworkManager.py) pythonDownloadCopy code""" AeroMesher Network Core - P2P Messaging Engine This module implements the decentralized networking layer including: - Kademlia-based DHT for serverless peer discovery - UDP hole punching for NAT traversal - Double Ratchet E2EE (Signal Protocol style) - Battery-efficient wake/sleep scheduling Architecture: Java UI <-> Chaquopy Bridge <-> NetworkManager <-> UDP Sockets Author: AeroMesher Team License: MIT """ from __future__ import annotations import asyncio import hashlib import hmac import json import logging import os import random import socket import struct import threading import time from abc import ABC, abstractmethod from collections import OrderedDict from dataclasses import dataclass, field, asdict from enum import IntEnum, auto from typing import ( Any, Callable, Dict, List, Optional, Set, Tuple, TypeVar, Union ) from concurrent.futures import ThreadPoolExecutor import base64 # Cryptography imports (use cryptography library) try: from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import x25519, ed25519 from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.kdf.hkdf import HKDF from cryptography.hazmat.backends import default_backend CRYPTO_AVAILABLE = True except ImportError: CRYPTO_AVAILABLE = False logging.warning("cryptography library not available, using mock encryption") # ============================================================================= # Configuration Constants # ============================================================================= K_BUCKET_SIZE = 20 # Kademlia k-value (peers per bucket) ALPHA_CONCURRENCY = 3 # Parallel lookups ID_BITS = 160 # SHA-1 based node IDs BOOTSTRAP_NODES = [ ("bootstrap1.aeromesher.local", 51820), ("bootstrap2.aeromesher.local", 51820), ] UDP_PORT = 51820 STUN_SERVERS = [ ("stun.l.google.com", 19302), ("stun1.l.google.com", 19302), ] MESSAGE_TTL_SECONDS = 86400 * 7 # 7 days KEEPALIVE_INTERVAL = 30 # seconds SLEEP_INTERVAL = 300 # 5 minutes when idle (battery saving) MAX_MESSAGE_SIZE = 65507 # UDP max payload logging.basicConfig( level=logging.INFO, format='%(asctime)s [%(levelname)s] %(name)s: %(message)s' ) logger = logging.getLogger("AeroMesher.Network") # ============================================================================= # Message Types & Serialization # ============================================================================= class MessageType(IntEnum): """Protocol message types for P2P communication.""" PING = 1 PONG = 2 FIND_NODE = 3 FIND_NODE_REPLY = 4 STORE = 5 FIND_VALUE = 6 FIND_VALUE_REPLY = 7 CHAT_MESSAGE = 10 CHAT_ACK = 11 NUDGE = 12 KEY_EXCHANGE = 20 KEY_EXCHANGE_REPLY = 21 @dataclass class NodeInfo: """Represents a peer node in the network.""" node_id: bytes address: Tuple[str, int] last_seen: float = field(default_factory=time.time) public_key: Optional[bytes] = None def to_dict(self) -> Dict[str, Any]: return { "node_id": base64.b64encode(self.node_id).decode(), "address": list(self.address), "last_seen": self.last_seen, "public_key": base64.b64encode(self.public_key).decode() if self.public_key else None } @classmethod def from_dict(cls, data: Dict[str, Any]) -> "NodeInfo": return cls( node_id=base64.b64decode(data["node_id"]), address=tuple(data["address"]), last_seen=data.get("last_seen", time.time()), public_key=base64.b64decode(data["public_key"]) if data.get("public_key") else None ) @dataclass class ChatMessage: """Encrypted chat message structure.""" message_id: str sender_id: bytes recipient_id: bytes ciphertext: bytes timestamp: float nonce: bytes ratchet_key: Optional[bytes] = None # For Double Ratchet def serialize(self) -> bytes: """Serialize to bytes for network transmission.""" payload = { "mid": self.message_id, "sid": base64.b64encode(self.sender_id).decode(), "rid": base64.b64encode(self.recipient_id).decode(), "ct": base64.b64encode(self.ciphertext).decode(), "ts": self.timestamp, "nc": base64.b64encode(self.nonce).decode(), } if self.ratchet_key: payload["rk"] = base64.b64encode(self.ratchet_key).decode() return json.dumps(payload, separators=(',', ':')).encode('utf-8') @classmethod def deserialize(cls, data: bytes) -> "ChatMessage": """Deserialize from network bytes.""" payload = json.loads(data.decode('utf-8')) return cls( message_id=payload["mid"], sender_id=base64.b64decode(payload["sid"]), recipient_id=base64.b64decode(payload["rid"]), ciphertext=base64.b64decode(payload["ct"]), timestamp=payload["ts"], nonce=base64.b64decode(payload["nc"]), ratchet_key=base64.b64decode(payload["rk"]) if "rk" in payload else None ) class ProtocolMessage: """Low-level protocol message wrapper.""" HEADER_FORMAT = "!BHI" # type(1), length(2), sequence(4) HEADER_SIZE = struct.calcsize(HEADER_FORMAT) def __init__( self, msg_type: MessageType, payload: bytes, sequence: int = 0 ): self.msg_type = msg_type self.payload = payload self.sequence = sequence or random.randint(0, 0xFFFFFFFF) def pack(self) -> bytes: """Pack message for UDP transmission.""" header = struct.pack( self.HEADER_FORMAT, self.msg_type, len(self.payload), self.sequence ) return header + self.payload @classmethod def unpack(cls, data: bytes) -> "ProtocolMessage": """Unpack message from UDP datagram.""" if len(data) < cls.HEADER_SIZE: raise ValueError("Message too short") msg_type, length, sequence = struct.unpack( cls.HEADER_FORMAT, data[:cls.HEADER_SIZE] ) payload = data[cls.HEADER_SIZE:cls.HEADER_SIZE + length] if len(payload) != length: raise ValueError(f"Payload length mismatch: expected {length}, got {len(payload)}") return cls(MessageType(msg_type), payload, sequence) # ============================================================================= # Kademlia DHT Implementation # ============================================================================= def xor_distance(id1: bytes, id2: bytes) -> int: """Calculate XOR distance between two node IDs.""" return int.from_bytes( bytes(a ^ b for a, b in zip(id1, id2)), byteorder='big' ) def generate_node_id() -> bytes: """Generate a random 160-bit node ID.""" return hashlib.sha1(os.urandom(32)).digest() class KBucket: """ K-Bucket for Kademlia routing table. Maintains up to K_BUCKET_SIZE nodes, sorted by last-seen time. Implements LRU eviction with liveness checks. """ def __init__(self, range_min: int, range_max: int): self.range_min = range_min self.range_max = range_max self.nodes: OrderedDict[bytes, NodeInfo] = OrderedDict() self.replacement_cache: OrderedDict[bytes, NodeInfo] = OrderedDict() self._lock = threading.RLock() def covers(self, node_id: bytes) -> bool: """Check if node_id falls within this bucket's range.""" id_int = int.from_bytes(node_id, byteorder='big') return self.range_min <= id_int < self.range_max def add_node(self, node: NodeInfo) -> bool: """ Add or update a node in the bucket. Returns True if node was added, False if bucket is full. """ with self._lock: if node.node_id in self.nodes: # Move to end (most recently seen) self.nodes.move_to_end(node.node_id) self.nodes[node.node_id] = node return True if len(self.nodes) < K_BUCKET_SIZE: self.nodes[node.node_id] = node return True # Bucket full - add to replacement cache self.replacement_cache[node.node_id] = node if len(self.replacement_cache) > K_BUCKET_SIZE: self.replacement_cache.popitem(last=False) return False def remove_node(self, node_id: bytes) -> Optional[NodeInfo]: """Remove a node and potentially replace from cache.""" with self._lock: removed = self.nodes.pop(node_id, None) if removed and self.replacement_cache: # Promote from replacement cache replacement_id, replacement = self.replacement_cache.popitem(last=True) self.nodes[replacement_id] = replacement return removed def get_nodes(self, count: int = K_BUCKET_SIZE) -> List[NodeInfo]: """Get up to `count` nodes, sorted by last seen (most recent first).""" with self._lock: return list(self.nodes.values())[-count:] def __len__(self) -> int: return len(self.nodes) class RoutingTable: """ Kademlia routing table with 160 k-buckets. Each bucket covers a specific XOR distance range from our node ID. """ def __init__(self, local_node_id: bytes): self.local_node_id = local_node_id self.buckets: List[KBucket] = [] self._init_buckets() self._lock = threading.RLock() def _init_buckets(self): """Initialize the 160 k-buckets.""" for i in range(ID_BITS): range_min = 2 ** i if i > 0 else 0 range_max = 2 ** (i + 1) self.buckets.append(KBucket(range_min, range_max)) def _bucket_index(self, node_id: bytes) -> int: """Find the appropriate bucket index for a node ID.""" distance = xor_distance(self.local_node_id, node_id) if distance == 0: return 0 return distance.bit_length() - 1 def add_node(self, node: NodeInfo) -> bool: """Add a node to the routing table.""" if node.node_id == self.local_node_id: return False with self._lock: index = self._bucket_index(node.node_id) return self.buckets[index].add_node(node) def remove_node(self, node_id: bytes) -> Optional[NodeInfo]: """Remove a node from the routing table.""" with self._lock: index = self._bucket_index(node_id) return self.buckets[index].remove_node(node_id) def find_closest(self, target_id: bytes, count: int = K_BUCKET_SIZE) -> List[NodeInfo]: """Find the `count` closest nodes to target_id.""" with self._lock: all_nodes: List[Tuple[int, NodeInfo]] = [] for bucket in self.buckets: for node in bucket.get_nodes(): distance = xor_distance(target_id, node.node_id) all_nodes.append((distance, node)) all_nodes.sort(key=lambda x: x[0]) return [node for _, node in all_nodes[:count]] def get_all_nodes(self) -> List[NodeInfo]: """Get all nodes in the routing table.""" with self._lock: nodes = [] for bucket in self.buckets: nodes.extend(bucket.get_nodes()) return nodes # ============================================================================= # Cryptography Engine (Double Ratchet Simplified) # ============================================================================= class CryptoEngine: """ Handles E2EE using a simplified Double Ratchet implementation. For production, consider using the `python-doubleratchet` library or implementing full X3DH key agreement. """ def __init__(self): if not CRYPTO_AVAILABLE: logger.warning("Running with mock encryption - NOT SECURE") self.identity_key = None self.prekey = None return # Generate long-term identity key (Ed25519 for signing) self.identity_private = ed25519.Ed25519PrivateKey.generate() self.identity_key = self.identity_private.public_key() # Generate ephemeral key for DH (X25519) self.ephemeral_private = x25519.X25519PrivateKey.generate() self.prekey = self.ephemeral_private.public_key() # Ratchet state per peer self._ratchet_states: Dict[bytes, Dict[str, Any]] = {} self._lock = threading.RLock() def get_public_keys(self) -> Tuple[bytes, bytes]: """Get serialized public keys for key exchange.""" if not CRYPTO_AVAILABLE: return (b"mock_identity", b"mock_prekey") identity_bytes = self.identity_key.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) prekey_bytes = self.prekey.public_bytes( encoding=serialization.Encoding.Raw, format=serialization.PublicFormat.Raw ) return (identity_bytes, prekey_bytes) def perform_key_exchange( self, peer_id: bytes, peer_identity: bytes, peer_prekey: bytes ) -> bytes: """ Perform X25519 key exchange and initialize ratchet state. Returns the shared root key. """ if not CRYPTO_AVAILABLE: return hashlib.sha256(peer_id).digest() # Load peer's X25519 public key peer_public = x25519.X25519PublicKey.from_public_bytes(peer_prekey) # Perform DH shared_secret = self.ephemeral_private.exchange(peer_public) # Derive root key using HKDF hkdf = HKDF( algorithm=hashes.SHA256(), length=32, salt=b"AeroMesher-v1", info=b"root-key", backend=default_backend() ) root_key = hkdf.derive(shared_secret) # Initialize ratchet state with self._lock: self._ratchet_states[peer_id] = { "root_key": root_key, "send_chain_key": self._derive_chain_key(root_key, b"send"), "recv_chain_key": self._derive_chain_key(root_key, b"recv"), "send_counter": 0, "recv_counter": 0, } return root_key def _derive_chain_key(self, root_key: bytes, label: bytes) -> bytes: """Derive a chain key from root key.""" return hmac.new(root_key, label, hashlib.sha256).digest() def _ratchet_chain(self, chain_key: bytes) -> Tuple[bytes, bytes]: """Advance the chain: returns (new_chain_key, message_key).""" message_key = hmac.new(chain_key, b"\x01", hashlib.sha256).digest() new_chain_key = hmac.new(chain_key, b"\x02", hashlib.sha256).digest() return new_chain_key, message_key def encrypt_message( self, peer_id: bytes, plaintext: bytes ) -> Tuple[bytes, bytes, Optional[bytes]]: """ Encrypt a message for a peer. Returns: (ciphertext, nonce, ratchet_key) """ if not CRYPTO_AVAILABLE: # Mock encryption for testing nonce = os.urandom(12) return (plaintext, nonce, None) with self._lock: state = self._ratchet_states.get(peer_id) if not state: raise ValueError(f"No session established with peer {peer_id.hex()}") # Ratchet the send chain state["send_chain_key"], message_key = self._ratchet_chain( state["send_chain_key"] ) state["send_counter"] += 1 # Encrypt with AES-256-GCM nonce = os.urandom(12) aesgcm = AESGCM(message_key) ciphertext = aesgcm.encrypt(nonce, plaintext, None) return (ciphertext, nonce, None) def decrypt_message( self, peer_id: bytes, ciphertext: bytes, nonce: bytes, ratchet_key: Optional[bytes] = None ) -> bytes: """Decrypt a message from a peer.""" if not CRYPTO_AVAILABLE: return ciphertext # Mock decryption with self._lock: state = self._ratchet_states.get(peer_id) if not state: raise ValueError(f"No session established with peer {peer_id.hex()}") # Ratchet the receive chain state["recv_chain_key"], message_key = self._ratchet_chain( state["recv_chain_key"] ) state["recv_counter"] += 1 # Decrypt with AES-256-GCM aesgcm = AESGCM(message_key) plaintext = aesgcm.decrypt(nonce, ciphertext, None) return plaintext # ============================================================================= # Transport Layer (UDP with NAT Traversal) # ============================================================================= class UDPTransport: """ UDP transport layer with STUN-based NAT traversal. Implements: - STUN discovery for public IP/port - UDP hole punching coordination - Reliable message delivery with retries """ def __init__( self, local_port: int = UDP_PORT, on_message: Optional[Callable[[bytes, Tuple[str, int]], None]] = None ): self.local_port = local_port self.on_message = on_message self.socket: Optional[socket.socket] = None self.public_endpoint: Optional[Tuple[str, int]] = None self._running = False self._recv_thread: Optional[threading.Thread] = None self._pending_acks: Dict[int, asyncio.Future] = {} self._lock = threading.RLock() def start(self) -> Tuple[str, int]: """ Start the UDP transport. Returns the public (STUN-discovered) endpoint. """ # Create UDP socket self.socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.bind(("0.0.0.0", self.local_port)) self.socket.settimeout(1.0) # Discover public endpoint via STUN self.public_endpoint = self._stun_discover() logger.info(f"Public endpoint: {self.public_endpoint}") # Start receive thread self._running = True self._recv_thread = threading.Thread(target=self._recv_loop, daemon=True) self._recv_thread.start() return self.public_endpoint def stop(self): """Stop the UDP transport.""" self._running = False if self._recv_thread: self._recv_thread.join(timeout=2.0) if self.socket: self.socket.close() self.socket = None def _stun_discover(self) -> Tuple[str, int]: """ Discover public IP:port using STUN protocol. Simplified STUN Binding Request implementation. """ for stun_host, stun_port in STUN_SERVERS: try: # STUN Binding Request (RFC 5389) # Magic cookie + transaction ID transaction_id = os.urandom(12) stun_request = struct.pack( "!HHI12s", 0x0001, # Binding Request 0, # Message length 0x2112A442, # Magic cookie transaction_id ) stun_addr = (socket.gethostbyname(stun_host), stun_port) self.socket.sendto(stun_request, stun_addr) # Wait for response self.socket.settimeout(2.0) response, _ = self.socket.recvfrom(1024) # Parse STUN response (simplified) if len(response) >= 20: # Look for XOR-MAPPED-ADDRESS attribute offset = 20 while offset < len(response) - 4: attr_type, attr_len = struct.unpack("!HH", response[offset:offset+4]) offset += 4 if attr_type == 0x0020: # XOR-MAPPED-ADDRESS family = response[offset + 1] if family == 0x01: # IPv4 xor_port = struct.unpack("!H", response[offset+2:offset+4])[0] xor_ip = struct.unpack("!I", response[offset+4:offset+8])[0] port = xor_port ^ 0x2112 ip_int = xor_ip ^ 0x2112A442 ip = socket.inet_ntoa(struct.pack("!I", ip_int)) return (ip, port) offset += attr_len + (4 - attr_len % 4) % 4 # Padding except Exception as e: logger.debug(f"STUN server {stun_host} failed: {e}") continue # Fallback to local address logger.warning("STUN discovery failed, using local address") hostname = socket.gethostname() local_ip = socket.gethostbyname(hostname) return (local_ip, self.local_port) def _recv_loop(self): """Background thread for receiving UDP datagrams.""" while self._running: try: data, addr = self.socket.recvfrom(MAX_MESSAGE_SIZE) if self.on_message: self.on_message(data, addr) except socket.timeout: continue except OSError: if self._running: logger.error("Socket error in recv loop") break except Exception as e: logger.exception(f"Error in recv loop: {e}") def send(self, data: bytes, address: Tuple[str, int]) -> bool: """Send a UDP datagram.""" if not self.socket: return False try: self.socket.sendto(data, address) return True except Exception as e: logger.error(f"Send failed to {address}: {e}") return False def punch_hole(self, peer_address: Tuple[str, int]) -> bool: """ Initiate UDP hole punching to a peer. Sends empty packets to open NAT mapping. """ for _ in range(3): self.send(b"\x00", peer_address) time.sleep(0.1) return True # ============================================================================= # Main Network Manager (Bridge to Java) # ============================================================================= class NetworkManager: """ Main orchestrator for AeroMesher P2P networking. This class is exposed to Java via Chaquopy and provides: - Peer discovery and connection management - Encrypted message sending/receiving - Battery-efficient background operation Usage from Java (Chaquopy): Python py = Python.getInstance(); PyObject networkModule = py.getModule("network_manager"); PyObject manager = networkModule.callAttr("NetworkManager"); manager.callAttr("start"); manager.callAttr("send_message", peerId, "Hello!"); """ def __init__(self, data_dir: str = "/data/data/com.aeromesher/files"): """ Initialize the NetworkManager. Args: data_dir: Directory for persistent storage (keys, routing table) """ self.data_dir = data_dir # Generate or load node identity self.node_id = self._load_or_generate_node_id() logger.info(f"Node ID: {self.node_id.hex()[:16]}...") # Initialize components self.routing_table = RoutingTable(self.node_id) self.crypto = CryptoEngine() self.transport: Optional[UDPTransport] = None # State management self._running = False self._idle = False self._last_activity = time.time() # Message handling self._message_handlers: Dict[MessageType, Callable] = {} self._pending_requests: Dict[int, asyncio.Future] = {} self._received_messages: Set[str] = set() # Deduplication # Java callbacks (set via bridge) self._on_message_callback: Optional[Callable] = None self._on_peer_discovered_callback: Optional[Callable] = None self._on_connection_state_callback: Optional[Callable] = None # Background executor self._executor = ThreadPoolExecutor(max_workers=4, thread_name_prefix="AeroNet") self._maintenance_thread: Optional[threading.Thread] = None self._setup_handlers() def _load_or_generate_node_id(self) -> bytes: """Load existing node ID or generate a new one.""" id_path = os.path.join(self.data_dir, "node_id") try: if os.path.exists(id_path): with open(id_path, "rb") as f: return f.read(20) except Exception as e: logger.warning(f"Failed to load node ID: {e}") # Generate new ID node_id = generate_node_id() try: os.makedirs(self.data_dir, exist_ok=True) with open(id_path, "wb") as f: f.write(node_id) except Exception as e: logger.warning(f"Failed to save node ID: {e}") return node_id def _setup_handlers(self): """Register protocol message handlers.""" self._message_handlers = { MessageType.PING: self._handle_ping, MessageType.PONG: self._handle_pong, MessageType.FIND_NODE: self._handle_find_node, MessageType.FIND_NODE_REPLY: self._handle_find_node_reply, MessageType.CHAT_MESSAGE: self._handle_chat_message, MessageType.CHAT_ACK: self._handle_chat_ack, MessageType.NUDGE: self._handle_nudge, MessageType.KEY_EXCHANGE: self._handle_key_exchange, MessageType.KEY_EXCHANGE_REPLY: self._handle_key_exchange_reply, } # ========================================================================= # Java Bridge Methods (Exposed to Chaquopy) # ========================================================================= def start(self) -> Dict[str, Any]: """ Start the P2P network service. Returns: Status dict with public endpoint and node ID. """ if self._running: return {"status": "already_running", "node_id": self.node_id.hex()} try: # Start transport self.transport = UDPTransport( local_port=UDP_PORT, on_message=self._on_transport_message ) public_endpoint = self.transport.start() self._running = True # Start maintenance thread (keepalives, routing table refresh) self._maintenance_thread = threading.Thread( target=self._maintenance_loop, daemon=True, name="AeroMaint" ) self._maintenance_thread.start() # Bootstrap into network self._executor.submit(self._bootstrap) logger.info("NetworkManager started successfully") return { "status": "started", "node_id": self.node_id.hex(), "public_ip": public_endpoint[0], "public_port": public_endpoint[1], } except Exception as e: logger.exception("Failed to start NetworkManager") return {"status": "error", "message": str(e)} def stop(self) -> Dict[str, Any]: """Stop the P2P network service.""" self._running = False if self.transport: self.transport.stop() self.transport = None self._executor.shutdown(wait=False) logger.info("NetworkManager stopped") return {"status": "stopped"} def send_message( self, peer_id_hex: str, content: str, message_type: str = "text" ) -> Dict[str, Any]: """ Send an encrypted message to a peer. Args: peer_id_hex: Hex-encoded peer node ID content: Message content (plaintext) message_type: Type of message ("text", "image", etc.) Returns: Status dict with message ID and delivery status. """ try: peer_id = bytes.fromhex(peer_id_hex) # Generate message ID message_id = hashlib.sha256( f"{self.node_id.hex()}{peer_id_hex}{time.time()}{os.urandom(8).hex()}".encode() ).hexdigest()[:32] # Encrypt message plaintext = json.dumps({ "type": message_type, "content": content, "timestamp": time.time() }).encode('utf-8') ciphertext, nonce, ratchet_key = self.crypto.encrypt_message(peer_id, plaintext) # Create chat message chat_msg = ChatMessage( message_id=message_id, sender_id=self.node_id, recipient_id=peer_id, ciphertext=ciphertext, timestamp=time.time(), nonce=nonce, ratchet_key=ratchet_key ) # Look up peer address peer_node = self._find_peer_in_routing_table(peer_id) if not peer_node: # Trigger DHT lookup self._executor.submit(self._lookup_and_send, peer_id, chat_msg) return { "status": "pending", "message_id": message_id, "info": "Peer lookup in progress" } # Send directly self._send_chat_message(chat_msg, peer_node.address) self._last_activity = time.time() return { "status": "sent", "message_id": message_id, "peer_address": list(peer_node.address) } except Exception as e: logger.exception(f"Failed to send message to {peer_id_hex}") return {"status": "error", "message": str(e)} def send_nudge(self, peer_id_hex: str) -> Dict[str, Any]: """ Send a "nudge" to a peer (MSN-style attention grabber). Args: peer_id_hex: Hex-encoded peer node ID Returns: Status dict. """ try: peer_id = bytes.fromhex(peer_id_hex) peer_node = self._find_peer_in_routing_table(peer_id) if not peer_node: return {"status": "error", "message": "Peer not found"} # Create nudge message payload = json.dumps({ "sender": self.node_id.hex(), "timestamp": time.time() }).encode('utf-8') msg = ProtocolMessage(MessageType.NUDGE, payload) self.transport.send(msg.pack(), peer_node.address) return {"status": "sent"} except Exception as e: return {"status": "error", "message": str(e)} def get_peers(self) -> List[Dict[str, Any]]: """ Get list of known peers. Returns: List of peer info dicts. """ peers = [] for node in self.routing_table.get_all_nodes(): peers.append({ "node_id": node.node_id.hex(), "address": list(node.address), "last_seen": node.last_seen, "online": (time.time() - node.last_seen) < 300 }) return peers def establish_session(self, peer_id_hex: str) -> Dict[str, Any]: """ Establish an encrypted session with a peer (key exchange). Args: peer_id_hex: Hex-encoded peer node ID Returns: Status dict. """ try: peer_id = bytes.fromhex(peer_id_hex) peer_node = self._find_peer_in_routing_table(peer_id) if not peer_node: return {"status": "error", "message": "Peer not found in routing table"} # Send key exchange request identity_key, prekey = self.crypto.get_public_keys() payload = json.dumps({ "sender": self.node_id.hex(), "identity_key": base64.b64encode(identity_key).decode(), "prekey": base64.b64encode(prekey).decode(), }).encode('utf-8') msg = ProtocolMessage(MessageType.KEY_EXCHANGE, payload) self.transport.send(msg.pack(), peer_node.address) return {"status": "initiated", "peer_id": peer_id_hex} except Exception as e: logger.exception("Key exchange failed") return {"status": "error", "message": str(e)} def set_message_callback(self, callback: Callable[[Dict], None]): """ Set callback for incoming messages. The callback receives a dict with message details. """ self._on_message_callback = callback def set_peer_callback(self, callback: Callable[[Dict], None]): """Set callback for peer discovery events.""" self._on_peer_discovered_callback = callback def set_connection_callback(self, callback: Callable[[str, bool], None]): """Set callback for connection state changes.""" self._on_connection_state_callback = callback def get_battery_state(self) -> Dict[str, Any]: """ Get current battery optimization state. Returns: Dict with idle status and intervals. """ return { "idle": self._idle, "last_activity": self._last_activity, "idle_duration": time.time() - self._last_activity if self._idle else 0, "keepalive_interval": SLEEP_INTERVAL if self._idle else KEEPALIVE_INTERVAL } # ========================================================================= # Internal Network Operations # ========================================================================= def _bootstrap(self): """Bootstrap into the P2P network using known nodes.""" logger.info("Bootstrapping into network...") for host, port in BOOTSTRAP_NODES: try: # Resolve hostname ip = socket.gethostbyname(host) address = (ip, port) # Send FIND_NODE for our own ID (populates routing table) self._send_find_node(self.node_id, address) except Exception as e: logger.warning(f"Bootstrap node {host}:{port} failed: {e}") # Also lookup a few random IDs to populate diverse buckets for _ in range(3): random_id = generate_node_id() closest = self.routing_table.find_closest(random_id, ALPHA_CONCURRENCY) for node in closest: self._send_find_node(random_id, node.address) def _maintenance_loop(self): """ Background maintenance loop. Handles: - Periodic keepalives to maintain NAT mappings - Routing table refresh - Idle detection for battery savings """ while self._running: try: current_time = time.time() idle_threshold = 120 # 2 minutes # Check if we should enter idle mode if current_time - self._last_activity > idle_threshold: if not self._idle: logger.info("Entering idle mode for battery savings") self._idle = True else: if self._idle: logger.info("Exiting idle mode") self._idle = False # Determine sleep interval based on idle state interval = SLEEP_INTERVAL if self._idle else KEEPALIVE_INTERVAL # Send keepalives to a subset of peers nodes = self.routing_table.find_closest(self.node_id, 3) for node in nodes: self._send_ping(node.address) # Refresh stale buckets (every 15 minutes) if int(current_time) % 900 < interval: self._refresh_routing_table() time.sleep(interval) except Exception as e: logger.exception(f"Maintenance loop error: {e}") time.sleep(5) def _refresh_routing_table(self): """Refresh routing table by looking up random IDs in each bucket.""" logger.debug("Refreshing routing table...") for i in range(0, ID_BITS, 20): # Sample every 20th bucket # Generate ID that would fall in this bucket target = bytearray(self.node_id) byte_idx = i // 8 bit_idx = i % 8 if byte_idx < len(target): target[byte_idx] ^= (1 << (7 - bit_idx)) closest = self.routing_table.find_closest(bytes(target), ALPHA_CONCURRENCY) for node in closest[:1]: # Limit queries self._send_find_node(bytes(target), node.address) def _on_transport_message(self, data: bytes, address: Tuple[str, int]): """Handle incoming UDP message.""" try: msg = ProtocolMessage.unpack(data) handler = self._message_handlers.get(msg.msg_type) if handler: self._executor.submit(handler, msg, address) else: logger.warning(f"Unknown message type: {msg.msg_type}") except Exception as e: logger.debug(f"Failed to parse message from {address}: {e}") def _send_ping(self, address: Tuple[str, int]): """Send PING to an address.""" payload = json.dumps({"sender": self.node_id.hex()}).encode() msg = ProtocolMessage(MessageType.PING, payload) self.transport.send(msg.pack(), address) def _send_find_node(self, target_id: bytes, address: Tuple[str, int]): """Send FIND_NODE request.""" payload = json.dumps({ "sender": self.node_id.hex(), "target": target_id.hex() }).encode() msg = ProtocolMessage(MessageType.FIND_NODE, payload) self.transport.send(msg.pack(), address) def _send_chat_message(self, chat_msg: ChatMessage, address: Tuple[str, int]): """Send encrypted chat message.""" msg = ProtocolMessage(MessageType.CHAT_MESSAGE, chat_msg.serialize()) self.transport.send(msg.pack(), address) def _find_peer_in_routing_table(self, peer_id: bytes) -> Optional[NodeInfo]: """Find a peer in the local routing table.""" closest = self.routing_table.find_closest(peer_id, 1) if closest and closest[0].node_id == peer_id: return closest[0] return None def _lookup_and_send(self, peer_id: bytes, chat_msg: ChatMessage): """Perform iterative lookup and send message when peer is found.""" # Simplified iterative lookup queried: Set[bytes] = set() for _ in range(10): # Max iterations closest = self.routing_table.find_closest(peer_id, ALPHA_CONCURRENCY) for node in closest: if node.node_id in queried: continue queried.add(node.node_id) if node.node_id == peer_id: # Found the peer! self._send_chat_message(chat_msg, node.address) return # Query this node self._send_find_node(peer_id, node.address) time.sleep(0.5) logger.warning(f"Peer lookup failed: {peer_id.hex()[:16]}") # ========================================================================= # Message Handlers # ========================================================================= def _handle_ping(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle PING message.""" try: data = json.loads(msg.payload) sender_id = bytes.fromhex(data["sender"]) # Update routing table self.routing_table.add_node(NodeInfo( node_id=sender_id, address=address )) # Send PONG pong_payload = json.dumps({"sender": self.node_id.hex()}).encode() pong = ProtocolMessage(MessageType.PONG, pong_payload, msg.sequence) self.transport.send(pong.pack(), address) except Exception as e: logger.debug(f"Error handling PING: {e}") def _handle_pong(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle PONG message.""" try: data = json.loads(msg.payload) sender_id = bytes.fromhex(data["sender"]) self.routing_table.add_node(NodeInfo( node_id=sender_id, address=address )) except Exception as e: logger.debug(f"Error handling PONG: {e}") def _handle_find_node(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle FIND_NODE request.""" try: data = json.loads(msg.payload) sender_id = bytes.fromhex(data["sender"]) target_id = bytes.fromhex(data["target"]) # Update routing table self.routing_table.add_node(NodeInfo( node_id=sender_id, address=address )) # Find closest nodes to target closest = self.routing_table.find_closest(target_id, K_BUCKET_SIZE) nodes_data = [node.to_dict() for node in closest] # Send reply reply_payload = json.dumps({ "sender": self.node_id.hex(), "nodes": nodes_data }).encode() reply = ProtocolMessage(MessageType.FIND_NODE_REPLY, reply_payload, msg.sequence) self.transport.send(reply.pack(), address) except Exception as e: logger.debug(f"Error handling FIND_NODE: {e}") def _handle_find_node_reply(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle FIND_NODE_REPLY message.""" try: data = json.loads(msg.payload) # Add returned nodes to routing table for node_data in data.get("nodes", []): node = NodeInfo.from_dict(node_data) self.routing_table.add_node(node) # Notify callback of discovered peer if self._on_peer_discovered_callback: self._on_peer_discovered_callback({ "node_id": node.node_id.hex(), "address": list(node.address) }) except Exception as e: logger.debug(f"Error handling FIND_NODE_REPLY: {e}") def _handle_chat_message(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle incoming chat message.""" try: chat_msg = ChatMessage.deserialize(msg.payload) # Deduplicate if chat_msg.message_id in self._received_messages: return self._received_messages.add(chat_msg.message_id) # Decrypt plaintext = self.crypto.decrypt_message( chat_msg.sender_id, chat_msg.ciphertext, chat_msg.nonce, chat_msg.ratchet_key ) message_data = json.loads(plaintext) # Send ACK ack_payload = json.dumps({ "message_id": chat_msg.message_id, "sender": self.node_id.hex() }).encode() ack = ProtocolMessage(MessageType.CHAT_ACK, ack_payload) self.transport.send(ack.pack(), address) # Notify callback if self._on_message_callback: self._on_message_callback({ "message_id": chat_msg.message_id, "sender_id": chat_msg.sender_id.hex(), "type": message_data.get("type", "text"), "content": message_data.get("content", ""), "timestamp": chat_msg.timestamp }) self._last_activity = time.time() except Exception as e: logger.exception(f"Error handling chat message: {e}") def _handle_chat_ack(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle chat message acknowledgment.""" try: data = json.loads(msg.payload) message_id = data.get("message_id") logger.debug(f"Message {message_id} acknowledged") # Could update UI via callback here except Exception as e: logger.debug(f"Error handling CHAT_ACK: {e}") def _handle_nudge(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle incoming nudge.""" try: data = json.loads(msg.payload) sender_id = data.get("sender") if self._on_message_callback: self._on_message_callback({ "type": "nudge", "sender_id": sender_id, "timestamp": data.get("timestamp", time.time()) }) self._last_activity = time.time() except Exception as e: logger.debug(f"Error handling NUDGE: {e}") def _handle_key_exchange(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle key exchange request.""" try: data = json.loads(msg.payload) sender_id = bytes.fromhex(data["sender"]) peer_identity = base64.b64decode(data["identity_key"]) peer_prekey = base64.b64decode(data["prekey"]) # Perform key exchange self.crypto.perform_key_exchange(sender_id, peer_identity, peer_prekey) # Send reply with our keys identity_key, prekey = self.crypto.get_public_keys() reply_payload = json.dumps({ "sender": self.node_id.hex(), "identity_key": base64.b64encode(identity_key).decode(), "prekey": base64.b64encode(prekey).decode(), }).encode() reply = ProtocolMessage(MessageType.KEY_EXCHANGE_REPLY, reply_payload, msg.sequence) self.transport.send(reply.pack(), address) logger.info(f"Key exchange completed with {sender_id.hex()[:16]}") except Exception as e: logger.exception(f"Error handling KEY_EXCHANGE: {e}") def _handle_key_exchange_reply(self, msg: ProtocolMessage, address: Tuple[str, int]): """Handle key exchange reply.""" try: data = json.loads(msg.payload) sender_id = bytes.fromhex(data["sender"]) peer_identity = base64.b64decode(data["identity_key"]) peer_prekey = base64.b64decode(data["prekey"]) self.crypto.perform_key_exchange(sender_id, peer_identity, peer_prekey) logger.info(f"Session established with {sender_id.hex()[:16]}") except Exception as e: logger.exception(f"Error handling KEY_EXCHANGE_REPLY: {e}") # ============================================================================= # Module-level factory for Chaquopy # ============================================================================= _instance: Optional[NetworkManager] = None def get_instance(data_dir: str = "/data/data/com.aeromesher/files") -> NetworkManager: """Get or create the singleton NetworkManager instance.""" global _instance if _instance is None: _instance = NetworkManager(data_dir) return _instance def create_manager(data_dir: str) -> NetworkManager: """Factory function for Chaquopy bridge.""" return NetworkManager(data_dir) D. UX Micro-Interaction: "Window Shake" (Nudge Effect) NudgeAnimator.java javaDownloadCopy codepackage com.aeromesher.ui.animations; import android.animation.AnimatorSet; import android.animation.ObjectAnimator; import android.animation.ValueAnimator; import android.content.Context; import android.os.Build; import android.os.VibrationEffect; import android.os.Vibrator; import android.os.VibratorManager; import android.view.View; import android.view.animation.CycleInterpolator; import android.view.animation.DecelerateInterpolator; import androidx.annotation.NonNull; import androidx.annotation.Nullable; /** * NudgeAnimator - MSN Messenger-style "Nudge" window shake effect. * * Provides a nostalgic Y2K-era attention-grabbing animation with: * - Chaotic horizontal/vertical shake * - Synchronized haptic feedback * - Screen flash effect * - Decaying intensity * * Usage: * NudgeAnimator.nudge(chatContainerView, getContext()); */ public final class NudgeAnimator { private static final long NUDGE_DURATION_MS = 800; private static final float SHAKE_INTENSITY_DP = 12f; private static final int SHAKE_CYCLES = 6; // Vibration pattern: [delay, vibrate, pause, vibrate, ...] // Mimics the erratic MSN nudge feel private static final long[] VIBRATION_PATTERN = { 0, 50, 30, 60, 25, 70, 20, 80, 15, 50, 30, 40 }; private NudgeAnimator() { // Static utility class } /** * Perform the nudge animation on a view. * * @param targetView The view to shake (typically the chat container) * @param context Context for vibration service */ public static void nudge(@NonNull View targetView, @NonNull Context context) { nudge(targetView, context, null); } /** * Perform the nudge animation with completion callback. * * @param targetView The view to shake * @param context Context for vibration service * @param onComplete Callback when animation finishes */ public static void nudge( @NonNull View targetView, @NonNull Context context, @Nullable Runnable onComplete ) { // Calculate pixel intensity from DP float density = context.getResources().getDisplayMetrics().density; float shakePixels = SHAKE_INTENSITY_DP * density; // Create shake animators AnimatorSet shakeSet = createShakeAnimatorSet(targetView, shakePixels); // Add flash overlay effect View flashOverlay = createFlashOverlay(targetView, context); // Trigger haptic feedback triggerHapticFeedback(context); // Start animations if (flashOverlay != null) { animateFlash(flashOverlay); } shakeSet.start(); if (onComplete != null) { shakeSet.addListener(new android.animation.AnimatorListenerAdapter() { @Override public void onAnimationEnd(android.animation.Animator animation) { onComplete.run(); // Clean up flash overlay if (flashOverlay != null && flashOverlay.getParent() != null) { ((android.view.ViewGroup) flashOverlay.getParent()).removeView(flashOverlay); } } }); } } /** * Creates the compound shake animation set. * * The animation combines: * - Horizontal translation (primary shake) * - Vertical translation (secondary shake, slightly offset) * - Rotation (subtle tilt for organic feel) * - Scale pulse (impact emphasis) */ private static AnimatorSet createShakeAnimatorSet(View target, float intensity) { // Horizontal shake - primary motion ObjectAnimator shakeX = ObjectAnimator.ofFloat( target, View.TRANSLATION_X, 0f, intensity, -intensity * 0.9f, intensity * 0.8f, -intensity * 0.6f, intensity * 0.4f, -intensity * 0.2f, 0f ); shakeX.setDuration(NUDGE_DURATION_MS); shakeX.setInterpolator(new DecelerateInterpolator(1.5f)); // Vertical shake - secondary motion (offset timing for chaos) ObjectAnimator shakeY = ObjectAnimator.ofFloat( target, View.TRANSLATION_Y, 0f, -intensity * 0.5f, intensity * 0.4f, -intensity * 0.3f, intensity * 0.2f, -intensity * 0.1f, 0f ); shakeY.setDuration(NUDGE_DURATION_MS); shakeY.setStartDelay(30); // Slight offset shakeY.setInterpolator(new DecelerateInterpolator(1.2f)); // Rotation wobble - adds organic feel ObjectAnimator rotate = ObjectAnimator.ofFloat( target, View.ROTATION, 0f, 2f, -1.8f, 1.5f, -1f, 0.5f, 0f ); rotate.setDuration(NUDGE_DURATION_MS); rotate.setInterpolator(new DecelerateInterpolator(1.8f)); // Scale pulse - impact emphasis ObjectAnimator scaleX = ObjectAnimator.ofFloat( target, View.SCALE_X, 1f, 1.02f, 0.99f, 1.01f, 1f ); scaleX.setDuration(NUDGE_DURATION_MS / 2); ObjectAnimator scaleY = ObjectAnimator.ofFloat( target, View.SCALE_Y, 1f, 1.02f, 0.99f, 1.01f, 1f ); scaleY.setDuration(NUDGE_DURATION_MS / 2); // Combine all animations AnimatorSet animatorSet = new AnimatorSet(); animatorSet.playTogether(shakeX, shakeY, rotate, scaleX, scaleY); return animatorSet; } /** * Creates a flash overlay view for the attention effect. * Returns null if the target isn't in a suitable parent. */ @Nullable private static View createFlashOverlay(View target, Context context) { if (!(target.getParent() instanceof android.view.ViewGroup)) { return null; } android.view.ViewGroup parent = (android.view.ViewGroup) target.getParent(); View flashView = new View(context); flashView.setLayoutParams(new android.view.ViewGroup.LayoutParams( android.view.ViewGroup.LayoutParams.MATCH_PARENT, android.view.ViewGroup.LayoutParams.MATCH_PARENT )); flashView.setBackgroundColor(0xFFFFFF00); // Yellow flash (MSN style) flashView.setAlpha(0f); flashView.setElevation(1000f); // Above everything parent.addView(flashView); return flashView; } /** * Animates the flash overlay with quick fade in/out pulses. */ private static void animateFlash(View flashView) { ObjectAnimator flashAnim = ObjectAnimator.ofFloat( flashView, View.ALPHA, 0f, 0.3f, 0f, 0.2f, 0f, 0.1f, 0f ); flashAnim.setDuration(NUDGE_DURATION_MS); flashAnim.start(); } /** * Triggers haptic feedback synchronized with the shake. */ private static void triggerHapticFeedback(Context context) { Vibrator vibrator; if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.S) { VibratorManager vibratorManager = (VibratorManager) context.getSystemService(Context.VIBRATOR_MANAGER_SERVICE); if (vibratorManager != null) { vibrator = vibratorManager.getDefaultVibrator(); } else { return; } } else { vibrator = (Vibrator) context.getSystemService(Context.VIBRATOR_SERVICE); } if (vibrator == null || !vibrator.hasVibrator()) { return; } try { if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) { // Use waveform for complex pattern VibrationEffect effect = VibrationEffect.createWaveform( VIBRATION_PATTERN, -1 // Don't repeat ); vibrator.vibrate(effect); } else { // Legacy vibration vibrator.vibrate(VIBRATION_PATTERN, -1); } } catch (SecurityException e) { // Vibration permission not granted - fail silently android.util.Log.w("NudgeAnimator", "Vibration permission denied"); } } /** * Alternative: Simpler shake using CycleInterpolator. * Use this for lower-end devices or when battery is a concern. */ public static void nudgeSimple(@NonNull View targetView, @NonNull Context context) { float density = context.getResources().getDisplayMetrics().density; float shakePixels = SHAKE_INTENSITY_DP * density * 0.7f; ObjectAnimator animator = ObjectAnimator.ofFloat( targetView, View.TRANSLATION_X, 0f, shakePixels ); animator.setDuration(NUDGE_DURATION_MS); animator.setInterpolator(new CycleInterpolator(SHAKE_CYCLES)); animator.start(); triggerHapticFeedback(context); } /** * Chain multiple nudges with delay (for persistent attention). */ public static void nudgeRepeat( @NonNull View targetView, @NonNull Context context, int repeatCount, long delayBetweenMs ) { nudgeRepeatInternal(targetView, context, repeatCount, delayBetweenMs, 0); } private static void nudgeRepeatInternal( View targetView, Context context, int remaining, long delay, int current ) { if (remaining <= 0) return; nudge(targetView, context, () -> { targetView.postDelayed(() -> { nudgeRepeatInternal(targetView, context, remaining - 1, delay, current + 1); }, delay); }); } } Integration Example (ChatFragment.java) javaDownloadCopy codepackage com.aeromesher.ui.chat; import android.os.Bundle; import android.view.LayoutInflater; import android.view.View; import android.view.ViewGroup; import androidx.annotation.NonNull; import androidx.annotation.Nullable; import androidx.fragment.app.Fragment; import androidx.lifecycle.ViewModelProvider; import com.aeromesher.R; import com.aeromesher.databinding.FragmentChatBinding; import com.aeromesher.ui.animations.NudgeAnimator; import com.aeromesher.ui.components.GlassCardView; public class ChatFragment extends Fragment { private FragmentChatBinding binding; private ChatViewModel viewModel; @Nullable @Override public View onCreateView( @NonNull LayoutInflater inflater, @Nullable ViewGroup container, @Nullable Bundle savedInstanceState ) { binding = FragmentChatBinding.inflate(inflater, container, false); return binding.getRoot(); } @Override public void onViewCreated(@NonNull View view, @Nullable Bundle savedInstanceState) { super.onViewCreated(view, savedInstanceState); viewModel = new ViewModelProvider(this).get(ChatViewModel.class); // Observe incoming nudges from Python backend viewModel.getNudgeEvents().observe(getViewLifecycleOwner(), nudgeEvent -> { if (nudgeEvent != null) { // Trigger the Aero-style nudge animation NudgeAnimator.nudge( binding.chatContainer, // GlassCardView requireContext() ); // Show nudge notification toast showNudgeToast(nudgeEvent.getSenderName()); } }); // Nudge button click (send nudge to peer) binding.nudgeButton.setOnClickListener(v -> { // Animate our own view as feedback NudgeAnimator.nudgeSimple(binding.chatContainer, requireContext()); // Send nudge via Python backend viewModel.sendNudge(); }); } private void showNudgeToast(String senderName) { // Custom Aero-styled toast implementation // ... } @Override public void onDestroyView() { super.onDestroyView(); binding = null; } } Battery Efficiency Strategy The Python service implements intelligent power management through several mechanisms: Idle Detection monitors _last_activity timestamp to transition between active and idle states. After 2 minutes of no user interaction, the service enters idle mode where the keepalive interval extends from 30 seconds to 5 minutes. This reduces UDP traffic by approximately 90% during idle periods. Throttled Background Operations schedule routing table refreshes at 15-minute intervals rather than continuous maintenance. The DHT only queries bootstrap nodes during initial connection; subsequent peer discovery happens opportunistically when messages are sent. RenderScript Optimization in the GlassCardView throttles blur updates to 15fps maximum and downscales the blur buffer to 1/4 resolution. When the app backgrounds, setBlurEnabled(false) should be called to eliminate all blur processing. Wake Lock Strategy (not shown but recommended): Use PowerManager.PARTIAL_WAKE_LOCK only during active message transmission, releasing immediately upon ACK receipt. The UDP socket's 1-second timeout ensures the receive thread doesn't block indefinitely.