Files
nextsnap/app/services/nextcloud.py
kamaji cad4118f72 Add NextSnap PWA with photo gallery viewer and continuous capture
Offline-first photo capture app for Nextcloud with:
- Camera capture with continuous mode (auto-reopens after each photo)
- File browser with fullscreen image gallery, swipe navigation, and rename
- Upload queue with background sync engine
- Admin panel for Nextcloud user management
- Service worker for offline-first caching (v13)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-07 04:53:13 -06:00

351 lines
13 KiB
Python

import requests
from requests.auth import HTTPBasicAuth
from typing import Optional, Dict, Any, List
from xml.etree import ElementTree as ET
from urllib.parse import quote, unquote
class NextcloudClient:
"""Client for interacting with Nextcloud WebDAV and OCS APIs."""
def __init__(self, base_url: str, username: str, password: str):
self.base_url = base_url.rstrip('/')
self.username = username
self.password = password
self.auth = HTTPBasicAuth(username, password)
self.webdav_root = f"{self.base_url}/remote.php/dav/files/{username}"
self.ocs_root = f"{self.base_url}/ocs/v1.php"
def _make_request(self, method: str, url: str, **kwargs) -> requests.Response:
"""Make an authenticated request to Nextcloud."""
kwargs['auth'] = self.auth
kwargs.setdefault('timeout', 30)
response = requests.request(method, url, **kwargs)
response.raise_for_status()
return response
def verify_credentials(self) -> bool:
"""Verify that credentials are valid by attempting a PROPFIND on user root."""
try:
response = self._make_request('PROPFIND', self.webdav_root, headers={'Depth': '0'})
return response.status_code in [200, 207]
except Exception:
return False
def check_admin(self) -> bool:
"""Check if the user is in the admin group."""
try:
# Get user's groups via OCS API
url = f"{self.ocs_root}/cloud/users/{self.username}/groups"
headers = {'OCS-APIRequest': 'true'}
response = self._make_request('GET', url, headers=headers)
# Parse XML response
root = ET.fromstring(response.text)
# Check if 'admin' group is in the groups list
groups = []
for element in root.findall('.//element'):
groups.append(element.text)
return 'admin' in groups
except Exception:
return False
def propfind(self, path: str, depth: int = 1) -> Dict[str, Any]:
"""Execute a PROPFIND request on the given path."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
headers = {'Depth': str(depth)}
try:
response = self._make_request('PROPFIND', url, headers=headers)
return self._parse_propfind_response(response.text, path)
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return {'items': [], 'path': path, 'error': 'Path not found'}
raise
def _parse_propfind_response(self, xml_text: str, requested_path: str) -> Dict[str, Any]:
"""Parse WebDAV PROPFIND XML response into structured data."""
try:
# Parse XML
root = ET.fromstring(xml_text)
# Define namespace
ns = {
'd': 'DAV:',
'oc': 'http://owncloud.org/ns',
'nc': 'http://nextcloud.org/ns'
}
items = []
# Find all response elements
for response in root.findall('d:response', ns):
# Get href (path)
href_elem = response.find('d:href', ns)
if href_elem is None:
continue
href = unquote(href_elem.text)
# Extract filename from href
# href is like /remote.php/dav/files/username/path/to/file
parts = href.split('/files/' + self.username + '/')
if len(parts) > 1:
relative_path = parts[1].rstrip('/')
else:
continue
# Skip the parent directory in listings (depth=1 returns parent too)
if relative_path == requested_path.strip('/'):
continue
# Get properties
propstat = response.find('d:propstat', ns)
if propstat is None:
continue
prop = propstat.find('d:prop', ns)
if prop is None:
continue
# Check if it's a directory
resourcetype = prop.find('d:resourcetype', ns)
is_dir = resourcetype is not None and resourcetype.find('d:collection', ns) is not None
# Get file size
size_elem = prop.find('d:getcontentlength', ns)
size = int(size_elem.text) if size_elem is not None and size_elem.text else 0
# Get last modified
modified_elem = prop.find('d:getlastmodified', ns)
modified = modified_elem.text if modified_elem is not None else ''
# Get content type
content_type_elem = prop.find('d:getcontenttype', ns)
content_type = content_type_elem.text if content_type_elem is not None else ''
# Extract just the filename/dirname
name = relative_path.split('/')[-1] if '/' in relative_path else relative_path
items.append({
'name': name,
'path': '/' + relative_path,
'type': 'directory' if is_dir else 'file',
'size': size,
'modified': modified,
'content_type': content_type
})
return {
'items': items,
'path': requested_path,
'count': len(items)
}
except Exception as e:
return {
'items': [],
'path': requested_path,
'error': f'Failed to parse response: {str(e)}'
}
def put_file(self, path: str, data: bytes) -> Dict[str, Any]:
"""Upload a file to Nextcloud."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
headers = {
'Content-Type': 'application/octet-stream'
}
response = self._make_request('PUT', url, data=data, headers=headers)
return {
'success': True,
'url': url,
'status_code': response.status_code
}
def get_file(self, path: str) -> bytes:
"""Download a file from Nextcloud."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
response = self._make_request('GET', url)
return response.content
def head(self, path: str) -> bool:
"""Check if a file exists using HEAD request."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
try:
response = self._make_request('HEAD', url)
return response.status_code == 200
except Exception:
return False
def mkcol(self, path: str) -> bool:
"""Create a new directory."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
try:
response = self._make_request('MKCOL', url)
return response.status_code in [201, 405] # 201 Created, 405 if already exists
except requests.exceptions.HTTPError as e:
if e.response.status_code == 405:
# Directory already exists
return True
raise
def move(self, source: str, destination: str) -> bool:
"""Move or rename a file/directory."""
source_url = f"{self.webdav_root}/{source.lstrip('/')}"
dest_url = f"{self.webdav_root}/{destination.lstrip('/')}"
headers = {
'Destination': dest_url,
'Overwrite': 'F' # Don't overwrite existing files
}
response = self._make_request('MOVE', source_url, headers=headers)
return response.status_code in [201, 204] # 201 Created, 204 No Content
def delete(self, path: str) -> bool:
"""Delete a file or directory."""
url = f"{self.webdav_root}/{path.lstrip('/')}"
try:
response = self._make_request('DELETE', url)
return response.status_code == 204
except Exception:
return False
# OCS User Management Methods
def ocs_get_users(self) -> Dict[str, Any]:
"""Get list of all users via OCS API."""
url = f"{self.ocs_root}/cloud/users"
params = {'format': 'json'}
try:
response = self._make_request('GET', url, params=params)
data = response.json()
if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100:
raise Exception('OCS API error')
users = data.get('ocs', {}).get('data', {}).get('users', [])
# Get detailed info for each user
user_list = []
for username in users:
user_info = self.ocs_get_user(username)
if user_info:
user_list.append(user_info)
return {'success': True, 'users': user_list}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_get_user(self, username: str) -> Dict[str, Any]:
"""Get detailed info for a specific user."""
url = f"{self.ocs_root}/cloud/users/{username}"
params = {'format': 'json'}
try:
response = self._make_request('GET', url, params=params)
data = response.json()
if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100:
return None
user_data = data.get('ocs', {}).get('data', {})
return {
'id': user_data.get('id', username),
'displayname': user_data.get('displayname', ''),
'email': user_data.get('email', ''),
'enabled': user_data.get('enabled', True),
'groups': user_data.get('groups', [])
}
except Exception:
return None
def ocs_create_user(self, username: str, password: str, email: str = None,
displayname: str = None, groups: list = None) -> Dict[str, Any]:
"""Create a new user via OCS API."""
url = f"{self.ocs_root}/cloud/users"
params = {'format': 'json'}
data = {
'userid': username,
'password': password
}
if email:
data['email'] = email
if displayname:
data['displayName'] = displayname
if groups:
data['groups'] = groups
try:
response = self._make_request('POST', url, params=params, data=data)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
error_msg = meta.get('message', 'Failed to create user')
return {'success': False, 'error': error_msg}
return {'success': True, 'username': username}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_enable_user(self, username: str) -> Dict[str, Any]:
"""Enable a user account."""
url = f"{self.ocs_root}/cloud/users/{username}/enable"
params = {'format': 'json'}
try:
response = self._make_request('PUT', url, params=params)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
return {'success': False, 'error': meta.get('message', 'Failed to enable user')}
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_disable_user(self, username: str) -> Dict[str, Any]:
"""Disable a user account."""
url = f"{self.ocs_root}/cloud/users/{username}/disable"
params = {'format': 'json'}
try:
response = self._make_request('PUT', url, params=params)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
return {'success': False, 'error': meta.get('message', 'Failed to disable user')}
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
def ocs_delete_user(self, username: str) -> Dict[str, Any]:
"""Delete a user account."""
url = f"{self.ocs_root}/cloud/users/{username}"
params = {'format': 'json'}
try:
response = self._make_request('DELETE', url, params=params)
result = response.json()
meta = result.get('ocs', {}).get('meta', {})
if meta.get('statuscode') != 100:
return {'success': False, 'error': meta.get('message', 'Failed to delete user')}
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}