Energyclient -
import requests import sqlite3 import time from dataclasses import dataclass from datetime import datetime, timezone @dataclass class EnergyClient: api_root: str meter_id: str token: str
def fetch_latest(self): resp = self.session.get(f"self.api_root/meters/self.meter_id/reading") resp.raise_for_status() data = resp.json() # Cache it self.conn.execute( "INSERT OR REPLACE INTO readings (ts, power_w, cumulative_wh) VALUES (?, ?, ?)", (datetime.now(timezone.utc).isoformat(), data["power_w"], data["cumulative_wh"]) ) self.conn.commit() return data energyclient
def close(self): self.conn.close() | Problem | Strategy | |---------|----------| | Network failure | Retry with backoff (3–5 attempts) | | Rate limiting | Parse Retry-After header, queue commands | | Invalid token | Re‑authenticate automatically once | | Data gaps | Interpolate or flag missing samples | | Meter offline | Cache commands; apply when reconnected | import requests import sqlite3 import time from dataclasses