Add detailed inline comments to app.py, Dockerfile, and docker-compose.yml
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
109
app.py
109
app.py
@@ -105,22 +105,27 @@ def load_sites() -> dict[str, dict]:
|
||||
if not SITES_FILE.exists():
|
||||
log.warning("sites.conf not found — no sites can log in")
|
||||
return sites
|
||||
# Read the file line by line and parse each non-empty, non-comment line
|
||||
for line in SITES_FILE.read_text().splitlines():
|
||||
line = line.strip()
|
||||
# Skip blank lines and comments (lines starting with #)
|
||||
if not line or line.startswith("#"):
|
||||
continue
|
||||
# Skip malformed lines that don't contain a colon separator
|
||||
if ":" not in line:
|
||||
continue
|
||||
# Split on colons: "5001:1234:alice:pass" -> ["5001","1234","alice","pass"]
|
||||
parts = line.split(":")
|
||||
site_num = parts[0].strip()
|
||||
pin = parts[1].strip() if len(parts) > 1 else ""
|
||||
nc_user = parts[2].strip() if len(parts) > 2 else None
|
||||
nc_pass = parts[3].strip() if len(parts) > 3 else None
|
||||
site_num = parts[0].strip() # always present
|
||||
pin = parts[1].strip() if len(parts) > 1 else "" # required
|
||||
nc_user = parts[2].strip() if len(parts) > 2 else None # optional
|
||||
nc_pass = parts[3].strip() if len(parts) > 3 else None # optional
|
||||
# Only register the site if both site number and PIN are non-empty
|
||||
if site_num and pin:
|
||||
sites[site_num] = {
|
||||
"pin": pin,
|
||||
"nc_user": nc_user or None,
|
||||
"nc_pass": nc_pass or None,
|
||||
"nc_user": nc_user or None, # convert empty string to None
|
||||
"nc_pass": nc_pass or None, # convert empty string to None
|
||||
}
|
||||
return sites
|
||||
|
||||
@@ -160,12 +165,17 @@ def save_sites(sites: dict):
|
||||
"\n"
|
||||
)
|
||||
lines = []
|
||||
# Sort sites numerically (e.g. 1001, 1002, 5001) for readability.
|
||||
# The lambda converts the key to int for numeric sorting; non-numeric
|
||||
# keys (shouldn't happen) sort to the front with key=0.
|
||||
for site_num in sorted(sites.keys(), key=lambda s: int(s) if s.isdigit() else 0):
|
||||
cfg = sites[site_num]
|
||||
# Build the line: "5001:1234" (basic) or "5001:1234:alice:pass" (with NC creds)
|
||||
line = f"{site_num}:{cfg['pin']}"
|
||||
if cfg.get("nc_user") and cfg.get("nc_pass"):
|
||||
line += f":{cfg['nc_user']}:{cfg['nc_pass']}"
|
||||
lines.append(line)
|
||||
# Write the header comment block followed by all site lines
|
||||
SITES_FILE.write_text(header + "\n".join(lines) + "\n")
|
||||
|
||||
|
||||
@@ -577,12 +587,22 @@ def get_or_create_workbook(excel_path: Path):
|
||||
|
||||
def login_required(f):
|
||||
"""Decorator that redirects to /login if the user is not authenticated
|
||||
or has not selected a site."""
|
||||
or has not selected a site.
|
||||
|
||||
Usage: place @login_required below @app.route() on any view function
|
||||
that should only be accessible to logged-in technicians.
|
||||
"""
|
||||
from functools import wraps
|
||||
# @wraps preserves the original function's name and docstring,
|
||||
# which Flask needs to map URL rules to the correct endpoint.
|
||||
@wraps(f)
|
||||
def decorated(*args, **kwargs):
|
||||
# Both flags must be set: "authenticated" (PIN was correct) and
|
||||
# "site" (which site number they logged into). If either is
|
||||
# missing, bounce them back to the login page.
|
||||
if not session.get("authenticated") or not session.get("site"):
|
||||
return redirect(url_for("login"))
|
||||
# Credentials OK -- call the original view function
|
||||
return f(*args, **kwargs)
|
||||
return decorated
|
||||
|
||||
@@ -590,8 +610,10 @@ def login_required(f):
|
||||
def admin_required(f):
|
||||
"""Decorator that redirects to /admin/login if admin is not authenticated.
|
||||
|
||||
Independent from the technician session — both can coexist.
|
||||
Checks session["admin_authenticated"].
|
||||
Independent from the technician session — both can coexist in the
|
||||
same browser. A user can be logged in as both a technician (site PIN)
|
||||
and an admin (NC credentials) simultaneously because they use
|
||||
different session keys: "authenticated" vs "admin_authenticated".
|
||||
"""
|
||||
from functools import wraps
|
||||
@wraps(f)
|
||||
@@ -673,19 +695,27 @@ def entries():
|
||||
|
||||
rows = []
|
||||
if excel_path.exists():
|
||||
# read_only=True is faster and uses less memory -- we only need
|
||||
# to read values, not modify the workbook.
|
||||
wb = load_workbook(excel_path, read_only=True)
|
||||
ws = wb.active
|
||||
# iter_rows(values_only=True) yields tuples of cell values
|
||||
# instead of Cell objects -- simpler to work with.
|
||||
for i, row in enumerate(ws.iter_rows(values_only=True)):
|
||||
if i == 0:
|
||||
continue # skip header
|
||||
continue # skip the header row ("AP Number", "AP Location", ...)
|
||||
ap_num = row[0] or ""
|
||||
# Check which photos exist for this AP
|
||||
# For each AP entry, check which of the 6 photo types exist
|
||||
# on disk. This info is used by the template to show/hide
|
||||
# photo thumbnails and enable the "replace" button.
|
||||
photos = []
|
||||
if re.fullmatch(r"\d{3}", str(ap_num)):
|
||||
# Only check photos if ap_num is a valid 3-digit number
|
||||
# (protects against corrupted spreadsheet data)
|
||||
for suffix_name, file_suffix in SUFFIX_MAP.items():
|
||||
photos.append({
|
||||
"suffix": suffix_name,
|
||||
"label": SUFFIX_LABELS[suffix_name],
|
||||
"suffix": suffix_name, # URL-friendly name (e.g. "close")
|
||||
"label": SUFFIX_LABELS[suffix_name], # human label (e.g. "Close-up")
|
||||
"exists": find_photo(site, ap_num, file_suffix) is not None,
|
||||
})
|
||||
rows.append({
|
||||
@@ -694,7 +724,7 @@ def entries():
|
||||
"serial_number": row[2] or "",
|
||||
"mac_address": row[3] or "",
|
||||
"cable_length": row[4] or "",
|
||||
"photos": photos,
|
||||
"photos": photos, # list of photo status dicts for the template
|
||||
})
|
||||
wb.close()
|
||||
|
||||
@@ -746,10 +776,17 @@ def submit():
|
||||
if not re.fullmatch(r"([0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}", mac_address):
|
||||
return jsonify({"success": False, "error": "MAC Address must be valid (e.g. AA:BB:CC:DD:EE:FF)."}), 400
|
||||
|
||||
# Validate and save each of the 6 required photos
|
||||
saved_files = [] # filenames for the JSON response
|
||||
saved_paths = [] # full Paths for Nextcloud sync
|
||||
# Validate and save each of the 6 required photos.
|
||||
# PHOTO_FIELDS maps form field names to filename suffixes:
|
||||
# ("photo_ap", "") -> AP010.jpg (the "" means no suffix)
|
||||
# ("photo_far", "F") -> AP010F.jpg
|
||||
# ("photo_length", "_length") -> AP010_length.jpg
|
||||
# etc.
|
||||
saved_files = [] # filenames for the JSON response (e.g. ["AP010.jpg", ...])
|
||||
saved_paths = [] # full Path objects for Nextcloud sync
|
||||
for field_name, suffix in PHOTO_FIELDS:
|
||||
# request.files is a MultiDict of uploaded files keyed by form field name.
|
||||
# Each photo input in the HTML form has a name matching field_name.
|
||||
photo = request.files.get(field_name)
|
||||
if not photo or photo.filename == "":
|
||||
return jsonify({
|
||||
@@ -757,11 +794,14 @@ def submit():
|
||||
"error": f"Missing photo: {field_name}",
|
||||
}), 400
|
||||
|
||||
# Preserve the original file extension (e.g. .jpg, .png, .heic)
|
||||
# Preserve the original file extension so we don't lose format info.
|
||||
# If for some reason there's no extension, default to .jpg.
|
||||
ext = os.path.splitext(photo.filename)[1].lower() or ".jpg"
|
||||
# Build the structured filename: AP{number}{suffix}.{ext}
|
||||
# e.g. AP010.jpg, AP010F.jpg, AP010_length.jpg
|
||||
filename = f"AP{ap_number}{suffix}{ext}"
|
||||
save_path = site_upload_dir / filename
|
||||
# Flask's FileStorage.save() writes the uploaded file to disk
|
||||
photo.save(str(save_path))
|
||||
saved_files.append(filename)
|
||||
saved_paths.append(save_path)
|
||||
@@ -847,18 +887,22 @@ def replace_photo(ap_number, suffix):
|
||||
file_suffix = SUFFIX_MAP[suffix]
|
||||
site_dir, site_excel = get_site_paths(site)
|
||||
|
||||
# Delete old file (may have different extension)
|
||||
# Delete old file first. We must do this because the new photo might
|
||||
# have a different extension (e.g. replacing AP010.jpg with AP010.png).
|
||||
# If we didn't delete the old one, both files would exist and
|
||||
# find_photo() (which uses glob) might return the wrong one.
|
||||
old_path = find_photo(site, ap_number, file_suffix)
|
||||
if old_path and old_path.exists():
|
||||
old_path.unlink()
|
||||
old_path.unlink() # delete the old photo file
|
||||
|
||||
# Save new file
|
||||
# Save the new photo with the correct structured filename
|
||||
ext = os.path.splitext(photo.filename)[1].lower() or ".jpg"
|
||||
filename = f"AP{ap_number}{file_suffix}{ext}"
|
||||
save_path = site_dir / filename
|
||||
photo.save(str(save_path))
|
||||
|
||||
# Sync to Nextcloud
|
||||
# Sync the replacement photo to Nextcloud. We also pass the Excel
|
||||
# file even though it hasn't changed -- nc_sync uploads both.
|
||||
nc_sync(site, [save_path], site_excel)
|
||||
|
||||
return jsonify({"success": True, "message": f"Photo replaced: {filename}"})
|
||||
@@ -911,6 +955,8 @@ def admin_dashboard():
|
||||
"""
|
||||
sites = load_sites()
|
||||
site_list = []
|
||||
# Build a list of site info dicts for the admin template, sorted
|
||||
# numerically so they display in a consistent order.
|
||||
for site_num in sorted(sites.keys(), key=lambda s: int(s) if s.isdigit() else 0):
|
||||
cfg = sites[site_num]
|
||||
entry = {
|
||||
@@ -919,11 +965,14 @@ def admin_dashboard():
|
||||
"nc_user": cfg.get("nc_user"),
|
||||
"nc_pass": cfg.get("nc_pass"),
|
||||
}
|
||||
# Check NC user status if per-site credentials are set
|
||||
# For sites with per-site NC credentials, check whether the
|
||||
# Nextcloud user account actually exists. This lets the admin
|
||||
# see at a glance which users need provisioning.
|
||||
# For sites using global credentials, nc_exists is None (N/A).
|
||||
if cfg.get("nc_user"):
|
||||
entry["nc_exists"] = nc_user_exists(cfg["nc_user"])
|
||||
else:
|
||||
entry["nc_exists"] = None # uses global — no status to check
|
||||
entry["nc_exists"] = None # uses global credentials -- no per-site user to check
|
||||
site_list.append(entry)
|
||||
return render_template("admin.html", sites=site_list)
|
||||
|
||||
@@ -936,22 +985,29 @@ def admin_add_site():
|
||||
JSON body: {site, pin, nc_user?, nc_pass?}
|
||||
Site must be exactly 4 digits and must not already exist.
|
||||
"""
|
||||
# force=True makes get_json() work even if Content-Type isn't
|
||||
# application/json (the admin JS sends JSON via fetch).
|
||||
data = request.get_json(force=True)
|
||||
site = data.get("site", "").strip()
|
||||
pin = data.get("pin", "").strip()
|
||||
# NC credentials are optional -- empty strings become None
|
||||
nc_user = data.get("nc_user", "").strip() or None
|
||||
nc_pass = data.get("nc_pass", "").strip() or None
|
||||
|
||||
# Validate: site must be exactly 4 digits (e.g. "5001", not "50" or "ABCD")
|
||||
if not re.fullmatch(r"\d{4}", site):
|
||||
return jsonify({"ok": False, "error": "Site must be exactly 4 digits"}), 400
|
||||
|
||||
if not pin:
|
||||
return jsonify({"ok": False, "error": "PIN is required"}), 400
|
||||
|
||||
# Re-read sites.conf to get current state (avoids race conditions
|
||||
# if two admins are editing simultaneously)
|
||||
sites = load_sites()
|
||||
if site in sites:
|
||||
return jsonify({"ok": False, "error": f"Site {site} already exists"}), 409
|
||||
|
||||
# Add the new site and write back to sites.conf
|
||||
sites[site] = {"pin": pin, "nc_user": nc_user, "nc_pass": nc_pass}
|
||||
save_sites(sites)
|
||||
return jsonify({"ok": True})
|
||||
@@ -1032,6 +1088,11 @@ def admin_provision_site(site_id):
|
||||
# ---------------------------------------------------------------------------
|
||||
# Startup provisioning — runs on import (works with both gunicorn and dev server)
|
||||
# ---------------------------------------------------------------------------
|
||||
# This block runs at import time (i.e. when gunicorn loads the module or
|
||||
# when you run "python3 app.py" directly). It checks whether Nextcloud
|
||||
# credentials are configured and, if so, ensures all per-site NC users
|
||||
# from sites.conf exist. This is a one-time startup task -- to pick up
|
||||
# sites added later, restart the container.
|
||||
if nc_enabled(NC_USER, NC_PASS):
|
||||
print(f"Nextcloud sync enabled → {NC_URL} (default user: {NC_USER}, folder: {NC_FOLDER})")
|
||||
nc_provision_site_users()
|
||||
|
||||
Reference in New Issue
Block a user