Add NextSnap PWA with photo gallery viewer and continuous capture

Offline-first photo capture app for Nextcloud with:
- Camera capture with continuous mode (auto-reopens after each photo)
- File browser with fullscreen image gallery, swipe navigation, and rename
- Upload queue with background sync engine
- Admin panel for Nextcloud user management
- Service worker for offline-first caching (v13)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-07 04:53:13 -06:00
commit cad4118f72
55 changed files with 9038 additions and 0 deletions

1
app/services/__init__.py Normal file
View File

@@ -0,0 +1 @@
# Services package initialization

39
app/services/auth.py Normal file
View File

@@ -0,0 +1,39 @@
from flask import session
from functools import wraps
from flask import jsonify
import base64
def _decrypt_password(encrypted: str) -> str:
"""Decode base64 encoded password from session."""
return base64.b64decode(encrypted.encode()).decode()
def get_current_user():
"""Get current authenticated user info from session."""
if 'username' not in session:
return None
return {
'username': session['username'],
'password': _decrypt_password(session['password']),
'is_admin': session.get('is_admin', False)
}
def require_auth(f):
"""Decorator to require authentication for a route."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session:
return jsonify({'error': 'Authentication required'}), 401
return f(*args, **kwargs)
return decorated_function
def require_admin(f):
"""Decorator to require admin privileges for a route."""
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session:
return jsonify({'error': 'Authentication required'}), 401
if not session.get('is_admin', False):
return jsonify({'error': 'Admin privileges required'}), 403
return f(*args, **kwargs)
return decorated_function

350
app/services/nextcloud.py Normal file
View File

@@ -0,0 +1,350 @@
import requests
from requests.auth import HTTPBasicAuth
from typing import Optional, Dict, Any, List
from xml.etree import ElementTree as ET
from urllib.parse import quote, unquote
class NextcloudClient:
"""Client for interacting with Nextcloud WebDAV and OCS APIs."""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.auth = HTTPBasicAuth(username, password)
self.webdav_root = f"{self.base_url}/remote.php/dav/files/{username}"
self.ocs_root = f"{self.base_url}/ocs/v1.php"
def _make_request(self, method: str, url: str, **kwargs) -> requests.Response:
"""Make an authenticated request to Nextcloud."""
kwargs['auth'] = self.auth
kwargs.setdefault('timeout', 30)
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response
def verify_credentials(self) -> bool:
"""Verify that credentials are valid by attempting a PROPFIND on user root."""
try:
response = self._make_request('PROPFIND', self.webdav_root, headers={'Depth': '0'})
return response.status_code in [200, 207]
except Exception:
return False
def check_admin(self) -> bool:
"""Check if the user is in the admin group."""
try:
# Get user's groups via OCS API
url = f"{self.ocs_root}/cloud/users/{self.username}/groups"
headers = {'OCS-APIRequest': 'true'}
response = self._make_request('GET', url, headers=headers)
# Parse XML response
root = ET.fromstring(response.text)
# Check if 'admin' group is in the groups list
groups = []
for element in root.findall('.//element'):
groups.append(element.text)
return 'admin' in groups
except Exception:
return False
def propfind(self, path: str, depth: int = 1) -> Dict[str, Any]:
"""Execute a PROPFIND request on the given path."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
headers = {'Depth': str(depth)}
try:
response = self._make_request('PROPFIND', url, headers=headers)
return self._parse_propfind_response(response.text, path)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return {'items': [], 'path': path, 'error': 'Path not found'}
raise
def _parse_propfind_response(self, xml_text: str, requested_path: str) -> Dict[str, Any]:
"""Parse WebDAV PROPFIND XML response into structured data."""
try:
# Parse XML
root = ET.fromstring(xml_text)
# Define namespace
ns = {
'd': 'DAV:',
'oc': 'http://owncloud.org/ns',
'nc': 'http://nextcloud.org/ns'
}
items = []
# Find all response elements
for response in root.findall('d:response', ns):
# Get href (path)
href_elem = response.find('d:href', ns)
if href_elem is None:
continue
href = unquote(href_elem.text)
# Extract filename from href
# href is like /remote.php/dav/files/username/path/to/file
parts = href.split('/files/' + self.username + '/')
if len(parts) > 1:
relative_path = parts[1].rstrip('/')
else:
continue
# Skip the parent directory in listings (depth=1 returns parent too)
if relative_path == requested_path.strip('/'):
continue
# Get properties
propstat = response.find('d:propstat', ns)
if propstat is None:
continue
prop = propstat.find('d:prop', ns)
if prop is None:
continue
# Check if it's a directory
resourcetype = prop.find('d:resourcetype', ns)
is_dir = resourcetype is not None and resourcetype.find('d:collection', ns) is not None
# Get file size
size_elem = prop.find('d:getcontentlength', ns)
size = int(size_elem.text) if size_elem is not None and size_elem.text else 0
# Get last modified
modified_elem = prop.find('d:getlastmodified', ns)
modified = modified_elem.text if modified_elem is not None else ''
# Get content type
content_type_elem = prop.find('d:getcontenttype', ns)
content_type = content_type_elem.text if content_type_elem is not None else ''
# Extract just the filename/dirname
name = relative_path.split('/')[-1] if '/' in relative_path else relative_path
items.append({
'name': name,
'path': '/' + relative_path,
'type': 'directory' if is_dir else 'file',
'size': size,
'modified': modified,
'content_type': content_type
})
return {
'items': items,
'path': requested_path,
'count': len(items)
}
except Exception as e:
return {
'items': [],
'path': requested_path,
'error': f'Failed to parse response: {str(e)}'
}
def put_file(self, path: str, data: bytes) -> Dict[str, Any]:
"""Upload a file to Nextcloud."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
headers = {
'Content-Type': 'application/octet-stream'
}
response = self._make_request('PUT', url, data=data, headers=headers)
return {
'success': True,
'url': url,
'status_code': response.status_code
}
def get_file(self, path: str) -> bytes:
"""Download a file from Nextcloud."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
response = self._make_request('GET', url)
return response.content
def head(self, path: str) -> bool:
"""Check if a file exists using HEAD request."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
try:
response = self._make_request('HEAD', url)
return response.status_code == 200
except Exception:
return False
def mkcol(self, path: str) -> bool:
"""Create a new directory."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
try:
response = self._make_request('MKCOL', url)
return response.status_code in [201, 405] # 201 Created, 405 if already exists
except requests.exceptions.HTTPError as e:
if e.response.status_code == 405:
# Directory already exists
return True
raise
def move(self, source: str, destination: str) -> bool:
"""Move or rename a file/directory."""
source_url = f"{self.webdav_root}/{source.lstrip('/')}"
dest_url = f"{self.webdav_root}/{destination.lstrip('/')}"
headers = {
'Destination': dest_url,
'Overwrite': 'F' # Don't overwrite existing files
}
response = self._make_request('MOVE', source_url, headers=headers)
return response.status_code in [201, 204] # 201 Created, 204 No Content
def delete(self, path: str) -> bool:
"""Delete a file or directory."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
try:
response = self._make_request('DELETE', url)
return response.status_code == 204
except Exception:
return False
# OCS User Management Methods
def ocs_get_users(self) -> Dict[str, Any]:
"""Get list of all users via OCS API."""
url = f"{self.ocs_root}/cloud/users"
params = {'format': 'json'}
try:
response = self._make_request('GET', url, params=params)
data = response.json()
if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100:
raise Exception('OCS API error')
users = data.get('ocs', {}).get('data', {}).get('users', [])
# Get detailed info for each user
user_list = []
for username in users:
user_info = self.ocs_get_user(username)
if user_info:
user_list.append(user_info)
return {'success': True, 'users': user_list}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_get_user(self, username: str) -> Dict[str, Any]:
"""Get detailed info for a specific user."""
url = f"{self.ocs_root}/cloud/users/{username}"
params = {'format': 'json'}
try:
response = self._make_request('GET', url, params=params)
data = response.json()
if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100:
return None
user_data = data.get('ocs', {}).get('data', {})
return {
'id': user_data.get('id', username),
'displayname': user_data.get('displayname', ''),
'email': user_data.get('email', ''),
'enabled': user_data.get('enabled', True),
'groups': user_data.get('groups', [])
}
except Exception:
return None
def ocs_create_user(self, username: str, password: str, email: str = None,
displayname: str = None, groups: list = None) -> Dict[str, Any]:
"""Create a new user via OCS API."""
url = f"{self.ocs_root}/cloud/users"
params = {'format': 'json'}
data = {
'userid': username,
'password': password
}
if email:
data['email'] = email
if displayname:
data['displayName'] = displayname
if groups:
data['groups'] = groups
try:
response = self._make_request('POST', url, params=params, data=data)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
error_msg = meta.get('message', 'Failed to create user')
return {'success': False, 'error': error_msg}
return {'success': True, 'username': username}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_enable_user(self, username: str) -> Dict[str, Any]:
"""Enable a user account."""
url = f"{self.ocs_root}/cloud/users/{username}/enable"
params = {'format': 'json'}
try:
response = self._make_request('PUT', url, params=params)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
return {'success': False, 'error': meta.get('message', 'Failed to enable user')}
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_disable_user(self, username: str) -> Dict[str, Any]:
"""Disable a user account."""
url = f"{self.ocs_root}/cloud/users/{username}/disable"
params = {'format': 'json'}
try:
response = self._make_request('PUT', url, params=params)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
return {'success': False, 'error': meta.get('message', 'Failed to disable user')}
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_delete_user(self, username: str) -> Dict[str, Any]:
"""Delete a user account."""
url = f"{self.ocs_root}/cloud/users/{username}"
params = {'format': 'json'}
try:
response = self._make_request('DELETE', url, params=params)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
return {'success': False, 'error': meta.get('message', 'Failed to delete user')}
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}