Add password generator, duplicate AP rejection, and input sanitization

- Add Gen button to admin panel that generates 13-char alphanumeric
  passwords for Nextcloud credentials (Add Site form and Edit modal)
- Reject duplicate AP numbers within the same site (409 response)
- Fix newline injection vulnerability in admin API that allowed
  creating backdoor site entries via sites.conf corruption
- Fix colon-in-PIN bug by rejecting colons in PIN and NC User fields
- Use maxsplit=3 in sites.conf parser so NC Pass can contain colons
- Add nc_change_password() to sync password edits to Nextcloud
- Clean up corrupted sites.conf entries from prior injection

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
kamaji
2026-01-26 08:47:30 -06:00
parent 30fc55567c
commit a719e528ed
3 changed files with 132 additions and 94 deletions

204
app.py
View File

@@ -105,27 +105,22 @@ def load_sites() -> dict[str, dict]:
if not SITES_FILE.exists(): if not SITES_FILE.exists():
log.warning("sites.conf not found — no sites can log in") log.warning("sites.conf not found — no sites can log in")
return sites return sites
# Read the file line by line and parse each non-empty, non-comment line
for line in SITES_FILE.read_text().splitlines(): for line in SITES_FILE.read_text().splitlines():
line = line.strip() line = line.strip()
# Skip blank lines and comments (lines starting with #)
if not line or line.startswith("#"): if not line or line.startswith("#"):
continue continue
# Skip malformed lines that don't contain a colon separator
if ":" not in line: if ":" not in line:
continue continue
# Split on colons: "5001:1234:alice:pass" -> ["5001","1234","alice","pass"] parts = line.split(":", maxsplit=3)
parts = line.split(":") site_num = parts[0].strip()
site_num = parts[0].strip() # always present pin = parts[1].strip() if len(parts) > 1 else ""
pin = parts[1].strip() if len(parts) > 1 else "" # required nc_user = parts[2].strip() if len(parts) > 2 else None
nc_user = parts[2].strip() if len(parts) > 2 else None # optional nc_pass = parts[3].strip() if len(parts) > 3 else None
nc_pass = parts[3].strip() if len(parts) > 3 else None # optional
# Only register the site if both site number and PIN are non-empty
if site_num and pin: if site_num and pin:
sites[site_num] = { sites[site_num] = {
"pin": pin, "pin": pin,
"nc_user": nc_user or None, # convert empty string to None "nc_user": nc_user or None,
"nc_pass": nc_pass or None, # convert empty string to None "nc_pass": nc_pass or None,
} }
return sites return sites
@@ -165,17 +160,12 @@ def save_sites(sites: dict):
"\n" "\n"
) )
lines = [] lines = []
# Sort sites numerically (e.g. 1001, 1002, 5001) for readability.
# The lambda converts the key to int for numeric sorting; non-numeric
# keys (shouldn't happen) sort to the front with key=0.
for site_num in sorted(sites.keys(), key=lambda s: int(s) if s.isdigit() else 0): for site_num in sorted(sites.keys(), key=lambda s: int(s) if s.isdigit() else 0):
cfg = sites[site_num] cfg = sites[site_num]
# Build the line: "5001:1234" (basic) or "5001:1234:alice:pass" (with NC creds)
line = f"{site_num}:{cfg['pin']}" line = f"{site_num}:{cfg['pin']}"
if cfg.get("nc_user") and cfg.get("nc_pass"): if cfg.get("nc_user") and cfg.get("nc_pass"):
line += f":{cfg['nc_user']}:{cfg['nc_pass']}" line += f":{cfg['nc_user']}:{cfg['nc_pass']}"
lines.append(line) lines.append(line)
# Write the header comment block followed by all site lines
SITES_FILE.write_text(header + "\n".join(lines) + "\n") SITES_FILE.write_text(header + "\n".join(lines) + "\n")
@@ -333,6 +323,42 @@ def nc_create_user(username: str, password: str) -> bool:
return False return False
def nc_change_password(username: str, new_password: str) -> bool:
"""Change the password for an existing Nextcloud user.
Uses the OCS Provisioning API. Authenticated with the global admin
credentials (NC_USER / NC_PASS must be a Nextcloud admin account).
Args:
username: The Nextcloud user ID whose password to change
new_password: The new password to set
Returns:
True on success (statuscode 100), False on failure.
"""
url = f"{NC_URL.rstrip('/')}/ocs/v1.php/cloud/users/{username}?format=json"
try:
resp = requests.put(
url,
auth=(NC_USER, NC_PASS),
headers={"OCS-APIRequest": "true"},
data={"key": "password", "value": new_password},
timeout=30,
)
if resp.status_code == 200:
data = resp.json()
status = data.get("ocs", {}).get("meta", {}).get("statuscode")
if status == 100:
return True
msg = data.get("ocs", {}).get("meta", {}).get("message", "unknown error")
log.error("Failed to change NC password for %s: %s (status %s)", username, msg, status)
else:
log.error("Failed to change NC password for %s: HTTP %s", username, resp.status_code)
except Exception:
log.exception("Failed to change NC password for %s", username)
return False
def nc_provision_site_users(): def nc_provision_site_users():
"""Ensure all per-site Nextcloud users from sites.conf exist. """Ensure all per-site Nextcloud users from sites.conf exist.
@@ -587,22 +613,12 @@ def get_or_create_workbook(excel_path: Path):
def login_required(f): def login_required(f):
"""Decorator that redirects to /login if the user is not authenticated """Decorator that redirects to /login if the user is not authenticated
or has not selected a site. or has not selected a site."""
Usage: place @login_required below @app.route() on any view function
that should only be accessible to logged-in technicians.
"""
from functools import wraps from functools import wraps
# @wraps preserves the original function's name and docstring,
# which Flask needs to map URL rules to the correct endpoint.
@wraps(f) @wraps(f)
def decorated(*args, **kwargs): def decorated(*args, **kwargs):
# Both flags must be set: "authenticated" (PIN was correct) and
# "site" (which site number they logged into). If either is
# missing, bounce them back to the login page.
if not session.get("authenticated") or not session.get("site"): if not session.get("authenticated") or not session.get("site"):
return redirect(url_for("login")) return redirect(url_for("login"))
# Credentials OK -- call the original view function
return f(*args, **kwargs) return f(*args, **kwargs)
return decorated return decorated
@@ -610,10 +626,8 @@ def login_required(f):
def admin_required(f): def admin_required(f):
"""Decorator that redirects to /admin/login if admin is not authenticated. """Decorator that redirects to /admin/login if admin is not authenticated.
Independent from the technician session — both can coexist in the Independent from the technician session — both can coexist.
same browser. A user can be logged in as both a technician (site PIN) Checks session["admin_authenticated"].
and an admin (NC credentials) simultaneously because they use
different session keys: "authenticated" vs "admin_authenticated".
""" """
from functools import wraps from functools import wraps
@wraps(f) @wraps(f)
@@ -695,27 +709,19 @@ def entries():
rows = [] rows = []
if excel_path.exists(): if excel_path.exists():
# read_only=True is faster and uses less memory -- we only need
# to read values, not modify the workbook.
wb = load_workbook(excel_path, read_only=True) wb = load_workbook(excel_path, read_only=True)
ws = wb.active ws = wb.active
# iter_rows(values_only=True) yields tuples of cell values
# instead of Cell objects -- simpler to work with.
for i, row in enumerate(ws.iter_rows(values_only=True)): for i, row in enumerate(ws.iter_rows(values_only=True)):
if i == 0: if i == 0:
continue # skip the header row ("AP Number", "AP Location", ...) continue # skip header
ap_num = row[0] or "" ap_num = row[0] or ""
# For each AP entry, check which of the 6 photo types exist # Check which photos exist for this AP
# on disk. This info is used by the template to show/hide
# photo thumbnails and enable the "replace" button.
photos = [] photos = []
if re.fullmatch(r"\d{3}", str(ap_num)): if re.fullmatch(r"\d{3}", str(ap_num)):
# Only check photos if ap_num is a valid 3-digit number
# (protects against corrupted spreadsheet data)
for suffix_name, file_suffix in SUFFIX_MAP.items(): for suffix_name, file_suffix in SUFFIX_MAP.items():
photos.append({ photos.append({
"suffix": suffix_name, # URL-friendly name (e.g. "close") "suffix": suffix_name,
"label": SUFFIX_LABELS[suffix_name], # human label (e.g. "Close-up") "label": SUFFIX_LABELS[suffix_name],
"exists": find_photo(site, ap_num, file_suffix) is not None, "exists": find_photo(site, ap_num, file_suffix) is not None,
}) })
rows.append({ rows.append({
@@ -724,7 +730,7 @@ def entries():
"serial_number": row[2] or "", "serial_number": row[2] or "",
"mac_address": row[3] or "", "mac_address": row[3] or "",
"cable_length": row[4] or "", "cable_length": row[4] or "",
"photos": photos, # list of photo status dicts for the template "photos": photos,
}) })
wb.close() wb.close()
@@ -776,17 +782,22 @@ def submit():
if not re.fullmatch(r"([0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}", mac_address): if not re.fullmatch(r"([0-9A-Fa-f]{2}[:\-]){5}[0-9A-Fa-f]{2}", mac_address):
return jsonify({"success": False, "error": "MAC Address must be valid (e.g. AA:BB:CC:DD:EE:FF)."}), 400 return jsonify({"success": False, "error": "MAC Address must be valid (e.g. AA:BB:CC:DD:EE:FF)."}), 400
# Validate and save each of the 6 required photos. # Reject duplicate AP numbers within the same site
# PHOTO_FIELDS maps form field names to filename suffixes: if site_excel.exists():
# ("photo_ap", "") -> AP010.jpg (the "" means no suffix) wb_check = load_workbook(site_excel, read_only=True)
# ("photo_far", "F") -> AP010F.jpg ws_check = wb_check.active
# ("photo_length", "_length") -> AP010_length.jpg for i, row in enumerate(ws_check.iter_rows(values_only=True)):
# etc. if i == 0:
saved_files = [] # filenames for the JSON response (e.g. ["AP010.jpg", ...]) continue
saved_paths = [] # full Path objects for Nextcloud sync if str(row[0]) == ap_number:
wb_check.close()
return jsonify({"success": False, "error": f"AP {ap_number} already exists for this site."}), 409
wb_check.close()
# Validate and save each of the 6 required photos
saved_files = [] # filenames for the JSON response
saved_paths = [] # full Paths for Nextcloud sync
for field_name, suffix in PHOTO_FIELDS: for field_name, suffix in PHOTO_FIELDS:
# request.files is a MultiDict of uploaded files keyed by form field name.
# Each photo input in the HTML form has a name matching field_name.
photo = request.files.get(field_name) photo = request.files.get(field_name)
if not photo or photo.filename == "": if not photo or photo.filename == "":
return jsonify({ return jsonify({
@@ -794,14 +805,11 @@ def submit():
"error": f"Missing photo: {field_name}", "error": f"Missing photo: {field_name}",
}), 400 }), 400
# Preserve the original file extension so we don't lose format info. # Preserve the original file extension (e.g. .jpg, .png, .heic)
# If for some reason there's no extension, default to .jpg.
ext = os.path.splitext(photo.filename)[1].lower() or ".jpg" ext = os.path.splitext(photo.filename)[1].lower() or ".jpg"
# Build the structured filename: AP{number}{suffix}.{ext} # Build the structured filename: AP{number}{suffix}.{ext}
# e.g. AP010.jpg, AP010F.jpg, AP010_length.jpg
filename = f"AP{ap_number}{suffix}{ext}" filename = f"AP{ap_number}{suffix}{ext}"
save_path = site_upload_dir / filename save_path = site_upload_dir / filename
# Flask's FileStorage.save() writes the uploaded file to disk
photo.save(str(save_path)) photo.save(str(save_path))
saved_files.append(filename) saved_files.append(filename)
saved_paths.append(save_path) saved_paths.append(save_path)
@@ -887,22 +895,18 @@ def replace_photo(ap_number, suffix):
file_suffix = SUFFIX_MAP[suffix] file_suffix = SUFFIX_MAP[suffix]
site_dir, site_excel = get_site_paths(site) site_dir, site_excel = get_site_paths(site)
# Delete old file first. We must do this because the new photo might # Delete old file (may have different extension)
# have a different extension (e.g. replacing AP010.jpg with AP010.png).
# If we didn't delete the old one, both files would exist and
# find_photo() (which uses glob) might return the wrong one.
old_path = find_photo(site, ap_number, file_suffix) old_path = find_photo(site, ap_number, file_suffix)
if old_path and old_path.exists(): if old_path and old_path.exists():
old_path.unlink() # delete the old photo file old_path.unlink()
# Save the new photo with the correct structured filename # Save new file
ext = os.path.splitext(photo.filename)[1].lower() or ".jpg" ext = os.path.splitext(photo.filename)[1].lower() or ".jpg"
filename = f"AP{ap_number}{file_suffix}{ext}" filename = f"AP{ap_number}{file_suffix}{ext}"
save_path = site_dir / filename save_path = site_dir / filename
photo.save(str(save_path)) photo.save(str(save_path))
# Sync the replacement photo to Nextcloud. We also pass the Excel # Sync to Nextcloud
# file even though it hasn't changed -- nc_sync uploads both.
nc_sync(site, [save_path], site_excel) nc_sync(site, [save_path], site_excel)
return jsonify({"success": True, "message": f"Photo replaced: {filename}"}) return jsonify({"success": True, "message": f"Photo replaced: {filename}"})
@@ -955,8 +959,6 @@ def admin_dashboard():
""" """
sites = load_sites() sites = load_sites()
site_list = [] site_list = []
# Build a list of site info dicts for the admin template, sorted
# numerically so they display in a consistent order.
for site_num in sorted(sites.keys(), key=lambda s: int(s) if s.isdigit() else 0): for site_num in sorted(sites.keys(), key=lambda s: int(s) if s.isdigit() else 0):
cfg = sites[site_num] cfg = sites[site_num]
entry = { entry = {
@@ -965,14 +967,11 @@ def admin_dashboard():
"nc_user": cfg.get("nc_user"), "nc_user": cfg.get("nc_user"),
"nc_pass": cfg.get("nc_pass"), "nc_pass": cfg.get("nc_pass"),
} }
# For sites with per-site NC credentials, check whether the # Check NC user status if per-site credentials are set
# Nextcloud user account actually exists. This lets the admin
# see at a glance which users need provisioning.
# For sites using global credentials, nc_exists is None (N/A).
if cfg.get("nc_user"): if cfg.get("nc_user"):
entry["nc_exists"] = nc_user_exists(cfg["nc_user"]) entry["nc_exists"] = nc_user_exists(cfg["nc_user"])
else: else:
entry["nc_exists"] = None # uses global credentials -- no per-site user to check entry["nc_exists"] = None # uses global — no status to check
site_list.append(entry) site_list.append(entry)
return render_template("admin.html", sites=site_list) return render_template("admin.html", sites=site_list)
@@ -985,29 +984,31 @@ def admin_add_site():
JSON body: {site, pin, nc_user?, nc_pass?} JSON body: {site, pin, nc_user?, nc_pass?}
Site must be exactly 4 digits and must not already exist. Site must be exactly 4 digits and must not already exist.
""" """
# force=True makes get_json() work even if Content-Type isn't
# application/json (the admin JS sends JSON via fetch).
data = request.get_json(force=True) data = request.get_json(force=True)
site = data.get("site", "").strip() site = data.get("site", "").strip()
pin = data.get("pin", "").strip() pin = data.get("pin", "").strip()
# NC credentials are optional -- empty strings become None
nc_user = data.get("nc_user", "").strip() or None nc_user = data.get("nc_user", "").strip() or None
nc_pass = data.get("nc_pass", "").strip() or None nc_pass = data.get("nc_pass", "").strip() or None
# Validate: site must be exactly 4 digits (e.g. "5001", not "50" or "ABCD")
if not re.fullmatch(r"\d{4}", site): if not re.fullmatch(r"\d{4}", site):
return jsonify({"ok": False, "error": "Site must be exactly 4 digits"}), 400 return jsonify({"ok": False, "error": "Site must be exactly 4 digits"}), 400
if not pin: if not pin:
return jsonify({"ok": False, "error": "PIN is required"}), 400 return jsonify({"ok": False, "error": "PIN is required"}), 400
# Re-read sites.conf to get current state (avoids race conditions # Reject characters that would corrupt sites.conf (colon is the
# if two admins are editing simultaneously) # field delimiter, newlines would inject extra lines)
for field_name, value in [("PIN", pin), ("NC User", nc_user), ("NC Pass", nc_pass)]:
if value and ("\n" in value or "\r" in value):
return jsonify({"ok": False, "error": f"{field_name} must not contain newlines"}), 400
for field_name, value in [("PIN", pin), ("NC User", nc_user)]:
if value and ":" in value:
return jsonify({"ok": False, "error": f"{field_name} must not contain colons"}), 400
sites = load_sites() sites = load_sites()
if site in sites: if site in sites:
return jsonify({"ok": False, "error": f"Site {site} already exists"}), 409 return jsonify({"ok": False, "error": f"Site {site} already exists"}), 409
# Add the new site and write back to sites.conf
sites[site] = {"pin": pin, "nc_user": nc_user, "nc_pass": nc_pass} sites[site] = {"pin": pin, "nc_user": nc_user, "nc_pass": nc_pass}
save_sites(sites) save_sites(sites)
return jsonify({"ok": True}) return jsonify({"ok": True})
@@ -1026,14 +1027,38 @@ def admin_edit_site(site_id):
return jsonify({"ok": False, "error": f"Site {site_id} not found"}), 404 return jsonify({"ok": False, "error": f"Site {site_id} not found"}), 404
data = request.get_json(force=True) data = request.get_json(force=True)
if "pin" in data and data["pin"].strip(): pin = data.get("pin", "").strip() if "pin" in data else None
sites[site_id]["pin"] = data["pin"].strip() nc_user = data.get("nc_user", "").strip() if "nc_user" in data else None
if "nc_user" in data: nc_pass = data.get("nc_pass", "").strip() if "nc_pass" in data else None
sites[site_id]["nc_user"] = data["nc_user"].strip() or None
if "nc_pass" in data: # Reject characters that would corrupt sites.conf
sites[site_id]["nc_pass"] = data["nc_pass"].strip() or None for field_name, value in [("PIN", pin), ("NC User", nc_user), ("NC Pass", nc_pass)]:
if value and ("\n" in value or "\r" in value):
return jsonify({"ok": False, "error": f"{field_name} must not contain newlines"}), 400
for field_name, value in [("PIN", pin), ("NC User", nc_user)]:
if value and ":" in value:
return jsonify({"ok": False, "error": f"{field_name} must not contain colons"}), 400
if pin:
sites[site_id]["pin"] = pin
if nc_user is not None:
sites[site_id]["nc_user"] = nc_user or None
if nc_pass is not None:
sites[site_id]["nc_pass"] = nc_pass or None
save_sites(sites) save_sites(sites)
# Sync password change to Nextcloud if applicable
warning = None
new_nc_pass = data.get("nc_pass", "").strip() if "nc_pass" in data else None
nc_user = sites[site_id].get("nc_user")
if new_nc_pass and nc_user and all([NC_URL, NC_USER, NC_PASS]):
if nc_user_exists(nc_user):
if not nc_change_password(nc_user, new_nc_pass):
warning = f"Site saved, but failed to update password on Nextcloud for '{nc_user}'"
if warning:
return jsonify({"ok": True, "warning": warning})
return jsonify({"ok": True}) return jsonify({"ok": True})
@@ -1088,11 +1113,6 @@ def admin_provision_site(site_id):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Startup provisioning — runs on import (works with both gunicorn and dev server) # Startup provisioning — runs on import (works with both gunicorn and dev server)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# This block runs at import time (i.e. when gunicorn loads the module or
# when you run "python3 app.py" directly). It checks whether Nextcloud
# credentials are configured and, if so, ensures all per-site NC users
# from sites.conf exist. This is a one-time startup task -- to pick up
# sites added later, restart the container.
if nc_enabled(NC_USER, NC_PASS): if nc_enabled(NC_USER, NC_PASS):
print(f"Nextcloud sync enabled → {NC_URL} (default user: {NC_USER}, folder: {NC_FOLDER})") print(f"Nextcloud sync enabled → {NC_URL} (default user: {NC_USER}, folder: {NC_FOLDER})")
nc_provision_site_users() nc_provision_site_users()

View File

@@ -21,5 +21,6 @@
# The app reloads this file on every login attempt, so changes # The app reloads this file on every login attempt, so changes
# take effect immediately — no restart needed. # take effect immediately — no restart needed.
1102:1102:1102:FocusWrite2000!
1234:1234:1234:railFocus11 1234:1234:1234:railFocus11
2725:2725:2725:makeBiscuits 2725:2725:2725:makeBiscuits112

View File

@@ -177,6 +177,12 @@
cursor: not-allowed; cursor: not-allowed;
} }
.btn-gen {
padding: 8px 10px;
font-size: 0.8rem;
white-space: nowrap;
}
.card-actions { .card-actions {
margin-top: 10px; margin-top: 10px;
border-top: 1px solid #2a2a4a; border-top: 1px solid #2a2a4a;
@@ -298,6 +304,7 @@
<div class="form-row"> <div class="form-row">
<input type="text" id="add-nc-user" placeholder="NC User (optional)"> <input type="text" id="add-nc-user" placeholder="NC User (optional)">
<input type="text" id="add-nc-pass" placeholder="NC Pass (optional)"> <input type="text" id="add-nc-pass" placeholder="NC Pass (optional)">
<button type="button" class="btn btn-outline btn-gen" onclick="document.getElementById('add-nc-pass').value = generatePassword()">Gen</button>
</div> </div>
<button class="btn btn-blue" onclick="addSite()">Add Site</button> <button class="btn btn-blue" onclick="addSite()">Add Site</button>
</div> </div>
@@ -350,7 +357,10 @@
<label for="edit-nc-user">NC User</label> <label for="edit-nc-user">NC User</label>
<input type="text" id="edit-nc-user"> <input type="text" id="edit-nc-user">
<label for="edit-nc-pass">NC Pass</label> <label for="edit-nc-pass">NC Pass</label>
<input type="text" id="edit-nc-pass"> <div style="display:flex;gap:8px;">
<input type="text" id="edit-nc-pass" style="flex:1;">
<button type="button" class="btn btn-outline btn-gen" onclick="document.getElementById('edit-nc-pass').value = generatePassword()">Gen</button>
</div>
<div class="modal-actions"> <div class="modal-actions">
<button class="btn btn-outline" onclick="closeEdit()">Cancel</button> <button class="btn btn-outline" onclick="closeEdit()">Cancel</button>
<button class="btn btn-blue" onclick="saveEdit()">Save</button> <button class="btn btn-blue" onclick="saveEdit()">Save</button>
@@ -359,6 +369,13 @@
</div> </div>
<script> <script>
function generatePassword() {
const chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789';
const arr = new Uint8Array(13);
crypto.getRandomValues(arr);
return Array.from(arr, b => chars[b % chars.length]).join('');
}
const banner = document.getElementById('status-banner'); const banner = document.getElementById('status-banner');
function showBanner(msg, type) { function showBanner(msg, type) {