Files
receipt-manager/server.py
kamaji fde6fcb724 Add project filter and zip export with photos
- Filter dropdown to display receipts by project
- Total banner updates to reflect filtered view
- Export produces a zip with Excel + receipt photos
- Photo filenames use project_date_amount_category format

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 00:11:58 -06:00

717 lines
26 KiB
Python

#!/usr/bin/env python3
"""Receipt Manager backend — proxies receipt data and photos to Nextcloud via WebDAV."""
import base64
import getpass
import hashlib
import io
import json
import re
import secrets
import zipfile
import sys
from http.server import HTTPServer, BaseHTTPRequestHandler
from http.cookies import SimpleCookie
from urllib.parse import quote as urlquote, urlparse, parse_qs
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
import requests
# --- Nextcloud configuration (filled at startup) ---
NC_BASE = "https://nextcloud.sdanywhere.com"
NC_USERNAME = "kamaji"
NC_PASSWORD = "DvB0U2Uj3tOJaD"
NC_DAV_ROOT = "" # set after login, e.g. /remote.php/dav/files/<user>/business-manager/Expenses/Receipts/
NC_AUTH = ()
RECEIPTS_FILE = "receipts.json"
SETTINGS_FILE = "settings.json"
AUTH_FILE = "auth.json"
PHOTOS_DIR = "photos"
# Server-side session store: {token: True}
SESSIONS = {}
ANTHROPIC_API_KEY = ""
def hash_password(password):
"""Hash a password with SHA-256."""
return hashlib.sha256(password.encode()).hexdigest()
def load_auth():
"""Read auth.json from Nextcloud. If missing, create default credentials."""
r = nc_get(AUTH_FILE)
if r.status_code == 404:
default = {"username": "admin", "password_hash": hash_password("admin")}
save_auth(default)
return default
r.raise_for_status()
return r.json()
def save_auth(auth_data):
"""Write auth.json to Nextcloud."""
data = json.dumps(auth_data, indent=2).encode()
r = nc_put(AUTH_FILE, data, "application/json")
r.raise_for_status()
def nc_url(path=""):
"""Build full Nextcloud WebDAV URL for a sub-path under the receipts folder."""
return NC_DAV_ROOT + path
def nc_get(path=""):
"""GET a resource from Nextcloud. Returns requests.Response."""
return requests.get(nc_url(path), auth=NC_AUTH, timeout=30)
def nc_put(path, data, content_type="application/octet-stream"):
"""PUT (upload/overwrite) a resource to Nextcloud."""
return requests.put(nc_url(path), data=data, auth=NC_AUTH,
headers={"Content-Type": content_type}, timeout=60)
def nc_delete(path):
"""DELETE a resource from Nextcloud."""
return requests.delete(nc_url(path), auth=NC_AUTH, timeout=30)
def nc_mkcol(path):
"""Create a collection (folder) on Nextcloud. Ignores 405 (already exists)."""
r = requests.request("MKCOL", nc_url(path), auth=NC_AUTH, timeout=15)
if r.status_code not in (201, 405):
r.raise_for_status()
def nc_propfind(url):
"""PROPFIND on an absolute URL. Returns the response."""
return requests.request("PROPFIND", url, auth=NC_AUTH,
headers={"Depth": "0"}, timeout=15)
# --- Helpers for receipts.json ------------------------------------------------
def load_receipts():
"""Read receipts.json from Nextcloud. Returns a list."""
r = nc_get(RECEIPTS_FILE)
if r.status_code == 404:
return []
r.raise_for_status()
return r.json()
def save_receipts(receipts):
"""Write receipts.json to Nextcloud."""
data = json.dumps(receipts, indent=2).encode()
r = nc_put(RECEIPTS_FILE, data, "application/json")
r.raise_for_status()
def load_settings():
"""Read settings.json from Nextcloud. Returns dict with customers and projects."""
r = nc_get(SETTINGS_FILE)
if r.status_code == 404:
return {"customers": [], "projects": []}
r.raise_for_status()
return r.json()
def save_settings(data):
"""Write settings.json to Nextcloud."""
payload = json.dumps(data, indent=2).encode()
r = nc_put(SETTINGS_FILE, payload, "application/json")
r.raise_for_status()
def build_excel(receipts):
"""Build an .xlsx file from the receipts list. Returns bytes."""
wb = openpyxl.Workbook()
ws = wb.active
ws.title = "Receipts"
# Load settings for project/customer name lookups
settings = load_settings()
project_map = {p["id"]: p for p in settings.get("projects", [])}
customer_map = {c["id"]: c for c in settings.get("customers", [])}
# Styles
header_font = Font(bold=True, color="FFFFFF", size=11)
header_fill = PatternFill("solid", fgColor="1A1A2E")
header_align = Alignment(horizontal="center")
thin_border = Border(
bottom=Side(style="thin", color="DDDDDD"),
)
money_fmt = '#,##0.00'
# Headers
headers = ["Date", "Amount ($)", "Category", "Customer", "Project", "Note"]
col_widths = [14, 14, 14, 20, 20, 40]
for col_idx, (label, width) in enumerate(zip(headers, col_widths), 1):
cell = ws.cell(row=1, column=col_idx, value=label)
cell.font = header_font
cell.fill = header_fill
cell.alignment = header_align
ws.column_dimensions[cell.column_letter].width = width
# Data rows
for row_idx, r in enumerate(receipts, 2):
date_str = r.get("date", "")[:10] # YYYY-MM-DD
ws.cell(row=row_idx, column=1, value=date_str).border = thin_border
amt_cell = ws.cell(row=row_idx, column=2, value=r.get("amount", 0))
amt_cell.number_format = money_fmt
amt_cell.border = thin_border
ws.cell(row=row_idx, column=3,
value=r.get("category", "").capitalize()).border = thin_border
# Customer and Project columns
project_name = ""
customer_name = ""
pid = r.get("projectId", "")
if pid and pid in project_map:
proj = project_map[pid]
project_name = proj.get("name", "")
cid = proj.get("customerId", "")
if cid and cid in customer_map:
customer_name = customer_map[cid].get("name", "")
ws.cell(row=row_idx, column=4, value=customer_name).border = thin_border
ws.cell(row=row_idx, column=5, value=project_name).border = thin_border
ws.cell(row=row_idx, column=6,
value=r.get("note", "")).border = thin_border
# Total row
if receipts:
total_row = len(receipts) + 2
total_label = ws.cell(row=total_row, column=1, value="TOTAL")
total_label.font = Font(bold=True)
total_cell = ws.cell(
row=total_row, column=2,
value=sum(r.get("amount", 0) for r in receipts))
total_cell.font = Font(bold=True)
total_cell.number_format = money_fmt
buf = io.BytesIO()
wb.save(buf)
return buf.getvalue()
# --- Receipt extraction via Claude Haiku ---------------------------------------
def extract_receipt_info(jpeg_bytes):
"""Use Claude Haiku vision to extract total amount and date from a receipt image."""
try:
img_b64 = base64.standard_b64encode(jpeg_bytes).decode("ascii")
resp = requests.post(
"https://api.anthropic.com/v1/messages",
headers={
"x-api-key": ANTHROPIC_API_KEY,
"anthropic-version": "2023-06-01",
"Content-Type": "application/json",
},
json={
"model": "claude-3-haiku-20240307",
"max_tokens": 200,
"messages": [{
"role": "user",
"content": [
{
"type": "image",
"source": {
"type": "base64",
"media_type": "image/jpeg",
"data": img_b64,
},
},
{
"type": "text",
"text": 'Extract the total amount and transaction date from this receipt. Reply with JSON only: {"amount": number_or_null, "date": "YYYY-MM-DD"_or_null}',
},
],
}],
},
timeout=30,
)
resp.raise_for_status()
data = resp.json()
text = data["content"][0]["text"].strip()
# Extract JSON from response (may be wrapped in markdown code block)
m = re.search(r"\{.*\}", text, re.DOTALL)
if m:
parsed = json.loads(m.group())
return {
"amount": parsed.get("amount"),
"date": parsed.get("date"),
}
except Exception as e:
print(f"[extract_receipt_info] Error: {e}")
return {"amount": None, "date": None}
# --- HTTP handler -------------------------------------------------------------
class ReuseTCPServer(HTTPServer):
allow_reuse_address = True
class Handler(BaseHTTPRequestHandler):
def _send_json(self, obj, status=200, extra_headers=None):
body = json.dumps(obj).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
if extra_headers:
for k, v in extra_headers.items():
self.send_header(k, v)
self.end_headers()
self.wfile.write(body)
def _get_session_token(self):
"""Extract session token from cookies."""
cookie_header = self.headers.get("Cookie", "")
cookie = SimpleCookie()
cookie.load(cookie_header)
if "session" in cookie:
return cookie["session"].value
return None
def _check_session(self):
"""Return True if the request has a valid session. Otherwise send 401."""
token = self._get_session_token()
if token and token in SESSIONS:
return True
self._send_json({"error": "Unauthorized"}, 401)
return False
def _read_body(self):
length = int(self.headers.get("Content-Length", 0))
return self.rfile.read(length)
def _send_error(self, status, message):
self._send_json({"error": message}, status)
# --- routing helpers ---
def _match(self, method, pattern):
"""Check method and return regex match against self.path, or None."""
if self.command != method:
return None
return re.fullmatch(pattern, self.path)
# --- GET -----------------------------------------------------------------
def do_GET(self):
# Serve receipts.html
if self.path == "/" or self.path == "/receipts.html":
try:
with open("receipts.html", "rb") as f:
body = f.read()
self.send_response(200)
self.send_header("Content-Type", "text/html; charset=utf-8")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except FileNotFoundError:
self._send_error(404, "receipts.html not found")
return
# GET /api/receipts
if self.path == "/api/receipts":
if not self._check_session():
return
try:
receipts = load_receipts()
self._send_json(receipts)
except Exception as e:
self._send_error(502, str(e))
return
# GET /api/export — export receipts as Excel (optionally filtered by project)
# ?project=<id> → zip with Excel + photos for that project
# ?project=none → zip with Excel + photos for receipts with no project
# (no param) → zip with Excel + photos for all receipts
if self.path.startswith("/api/export"):
if not self._check_session():
return
try:
parsed = urlparse(self.path)
qs = parse_qs(parsed.query)
project_filter = qs.get("project", [None])[0]
receipts = load_receipts()
if project_filter == "none":
receipts = [r for r in receipts if not r.get("projectId")]
elif project_filter:
receipts = [r for r in receipts if r.get("projectId") == project_filter]
# Load settings for name lookups
settings = load_settings()
project_map = {p["id"]: p for p in settings.get("projects", [])}
customer_map = {c["id"]: c for c in settings.get("customers", [])}
def sanitize(s):
return re.sub(r'[^\w\s-]', '', s).strip().replace(" ", "_")
# Determine filename prefix
filename = "receipts"
if project_filter and project_filter != "none":
proj = project_map.get(project_filter)
if proj:
cust = customer_map.get(proj.get("customerId", ""))
parts = []
if cust:
parts.append(sanitize(cust["name"]))
parts.append(sanitize(proj["name"]))
filename = "-".join(parts) if parts else "receipts"
excel_bytes = build_excel(receipts)
# Build zip with Excel + photos
zip_buf = io.BytesIO()
with zipfile.ZipFile(zip_buf, "w", zipfile.ZIP_DEFLATED) as zf:
zf.writestr(f"{filename}.xlsx", excel_bytes)
for r in receipts:
if r.get("photo"):
try:
photo_resp = nc_get(f"{PHOTOS_DIR}/{r['id']}.jpg")
if photo_resp.status_code == 200:
# Name: project_date_amount_category
pid = r.get("projectId", "")
proj = project_map.get(pid)
proj_part = sanitize(proj["name"]) if proj else "no_project"
date_part = r.get("date", "")[:10]
amount_part = f"{r.get('amount', 0):.2f}"
cat_part = r.get("category", "other")
photo_name = f"{proj_part}_{date_part}_{amount_part}_{cat_part}.jpg"
zf.writestr(f"photos/{photo_name}", photo_resp.content)
except Exception:
pass # skip photos that fail to download
body = zip_buf.getvalue()
self.send_response(200)
self.send_header("Content-Type", "application/zip")
self.send_header("Content-Disposition",
f'attachment; filename="{filename}.zip"')
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
except Exception as e:
self._send_error(500, str(e))
return
# GET /api/settings
if self.path == "/api/settings":
if not self._check_session():
return
try:
settings = load_settings()
self._send_json(settings)
except Exception as e:
self._send_error(502, str(e))
return
# GET /api/photos/<id>
m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path)
if m:
if not self._check_session():
return
photo_id = m.group(1)
try:
r = nc_get(f"{PHOTOS_DIR}/{photo_id}.jpg")
if r.status_code == 404:
self._send_error(404, "Photo not found")
return
r.raise_for_status()
self.send_response(200)
self.send_header("Content-Type", r.headers.get("Content-Type", "image/jpeg"))
self.send_header("Content-Length", str(len(r.content)))
self.send_header("Cache-Control", "max-age=86400")
self.end_headers()
self.wfile.write(r.content)
except Exception as e:
self._send_error(502, str(e))
return
self._send_error(404, "Not found")
# --- POST ----------------------------------------------------------------
def do_POST(self):
# POST /api/login
if self.path == "/api/login":
try:
data = json.loads(self._read_body())
auth = load_auth()
if (data.get("username") == auth["username"]
and hash_password(data.get("password", "")) == auth["password_hash"]):
token = secrets.token_hex(32)
SESSIONS[token] = True
self._send_json({"ok": True}, 200, extra_headers={
"Set-Cookie": f"session={token}; Path=/; HttpOnly; SameSite=Strict"
})
else:
self._send_json({"error": "Invalid credentials"}, 401)
except Exception as e:
self._send_error(500, str(e))
return
# POST /api/logout
if self.path == "/api/logout":
token = self._get_session_token()
if token:
SESSIONS.pop(token, None)
self._send_json({"ok": True}, 200, extra_headers={
"Set-Cookie": "session=; Path=/; HttpOnly; SameSite=Strict; Max-Age=0"
})
return
# POST /api/change-password
if self.path == "/api/change-password":
if not self._check_session():
return
try:
data = json.loads(self._read_body())
auth = load_auth()
if hash_password(data.get("current", "")) != auth["password_hash"]:
self._send_json({"error": "Current password is incorrect"}, 403)
return
new_pw = data.get("new", "")
if len(new_pw) < 1:
self._send_json({"error": "New password cannot be empty"}, 400)
return
auth["password_hash"] = hash_password(new_pw)
save_auth(auth)
self._send_json({"ok": True})
except Exception as e:
self._send_error(500, str(e))
return
# POST /api/customers — upsert a customer
if self.path == "/api/customers":
if not self._check_session():
return
try:
data = json.loads(self._read_body())
settings = load_settings()
customers = settings.get("customers", [])
idx = next((i for i, c in enumerate(customers) if c["id"] == data["id"]), None)
if idx is not None:
customers[idx] = data
else:
customers.append(data)
settings["customers"] = customers
save_settings(settings)
self._send_json(data, 200)
except Exception as e:
self._send_error(500, str(e))
return
# POST /api/projects — upsert a project
if self.path == "/api/projects":
if not self._check_session():
return
try:
data = json.loads(self._read_body())
settings = load_settings()
projects = settings.get("projects", [])
idx = next((i for i, p in enumerate(projects) if p["id"] == data["id"]), None)
if idx is not None:
projects[idx] = data
else:
projects.append(data)
settings["projects"] = projects
save_settings(settings)
self._send_json(data, 200)
except Exception as e:
self._send_error(500, str(e))
return
# POST /api/receipts — upsert a receipt
if self.path == "/api/receipts":
if not self._check_session():
return
try:
data = json.loads(self._read_body())
receipts = load_receipts()
idx = next((i for i, r in enumerate(receipts) if r["id"] == data["id"]), None)
if idx is not None:
receipts[idx] = data
else:
receipts.append(data)
save_receipts(receipts)
self._send_json(data, 200)
except Exception as e:
self._send_error(500, str(e))
return
# POST /api/extract-receipt — extract total + date from receipt image
if self.path == "/api/extract-receipt":
if not self._check_session():
return
try:
body = self._read_body()
result = extract_receipt_info(body)
self._send_json(result)
except Exception as e:
self._send_json({"amount": None, "date": None})
return
# POST /api/photos/<id> — upload photo
m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path)
if m:
if not self._check_session():
return
photo_id = m.group(1)
try:
body = self._read_body()
r = nc_put(f"{PHOTOS_DIR}/{photo_id}.jpg", body, "image/jpeg")
r.raise_for_status()
self._send_json({"url": f"/api/photos/{photo_id}"})
except Exception as e:
self._send_error(502, str(e))
return
self._send_error(404, "Not found")
# --- DELETE --------------------------------------------------------------
def do_DELETE(self):
# DELETE /api/customers/<id>
m = re.fullmatch(r"/api/customers/([A-Za-z0-9_-]+)", self.path)
if m:
if not self._check_session():
return
cid = m.group(1)
try:
settings = load_settings()
settings["customers"] = [c for c in settings.get("customers", []) if c["id"] != cid]
settings["projects"] = [p for p in settings.get("projects", []) if p.get("customerId") != cid]
save_settings(settings)
self._send_json({"ok": True})
except Exception as e:
self._send_error(500, str(e))
return
# DELETE /api/projects/<id>
m = re.fullmatch(r"/api/projects/([A-Za-z0-9_-]+)", self.path)
if m:
if not self._check_session():
return
pid = m.group(1)
try:
settings = load_settings()
settings["projects"] = [p for p in settings.get("projects", []) if p["id"] != pid]
save_settings(settings)
self._send_json({"ok": True})
except Exception as e:
self._send_error(500, str(e))
return
m = re.fullmatch(r"/api/receipts/([A-Za-z0-9_-]+)", self.path)
if m:
if not self._check_session():
return
receipt_id = m.group(1)
try:
receipts = load_receipts()
receipts = [r for r in receipts if r["id"] != receipt_id]
save_receipts(receipts)
# Also delete photo (ignore 404)
try:
nc_delete(f"{PHOTOS_DIR}/{receipt_id}.jpg")
except Exception:
pass
self._send_json({"ok": True})
except Exception as e:
self._send_error(500, str(e))
return
self._send_error(404, "Not found")
# Suppress default logging noise
def log_message(self, fmt, *args):
print(f"[{self.command}] {self.path}{args[1] if len(args) > 1 else args[0]}")
# --- Startup ------------------------------------------------------------------
def ensure_folder_path():
"""Create the full folder hierarchy on Nextcloud if it doesn't exist."""
parts = ["business-manager", "Expenses", "Receipts"]
base = f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/"
current = base
for part in parts:
current += urlquote(part) + "/"
r = nc_propfind(current)
if r.status_code == 404:
print(f" Creating folder: {part}/")
mr = requests.request("MKCOL", current, auth=NC_AUTH, timeout=15)
if mr.status_code not in (201, 405):
print(f" ERROR creating {current}: {mr.status_code} {mr.text}")
sys.exit(1)
# Also ensure photos/ sub-folder
photos_url = current + urlquote(PHOTOS_DIR) + "/"
r = nc_propfind(photos_url)
if r.status_code == 404:
print(f" Creating folder: {PHOTOS_DIR}/")
mr = requests.request("MKCOL", photos_url, auth=NC_AUTH, timeout=15)
if mr.status_code not in (201, 405):
print(f" ERROR creating photos folder: {mr.status_code}")
sys.exit(1)
def main():
global NC_USERNAME, NC_PASSWORD, NC_DAV_ROOT, NC_AUTH, ANTHROPIC_API_KEY
print("=== Receipt Manager — Nextcloud Backend ===\n")
# Load Anthropic API key
try:
with open("api-key", "r") as f:
ANTHROPIC_API_KEY = f.read().strip()
print("Anthropic API key loaded.")
except FileNotFoundError:
print("WARNING: api-key file not found — receipt extraction disabled.")
if not NC_USERNAME:
NC_USERNAME = input("Nextcloud username: ").strip()
if not NC_PASSWORD:
NC_PASSWORD = getpass.getpass("App password: ").strip()
NC_AUTH = (NC_USERNAME, NC_PASSWORD)
NC_DAV_ROOT = (
f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}"
f"/business-manager/Expenses/Receipts/"
)
print("\nVerifying Nextcloud connectivity...")
try:
r = nc_propfind(
f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/"
)
if r.status_code == 401:
print("ERROR: Authentication failed. Check username/app-password.")
sys.exit(1)
r.raise_for_status()
print(" Connected OK.")
except requests.ConnectionError:
print("ERROR: Cannot reach Nextcloud server.")
sys.exit(1)
print("Ensuring folder structure...")
ensure_folder_path()
print(" Folders OK.\n")
port = 8080
server = ReuseTCPServer(("", port), Handler)
print(f"Serving on http://localhost:{port}")
print("Press Ctrl+C to stop.\n")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nShutting down.")
server.server_close()
if __name__ == "__main__":
main()