#!/usr/bin/env python3 """Receipt Manager backend — proxies receipt data and photos to Nextcloud via WebDAV.""" import base64 import getpass import hashlib import io import json import re import secrets import zipfile import sys from http.server import HTTPServer, BaseHTTPRequestHandler from http.cookies import SimpleCookie from urllib.parse import quote as urlquote, urlparse, parse_qs import openpyxl from openpyxl.styles import Font, PatternFill, Alignment, Border, Side import requests from PIL import Image import fitz # PyMuPDF import pillow_heif pillow_heif.register_heif_opener() # --- Nextcloud configuration (filled at startup) --- NC_BASE = "https://nextcloud.sdanywhere.com" NC_USERNAME = "kamaji" NC_PASSWORD = "DvB0U2Uj3tOJaD" NC_DAV_ROOT = "" # set after login, e.g. /remote.php/dav/files//business-manager/Expenses/Receipts/ NC_AUTH = () RECEIPTS_FILE = "receipts.json" SETTINGS_FILE = "settings.json" AUTH_FILE = "auth.json" PHOTOS_DIR = "photos" # Server-side session store: {token: True} SESSIONS = {} ANTHROPIC_API_KEY = "" def hash_password(password): """Hash a password with SHA-256.""" return hashlib.sha256(password.encode()).hexdigest() def load_auth(): """Read auth.json from Nextcloud. If missing, create default credentials.""" r = nc_get(AUTH_FILE) if r.status_code == 404: default = {"username": "admin", "password_hash": hash_password("admin")} save_auth(default) return default r.raise_for_status() return r.json() def save_auth(auth_data): """Write auth.json to Nextcloud.""" data = json.dumps(auth_data, indent=2).encode() r = nc_put(AUTH_FILE, data, "application/json") r.raise_for_status() def nc_url(path=""): """Build full Nextcloud WebDAV URL for a sub-path under the receipts folder.""" return NC_DAV_ROOT + path def nc_get(path=""): """GET a resource from Nextcloud. Returns requests.Response.""" return requests.get(nc_url(path), auth=NC_AUTH, timeout=30) def nc_put(path, data, content_type="application/octet-stream"): """PUT (upload/overwrite) a resource to Nextcloud.""" return requests.put(nc_url(path), data=data, auth=NC_AUTH, headers={"Content-Type": content_type}, timeout=60) def nc_delete(path): """DELETE a resource from Nextcloud.""" return requests.delete(nc_url(path), auth=NC_AUTH, timeout=30) def nc_mkcol(path): """Create a collection (folder) on Nextcloud. Ignores 405 (already exists).""" r = requests.request("MKCOL", nc_url(path), auth=NC_AUTH, timeout=15) if r.status_code not in (201, 405): r.raise_for_status() def nc_propfind(url): """PROPFIND on an absolute URL. Returns the response.""" return requests.request("PROPFIND", url, auth=NC_AUTH, headers={"Depth": "0"}, timeout=15) # --- Helpers for receipts.json ------------------------------------------------ def load_receipts(): """Read receipts.json from Nextcloud. Returns a list.""" r = nc_get(RECEIPTS_FILE) if r.status_code == 404: return [] r.raise_for_status() return r.json() def save_receipts(receipts): """Write receipts.json to Nextcloud.""" data = json.dumps(receipts, indent=2).encode() r = nc_put(RECEIPTS_FILE, data, "application/json") r.raise_for_status() def load_settings(): """Read settings.json from Nextcloud. Returns dict with customers and projects.""" r = nc_get(SETTINGS_FILE) if r.status_code == 404: return {"customers": [], "projects": []} r.raise_for_status() return r.json() def save_settings(data): """Write settings.json to Nextcloud.""" payload = json.dumps(data, indent=2).encode() r = nc_put(SETTINGS_FILE, payload, "application/json") r.raise_for_status() def build_excel(receipts): """Build an .xlsx file from the receipts list. Returns bytes.""" wb = openpyxl.Workbook() ws = wb.active ws.title = "Receipts" # Load settings for project/customer name lookups settings = load_settings() project_map = {p["id"]: p for p in settings.get("projects", [])} customer_map = {c["id"]: c for c in settings.get("customers", [])} # Styles header_font = Font(bold=True, color="FFFFFF", size=11) header_fill = PatternFill("solid", fgColor="1A1A2E") header_align = Alignment(horizontal="center") thin_border = Border( bottom=Side(style="thin", color="DDDDDD"), ) money_fmt = '#,##0.00' # Headers headers = ["Date", "Amount ($)", "Category", "Customer", "Project", "Note"] col_widths = [14, 14, 14, 20, 20, 40] for col_idx, (label, width) in enumerate(zip(headers, col_widths), 1): cell = ws.cell(row=1, column=col_idx, value=label) cell.font = header_font cell.fill = header_fill cell.alignment = header_align ws.column_dimensions[cell.column_letter].width = width # Data rows for row_idx, r in enumerate(receipts, 2): date_str = r.get("date", "")[:10] # YYYY-MM-DD ws.cell(row=row_idx, column=1, value=date_str).border = thin_border amt_cell = ws.cell(row=row_idx, column=2, value=r.get("amount", 0)) amt_cell.number_format = money_fmt amt_cell.border = thin_border ws.cell(row=row_idx, column=3, value=r.get("category", "").capitalize()).border = thin_border # Customer and Project columns project_name = "" customer_name = "" pid = r.get("projectId", "") if pid and pid in project_map: proj = project_map[pid] project_name = proj.get("name", "") cid = proj.get("customerId", "") if cid and cid in customer_map: customer_name = customer_map[cid].get("name", "") ws.cell(row=row_idx, column=4, value=customer_name).border = thin_border ws.cell(row=row_idx, column=5, value=project_name).border = thin_border ws.cell(row=row_idx, column=6, value=r.get("note", "")).border = thin_border # Total row if receipts: total_row = len(receipts) + 2 total_label = ws.cell(row=total_row, column=1, value="TOTAL") total_label.font = Font(bold=True) total_cell = ws.cell( row=total_row, column=2, value=sum(r.get("amount", 0) for r in receipts)) total_cell.font = Font(bold=True) total_cell.number_format = money_fmt buf = io.BytesIO() wb.save(buf) return buf.getvalue() # --- Receipt extraction via Claude Haiku --------------------------------------- def extract_receipt_info(jpeg_bytes): """Use Claude Haiku vision to extract total amount and date from a receipt image.""" try: img_b64 = base64.standard_b64encode(jpeg_bytes).decode("ascii") resp = requests.post( "https://api.anthropic.com/v1/messages", headers={ "x-api-key": ANTHROPIC_API_KEY, "anthropic-version": "2023-06-01", "Content-Type": "application/json", }, json={ "model": "claude-3-haiku-20240307", "max_tokens": 200, "messages": [{ "role": "user", "content": [ { "type": "image", "source": { "type": "base64", "media_type": "image/jpeg", "data": img_b64, }, }, { "type": "text", "text": 'Extract the total amount charged and the transaction date from this receipt. For hotel receipts, the total is the amount in parentheses (the charge to the guest), NOT the balance due (which is typically 0 meaning it has been paid). Reply with JSON only: {"amount": number_or_null, "date": "YYYY-MM-DD"_or_null}', }, ], }], }, timeout=30, ) resp.raise_for_status() data = resp.json() text = data["content"][0]["text"].strip() # Extract JSON from response (may be wrapped in markdown code block) m = re.search(r"\{.*\}", text, re.DOTALL) if m: parsed = json.loads(m.group()) return { "amount": parsed.get("amount"), "date": parsed.get("date"), } except Exception as e: print(f"[extract_receipt_info] Error: {e}") return {"amount": None, "date": None} MAX_UPLOAD_SIZE = 20 * 1024 * 1024 # 20 MB def convert_to_jpeg(raw_bytes, content_type=""): """Convert PDF, HEIC, PNG, WebP, etc. to JPEG bytes (max 1024px, quality 80).""" ct = content_type.lower() if ct == "application/pdf" or raw_bytes[:5] == b"%PDF-": # Render first page at 2x DPI doc = fitz.open(stream=raw_bytes, filetype="pdf") page = doc[0] mat = fitz.Matrix(2.0, 2.0) pix = page.get_pixmap(matrix=mat) img = Image.frombytes("RGB", (pix.width, pix.height), pix.samples) doc.close() else: img = Image.open(io.BytesIO(raw_bytes)) if img.mode in ("RGBA", "P", "LA"): img = img.convert("RGB") elif img.mode != "RGB": img = img.convert("RGB") # Resize to max 2048px on longest side max_dim = 2048 w, h = img.size if max(w, h) > max_dim: scale = max_dim / max(w, h) img = img.resize((int(w * scale), int(h * scale)), Image.LANCZOS) buf = io.BytesIO() img.save(buf, format="JPEG", quality=80) return buf.getvalue() # --- HTTP handler ------------------------------------------------------------- class ReuseTCPServer(HTTPServer): allow_reuse_address = True class Handler(BaseHTTPRequestHandler): def _send_json(self, obj, status=200, extra_headers=None): body = json.dumps(obj).encode() self.send_response(status) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(body))) if extra_headers: for k, v in extra_headers.items(): self.send_header(k, v) self.end_headers() self.wfile.write(body) def _get_session_token(self): """Extract session token from cookies.""" cookie_header = self.headers.get("Cookie", "") cookie = SimpleCookie() cookie.load(cookie_header) if "session" in cookie: return cookie["session"].value return None def _check_session(self): """Return True if the request has a valid session. Otherwise send 401.""" token = self._get_session_token() if token and token in SESSIONS: return True self._send_json({"error": "Unauthorized"}, 401) return False def _read_body(self): length = int(self.headers.get("Content-Length", 0)) return self.rfile.read(length) def _send_error(self, status, message): self._send_json({"error": message}, status) # --- routing helpers --- def _match(self, method, pattern): """Check method and return regex match against self.path, or None.""" if self.command != method: return None return re.fullmatch(pattern, self.path) # --- GET ----------------------------------------------------------------- def do_GET(self): # Serve receipts.html if self.path == "/" or self.path == "/receipts.html": try: with open("receipts.html", "rb") as f: body = f.read() self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except FileNotFoundError: self._send_error(404, "receipts.html not found") return # GET /api/receipts if self.path == "/api/receipts": if not self._check_session(): return try: receipts = load_receipts() self._send_json(receipts) except Exception as e: self._send_error(502, str(e)) return # GET /api/export — export receipts as Excel (optionally filtered by project) # ?project= → zip with Excel + photos for that project # ?project=none → zip with Excel + photos for receipts with no project # (no param) → zip with Excel + photos for all receipts if self.path.startswith("/api/export"): if not self._check_session(): return try: parsed = urlparse(self.path) qs = parse_qs(parsed.query) project_filter = qs.get("project", [None])[0] receipts = load_receipts() if project_filter == "none": receipts = [r for r in receipts if not r.get("projectId")] elif project_filter: receipts = [r for r in receipts if r.get("projectId") == project_filter] # Load settings for name lookups settings = load_settings() project_map = {p["id"]: p for p in settings.get("projects", [])} customer_map = {c["id"]: c for c in settings.get("customers", [])} def sanitize(s): return re.sub(r'[^\w\s-]', '', s).strip().replace(" ", "_") # Determine filename prefix filename = "receipts" if project_filter and project_filter != "none": proj = project_map.get(project_filter) if proj: cust = customer_map.get(proj.get("customerId", "")) parts = [] if cust: parts.append(sanitize(cust["name"])) parts.append(sanitize(proj["name"])) filename = "-".join(parts) if parts else "receipts" excel_bytes = build_excel(receipts) # Build zip with Excel + photos zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w", zipfile.ZIP_DEFLATED) as zf: zf.writestr(f"{filename}.xlsx", excel_bytes) for r in receipts: if r.get("photo"): try: photo_resp = nc_get(f"{PHOTOS_DIR}/{r['id']}.jpg") if photo_resp.status_code == 200: # Name: project_date_amount_category pid = r.get("projectId", "") proj = project_map.get(pid) proj_part = sanitize(proj["name"]) if proj else "no_project" date_part = r.get("date", "")[:10] amount_part = f"{r.get('amount', 0):.2f}" cat_part = r.get("category", "other") photo_name = f"{proj_part}_{date_part}_{amount_part}_{cat_part}.jpg" zf.writestr(f"photos/{photo_name}", photo_resp.content) except Exception: pass # skip photos that fail to download body = zip_buf.getvalue() self.send_response(200) self.send_header("Content-Type", "application/zip") self.send_header("Content-Disposition", f'attachment; filename="{filename}.zip"') self.send_header("Content-Length", str(len(body))) self.end_headers() self.wfile.write(body) except Exception as e: self._send_error(500, str(e)) return # GET /api/settings if self.path == "/api/settings": if not self._check_session(): return try: settings = load_settings() self._send_json(settings) except Exception as e: self._send_error(502, str(e)) return # GET /api/photos/ m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return photo_id = m.group(1) try: r = nc_get(f"{PHOTOS_DIR}/{photo_id}.jpg") if r.status_code == 404: self._send_error(404, "Photo not found") return r.raise_for_status() self.send_response(200) self.send_header("Content-Type", r.headers.get("Content-Type", "image/jpeg")) self.send_header("Content-Length", str(len(r.content))) self.send_header("Cache-Control", "max-age=86400") self.end_headers() self.wfile.write(r.content) except Exception as e: self._send_error(502, str(e)) return self._send_error(404, "Not found") # --- POST ---------------------------------------------------------------- def do_POST(self): # POST /api/login if self.path == "/api/login": try: data = json.loads(self._read_body()) auth = load_auth() if (data.get("username") == auth["username"] and hash_password(data.get("password", "")) == auth["password_hash"]): token = secrets.token_hex(32) SESSIONS[token] = True self._send_json({"ok": True}, 200, extra_headers={ "Set-Cookie": f"session={token}; Path=/; HttpOnly; SameSite=Strict" }) else: self._send_json({"error": "Invalid credentials"}, 401) except Exception as e: self._send_error(500, str(e)) return # POST /api/logout if self.path == "/api/logout": token = self._get_session_token() if token: SESSIONS.pop(token, None) self._send_json({"ok": True}, 200, extra_headers={ "Set-Cookie": "session=; Path=/; HttpOnly; SameSite=Strict; Max-Age=0" }) return # POST /api/change-password if self.path == "/api/change-password": if not self._check_session(): return try: data = json.loads(self._read_body()) auth = load_auth() if hash_password(data.get("current", "")) != auth["password_hash"]: self._send_json({"error": "Current password is incorrect"}, 403) return new_pw = data.get("new", "") if len(new_pw) < 1: self._send_json({"error": "New password cannot be empty"}, 400) return auth["password_hash"] = hash_password(new_pw) save_auth(auth) self._send_json({"ok": True}) except Exception as e: self._send_error(500, str(e)) return # POST /api/customers — upsert a customer if self.path == "/api/customers": if not self._check_session(): return try: data = json.loads(self._read_body()) settings = load_settings() customers = settings.get("customers", []) idx = next((i for i, c in enumerate(customers) if c["id"] == data["id"]), None) if idx is not None: customers[idx] = data else: customers.append(data) settings["customers"] = customers save_settings(settings) self._send_json(data, 200) except Exception as e: self._send_error(500, str(e)) return # POST /api/projects — upsert a project if self.path == "/api/projects": if not self._check_session(): return try: data = json.loads(self._read_body()) settings = load_settings() projects = settings.get("projects", []) idx = next((i for i, p in enumerate(projects) if p["id"] == data["id"]), None) if idx is not None: projects[idx] = data else: projects.append(data) settings["projects"] = projects save_settings(settings) self._send_json(data, 200) except Exception as e: self._send_error(500, str(e)) return # POST /api/receipts — upsert a receipt if self.path == "/api/receipts": if not self._check_session(): return try: data = json.loads(self._read_body()) receipts = load_receipts() idx = next((i for i, r in enumerate(receipts) if r["id"] == data["id"]), None) if idx is not None: receipts[idx] = data else: receipts.append(data) save_receipts(receipts) self._send_json(data, 200) except Exception as e: self._send_error(500, str(e)) return # POST /api/extract-receipt — extract total + date from receipt image if self.path == "/api/extract-receipt": if not self._check_session(): return try: body = self._read_body() result = extract_receipt_info(body) self._send_json(result) except Exception as e: self._send_json({"amount": None, "date": None}) return # POST /api/convert-image — convert PDF/HEIC/PNG/WebP to JPEG if self.path == "/api/convert-image": if not self._check_session(): return try: length = int(self.headers.get("Content-Length", 0)) if length > MAX_UPLOAD_SIZE: self._send_error(413, "File too large (max 20 MB)") return body = self._read_body() content_type = self.headers.get("Content-Type", "application/octet-stream") jpeg_bytes = convert_to_jpeg(body, content_type) self.send_response(200) self.send_header("Content-Type", "image/jpeg") self.send_header("Content-Length", str(len(jpeg_bytes))) self.end_headers() self.wfile.write(jpeg_bytes) except Exception as e: print(f"[convert-image] Error: {e}") self._send_error(500, f"Conversion failed: {e}") return # POST /api/photos/ — upload photo m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return photo_id = m.group(1) try: body = self._read_body() r = nc_put(f"{PHOTOS_DIR}/{photo_id}.jpg", body, "image/jpeg") r.raise_for_status() self._send_json({"url": f"/api/photos/{photo_id}"}) except Exception as e: self._send_error(502, str(e)) return self._send_error(404, "Not found") # --- DELETE -------------------------------------------------------------- def do_DELETE(self): # DELETE /api/customers/ m = re.fullmatch(r"/api/customers/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return cid = m.group(1) try: settings = load_settings() settings["customers"] = [c for c in settings.get("customers", []) if c["id"] != cid] settings["projects"] = [p for p in settings.get("projects", []) if p.get("customerId") != cid] save_settings(settings) self._send_json({"ok": True}) except Exception as e: self._send_error(500, str(e)) return # DELETE /api/projects/ m = re.fullmatch(r"/api/projects/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return pid = m.group(1) try: settings = load_settings() settings["projects"] = [p for p in settings.get("projects", []) if p["id"] != pid] save_settings(settings) self._send_json({"ok": True}) except Exception as e: self._send_error(500, str(e)) return m = re.fullmatch(r"/api/receipts/([A-Za-z0-9_-]+)", self.path) if m: if not self._check_session(): return receipt_id = m.group(1) try: receipts = load_receipts() receipts = [r for r in receipts if r["id"] != receipt_id] save_receipts(receipts) # Also delete photo (ignore 404) try: nc_delete(f"{PHOTOS_DIR}/{receipt_id}.jpg") except Exception: pass self._send_json({"ok": True}) except Exception as e: self._send_error(500, str(e)) return self._send_error(404, "Not found") # Suppress default logging noise def log_message(self, fmt, *args): print(f"[{self.command}] {self.path} — {args[1] if len(args) > 1 else args[0]}") # --- Startup ------------------------------------------------------------------ def ensure_folder_path(): """Create the full folder hierarchy on Nextcloud if it doesn't exist.""" parts = ["business-manager", "Expenses", "Receipts"] base = f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/" current = base for part in parts: current += urlquote(part) + "/" r = nc_propfind(current) if r.status_code == 404: print(f" Creating folder: {part}/") mr = requests.request("MKCOL", current, auth=NC_AUTH, timeout=15) if mr.status_code not in (201, 405): print(f" ERROR creating {current}: {mr.status_code} {mr.text}") sys.exit(1) # Also ensure photos/ sub-folder photos_url = current + urlquote(PHOTOS_DIR) + "/" r = nc_propfind(photos_url) if r.status_code == 404: print(f" Creating folder: {PHOTOS_DIR}/") mr = requests.request("MKCOL", photos_url, auth=NC_AUTH, timeout=15) if mr.status_code not in (201, 405): print(f" ERROR creating photos folder: {mr.status_code}") sys.exit(1) def main(): global NC_USERNAME, NC_PASSWORD, NC_DAV_ROOT, NC_AUTH, ANTHROPIC_API_KEY print("=== Receipt Manager — Nextcloud Backend ===\n") # Load Anthropic API key try: with open("api-key", "r") as f: ANTHROPIC_API_KEY = f.read().strip() print("Anthropic API key loaded.") except FileNotFoundError: print("WARNING: api-key file not found — receipt extraction disabled.") if not NC_USERNAME: NC_USERNAME = input("Nextcloud username: ").strip() if not NC_PASSWORD: NC_PASSWORD = getpass.getpass("App password: ").strip() NC_AUTH = (NC_USERNAME, NC_PASSWORD) NC_DAV_ROOT = ( f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}" f"/business-manager/Expenses/Receipts/" ) print("\nVerifying Nextcloud connectivity...") try: r = nc_propfind( f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/" ) if r.status_code == 401: print("ERROR: Authentication failed. Check username/app-password.") sys.exit(1) r.raise_for_status() print(" Connected OK.") except requests.ConnectionError: print("ERROR: Cannot reach Nextcloud server.") sys.exit(1) print("Ensuring folder structure...") ensure_folder_path() print(" Folders OK.\n") port = 8080 server = ReuseTCPServer(("", port), Handler) print(f"Serving on http://localhost:{port}") print("Press Ctrl+C to stop.\n") try: server.serve_forever() except KeyboardInterrupt: print("\nShutting down.") server.server_close() if __name__ == "__main__": main()