Add authentication and date picker to Receipt Manager
Add username/password login with session cookies, change-password modal, and logout. All API endpoints require a valid session. Default credentials (admin/admin) are auto-created on first run. Also add a date input field to the receipt modal, defaulting to today but allowing the user to pick any date. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
466
server.py
Normal file
466
server.py
Normal file
@@ -0,0 +1,466 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Receipt Manager backend — proxies receipt data and photos to Nextcloud via WebDAV."""
|
||||
|
||||
import getpass
|
||||
import hashlib
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
import secrets
|
||||
import sys
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from http.cookies import SimpleCookie
|
||||
from urllib.parse import quote as urlquote
|
||||
|
||||
import openpyxl
|
||||
from openpyxl.styles import Font, PatternFill, Alignment, Border, Side
|
||||
import requests
|
||||
|
||||
# --- Nextcloud configuration (filled at startup) ---
|
||||
NC_BASE = "https://nextcloud.sdanywhere.com"
|
||||
NC_USERNAME = "kamaji"
|
||||
NC_PASSWORD = "DvB0U2Uj3tOJaD"
|
||||
NC_DAV_ROOT = "" # set after login, e.g. /remote.php/dav/files/<user>/business-manager/Expenses/Receipts/
|
||||
NC_AUTH = ()
|
||||
|
||||
RECEIPTS_FILE = "receipts.json"
|
||||
AUTH_FILE = "auth.json"
|
||||
PHOTOS_DIR = "photos"
|
||||
|
||||
# Server-side session store: {token: True}
|
||||
SESSIONS = {}
|
||||
|
||||
|
||||
def hash_password(password):
|
||||
"""Hash a password with SHA-256."""
|
||||
return hashlib.sha256(password.encode()).hexdigest()
|
||||
|
||||
|
||||
def load_auth():
|
||||
"""Read auth.json from Nextcloud. If missing, create default credentials."""
|
||||
r = nc_get(AUTH_FILE)
|
||||
if r.status_code == 404:
|
||||
default = {"username": "admin", "password_hash": hash_password("admin")}
|
||||
save_auth(default)
|
||||
return default
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def save_auth(auth_data):
|
||||
"""Write auth.json to Nextcloud."""
|
||||
data = json.dumps(auth_data, indent=2).encode()
|
||||
r = nc_put(AUTH_FILE, data, "application/json")
|
||||
r.raise_for_status()
|
||||
|
||||
|
||||
def nc_url(path=""):
|
||||
"""Build full Nextcloud WebDAV URL for a sub-path under the receipts folder."""
|
||||
return NC_DAV_ROOT + path
|
||||
|
||||
|
||||
def nc_get(path=""):
|
||||
"""GET a resource from Nextcloud. Returns requests.Response."""
|
||||
return requests.get(nc_url(path), auth=NC_AUTH, timeout=30)
|
||||
|
||||
|
||||
def nc_put(path, data, content_type="application/octet-stream"):
|
||||
"""PUT (upload/overwrite) a resource to Nextcloud."""
|
||||
return requests.put(nc_url(path), data=data, auth=NC_AUTH,
|
||||
headers={"Content-Type": content_type}, timeout=60)
|
||||
|
||||
|
||||
def nc_delete(path):
|
||||
"""DELETE a resource from Nextcloud."""
|
||||
return requests.delete(nc_url(path), auth=NC_AUTH, timeout=30)
|
||||
|
||||
|
||||
def nc_mkcol(path):
|
||||
"""Create a collection (folder) on Nextcloud. Ignores 405 (already exists)."""
|
||||
r = requests.request("MKCOL", nc_url(path), auth=NC_AUTH, timeout=15)
|
||||
if r.status_code not in (201, 405):
|
||||
r.raise_for_status()
|
||||
|
||||
|
||||
def nc_propfind(url):
|
||||
"""PROPFIND on an absolute URL. Returns the response."""
|
||||
return requests.request("PROPFIND", url, auth=NC_AUTH,
|
||||
headers={"Depth": "0"}, timeout=15)
|
||||
|
||||
|
||||
# --- Helpers for receipts.json ------------------------------------------------
|
||||
|
||||
def load_receipts():
|
||||
"""Read receipts.json from Nextcloud. Returns a list."""
|
||||
r = nc_get(RECEIPTS_FILE)
|
||||
if r.status_code == 404:
|
||||
return []
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
|
||||
|
||||
def save_receipts(receipts):
|
||||
"""Write receipts.json to Nextcloud."""
|
||||
data = json.dumps(receipts, indent=2).encode()
|
||||
r = nc_put(RECEIPTS_FILE, data, "application/json")
|
||||
r.raise_for_status()
|
||||
|
||||
|
||||
def build_excel(receipts):
|
||||
"""Build an .xlsx file from the receipts list. Returns bytes."""
|
||||
wb = openpyxl.Workbook()
|
||||
ws = wb.active
|
||||
ws.title = "Receipts"
|
||||
|
||||
# Styles
|
||||
header_font = Font(bold=True, color="FFFFFF", size=11)
|
||||
header_fill = PatternFill("solid", fgColor="1A1A2E")
|
||||
header_align = Alignment(horizontal="center")
|
||||
thin_border = Border(
|
||||
bottom=Side(style="thin", color="DDDDDD"),
|
||||
)
|
||||
money_fmt = '#,##0.00'
|
||||
|
||||
# Headers
|
||||
headers = ["Date", "Amount ($)", "Category", "Note"]
|
||||
col_widths = [14, 14, 14, 40]
|
||||
for col_idx, (label, width) in enumerate(zip(headers, col_widths), 1):
|
||||
cell = ws.cell(row=1, column=col_idx, value=label)
|
||||
cell.font = header_font
|
||||
cell.fill = header_fill
|
||||
cell.alignment = header_align
|
||||
ws.column_dimensions[cell.column_letter].width = width
|
||||
|
||||
# Data rows
|
||||
for row_idx, r in enumerate(receipts, 2):
|
||||
date_str = r.get("date", "")[:10] # YYYY-MM-DD
|
||||
ws.cell(row=row_idx, column=1, value=date_str).border = thin_border
|
||||
amt_cell = ws.cell(row=row_idx, column=2, value=r.get("amount", 0))
|
||||
amt_cell.number_format = money_fmt
|
||||
amt_cell.border = thin_border
|
||||
ws.cell(row=row_idx, column=3,
|
||||
value=r.get("category", "").capitalize()).border = thin_border
|
||||
ws.cell(row=row_idx, column=4,
|
||||
value=r.get("note", "")).border = thin_border
|
||||
|
||||
# Total row
|
||||
if receipts:
|
||||
total_row = len(receipts) + 2
|
||||
total_label = ws.cell(row=total_row, column=1, value="TOTAL")
|
||||
total_label.font = Font(bold=True)
|
||||
total_cell = ws.cell(
|
||||
row=total_row, column=2,
|
||||
value=sum(r.get("amount", 0) for r in receipts))
|
||||
total_cell.font = Font(bold=True)
|
||||
total_cell.number_format = money_fmt
|
||||
|
||||
buf = io.BytesIO()
|
||||
wb.save(buf)
|
||||
return buf.getvalue()
|
||||
|
||||
|
||||
# --- HTTP handler -------------------------------------------------------------
|
||||
|
||||
class ReuseTCPServer(HTTPServer):
|
||||
allow_reuse_address = True
|
||||
|
||||
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
def _send_json(self, obj, status=200, extra_headers=None):
|
||||
body = json.dumps(obj).encode()
|
||||
self.send_response(status)
|
||||
self.send_header("Content-Type", "application/json")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
if extra_headers:
|
||||
for k, v in extra_headers.items():
|
||||
self.send_header(k, v)
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
|
||||
def _get_session_token(self):
|
||||
"""Extract session token from cookies."""
|
||||
cookie_header = self.headers.get("Cookie", "")
|
||||
cookie = SimpleCookie()
|
||||
cookie.load(cookie_header)
|
||||
if "session" in cookie:
|
||||
return cookie["session"].value
|
||||
return None
|
||||
|
||||
def _check_session(self):
|
||||
"""Return True if the request has a valid session. Otherwise send 401."""
|
||||
token = self._get_session_token()
|
||||
if token and token in SESSIONS:
|
||||
return True
|
||||
self._send_json({"error": "Unauthorized"}, 401)
|
||||
return False
|
||||
|
||||
def _read_body(self):
|
||||
length = int(self.headers.get("Content-Length", 0))
|
||||
return self.rfile.read(length)
|
||||
|
||||
def _send_error(self, status, message):
|
||||
self._send_json({"error": message}, status)
|
||||
|
||||
# --- routing helpers ---
|
||||
def _match(self, method, pattern):
|
||||
"""Check method and return regex match against self.path, or None."""
|
||||
if self.command != method:
|
||||
return None
|
||||
return re.fullmatch(pattern, self.path)
|
||||
|
||||
# --- GET -----------------------------------------------------------------
|
||||
def do_GET(self):
|
||||
# Serve receipts.html
|
||||
if self.path == "/" or self.path == "/receipts.html":
|
||||
try:
|
||||
with open("receipts.html", "rb") as f:
|
||||
body = f.read()
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "text/html; charset=utf-8")
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
except FileNotFoundError:
|
||||
self._send_error(404, "receipts.html not found")
|
||||
return
|
||||
|
||||
# GET /api/receipts
|
||||
if self.path == "/api/receipts":
|
||||
if not self._check_session():
|
||||
return
|
||||
try:
|
||||
receipts = load_receipts()
|
||||
self._send_json(receipts)
|
||||
except Exception as e:
|
||||
self._send_error(502, str(e))
|
||||
return
|
||||
|
||||
# GET /api/export — export receipts as Excel
|
||||
if self.path == "/api/export":
|
||||
if not self._check_session():
|
||||
return
|
||||
try:
|
||||
receipts = load_receipts()
|
||||
body = build_excel(receipts)
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type",
|
||||
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
|
||||
self.send_header("Content-Disposition",
|
||||
'attachment; filename="receipts.xlsx"')
|
||||
self.send_header("Content-Length", str(len(body)))
|
||||
self.end_headers()
|
||||
self.wfile.write(body)
|
||||
except Exception as e:
|
||||
self._send_error(500, str(e))
|
||||
return
|
||||
|
||||
# GET /api/photos/<id>
|
||||
m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path)
|
||||
if m:
|
||||
if not self._check_session():
|
||||
return
|
||||
photo_id = m.group(1)
|
||||
try:
|
||||
r = nc_get(f"{PHOTOS_DIR}/{photo_id}.jpg")
|
||||
if r.status_code == 404:
|
||||
self._send_error(404, "Photo not found")
|
||||
return
|
||||
r.raise_for_status()
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", r.headers.get("Content-Type", "image/jpeg"))
|
||||
self.send_header("Content-Length", str(len(r.content)))
|
||||
self.send_header("Cache-Control", "max-age=86400")
|
||||
self.end_headers()
|
||||
self.wfile.write(r.content)
|
||||
except Exception as e:
|
||||
self._send_error(502, str(e))
|
||||
return
|
||||
|
||||
self._send_error(404, "Not found")
|
||||
|
||||
# --- POST ----------------------------------------------------------------
|
||||
def do_POST(self):
|
||||
# POST /api/login
|
||||
if self.path == "/api/login":
|
||||
try:
|
||||
data = json.loads(self._read_body())
|
||||
auth = load_auth()
|
||||
if (data.get("username") == auth["username"]
|
||||
and hash_password(data.get("password", "")) == auth["password_hash"]):
|
||||
token = secrets.token_hex(32)
|
||||
SESSIONS[token] = True
|
||||
self._send_json({"ok": True}, 200, extra_headers={
|
||||
"Set-Cookie": f"session={token}; Path=/; HttpOnly; SameSite=Strict"
|
||||
})
|
||||
else:
|
||||
self._send_json({"error": "Invalid credentials"}, 401)
|
||||
except Exception as e:
|
||||
self._send_error(500, str(e))
|
||||
return
|
||||
|
||||
# POST /api/logout
|
||||
if self.path == "/api/logout":
|
||||
token = self._get_session_token()
|
||||
if token:
|
||||
SESSIONS.pop(token, None)
|
||||
self._send_json({"ok": True}, 200, extra_headers={
|
||||
"Set-Cookie": "session=; Path=/; HttpOnly; SameSite=Strict; Max-Age=0"
|
||||
})
|
||||
return
|
||||
|
||||
# POST /api/change-password
|
||||
if self.path == "/api/change-password":
|
||||
if not self._check_session():
|
||||
return
|
||||
try:
|
||||
data = json.loads(self._read_body())
|
||||
auth = load_auth()
|
||||
if hash_password(data.get("current", "")) != auth["password_hash"]:
|
||||
self._send_json({"error": "Current password is incorrect"}, 403)
|
||||
return
|
||||
new_pw = data.get("new", "")
|
||||
if len(new_pw) < 1:
|
||||
self._send_json({"error": "New password cannot be empty"}, 400)
|
||||
return
|
||||
auth["password_hash"] = hash_password(new_pw)
|
||||
save_auth(auth)
|
||||
self._send_json({"ok": True})
|
||||
except Exception as e:
|
||||
self._send_error(500, str(e))
|
||||
return
|
||||
|
||||
# POST /api/receipts — upsert a receipt
|
||||
if self.path == "/api/receipts":
|
||||
if not self._check_session():
|
||||
return
|
||||
try:
|
||||
data = json.loads(self._read_body())
|
||||
receipts = load_receipts()
|
||||
idx = next((i for i, r in enumerate(receipts) if r["id"] == data["id"]), None)
|
||||
if idx is not None:
|
||||
receipts[idx] = data
|
||||
else:
|
||||
receipts.append(data)
|
||||
save_receipts(receipts)
|
||||
self._send_json(data, 200)
|
||||
except Exception as e:
|
||||
self._send_error(500, str(e))
|
||||
return
|
||||
|
||||
# POST /api/photos/<id> — upload photo
|
||||
m = re.fullmatch(r"/api/photos/([A-Za-z0-9_-]+)", self.path)
|
||||
if m:
|
||||
if not self._check_session():
|
||||
return
|
||||
photo_id = m.group(1)
|
||||
try:
|
||||
body = self._read_body()
|
||||
r = nc_put(f"{PHOTOS_DIR}/{photo_id}.jpg", body, "image/jpeg")
|
||||
r.raise_for_status()
|
||||
self._send_json({"url": f"/api/photos/{photo_id}"})
|
||||
except Exception as e:
|
||||
self._send_error(502, str(e))
|
||||
return
|
||||
|
||||
self._send_error(404, "Not found")
|
||||
|
||||
# --- DELETE --------------------------------------------------------------
|
||||
def do_DELETE(self):
|
||||
m = re.fullmatch(r"/api/receipts/([A-Za-z0-9_-]+)", self.path)
|
||||
if m:
|
||||
if not self._check_session():
|
||||
return
|
||||
receipt_id = m.group(1)
|
||||
try:
|
||||
receipts = load_receipts()
|
||||
receipts = [r for r in receipts if r["id"] != receipt_id]
|
||||
save_receipts(receipts)
|
||||
# Also delete photo (ignore 404)
|
||||
try:
|
||||
nc_delete(f"{PHOTOS_DIR}/{receipt_id}.jpg")
|
||||
except Exception:
|
||||
pass
|
||||
self._send_json({"ok": True})
|
||||
except Exception as e:
|
||||
self._send_error(500, str(e))
|
||||
return
|
||||
|
||||
self._send_error(404, "Not found")
|
||||
|
||||
# Suppress default logging noise
|
||||
def log_message(self, fmt, *args):
|
||||
print(f"[{self.command}] {self.path} — {args[1] if len(args) > 1 else args[0]}")
|
||||
|
||||
|
||||
# --- Startup ------------------------------------------------------------------
|
||||
|
||||
def ensure_folder_path():
|
||||
"""Create the full folder hierarchy on Nextcloud if it doesn't exist."""
|
||||
parts = ["business-manager", "Expenses", "Receipts"]
|
||||
base = f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/"
|
||||
current = base
|
||||
for part in parts:
|
||||
current += urlquote(part) + "/"
|
||||
r = nc_propfind(current)
|
||||
if r.status_code == 404:
|
||||
print(f" Creating folder: {part}/")
|
||||
mr = requests.request("MKCOL", current, auth=NC_AUTH, timeout=15)
|
||||
if mr.status_code not in (201, 405):
|
||||
print(f" ERROR creating {current}: {mr.status_code} {mr.text}")
|
||||
sys.exit(1)
|
||||
# Also ensure photos/ sub-folder
|
||||
photos_url = current + urlquote(PHOTOS_DIR) + "/"
|
||||
r = nc_propfind(photos_url)
|
||||
if r.status_code == 404:
|
||||
print(f" Creating folder: {PHOTOS_DIR}/")
|
||||
mr = requests.request("MKCOL", photos_url, auth=NC_AUTH, timeout=15)
|
||||
if mr.status_code not in (201, 405):
|
||||
print(f" ERROR creating photos folder: {mr.status_code}")
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
def main():
|
||||
global NC_USERNAME, NC_PASSWORD, NC_DAV_ROOT, NC_AUTH
|
||||
|
||||
print("=== Receipt Manager — Nextcloud Backend ===\n")
|
||||
if not NC_USERNAME:
|
||||
NC_USERNAME = input("Nextcloud username: ").strip()
|
||||
if not NC_PASSWORD:
|
||||
NC_PASSWORD = getpass.getpass("App password: ").strip()
|
||||
NC_AUTH = (NC_USERNAME, NC_PASSWORD)
|
||||
NC_DAV_ROOT = (
|
||||
f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}"
|
||||
f"/business-manager/Expenses/Receipts/"
|
||||
)
|
||||
|
||||
print("\nVerifying Nextcloud connectivity...")
|
||||
try:
|
||||
r = nc_propfind(
|
||||
f"{NC_BASE}/remote.php/dav/files/{urlquote(NC_USERNAME)}/"
|
||||
)
|
||||
if r.status_code == 401:
|
||||
print("ERROR: Authentication failed. Check username/app-password.")
|
||||
sys.exit(1)
|
||||
r.raise_for_status()
|
||||
print(" Connected OK.")
|
||||
except requests.ConnectionError:
|
||||
print("ERROR: Cannot reach Nextcloud server.")
|
||||
sys.exit(1)
|
||||
|
||||
print("Ensuring folder structure...")
|
||||
ensure_folder_path()
|
||||
print(" Folders OK.\n")
|
||||
|
||||
port = 8080
|
||||
server = ReuseTCPServer(("", port), Handler)
|
||||
print(f"Serving on http://localhost:{port}")
|
||||
print("Press Ctrl+C to stop.\n")
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\nShutting down.")
|
||||
server.server_close()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user