Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for WebDAV SystemTags #67

Open
luffah opened this issue Apr 22, 2021 · 0 comments
Open

Support for WebDAV SystemTags #67

luffah opened this issue Apr 22, 2021 · 0 comments

Comments

@luffah
Copy link

luffah commented Apr 22, 2021

Hello,

The tags features is not yet supported by this lib.
Given i need this feature for a project, for now, i just added a plugin.
Below the code.

Shall i fork and PR the mod ? (i don't know if the project still active)

import re
import requests
import json
from nextcloud import NextCloud as _Nextcloud
from nextcloud.api_wrappers import OCS_API_CLASSES
from nextcloud.api_wrappers.webdav import File, WebDAV
from nextcloud.base import WithRequester
from nextcloud.requester import WebDAVRequester, WebDAVResponse
from xml.etree import ElementTree as ET


# to simplify code parts with DAV requests
XML_REQUEST_PROPFIND_TEMPLATE = """<?xml version="1.0"?>
    <d:propfind xmlns:d="DAV:" xmlns:oc="http://owncloud.org/ns"
        xmlns:nc="http://nextcloud.org/ns">
        <d:prop>
           {0}
        </d:prop>
    </d:propfind>
"""

def build_xml_propfind_query(oc_fields=[], d_fields=[]):
    props_xml = ""
    for field in d_fields:
        props_xml += "<d:{} />".format(field)
    for field in oc_fields:
        props_xml += "<oc:{} />".format(field)
    return XML_REQUEST_PROPFIND_TEMPLATE.format(props_xml)



"""
Stick new classes to nextcloud_api lib
"""

WebDAVResponse.METHODS_SUCCESS_CODES['POST'] = WebDAVResponse.METHODS_SUCCESS_CODES['PUT']

class WebDAVExtented(WebDAV):

    def get_file_property(self, uid, path, field, tag='oc'):
        get_file_prop_xpath = '{DAV:}propstat/d:prop/%s:%s' % (tag, field)
        data = build_xml_propfind_query([field])

        additional_url = uid
        additional_url = "{}/{}".format(uid, path)
        resp = self.requester.propfind(
            additional_url=additional_url,
            headers={"Depth": str(0)},
            data=data)

        response_data = resp.data
        resp.data = None

        if not resp.is_ok:
            return resp

        response_xml_data = ET.fromstring(response_data)
        for xml_data in response_xml_data:
            for prop in xml_data.findall(get_file_prop_xpath, File.xml_namespaces_map):
                resp.data = prop.text
            break

        return resp


class NextCloud(_Nextcloud):

    def __init__(self, endpoint, user, password, json_output=True):
        super().__init__(endpoint, user, password, json_output=json_output)
        webdav_requester = WebDAVRequester(endpoint, user, password)
        functionality_classes = [
            WebDAVExtented(webdav_requester, json_output=json_output),
            SystemTags(webdav_requester, json_output=json_output),
            SystemTagsRelation(webdav_requester, json_output=json_output,
                               client=self),  # require to fetch both tag and files
        ]
        self.functionality_classes += functionality_classes
        for functionality_class in functionality_classes:
            for potential_method in dir(functionality_class):
                if(
                    potential_method.startswith('_')
                    or not callable(getattr(functionality_class, potential_method))
                ):
                    continue
                setattr(self, potential_method, getattr(
                    functionality_class, potential_method))


"""
 Define properties models
"""
class Prop():

    def __init__(self, xml_name, json=None, default=None):
        self.attr_name = self._xml_name_to_py_name(xml_name)
        self.json_key = json
        self.xml_key = xml_name
        self.default_val = default

    @staticmethod
    def _xml_name_to_py_name(name):
        return name.replace('-', '_')

    @staticmethod
    def _py_name_to_xml_name(name):
        return name.replace('_', '-')


class DProp(Prop):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.type = 'd'


class OCProp(Prop):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.type = 'oc'


"""
Define generic result class
that could be common to File and Tag
"""
class AttributeCollection():
    SUCCESS_STATUS = 'HTTP/1.1 200 OK'
    COLLECTION_RESOURCE_TYPE = 'collection'
    _attrs = []
    _xml_namespaces_map = File.xml_namespaces_map

    @property
    def _fields(self):
        return [v.attr_name for v in self._attrs]

    @property
    def _properties(self):
        return [v.xml_key for v in self._attrs]

    def __init__(self, xml_data):
        self.href = xml_data.find('d:href', self._xml_namespaces_map).text
        for attr in self._attrs:
            setattr(self, attr.attr_name, None)
        for propstat in xml_data.iter('{DAV:}propstat'):
            if propstat.find('d:status', self._xml_namespaces_map).text != self.SUCCESS_STATUS:
                continue
            for xml_property in propstat.find('d:prop', self._xml_namespaces_map):
                property_name = re.sub("{.*}", "", xml_property.tag)
                if property_name not in self._properties:
                    continue
                value = self._get_property_value(xml_property)
                setattr(self, Prop._xml_name_to_py_name(property_name), value)

    @classmethod
    def _get_property_value(self, xml_property):
        return xml_property.text

    @classmethod
    def default_get(cls, key_format='json', **kwargs):
        vals = {getattr(v, "%s_key" % key_format): v.default_val
                for v in cls._attrs
                if getattr(v, "%s_key" % key_format, False)}
        vals.update(kwargs)
        return vals

    @classmethod
    def build_xml_propfind_query(cls, fields=None):
        if not fields:
            fields = [attr.xml_key for attr in cls._attrs]
        return build_xml_propfind_query(oc_fields=fields)

    @classmethod
    def from_response(cls, resp, json_output=None, filtered=None):
        if not resp.is_ok:
            resp.data = None
            return resp
        response_data = resp.data
        response_xml_data = ET.fromstring(response_data)
        attr_datas = [cls(xml_data) for xml_data in response_xml_data]
        if filtered and callable(filtered):
            attr_datas = [attr_data for attr_data in attr_datas
                          if filtered(attr_data)]
        resp.data = attr_datas if not json_output else [
            attr_data.as_dict() for attr_data in attr_datas]
        return resp

    def as_dict(self):
        attrs = [v.attr_name for v in self._attrs]
        return {key: value
                for key, value in self.__dict__.items()
                if key in attrs}


"""
Define main processings
"""
class Tag(AttributeCollection):

    _attrs = [
        OCProp("id"),
        OCProp("display-name", json="name", default='default_tag_name'),
        OCProp("user-visible", json="userVisible", default=True),
        OCProp("can-assign", json="canAssign", default=True),
        OCProp("user-assignable", json="userAssignable", default=True)
    ]

class SystemTags(WithRequester):
    API_URL = '/remote.php/dav/systemtags'
    CREATED_CODE = 201

    def __init__(self, *args, **kwargs):
        super(SystemTags, self).__init__(*args)
        self.json_output = kwargs.get('json_output')

    def get_sytemtag(self, name, fields=None, json_output=None):
        if not fields:
            fields = Tag._fields
        resp = self.requester.propfind(
            data=Tag.build_xml_propfind_query(
                set(['display-name', *fields])
            )
        )
        if json_output is None:
            json_output = self.json_output
        return Tag.from_response(
            resp,
            json_output=json_output,
            filtered=lambda t: t.display_name == name)

    def get_systemtags(self, name=None):
        """
        Get list of all tags

        Returns: response with <list>Tag in data
        """
        resp = self.requester.propfind(data=Tag.build_xml_propfind_query())
        return Tag.from_response(resp, json_output=self.json_output)

    def create_systemtag(self, name, **kwargs):
        """
        Create a new system tag from name.
        Returns requester response with file id as data
        """
        data = Tag.default_get(name=name, **kwargs)
        url = self.requester.get_full_url()
        # i use requests because headers, can't be set in requester.post
        # resp = self.requester.post(data=data)
        res = requests.post(url, auth=self.requester.auth_pk,
                            data=json.dumps(data), headers={
                                "Content-Type": "application/json"
                            })
        resp = self.requester.rtn(res)
        if res.status_code == self.CREATED_CODE:
            resp.data = int(res.headers['Content-Location'].split('/')[-1])

        return resp

    def delete_systemtag(self, name=None, tag_id=None):
        """
        Delete systemtag
        Args:
          name    (str): tag name
        OR
          tag_id  (int): tag id
        Returns response
        """

        if not tag_id:
            resp = self.get_sytemtag(name, ['id'],
                                     json_output=False)
            if resp.data:
                tag_id = resp.data[0].id

        if tag_id:
            resp = self.requester.delete(url=str(tag_id))
        return resp


class FluidArgumentsMixin():
    """
    a stupid mixin to avoid code repetition
    using dangerous getattr functions
    with major counter side of loosing 
    some transaction infos
    """
    def _arguments_get(self, varnames, vals):
        if 'kwargs' in vals:
            vals.update(vals['kwargs'])
        ret = []
        for varname in varnames:
            val = vals.get(varname, None)
            if val is None:
                getter_func_name = '_default_get_%s' % varname
                if hasattr(self, getter_func_name):
                    val = getattr(self, getter_func_name)(vals)
            ret.append(val)
        return ret


class SystemTagsRelation(WithRequester, FluidArgumentsMixin):
    API_URL = '/remote.php/dav/systemtags-relations/files'


    def __init__(self, *args, **kwargs):
        super(SystemTagsRelation, self).__init__(*args)
        self.json_output = kwargs.get('json_output')
        self.client = kwargs.get('client')

    def _get_fileid_from_path(self, uid, path):
        """ Tricky function to fetch file """
        resp = self.client.get_file_property(uid, path, 'fileid')
        id_ = None
        if resp.data:
            id_ = int(resp.data)
        return id_

    def _get_systemtag_id_from_name(self, name):
        resp = self.client.get_sytemtag(name, ['id'],
                                 json_output=False)
        tag_id = None
        if resp.data:
            tag_id = int(resp.data[0].id)
        return tag_id


    def _default_get_file_id(self, vals):
        uid = vals.get('uid', None)
        path = vals.get('path', None)
        if not (uid and path):
            raise ValueError("Insufficient infos about the file")
        return self._get_fileid_from_path(uid, path)

    def _default_get_tag_id(self, vals):
        tag_name = vals.get('tag_name', None)
        if not tag_name:
            raise ValueError("Insufficient infos about the tag")
        return self._get_systemtag_id_from_name(tag_name)

    def get_systemtags_relation(self, file_id=None, **kwargs):
        """
        Get all tags from a given file/folder

        Args:
          file_id (int): file id found from file object
         OR
             uid  (str): user (to know from where fetch file)
             path (str): path to file/folder

        Returns:
        """

        (file_id,) = self._arguments_get(['file_id'], locals())

        data = Tag.build_xml_propfind_query()

        resp = self.requester.propfind(additional_url=file_id,
                                       data=data)

        return Tag.from_response(resp, json_output=self.json_output)

    def delete_systemtags_relation(self, file_id=None, tag_id=None, **kwargs):
        """
        Delete a tag from a given file/folder

        Args:
          file_id (int): id found in file object
          tag_id  (int): id found in tag object

        Returns:
        """
        (file_id, tag_id) = self._arguments_get(['file_id', 'tag_id'], locals())

        resp = self.requester.delete(
            url="{}/{}".format(file_id, tag_id))
        return resp

    def add_systemtags_relation(self, file_id=None, tag_id=None, **kwargs):
        """
        set a tag from a given file/folder

        Args:
           file_id (int): id found in file object
            tag_id (int): id found in tag object
        (if you didn't provided file_id)
              uid  (str): user (to know from where fetch file)
              path (str): path to file/folder
        (if you didn't provided tag_id)
          tag_name (str): tag_name to search or create

        Returns:
        """
        (file_id, tag_id) = self._arguments_get(['file_id', 'tag_id'], locals())
        if not tag_id and 'tag_name' in kwargs:
            resp = self.client.create_systemtag(kwargs['tag_name'])
            if not resp.is_ok:
                return resp
            tag_id = resp.data
        if not file_id:
            raise ValueError('No file found')

        data = Tag.build_xml_propfind_query()

        resp = self.requester.put(url="{}/{}".format(file_id, tag_id))
        return resp
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant