# _*_ coding:utf-8 _*_ # Tiny WebDav Server for Pythonista - IOS. (Base on pandav WebDav server ) # # (C)2013/11 By: Lai ChuJiang # # this code can run not just Pythonista on IOS ,also run on OSX python. # Support Client: Windows / OSX / other Webdav client for IOS,etc : FE File explorer / goodreader / iWorks for ios # # 2024/6/1 Change Log: # Python3 support # add do_PROPPATCH function, now Windows Explorer full support. (!! windows explorer limit filesize < 50MBytes, you can using windows regedit edit it !!) # support show webdav request/response info. ( sys_debug = True ) # bug fix # # 20170907 Change Log: # 1. Fix Coda WebDav access problem.(because Coda WebDav all xml item add xmlns="DAV:".) # # 2013/11 Change Log: # 1. Combind all files to one file,so can using for Pythonista easy. # 2. Add MKCOL(Create dir); MOVE(rename file); DELETE(delete file or dir); COPY (Copy file) # 3. Change some decode, Now it's can support Chinese. # 4. Pythonista(For IOS) not dircache module ,so change code, don't using this module. # 5. change DAV version from 1 to 2, so the OSX finder can write. # 6. for DAV version 2 support , add LOCK / UNLOCK fake support. (not real lock file) # *** !!!! So Don't using > 1 client sametime write or delete same file. maybe lost files. # 7. Change the do_PROPFIND module, now it's simply & right for OSX # 8. Change the do_GET module, now support RANGE # 9. Change the do_PUT module, add Content-Length=0 support (create empty file) ,so the OSX Finder support. # * if not add empty file, the Finder copy files and then delete all this. #10. Add WebDav Basic Auth function,now you can set user & passwd # ** using wdusers.conf file (just user:passwd), if not this file ,the Auth disable. #11. Fix the broken pipe error message # # WebDav RFC: http://www.ics.uci.edu/~ejw/authoring/protocol/rfc2518.html # http://restpatterns.org/HTTP_Methods # # Base : pandav v0.2 # Copyright (c) 2005.-2006. Ivan Voras # Released under the Artistic License # from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn import urllib.request, urllib.parse, urllib.error, urllib.parse from time import timezone, strftime, localtime, gmtime import os, sys, re, shutil, uuid, hashlib, mimetypes, base64, socket # Debug message ( True / False ) sys_debug = False # get localhost IP address def get_localip(): try: gAdd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) gAdd.connect(('www.bing.com', 80)) return gAdd.getsockname()[0] except: return '127.0.0.1' class Member: M_MEMBER = 1 M_COLLECTION = 2 def getProperties(self): return {} class Collection(Member): def __init__(self, name): self.name = name def getMembers(self): return [] class FileMember(Member): def __init__(self, name, parent): self.name = name self.parent = parent self.name = name self.fsname = parent.fsname + name # e.g. '/var/www/mysite/some.txt' self.virname = parent.virname + name # e.g. '/mysite/some.txt' self.type = Member.M_MEMBER def __str__(self): return "%s -> %s" % (self.virname, self.fsname) def getProperties(self): """Return dictionary with WebDAV properties. Values shold be formatted according to the WebDAV specs.""" st = os.stat(self.fsname) p = {} p['creationdate'] = unixdate2iso8601(st.st_ctime) p['getlastmodified'] = unixdate2httpdate(st.st_mtime) p['displayname'] = self.name # p['getetag'] = md5.new(self.fsname).hexdigest() m = hashlib.md5() m.update(self.fsname.encode('utf-8')) p['getetag'] = m.hexdigest() if self.type == Member.M_MEMBER: p['getcontentlength'] = st.st_size p['getcontenttype'], z = mimetypes.guess_type(self.name) p['getcontentlanguage'] = None else: # Member.M_COLLECTION p['resourcetype'] = '' if self.name[0] == ".": p['ishidden'] = 1 if not os.access(self.fsname, os.W_OK): p['isreadonly'] = 1 if self.name == '/': p['isroot'] = 1 return p def sendData(self, wfile,bpoint=0,epoint=0): """Send the file to the client. Literally.""" st = os.stat(self.fsname) f = open(self.fsname, 'rb') writ = 0 # for send Range xxx-xxx if bpoint>0 and bpointbpoint: if epoint<=st.st_size: rsize = epoint - bpoint + 1 else: rsize = st.st_size - bpoint else: rsize = st.st_size while writ < rsize: if (rsize - writ) < 65536: buf = f.read(rsize) else: buf = f.read(65536) if not buf: break writ += len(buf) wfile.write(buf) f.close() def findMember(self, name): ''' 2024/6/1 add ''' return None class DirCollection(FileMember, Collection): COLLECTION_MIME_TYPE = 'httpd/unix-directory' # application/x-collection ? def __init__(self, fsdir, virdir, parent=None): if not os.path.exists(fsdir): raise "Local directory (fsdir) not found: " + fsdir self.fsname = fsdir self.name = virdir if self.fsname[-1] != os.sep: if self.fsname[-1] == '/': # fixup win/dos/mac separators self.fsname = self.fsname[:-1] + os.sep else: self.fsname += os.sep self.virname = virdir if self.virname[-1] != '/': self.virname += '/' self.parent = parent self.type = Member.M_COLLECTION def getProperties(self): p = FileMember.getProperties(self) # inherit file properties p['iscollection'] = 1 p['getcontenttype'] = DirCollection.COLLECTION_MIME_TYPE return p def getMembers(self): """Get immediate members of this collection.""" l = os.listdir(self.fsname) # obtain a copy of dirlist tcount=0 for tmpi in l: if os.path.isfile(self.fsname+tmpi) == False: l[tcount]=l[tcount]+'/' tcount += 1 r = [] for f in l: if f[-1] != '/': m = FileMember(f, self) # Member is a file else: m = DirCollection(self.fsname + f, self.virname + f, self) # Member is a collection r.append(m) return r # Return WebDav Root Dir info def rootdir(self): return self.fsname def findMember(self, name): """Search for a particular member.""" l = os.listdir(self.fsname) # obtain a copy of dirlist tcount=0 for tmpi in l: if os.path.isfile(self.fsname+tmpi) == False: l[tcount]=l[tcount]+'/' tcount += 1 if name in l: if name[-1] != '/': return FileMember(name, self) else: return DirCollection(self.fsname + name, self.virname + name, self) elif name[-1] != '/': name += '/' if name in l: return DirCollection(self.fsname + name, self.virname + name, self) def sendData(self, wfile): """Send "file" to the client. Since this is a directory, build some arbitrary HTML.""" memb = self.getMembers() def esc(s): return (str(s) .replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"')) data = '%s' % esc(self.virname) data += '' for m in memb: p = m.getProperties() if 'getcontentlength' in p: p['size'] = int(p['getcontentlength']) p['timestamp'] = p['getlastmodified'] else: p['size'] = 0 p['timestamp'] = '-DIR-' data += '' % (esc(p['displayname']), p['size'], esc(p['timestamp'])) data += '
NameSizeTimestamp
%s%d%s
' wfile.write(data.encode('utf-8')) def recvMember(self, rfile, name, size, req): """Receive (save) a member file""" fname = os.path.join(self.fsname, urllib.parse.unquote(name)) f = open(fname, 'wb') # if size=-1 it's Transfer-Encoding: Chunked mode, like OSX finder using this mode put data # so the file size need get here. if size == -2: l = int(rfile.readline(), 16) ltotal = 0 while l > 0: buf = rfile.read(l) f.write(buf) #yield buf rfile.readline() ltotal += l l = int(rfile.readline(), 16) elif size > 0: # if size=0 ,just save a empty file. writ = 0 bs = 65536 while True: if size != -1 and (bs > size-writ): bs = size-writ buf = rfile.read(bs) if len(buf) == 0: break f.write(buf) writ += len(buf) if size != -1 and writ >= size: break f.close() def unixdate2iso8601(d): tz = timezone / 3600 # can it be fractional? tz = '%+03d' % tz return strftime('%Y-%m-%dT%H:%M:%S', localtime(d)) + tz + ':00' def unixdate2httpdate(d): return strftime('%a, %d %b %Y %H:%M:%S GMT', gmtime(d)) class Tag: def __init__(self, name, attrs, data='', parser=None): self.d = {} self.name = name self.attrs = attrs if type(self.attrs) == type(''): self.attrs = splitattrs(self.attrs) for a in self.attrs: if a.startswith('xmlns'): # `xmlns:D="DAV:"` should register prefix "D" → "DAV:". # `xmlns="DAV:"` (default ns) registers prefix "" → "DAV:". nsname = a[5:] # drop "xmlns"; leaves "" (default) or ":D" (prefixed) if nsname.startswith(':'): nsname = nsname[1:] parser.namespaces[nsname] = self.attrs[a] self.rawname = self.name p = name.find(':') if p > 0: nsname = name[0:p] if nsname in parser.namespaces: self.ns = parser.namespaces[nsname] self.name = self.rawname[p+1:] else: self.ns = '' self.data = data # Emulate dictionary d def __len__(self): return len(self.d) def __getitem__(self, key): return self.d[key] def __setitem__(self, key, value): self.d[key] = value def __delitem__(self, key): del self.d[key] def __iter__(self): return iter(self.d.keys()) def __contains__(self, key): return key in self.d def __str__(self): """Returns unicode semi human-readable representation of the structure""" if self.attrs: s = '<%s %s> %s ' % (self.name, self.attrs, self.data) else: s = '<%s> %s ' % (self.name, self.data) for k in self.d: if type(self.d[k]) == type(self): s += '|%s: %s|' % (k, str(self.d[k])) else: s += '|' + ','.join([str(x) for x in self.d[k]]) + '|' return s def addChild(self, tag): """Adds a child to self. tag must be instance of Tag""" if tag.name in self.d: if type(self.d[tag.name]) == type(self): # If there are multiple sibiling tags with same name, form a list :) self.d[tag.name] = [self.d[tag.name]] self.d[tag.name].append(tag) else: self.d[tag.name] = tag return tag class XMLDict_Parser: def __init__(self, xml): self.xml = xml self.p = 0 self.encoding = sys.getdefaultencoding() self.namespaces = {} def getnexttag(self): ptag = self.xml.find('<', self.p) if ptag < 0: return None, None, self.xml[self.p:].strip() data = self.xml[self.p:ptag].strip() self.p = ptag self.tagbegin = ptag p2 = self.xml.find('>', self.p+1) if p2 < 0: raise "Malformed XML - unclosed tag?" tag = self.xml[ptag+1:p2] self.p = p2+1 self.tagend = p2+1 ps = tag.find(' ') ### Change By LCJ @ 2017/9/7 from [ if ps > 0: ] ### ### for IOS Coda Webdav support ### if ps > 0 and tag[-1] != '/': tag, attrs = tag.split(' ', 1) else: attrs = '' return tag, attrs, data def builddict(self): """Builds a nested-dictionary-like structure from the xml. This method picks up tags on the main level and calls processTag() for nested tags.""" d = Tag('', '') while True: tag, attrs, data = self.getnexttag() if data != '': # data is actually that between the last tag and this one sys.stderr.write("Warning: inline data between tags?!\n") if not tag: break if tag[-1] == '/': # an 'empty' tag (e.g. ) d.addChild(Tag(tag[:-1], attrs, parser=self)) continue elif tag[0] == '?': # special tag t = d.addChild(Tag(tag, attrs, parser=self)) if tag == '?xml' and 'encoding' in t.attrs: self.encoding = t.attrs['encoding'] else: try: self.processTag(d.addChild(Tag(tag, attrs, parser=self))) except: sys.stderr.write("Error processing tag %s\n" % tag) d.encoding = self.encoding return d def processTag(self, dtag): """Process single tag's data""" until = '/'+dtag.rawname while True: tag, attrs, data = self.getnexttag() if data: dtag.data += data if tag == None: sys.stderr.write("Unterminated tag '"+dtag.rawname+"'?\n") break if tag == until: break if tag[-1] == '/': dtag.addChild(Tag(tag[:-1], attrs, parser=self)) continue self.processTag(dtag.addChild(Tag(tag, attrs, parser=self))) def splitattrs(att): """Extracts name="value" pairs from string; returns them as dictionary""" d = {} for m in re.findall('([a-zA-Z_][a-zA-Z_:0-9]*?)="(.+?)"', att): d[m[0]] = m[1] return d def builddict(xml): """Wrapper function for straightforward parsing""" if sys_debug: print('-> XML:',xml) p = XMLDict_Parser(xml.decode('utf-8')) return p.builddict() class DAVRequestHandler(BaseHTTPRequestHandler): server_version = "tiny_WebDAV" all_props = ['name', 'parentname', 'href', 'ishidden', 'isreadonly', 'getcontenttype', 'contentclass', 'getcontentlanguage', 'creationdate', 'lastaccessed', 'getlastmodified', 'getcontentlength', 'iscollection', 'isstructureddocument', 'defaultdocument', 'displayname', 'isroot', 'resourcetype'] basic_props = ['name', 'getcontenttype', 'getcontentlength', 'creationdate', 'getlastmodified', 'iscollection'] auth_file = False auth_enable = False Auserlist = [] # for debug out def send_response(self, code, message=None): if sys_debug: print(f"<- status code: {code}") super().send_response(code, message) # Emit CORS headers on every response so the browser doesn't block # cross-origin requests (e.g. http://127.0.0.1:5500 -> http://localhost/). # Echo the request Origin if present, otherwise allow any origin. origin = self.headers.get('Origin') if hasattr(self, 'headers') else None self.send_header('Access-Control-Allow-Origin', origin or '*') self.send_header('Vary', 'Origin') self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY') self.send_header('Access-Control-Allow-Headers', 'Authorization, Content-Type, Depth, Destination, If, If-Match, If-Modified-Since, If-None-Match, If-Range, Lock-Token, Overwrite, Timeout, X-Requested-With') self.send_header('Access-Control-Max-Age', '86400') def send_header(self, keyword, value): if sys_debug: print(f"<- header: {keyword}: {value}") super().send_header(keyword, value) def end_headers(self): if sys_debug: print("<- End of headers") super().end_headers() def handle_one_request(self): super().handle_one_request() if sys_debug: print(f'>>>>>> {self.command} <<<<<<') # User Auth # if success ,return False; # Get WebDav User/Pass file : wdusers.conf # file formate: user:pass\n user:pass\n def WebAuth(self): if sys_debug: import inspect print(f'## {inspect.stack()[1].function}, PATH: {urllib.parse.unquote(self.path)}') print(self.headers) if self.server.auth_enable: if 'Authorization' in self.headers: try: AuthInfo = self.headers['Authorization'][6:].encode('utf-8') except: AuthInfo = '' if AuthInfo in self.server.userpwd: return False # Auth success self.send_response(401,'Authorization Required') self.send_header('WWW-Authenticate', 'Basic realm="WebDav Auth"') self.send_header('Content-type', 'text/html') self.end_headers() return True else: return False def do_OPTIONS(self): if self.WebAuth(): return self.send_response(200, DAVRequestHandler.server_version) self.send_header('Allow', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY') self.send_header('Content-length', '0') self.send_header('X-Server-Copyright', DAVRequestHandler.server_version) self.send_header('DAV', '1, 2') #OSX Finder need Ver 2, if Ver 1 -- read only self.send_header('MS-Author-Via', 'DAV') self.end_headers() def do_DELETE(self): if self.WebAuth(): return path = urllib.parse.unquote(self.path) if path == '': self.send_error(404, 'Object not found') self.send_header('Content-length', '0') self.end_headers() return path = self.server.root.rootdir() + path if os.path.isfile(path): os.remove(path) #delete file elif os.path.isdir(path): shutil.rmtree(path) #delete dir else: self.send_response(404,'Not Found') self.send_header('Content-length', '0') self.end_headers() return self.send_response(204, 'No Content') self.send_header('Content-length', '0') self.end_headers() def do_MKCOL(self): if self.WebAuth(): return path = urllib.parse.unquote(self.path) if path != '': path = self.server.root.rootdir() + path if os.path.isdir(path) == False: os.mkdir(path) self.send_response(201, "Created") self.send_header('Content-length', '0') self.end_headers() return self.send_response(403, "OK") self.send_header('Content-length', '0') self.end_headers() def do_MOVE(self): if self.WebAuth(): return oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path) newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path if (os.path.isfile(oldfile)==True and os.path.isfile(newfile)==False): shutil.move(oldfile,newfile) if (os.path.isdir(oldfile)==True and os.path.isdir(newfile)==False): os.rename(oldfile,newfile) self.send_response(201, "Created") self.send_header('Content-length', '0') self.end_headers() def do_COPY(self): if self.WebAuth(): return oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path) newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path if (os.path.isfile(oldfile)==True): # and os.path.isfile(newfile)==False): copy can rewrite. shutil.copyfile(oldfile,newfile) self.send_response(201, "Created") self.send_header('Content-length', '0') self.end_headers() def do_LOCK(self): if self.WebAuth(): return if 'Content-length' in self.headers: req = self.rfile.read(int(self.headers['Content-length'])) else: req = self.rfile.read() d = builddict(req) try: clientid = str(d['lockinfo']['owner']['href'])[7:] # temp: need Change other method!!! except: clientid = '' lockid = str(uuid.uuid1()) retstr = '\n\n\n\n\n\nInfinity\n\n'+clientid+'\n\nInfinite\nopaquelocktoken:'+lockid+'\n\n\n\n' self.send_response(201,'Created') self.send_header("Content-type",'text/xml') self.send_header("charset",'"utf-8"') self.send_header("Lock-Token",'') self.send_header('Content-Length',len(retstr)) w = BufWriter(self.wfile, sys_debug) w.write(retstr) self.send_header('Content-Length', str(w.getSize())) self.end_headers() w.flush() def do_UNLOCK(self): if self.WebAuth(): return # frome self.headers get Lock-Token: self.send_response(204, 'No Content') # unlock using 204 for sucess. self.send_header('Content-length', '0') self.end_headers() def do_PROPFIND(self): if self.WebAuth(): return depth = 'infinity' if 'Depth' in self.headers: depth = self.headers['Depth'].lower() if 'Content-length' in self.headers: req = self.rfile.read(int(self.headers['Content-length'])) else: req = self.rfile.read() d = builddict(req) # change all http.request to dict stru wished_all = False if len(d) == 0: wished_props = DAVRequestHandler.basic_props else: if 'allprop' in d['propfind']: wished_props = DAVRequestHandler.all_props wished_all = True else: wished_props = [] for prop in d['propfind']['prop']: ### 2017/9/7 Edit By LCJ , Old is [ wished_props.append(prop) ] ### for IOS Coda Webdav support ### wished_props.append(prop.split(' ')[0]) path, elem = self.path_elem() if not elem: if len(path) >= 1: # it's a non-existing file self.send_response(404, 'Not Found') self.send_header('Content-length', '0') self.end_headers() return else: elem = self.server.root # fixup root lookups? if depth != '0' and not elem: #or elem.type != Member.M_COLLECTION: self.send_response(406, 'This is not allowed') self.send_header('Content-length', '0') self.end_headers() return self.send_response(207, 'Multi-Status') #Multi-Status self.send_header('Content-Type', 'text/xml') self.send_header("charset",'"utf-8"') w = BufWriter(self.wfile, sys_debug) w.write('\n') w.write('\n') def write_props_member(w, m): w.write('\n%s\n\n\n' % urllib.parse.quote(m.virname)) #add urllib.quote for chinese props = m.getProperties() # get the file or dir props # For OSX Finder : getlastmodified, getcontentlength, resourceType if ('quota-available-bytes' in wished_props) or ('quota-used-bytes'in wished_props) or ('quota' in wished_props) or ('quotaused'in wished_props): # windows don't support os.statvfs try: sDisk = os.statvfs('/') props['quota-used-bytes'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize props['quotaused'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize props['quota-available-bytes'] = sDisk.f_bavail * sDisk.f_frsize props['quota'] = sDisk.f_bavail * sDisk.f_frsize except: pass for wp in wished_props: if (wp in props) == False: w.write(' \n' % wp) else: w.write(' %s\n' % (wp, str(props[wp]), wp)) w.write('\nHTTP/1.1 200 OK\n\n\n') write_props_member(w, elem) if depth == '1': for m in elem.getMembers(): write_props_member(w,m) w.write('') self.send_header('Content-Length', str(w.getSize())) self.end_headers() w.flush() def do_PROPPATCH(self): if self.WebAuth(): return if 'Content-length' in self.headers: req = self.rfile.read(int(self.headers['Content-length'])) else: req = self.rfile.read() d = builddict(req) # change all http.request to dict stru self.send_response(207, 'Multi-Status') #Multi-Status self.send_header('Content-Type', 'text/xml') self.send_header("charset",'"utf-8"') retstr = f'{urllib.parse.unquote(self.path)}HTTP/1.1 200 OK' w = BufWriter(self.wfile, sys_debug) w.write(retstr) self.send_header('Content-Length', str(w.getSize())) self.end_headers() w.flush() def do_GET(self, onlyhead=False): if self.WebAuth(): return path, elem = self.path_elem() if not elem: self.send_error(404, 'Object not found') return try: props = elem.getProperties() except: self.send_response(500, "Error retrieving properties") self.end_headers() return # Collections don't carry a content length — render a directory # listing instead of treating them like a file. if elem.type == Member.M_COLLECTION: ctype = props.get('getcontenttype') or DirCollection.COLLECTION_MIME_TYPE self.send_response(200, 'OK') self.send_header("Content-type", ctype) if 'getlastmodified' in props: self.send_header("Last-modified", props['getlastmodified']) self.end_headers() if not onlyhead: elem.sendData(self.wfile) return # when the client had Range: bytes=3156-3681 bpoint = 0 epoint = 0 fullen = props.get('getcontentlength', 0) if 'Range' in self.headers: stmp = self.headers['Range'][6:] stmp = stmp.split('-') try: bpoint = int(stmp[0]) except: bpoint = 0 try: epoint = int(stmp[1]) except: epoint = fullen - 1 if (epoint<=bpoint): bpoint = 0 epoint = fullen - 1 fullen = epoint - bpoint + 1 if epoint>0: self.send_response(206, 'Partial Content') self.send_header("Content-Range", " Bytes %s-%s/%s" % (bpoint, epoint, fullen)) else: self.send_response(200, 'OK') self.send_header("Content-type", props.get('getcontenttype', 'application/octet-stream')) if 'getlastmodified' in props: self.send_header("Last-modified", props['getlastmodified']) self.send_header("Content-length", fullen) self.end_headers() if not onlyhead: if fullen >0 : # all 0 size file don't need this elem.sendData(self.wfile,bpoint,epoint) def do_HEAD(self): self.do_GET(True) # HEAD should behave like GET, only without contents def do_PUT(self): if self.WebAuth(): return try: if 'Content-length' in self.headers: size = int(self.headers['Content-length']) elif 'Transfer-Encoding' in self.headers: if self.headers['Transfer-Encoding'].lower()=='chunked': size = -2 else: size = -1 path, elem = self.path_elem_prev() ename = path[-1] except: self.send_response(400, 'Cannot parse request') self.send_header('Content-length', '0') self.end_headers() return # for OSX finder, it's first send a 0 byte file,and you need response a 201 code,and then osx send real file. # OSX finder don't using content-length. if ename == '.DS_Store': self.send_response(403, 'Forbidden') self.send_header('Content-length', '0') self.end_headers() else: try: elem.recvMember(self.rfile, ename, size, self) except: self.send_response(500, 'Cannot save file') self.send_header('Content-length', '0') self.end_headers() return # sucess return self.send_response(201, 'Created') self.send_header('Content-length', '0') self.end_headers() def split_path(self, path): """Splits path string in form '/dir1/dir2/file' into parts""" p = path.split('/')[1:] while p and p[-1] in ('','/'): p = p[:-1] if len(p) > 0: p[-1] += '/' return p def path_elem(self): """Returns split path (see split_path()) and Member object of the last element""" path = self.split_path(urllib.parse.unquote(self.path)) elem = self.server.root for e in path: elem = elem.findMember(e) if elem == None: break return (path, elem) def path_elem_prev(self): """Returns split path (see split_path()) and Member object of the next-to-last element""" path = self.split_path(urllib.parse.unquote(self.path)) elem = self.server.root for e in path[:-1]: elem = elem.findMember(e) if elem == None: break return (path, elem) # disable log info output to screen def log_message(self,format,*args): pass class BufWriter: def __init__(self, w, debug=False): self.w = w self.buf = '' self.debug = debug def write(self, s): self.buf += s def flush(self): if self.debug: print('<- XML:', self.buf) # sys.stderr.write(self.buf) self.w.write(self.buf.encode('utf-8')) self.w.flush() def getSize(self): return len(self.buf.encode('utf-8')) class DAVServer(ThreadingMixIn, HTTPServer): def __init__(self, addr, handler, root, userpwd): HTTPServer.__init__(self, addr, handler) self.root = root self.userpwd = userpwd # WebDav Auth user:passwd if len(userpwd)>0: self.auth_enable = True else: self.auth_enable = False # disable the broken pipe error message def finish_request(self,request,client_address): try: HTTPServer.finish_request(self, request, client_address) except socket.error as e: pass if __name__ == '__main__': # WebDav TCP Port srvport = 80 # Get local IP address myaddr = get_localip() print(f'# WebDav Server run at http://{myaddr}:{srvport}') server_address = ('', srvport) # WebDav Auth User/Password file # if not this file ,the auth function disable. # file format: user:passwd\n user:passwd\n # or you can change your auth mode and file save format userpwd = [] try: f = open('wdusers.conf', 'r') for uinfo in f.readlines(): uinfo = uinfo.strip() if len(uinfo) > 4: userpwd.append(base64.b64encode(uinfo.encode('utf-8'))) except: pass # first is Server root dir, Second is virtual dir # **** Change First ./ to your dir , etc :/mnt/flash/public, d:/share_file/ root = DirCollection('files/', '/') httpd = DAVServer(server_address, DAVRequestHandler, root, userpwd) try: httpd.serve_forever() # todo: add some control over starting and stopping the server except: print('# WebDav Server Stop.')