import requests from requests.auth import HTTPBasicAuth from typing import Optional, Dict, Any, List from xml.etree import ElementTree as ET from urllib.parse import quote, unquote class NextcloudClient: """Client for interacting with Nextcloud WebDAV and OCS APIs.""" def __init__(self, base_url: str, username: str, password: str): self.base_url = base_url.rstrip('/') self.username = username self.password = password self.auth = HTTPBasicAuth(username, password) self.webdav_root = f"{self.base_url}/remote.php/dav/files/{username}" self.ocs_root = f"{self.base_url}/ocs/v1.php" def _make_request(self, method: str, url: str, **kwargs) -> requests.Response: """Make an authenticated request to Nextcloud.""" kwargs['auth'] = self.auth kwargs.setdefault('timeout', 30) response = requests.request(method, url, **kwargs) response.raise_for_status() return response def verify_credentials(self) -> bool: """Verify that credentials are valid by attempting a PROPFIND on user root.""" try: response = self._make_request('PROPFIND', self.webdav_root, headers={'Depth': '0'}) return response.status_code in [200, 207] except Exception: return False def check_admin(self) -> bool: """Check if the user is in the admin group.""" try: # Get user's groups via OCS API url = f"{self.ocs_root}/cloud/users/{self.username}/groups" headers = {'OCS-APIRequest': 'true'} response = self._make_request('GET', url, headers=headers) # Parse XML response root = ET.fromstring(response.text) # Check if 'admin' group is in the groups list groups = [] for element in root.findall('.//element'): groups.append(element.text) return 'admin' in groups except Exception: return False def propfind(self, path: str, depth: int = 1) -> Dict[str, Any]: """Execute a PROPFIND request on the given path.""" url = f"{self.webdav_root}/{path.lstrip('/')}" headers = {'Depth': str(depth)} try: response = self._make_request('PROPFIND', url, headers=headers) return self._parse_propfind_response(response.text, path) except requests.exceptions.HTTPError as e: if e.response.status_code == 404: return {'items': [], 'path': path, 'error': 'Path not found'} raise def _parse_propfind_response(self, xml_text: str, requested_path: str) -> Dict[str, Any]: """Parse WebDAV PROPFIND XML response into structured data.""" try: # Parse XML root = ET.fromstring(xml_text) # Define namespace ns = { 'd': 'DAV:', 'oc': 'http://owncloud.org/ns', 'nc': 'http://nextcloud.org/ns' } items = [] # Find all response elements for response in root.findall('d:response', ns): # Get href (path) href_elem = response.find('d:href', ns) if href_elem is None: continue href = unquote(href_elem.text) # Extract filename from href # href is like /remote.php/dav/files/username/path/to/file parts = href.split('/files/' + self.username + '/') if len(parts) > 1: relative_path = parts[1].rstrip('/') else: continue # Skip the parent directory in listings (depth=1 returns parent too) if relative_path == requested_path.strip('/'): continue # Get properties propstat = response.find('d:propstat', ns) if propstat is None: continue prop = propstat.find('d:prop', ns) if prop is None: continue # Check if it's a directory resourcetype = prop.find('d:resourcetype', ns) is_dir = resourcetype is not None and resourcetype.find('d:collection', ns) is not None # Get file size size_elem = prop.find('d:getcontentlength', ns) size = int(size_elem.text) if size_elem is not None and size_elem.text else 0 # Get last modified modified_elem = prop.find('d:getlastmodified', ns) modified = modified_elem.text if modified_elem is not None else '' # Get content type content_type_elem = prop.find('d:getcontenttype', ns) content_type = content_type_elem.text if content_type_elem is not None else '' # Extract just the filename/dirname name = relative_path.split('/')[-1] if '/' in relative_path else relative_path items.append({ 'name': name, 'path': '/' + relative_path, 'type': 'directory' if is_dir else 'file', 'size': size, 'modified': modified, 'content_type': content_type }) return { 'items': items, 'path': requested_path, 'count': len(items) } except Exception as e: return { 'items': [], 'path': requested_path, 'error': f'Failed to parse response: {str(e)}' } def put_file(self, path: str, data: bytes) -> Dict[str, Any]: """Upload a file to Nextcloud.""" url = f"{self.webdav_root}/{path.lstrip('/')}" headers = { 'Content-Type': 'application/octet-stream' } response = self._make_request('PUT', url, data=data, headers=headers) return { 'success': True, 'url': url, 'status_code': response.status_code } def get_file(self, path: str) -> bytes: """Download a file from Nextcloud.""" url = f"{self.webdav_root}/{path.lstrip('/')}" response = self._make_request('GET', url) return response.content def head(self, path: str) -> bool: """Check if a file exists using HEAD request.""" url = f"{self.webdav_root}/{path.lstrip('/')}" try: response = self._make_request('HEAD', url) return response.status_code == 200 except Exception: return False def mkcol(self, path: str) -> bool: """Create a new directory.""" url = f"{self.webdav_root}/{path.lstrip('/')}" try: response = self._make_request('MKCOL', url) return response.status_code in [201, 405] # 201 Created, 405 if already exists except requests.exceptions.HTTPError as e: if e.response.status_code == 405: # Directory already exists return True raise def move(self, source: str, destination: str) -> bool: """Move or rename a file/directory.""" source_url = f"{self.webdav_root}/{source.lstrip('/')}" dest_url = f"{self.webdav_root}/{destination.lstrip('/')}" headers = { 'Destination': dest_url, 'Overwrite': 'F' # Don't overwrite existing files } response = self._make_request('MOVE', source_url, headers=headers) return response.status_code in [201, 204] # 201 Created, 204 No Content def delete(self, path: str) -> bool: """Delete a file or directory.""" url = f"{self.webdav_root}/{path.lstrip('/')}" try: response = self._make_request('DELETE', url) return response.status_code == 204 except Exception: return False # OCS User Management Methods @property def _ocs_headers(self): return {'OCS-APIRequest': 'true'} def ocs_get_users(self) -> Dict[str, Any]: """Get list of all users via OCS API.""" url = f"{self.ocs_root}/cloud/users" params = {'format': 'json'} try: response = self._make_request('GET', url, params=params, headers=self._ocs_headers) data = response.json() if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100: raise Exception('OCS API error') users = data.get('ocs', {}).get('data', {}).get('users', []) # Get detailed info for each user user_list = [] for username in users: user_info = self.ocs_get_user(username) if user_info: user_list.append(user_info) return {'success': True, 'users': user_list} except Exception as e: return {'success': False, 'error': str(e)} def ocs_get_user(self, username: str) -> Dict[str, Any]: """Get detailed info for a specific user.""" url = f"{self.ocs_root}/cloud/users/{username}" params = {'format': 'json'} try: response = self._make_request('GET', url, params=params, headers=self._ocs_headers) data = response.json() if data.get('ocs', {}).get('meta', {}).get('statuscode') != 100: return None user_data = data.get('ocs', {}).get('data', {}) return { 'id': user_data.get('id', username), 'displayname': user_data.get('displayname', ''), 'email': user_data.get('email', ''), 'enabled': user_data.get('enabled', True), 'groups': user_data.get('groups', []) } except Exception: return None def ocs_create_user(self, username: str, password: str, email: str = None, displayname: str = None, groups: list = None) -> Dict[str, Any]: """Create a new user via OCS API.""" url = f"{self.ocs_root}/cloud/users" params = {'format': 'json'} data = { 'userid': username, 'password': password } if email: data['email'] = email if displayname: data['displayName'] = displayname if groups: data['groups'] = groups try: response = self._make_request('POST', url, params=params, data=data, headers=self._ocs_headers) result = response.json() meta = result.get('ocs', {}).get('meta', {}) if meta.get('statuscode') != 100: error_msg = meta.get('message', 'Failed to create user') return {'success': False, 'error': error_msg} return {'success': True, 'username': username} except Exception as e: return {'success': False, 'error': str(e)} def ocs_enable_user(self, username: str) -> Dict[str, Any]: """Enable a user account.""" url = f"{self.ocs_root}/cloud/users/{username}/enable" params = {'format': 'json'} try: response = self._make_request('PUT', url, params=params, headers=self._ocs_headers) result = response.json() meta = result.get('ocs', {}).get('meta', {}) if meta.get('statuscode') != 100: return {'success': False, 'error': meta.get('message', 'Failed to enable user')} return {'success': True} except Exception as e: return {'success': False, 'error': str(e)} def ocs_disable_user(self, username: str) -> Dict[str, Any]: """Disable a user account.""" url = f"{self.ocs_root}/cloud/users/{username}/disable" params = {'format': 'json'} try: response = self._make_request('PUT', url, params=params, headers=self._ocs_headers) result = response.json() meta = result.get('ocs', {}).get('meta', {}) if meta.get('statuscode') != 100: return {'success': False, 'error': meta.get('message', 'Failed to disable user')} return {'success': True} except Exception as e: return {'success': False, 'error': str(e)} def ocs_delete_user(self, username: str) -> Dict[str, Any]: """Delete a user account.""" url = f"{self.ocs_root}/cloud/users/{username}" params = {'format': 'json'} try: response = self._make_request('DELETE', url, params=params, headers=self._ocs_headers) result = response.json() meta = result.get('ocs', {}).get('meta', {}) if meta.get('statuscode') != 100: return {'success': False, 'error': meta.get('message', 'Failed to delete user')} return {'success': True} except Exception as e: return {'success': False, 'error': str(e)} def ocs_set_password(self, username: str, password: str) -> Dict[str, Any]: """Set a user's password via OCS API.""" url = f"{self.ocs_root}/cloud/users/{username}" params = {"format": "json"} data = {"key": "password", "value": password} try: response = self._make_request("PUT", url, params=params, data=data, headers=self._ocs_headers) result = response.json() meta = result.get("ocs", {}).get("meta", {}) if meta.get("statuscode") != 100: return {"success": False, "error": meta.get("message", "Failed to set password")} return {"success": True} except Exception as e: return {"success": False, "error": str(e)}