Offline-first photo capture app for Nextcloud with: - Camera capture with continuous mode (auto-reopens after each photo) - File browser with fullscreen image gallery, swipe navigation, and rename - Upload queue with background sync engine - Admin panel for Nextcloud user management - Service worker for offline-first caching (v13) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
351 lines
13 KiB
Python
351 lines
13 KiB
Python
import requests
|
|
from requests.auth import HTTPBasicAuth
|
|
from typing import Optional, Dict, Any, List
|
|
from xml.etree import ElementTree as ET
|
|
from urllib.parse import quote, unquote
|
|
|
|
class NextcloudClient:
|
|
"""Client for interacting with Nextcloud WebDAV and OCS APIs."""
|
|
|
|
def __init__(self, base_url: str, username: str, password: str):
|
|
self.base_url = base_url.rstrip('/')
|
|
self.username = username
|
|
self.password = password
|
|
self.auth = HTTPBasicAuth(username, password)
|
|
self.webdav_root = f"{self.base_url}/remote.php/dav/files/{username}"
|
|
self.ocs_root = f"{self.base_url}/ocs/v1.php"
|
|
|
|
def _make_request(self, method: str, url: str, **kwargs) -> requests.Response:
|
|
"""Make an authenticated request to Nextcloud."""
|
|
kwargs['auth'] = self.auth
|
|
kwargs.setdefault('timeout', 30)
|
|
response = requests.request(method, url, **kwargs)
|
|
response.raise_for_status()
|
|
return response
|
|
|
|
def verify_credentials(self) -> bool:
|
|
"""Verify that credentials are valid by attempting a PROPFIND on user root."""
|
|
try:
|
|
response = self._make_request('PROPFIND', self.webdav_root, headers={'Depth': '0'})
|
|
return response.status_code in [200, 207]
|
|
except Exception:
|
|
return False
|
|
|
|
def check_admin(self) -> bool:
|
|
"""Check if the user is in the admin group."""
|
|
try:
|
|
# Get user's groups via OCS API
|
|
url = f"{self.ocs_root}/cloud/users/{self.username}/groups"
|
|
headers = {'OCS-APIRequest': 'true'}
|
|
response = self._make_request('GET', url, headers=headers)
|
|
|
|
# Parse XML response
|
|
root = ET.fromstring(response.text)
|
|
|
|
# Check if 'admin' group is in the groups list
|
|
groups = []
|
|
for element in root.findall('.//element'):
|
|
groups.append(element.text)
|
|
|
|
return 'admin' in groups
|
|
except Exception:
|
|
return False
|
|
|
|
def propfind(self, path: str, depth: int = 1) -> Dict[str, Any]:
|
|
"""Execute a PROPFIND request on the given path."""
|
|
url = f"{self.webdav_root}/{path.lstrip('/')}"
|
|
headers = {'Depth': str(depth)}
|
|
|
|
try:
|
|
response = self._make_request('PROPFIND', url, headers=headers)
|
|
return self._parse_propfind_response(response.text, path)
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 404:
|
|
return {'items': [], 'path': path, 'error': 'Path not found'}
|
|
raise
|
|
|
|
def _parse_propfind_response(self, xml_text: str, requested_path: str) -> Dict[str, Any]:
|
|
"""Parse WebDAV PROPFIND XML response into structured data."""
|
|
try:
|
|
# Parse XML
|
|
root = ET.fromstring(xml_text)
|
|
|
|
# Define namespace
|
|
ns = {
|
|
'd': 'DAV:',
|
|
'oc': 'http://owncloud.org/ns',
|
|
'nc': 'http://nextcloud.org/ns'
|
|
}
|
|
|
|
items = []
|
|
|
|
# Find all response elements
|
|
for response in root.findall('d:response', ns):
|
|
# Get href (path)
|
|
href_elem = response.find('d:href', ns)
|
|
if href_elem is None:
|
|
continue
|
|
|
|
href = unquote(href_elem.text)
|
|
|
|
# Extract filename from href
|
|
# href is like /remote.php/dav/files/username/path/to/file
|
|
parts = href.split('/files/' + self.username + '/')
|
|
if len(parts) > 1:
|
|
relative_path = parts[1].rstrip('/')
|
|
else:
|
|
continue
|
|
|
|
# Skip the parent directory in listings (depth=1 returns parent too)
|
|
if relative_path == requested_path.strip('/'):
|
|
continue
|
|
|
|
# Get properties
|
|
propstat = response.find('d:propstat', ns)
|
|
if propstat is None:
|
|
continue
|
|
|
|
prop = propstat.find('d:prop', ns)
|
|
if prop is None:
|
|
continue
|
|
|
|
# Check if it's a directory
|
|
resourcetype = prop.find('d:resourcetype', ns)
|
|
is_dir = resourcetype is not None and resourcetype.find('d:collection', ns) is not None
|
|
|
|
# Get file size
|
|
size_elem = prop.find('d:getcontentlength', ns)
|
|
size = int(size_elem.text) if size_elem is not None and size_elem.text else 0
|
|
|
|
# Get last modified
|
|
modified_elem = prop.find('d:getlastmodified', ns)
|
|
modified = modified_elem.text if modified_elem is not None else ''
|
|
|
|
# Get content type
|
|
content_type_elem = prop.find('d:getcontenttype', ns)
|
|
content_type = content_type_elem.text if content_type_elem is not None else ''
|
|
|
|
# Extract just the filename/dirname
|
|
name = relative_path.split('/')[-1] if '/' in relative_path else relative_path
|
|
|
|
items.append({
|
|
'name': name,
|
|
'path': '/' + relative_path,
|
|
'type': 'directory' if is_dir else 'file',
|
|
'size': size,
|
|
'modified': modified,
|
|
'content_type': content_type
|
|
})
|
|
|
|
return {
|
|
'items': items,
|
|
'path': requested_path,
|
|
'count': len(items)
|
|
}
|
|
except Exception as e:
|
|
return {
|
|
'items': [],
|
|
'path': requested_path,
|
|
'error': f'Failed to parse response: {str(e)}'
|
|
}
|
|
|
|
def put_file(self, path: str, data: bytes) -> Dict[str, Any]:
|
|
"""Upload a file to Nextcloud."""
|
|
url = f"{self.webdav_root}/{path.lstrip('/')}"
|
|
|
|
headers = {
|
|
'Content-Type': 'application/octet-stream'
|
|
}
|
|
|
|
response = self._make_request('PUT', url, data=data, headers=headers)
|
|
|
|
return {
|
|
'success': True,
|
|
'url': url,
|
|
'status_code': response.status_code
|
|
}
|
|
|
|
def get_file(self, path: str) -> bytes:
|
|
"""Download a file from Nextcloud."""
|
|
url = f"{self.webdav_root}/{path.lstrip('/')}"
|
|
response = self._make_request('GET', url)
|
|
return response.content
|
|
|
|
def head(self, path: str) -> bool:
|
|
"""Check if a file exists using HEAD request."""
|
|
url = f"{self.webdav_root}/{path.lstrip('/')}"
|
|
try:
|
|
response = self._make_request('HEAD', url)
|
|
return response.status_code == 200
|
|
except Exception:
|
|
return False
|
|
|
|
def mkcol(self, path: str) -> bool:
|
|
"""Create a new directory."""
|
|
url = f"{self.webdav_root}/{path.lstrip('/')}"
|
|
|
|
try:
|
|
response = self._make_request('MKCOL', url)
|
|
return response.status_code in [201, 405] # 201 Created, 405 if already exists
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 405:
|
|
# Directory already exists
|
|
return True
|
|
raise
|
|
|
|
def move(self, source: str, destination: str) -> bool:
|
|
"""Move or rename a file/directory."""
|
|
source_url = f"{self.webdav_root}/{source.lstrip('/')}"
|
|
dest_url = f"{self.webdav_root}/{destination.lstrip('/')}"
|
|
|
|
headers = {
|
|
'Destination': dest_url,
|
|
'Overwrite': 'F' # Don't overwrite existing files
|
|
}
|
|
|
|
response = self._make_request('MOVE', source_url, headers=headers)
|
|
return response.status_code in [201, 204] # 201 Created, 204 No Content
|
|
|
|
def delete(self, path: str) -> bool:
|
|
"""Delete a file or directory."""
|
|
url = f"{self.webdav_root}/{path.lstrip('/')}"
|
|
|
|
try:
|
|
response = self._make_request('DELETE', url)
|
|
return response.status_code == 204
|
|
except Exception:
|
|
return False
|
|
|
|
# OCS User Management Methods
|
|
|
|
def ocs_get_users(self) -> Dict[str, Any]:
|
|
"""Get list of all users via OCS API."""
|
|
url = f"{self.ocs_root}/cloud/users"
|
|
params = {'format': 'json'}
|
|
|
|
try:
|
|
response = self._make_request('GET', url, params=params)
|
|
data = response.json()
|
|
|
|
if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100:
|
|
raise Exception('OCS API error')
|
|
|
|
users = data.get('ocs', {}).get('data', {}).get('users', [])
|
|
|
|
# Get detailed info for each user
|
|
user_list = []
|
|
for username in users:
|
|
user_info = self.ocs_get_user(username)
|
|
if user_info:
|
|
user_list.append(user_info)
|
|
|
|
return {'success': True, 'users': user_list}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def ocs_get_user(self, username: str) -> Dict[str, Any]:
|
|
"""Get detailed info for a specific user."""
|
|
url = f"{self.ocs_root}/cloud/users/{username}"
|
|
params = {'format': 'json'}
|
|
|
|
try:
|
|
response = self._make_request('GET', url, params=params)
|
|
data = response.json()
|
|
|
|
if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100:
|
|
return None
|
|
|
|
user_data = data.get('ocs', {}).get('data', {})
|
|
|
|
return {
|
|
'id': user_data.get('id', username),
|
|
'displayname': user_data.get('displayname', ''),
|
|
'email': user_data.get('email', ''),
|
|
'enabled': user_data.get('enabled', True),
|
|
'groups': user_data.get('groups', [])
|
|
}
|
|
except Exception:
|
|
return None
|
|
|
|
def ocs_create_user(self, username: str, password: str, email: str = None,
|
|
displayname: str = None, groups: list = None) -> Dict[str, Any]:
|
|
"""Create a new user via OCS API."""
|
|
url = f"{self.ocs_root}/cloud/users"
|
|
params = {'format': 'json'}
|
|
|
|
data = {
|
|
'userid': username,
|
|
'password': password
|
|
}
|
|
|
|
if email:
|
|
data['email'] = email
|
|
if displayname:
|
|
data['displayName'] = displayname
|
|
if groups:
|
|
data['groups'] = groups
|
|
|
|
try:
|
|
response = self._make_request('POST', url, params=params, data=data)
|
|
result = response.json()
|
|
|
|
meta = result.get('ocs', {}).get('meta', {})
|
|
if meta.get('statuscode') != 100:
|
|
error_msg = meta.get('message', 'Failed to create user')
|
|
return {'success': False, 'error': error_msg}
|
|
|
|
return {'success': True, 'username': username}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def ocs_enable_user(self, username: str) -> Dict[str, Any]:
|
|
"""Enable a user account."""
|
|
url = f"{self.ocs_root}/cloud/users/{username}/enable"
|
|
params = {'format': 'json'}
|
|
|
|
try:
|
|
response = self._make_request('PUT', url, params=params)
|
|
result = response.json()
|
|
|
|
meta = result.get('ocs', {}).get('meta', {})
|
|
if meta.get('statuscode') != 100:
|
|
return {'success': False, 'error': meta.get('message', 'Failed to enable user')}
|
|
|
|
return {'success': True}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def ocs_disable_user(self, username: str) -> Dict[str, Any]:
|
|
"""Disable a user account."""
|
|
url = f"{self.ocs_root}/cloud/users/{username}/disable"
|
|
params = {'format': 'json'}
|
|
|
|
try:
|
|
response = self._make_request('PUT', url, params=params)
|
|
result = response.json()
|
|
|
|
meta = result.get('ocs', {}).get('meta', {})
|
|
if meta.get('statuscode') != 100:
|
|
return {'success': False, 'error': meta.get('message', 'Failed to disable user')}
|
|
|
|
return {'success': True}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|
|
|
|
def ocs_delete_user(self, username: str) -> Dict[str, Any]:
|
|
"""Delete a user account."""
|
|
url = f"{self.ocs_root}/cloud/users/{username}"
|
|
params = {'format': 'json'}
|
|
|
|
try:
|
|
response = self._make_request('DELETE', url, params=params)
|
|
result = response.json()
|
|
|
|
meta = result.get('ocs', {}).get('meta', {})
|
|
if meta.get('statuscode') != 100:
|
|
return {'success': False, 'error': meta.get('message', 'Failed to delete user')}
|
|
|
|
return {'success': True}
|
|
except Exception as e:
|
|
return {'success': False, 'error': str(e)}
|