#!/usr/bin/env python3 """Receipt Manager backend — proxies receipt data and photos to Nextcloud via WebDAV.""" import base64 import getpass import hashlib import io import json import re import secrets import sys from http.server import HTTPServer, BaseHTTPRequestHandler from http.cookies import SimpleCookie from urllib.parse import quote as urlquote import openpyxl from openpyxl.styles import Font, PatternFill, Alignment, Border, Side import requests # --- Nextcloud configuration (filled at startup) --- NC_BASE = "https://nextcloud.sdanywhere.com" NC_USERNAME = "kamaji" NC_PASSWORD = "DvB0U2Uj3tOJaD" NC_DAV_ROOT = "" # set after login, e.g. /remote.php/dav/files//business-manager/Expenses/Receipts/ NC_AUTH = () RECEIPTS_FILE = "receipts.json" AUTH_FILE = "auth.json" PHOTOS_DIR = "photos" # Server-side session store: {token: True} SESSIONS = {} ANTHROPIC_API_KEY = "" def hash_password(password): """Hash a password with SHA-256.""" return hashlib.sha256(password.encode()).hexdigest() def load_auth(): """Read auth.json from Nextcloud. If missing, create default credentials.""" r = nc_get(AUTH_FILE) if r.status_code == 404: default = {"username": "admin", "password_hash": hash_password("admin")} save_auth(default) return default r.raise_for_status() return r.json() def save_auth(auth_data): """Write auth.json to Nextcloud.""" data = json.dumps(auth_data, indent=2).encode() r = nc_put(AUTH_FILE, data, "application/json") r.raise_for_status() def nc_url(path=""): """Build full Nextcloud WebDAV URL for a sub-path under the receipts folder.""" return NC_DAV_ROOT + path def nc_get(path=""): """GET a resource from Nextcloud. Returns requests.Response.""" return requests.get(nc_url(path), auth=NC_AUTH, timeout=30) def nc_put(path, data, content_type="application/octet-stream"): """PUT (upload/overwrite) a resource to Nextcloud.""" return requests.put(nc_url(path), data=data, auth=NC_AUTH, headers={"Content-Type": content_type}, timeout=60) def nc_delete(path): """DELETE a resource from Nextcloud.""" return requests.delete(nc_url(path), auth=NC_AUTH, timeout=30) def nc_mkcol(path): """Create a collection (folder) on Nextcloud. Ignores 405 (already exists).""" r = requests.request("MKCOL", nc_url(path), auth=NC_AUTH, timeout=15) if r.status_code not in (201, 405): r.raise_for_status() def nc_propfind(url): """PROPFIND on an absolute URL. Returns the response.""" return requests.request("PROPFIND", url, auth=NC_AUTH, headers={"Depth": "0"}, timeout=15) # --- Helpers for receipts.json ------------------------------------------------ def load_receipts(): """Read receipts.json from Nextcloud. Returns a list.""" r = nc_get(RECEIPTS_FILE) if r.status_code == 404: return [] r.raise_for_status() return r.json() def save_receipts(receipts): """Write receipts.json to Nextcloud.""" data = json.dumps(receipts, indent=2).encode() r = nc_put(RECEIPTS_FILE, data, "application/json") r.raise_for_status() def build_excel(receipts): """Build an .xlsx file from the receipts list. Returns bytes.""" wb = openpyxl.Workbook() ws = wb.active ws.title = "Receipts" # Styles header_font = Font(bold=True, color="FFFFFF", size=11) header_fill = PatternFill("solid", fgColor="1A1A2E") header_align = Alignment(horizontal="center") thin_border = Border( bottom=Side(style="thin", color="DDDDDD"), ) money_fmt = '#,##0.00' # Headers headers = ["Date", "Amount ($)", "Category", "Note"] col_widths = [14, 14, 14, 40] for col_idx, (label, width) in enumerate(zip(headers, col_widths), 1): cell = ws.cell(row=1, column=col_idx, value=label) cell.font = header_font cell.fill = header_fill cell.alignment = header_align ws.column_dimensions[cell.column_letter].width = width # Data rows for row_idx, r in enumerate(receipts, 2): date_str = r.get("date", "")[:10] # YYYY-MM-DD ws.cell(row=row_idx, column=1, value=date_str).border = thin_border amt_cell = ws.cell(row=row_idx, column=2, value=r.get("amount", 0)) amt_cell.number_format = money_fmt amt_cell.border = thin_border ws.cell(row=row_idx, column=3, value=r.get("category", "").capitalize()).border = thin_border ws.cell(row=row_idx, column=4, value=r.get("note", "")).border = thin_border # Total row if receipts: total_row = len(receipts) + 2 total_label = ws.cell(row=total_row, column=1, value="TOTAL") total_label.font = Font(bold=True) total_cell = ws.cell( row=total_row, column=2, value=sum(r.get("amount", 0) for r in receipts)) total_cell.font = Font(bold=True) total_cell.number_format = money_fmt buf = io.BytesIO() wb.save(buf) return buf.getvalue() # --- Receipt extraction via Claude Haiku --------------------------------------- def extract_receipt_info(jpeg_bytes): """Use Claude Haiku vision to extract total amount and date from a receipt image.""" try: img_b64 = base64.standard_b64encode(jpeg_bytes).decode("ascii") resp = requests.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "Content-Type": "application/json", }, json={ "model": "claude-3-haiku-20240307", "max_tokens": 200, "messages": [{ "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": img_b64, }, }, { "type": "text", "text": 'Extract the total amount and transaction date from this receipt. Reply with JSON only: {"amount": number_or_null, "date": "YYYY-MM-DD"_or_null}', }, ], }], }, timeout=30, ) resp.raise_for_status() data = resp.json() text = data["content"][0]["text"].strip() # Extract JSON from response (may be wrapped in markdown code block) m = re.search(r"\{.*\}", text, re.DOTALL) if m: parsed = json.loads(m.group()) return { "amount": parsed.get("amount"), "date": parsed.get("date"), } except Exception as e: print(f"[extract_receipt_info] Error: {e}") return {"amount": None, "date": None} # --- HTTP handler ------------------------------------------------------------- class ReuseTCPServer(HTTPServer): allow_reuse_address = True class Handler(BaseHTTPRequestHandler): def _send_json(self, obj, status=200, extra_headers=None): body = json.dumps(obj).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) if extra_headers: for k, v in extra_headers.items(): self.send_header(k, v) self.end_headers() self.wfile.write(body) def _get_session_token(self): """Extract session token from cookies.""" cookie_header = self.headers.get("Cookie", "") cookie = SimpleCookie() cookie.load(cookie_header) if "session" in cookie: return cookie["session"].value return None def _check_session(self): """Return True if the request has a valid session. Otherwise send 401.""" token = self._get_session_token() if token and token in SESSIONS: return True self._send_json({"error": "Unauthorized"}, 401) return False def _read_body(self): length = int(self.headers.get("Content-Length", 0)) return self.rfile.read(length) def _send_error(self, status, message): self._send_json({"error": message}, status) # --- routing helpers --- def _match(self, method, pattern): """Check method and return regex match against self.path, or None.""" if self.command != method: return None return re.fullmatch(pattern, self.path) # --- GET ----------------------------------------------------------------- def do_GET(self): # Serve receipts.html if self.path == "/" or self.path == "/receipts.html": try: with open("receipts.html", "rb") as f: body = f.read() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except FileNotFoundError: self._send_error(404, "receipts.html not found") return # GET /api/receipts if self.path == "/api/receipts": if not self._check_session(): return try: receipts = load_receipts() self._send_json(receipts) except Exception as e: self._send_error(502, str(e)) return # GET /api/export — export receipts as Excel if self.path == "/api/export": if not self._check_session(): return try: receipts = load_receipts() body = build_excel(receipts) self.send_response(200) self.send_header("Content-Type", "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") self.send_header("Content-Disposition", 'attachment; filename="receipts.xlsx"') self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except Exception as e: self._send_error(500, str(e)) return # GET /api/photos/ m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return photo_id = m.group(1) try: r = nc_get(f"{PHOTOS_DIR}/{photo_id}.jpg") if r.status_code == 404: self._send_error(404, "Photo not found") return r.raise_for_status() self.send_response(200) self.send_header("Content-Type", r.headers.get("Content-Type", "image/jpeg")) self.send_header("Content-Length", str(len(r.content))) self.send_header("Cache-Control", "max-age=86400") self.end_headers() self.wfile.write(r.content) except Exception as e: self._send_error(502, str(e)) return self._send_error(404, "Not found") # --- POST ---------------------------------------------------------------- def do_POST(self): # POST /api/login if self.path == "/api/login": try: data = json.loads(self._read_body()) auth = load_auth() if (data.get("username") == auth["username"] and hash_password(data.get("password", "")) == auth["password_hash"]): token = secrets.token_hex(32) SESSIONS[token] = True self._send_json({"ok": True}, 200, extra_headers={ "Set-Cookie": f"session={token}; Path=/; HttpOnly; SameSite=Strict" }) else: self._send_json({"error": "Invalid credentials"}, 401) except Exception as e: self._send_error(500, str(e)) return # POST /api/logout if self.path == "/api/logout": token = self._get_session_token() if token: SESSIONS.pop(token, None) self._send_json({"ok": True}, 200, extra_headers={ "Set-Cookie": "session=; Path=/; HttpOnly; SameSite=Strict; Max-Age=0" }) return # POST /api/change-password if self.path == "/api/change-password": if not self._check_session(): return try: data = json.loads(self._read_body()) auth = load_auth() if hash_password(data.get("current", "")) != auth["password_hash"]: self._send_json({"error": "Current password is incorrect"}, 403) return new_pw = data.get("new", "") if len(new_pw) < 1: self._send_json({"error": "New password cannot be empty"}, 400) return auth["password_hash"] = hash_password(new_pw) save_auth(auth) self._send_json({"ok": True}) except Exception as e: self._send_error(500, str(e)) return # POST /api/receipts — upsert a receipt if self.path == "/api/receipts": if not self._check_session(): return try: data = json.loads(self._read_body()) receipts = load_receipts() idx = next((i for i, r in enumerate(receipts) if r["id"] == data["id"]), None) if idx is not None: receipts[idx] = data else: receipts.append(data) save_receipts(receipts) self._send_json(data, 200) except Exception as e: self._send_error(500, str(e)) return # POST /api/extract-receipt — extract total + date from receipt image if self.path == "/api/extract-receipt": if not self._check_session(): return try: body = self._read_body() result = extract_receipt_info(body) self._send_json(result) except Exception as e: self._send_json({"amount": None, "date": None}) return # POST /api/photos/ — upload photo m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return photo_id = m.group(1) try: body = self._read_body() r = nc_put(f"{PHOTOS_DIR}/{photo_id}.jpg", body, "image/jpeg") r.raise_for_status() self._send_json({"url": f"/api/photos/{photo_id}"}) except Exception as e: self._send_error(502, str(e)) return self._send_error(404, "Not found") # --- DELETE -------------------------------------------------------------- def do_DELETE(self): m = re.fullmatch(r"/api/receipts/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return receipt_id = m.group(1) try: receipts = load_receipts() receipts = [r for r in receipts if r["id"] != receipt_id] save_receipts(receipts) # Also delete photo (ignore 404) try: nc_delete(f"{PHOTOS_DIR}/{receipt_id}.jpg") except Exception: pass self._send_json({"ok": True}) except Exception as e: self._send_error(500, str(e)) return self._send_error(404, "Not found") # Suppress default logging noise def log_message(self, fmt, *args): print(f"[{self.command}] {self.path} — {args[1] if len(args) > 1 else args[0]}") # --- Startup ------------------------------------------------------------------ def ensure_folder_path(): """Create the full folder hierarchy on Nextcloud if it doesn't exist.""" parts = ["business-manager", "Expenses", "Receipts"] base = f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/" current = base for part in parts: current += urlquote(part) + "/" r = nc_propfind(current) if r.status_code == 404: print(f" Creating folder: {part}/") mr = requests.request("MKCOL", current, auth=NC_AUTH, timeout=15) if mr.status_code not in (201, 405): print(f" ERROR creating {current}: {mr.status_code} {mr.text}") sys.exit(1) # Also ensure photos/ sub-folder photos_url = current + urlquote(PHOTOS_DIR) + "/" r = nc_propfind(photos_url) if r.status_code == 404: print(f" Creating folder: {PHOTOS_DIR}/") mr = requests.request("MKCOL", photos_url, auth=NC_AUTH, timeout=15) if mr.status_code not in (201, 405): print(f" ERROR creating photos folder: {mr.status_code}") sys.exit(1) def main(): global NC_USERNAME, NC_PASSWORD, NC_DAV_ROOT, NC_AUTH, ANTHROPIC_API_KEY print("=== Receipt Manager — Nextcloud Backend ===\n") # Load Anthropic API key try: with open("api-key", "r") as f: ANTHROPIC_API_KEY = f.read().strip() print("Anthropic API key loaded.") except FileNotFoundError: print("WARNING: api-key file not found — receipt extraction disabled.") if not NC_USERNAME: NC_USERNAME = input("Nextcloud username: ").strip() if not NC_PASSWORD: NC_PASSWORD = getpass.getpass("App password: ").strip() NC_AUTH = (NC_USERNAME, NC_PASSWORD) NC_DAV_ROOT = ( f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}" f"/business-manager/Expenses/Receipts/" ) print("\nVerifying Nextcloud connectivity...") try: r = nc_propfind( f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/" ) if r.status_code == 401: print("ERROR: Authentication failed. Check username/app-password.") sys.exit(1) r.raise_for_status() print(" Connected OK.") except requests.ConnectionError: print("ERROR: Cannot reach Nextcloud server.") sys.exit(1) print("Ensuring folder structure...") ensure_folder_path() print(" Folders OK.\n") port = 8080 server = ReuseTCPServer(("", port), Handler) print(f"Serving on http://localhost:{port}") print("Press Ctrl+C to stop.\n") try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down.") server.server_close() if __name__ == "__main__": main()