# _*_ coding:utf-8 _*_ # Tiny WebDav Server for Pythonista - IOS. (Base on pandav WebDav server ) # # (C)2013/11 By: Lai ChuJiang # # this code can run not just Pythonista on IOS ,also run on OSX python. # Support Client: Windows / OSX / other Webdav client for IOS,etc : FE File explorer / goodreader / iWorks for ios # # 2024/6/1 Change Log: # Python3 support # add do_PROPPATCH function, now Windows Explorer full support. (!! windows explorer limit filesize < 50MBytes, you can using windows regedit edit it !!) # support show webdav request/response info. ( sys_debug = True ) # bug fix # # 20170907 Change Log: # 1. Fix Coda WebDav access problem.(because Coda WebDav all xml item add xmlns="DAV:".) # # 2013/11 Change Log: # 1. Combind all files to one file,so can using for Pythonista easy. # 2. Add MKCOL(Create dir); MOVE(rename file); DELETE(delete file or dir); COPY (Copy file) # 3. Change some decode, Now it's can support Chinese. # 4. Pythonista(For IOS) not dircache module ,so change code, don't using this module. # 5. change DAV version from 1 to 2, so the OSX finder can write. # 6. for DAV version 2 support , add LOCK / UNLOCK fake support. (not real lock file) # *** !!!! So Don't using > 1 client sametime write or delete same file. maybe lost files. # 7. Change the do_PROPFIND module, now it's simply & right for OSX # 8. Change the do_GET module, now support RANGE # 9. Change the do_PUT module, add Content-Length=0 support (create empty file) ,so the OSX Finder support. # * if not add empty file, the Finder copy files and then delete all this. #10. Add WebDav Basic Auth function,now you can set user & passwd # ** using wdusers.conf file (just user:passwd), if not this file ,the Auth disable. #11. Fix the broken pipe error message # # WebDav RFC: http://www.ics.uci.edu/~ejw/authoring/protocol/rfc2518.html # http://restpatterns.org/HTTP_Methods # # Base : pandav v0.2 # Copyright (c) 2005.-2006. Ivan Voras # Released under the Artistic License # from http.server import HTTPServer, BaseHTTPRequestHandler from socketserver import ThreadingMixIn import urllib.request, urllib.parse, urllib.error, urllib.parse from time import timezone, strftime, localtime, gmtime import os, sys, re, shutil, struct, threading, uuid, hashlib, mimetypes, base64, socket, json _ws_lock = threading.Lock() _ws_clients: set = set() # connected WebSocket sockets # Debug message ( True / False ) sys_debug = False # ── Logging ──────────────────────────────────────────────────────────────────── def _log(tag: str, msg: str): print(f'[{strftime("%H:%M:%S")}] [{tag}] {msg}', flush=True) # ── WebSocket broadcast (module-level so the REPL can call it) ───────────────── def ws_broadcast(text: str) -> int: """Send a UTF-8 text frame to every connected WebSocket client.""" payload = text.encode('utf-8') n = len(payload) if n < 126: header = bytes([0x81, n]) elif n < 65536: header = bytes([0x81, 126]) + struct.pack('>H', n) else: header = bytes([0x81, 127]) + struct.pack('>Q', n) frame = header + payload with _ws_lock: clients = set(_ws_clients) sent = 0 for sock in clients: try: sock.send(frame) sent += 1 except Exception: with _ws_lock: _ws_clients.discard(sock) return sent # ── REPL ─────────────────────────────────────────────────────────────────────── def _repl_thread(): """Read lines from stdin and broadcast them to all connected WS clients.""" _log('REPL', 'ready — type a message and press Enter to broadcast, Ctrl-D to quit') while True: try: sys.stdout.write('> ') sys.stdout.flush() line = sys.stdin.readline() if not line: # EOF / Ctrl-D break line = line.rstrip('\r\n') if not line: continue n = ws_broadcast(line) _log('REPL', f'sent to {n} client(s): {line}') except (EOFError, KeyboardInterrupt): break # get localhost IP address def get_localip(): try: gAdd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) gAdd.connect(('www.bing.com', 80)) return gAdd.getsockname()[0] except: return '127.0.0.1' class Member: M_MEMBER = 1 M_COLLECTION = 2 def getProperties(self): return {} class Collection(Member): def __init__(self, name): self.name = name def getMembers(self): return [] class FileMember(Member): def __init__(self, name, parent): self.name = name self.parent = parent self.name = name self.fsname = parent.fsname + name # e.g. '/var/www/mysite/some.txt' self.virname = parent.virname + name # e.g. '/mysite/some.txt' self.type = Member.M_MEMBER def __str__(self): return "%s -> %s" % (self.virname, self.fsname) def getProperties(self): """Return dictionary with WebDAV properties. Values shold be formatted according to the WebDAV specs.""" st = os.stat(self.fsname) p = {} p['creationdate'] = unixdate2iso8601(st.st_ctime) p['getlastmodified'] = unixdate2httpdate(st.st_mtime) p['displayname'] = self.name # p['getetag'] = md5.new(self.fsname).hexdigest() m = hashlib.md5() m.update(self.fsname.encode('utf-8')) p['getetag'] = m.hexdigest() if self.type == Member.M_MEMBER: p['getcontentlength'] = st.st_size p['getcontenttype'], z = mimetypes.guess_type(self.name) p['getcontentlanguage'] = None else: # Member.M_COLLECTION p['resourcetype'] = '' if self.name[0] == ".": p['ishidden'] = 1 if not os.access(self.fsname, os.W_OK): p['isreadonly'] = 1 if self.name == '/': p['isroot'] = 1 return p def sendData(self, wfile,bpoint=0,epoint=0): """Send the file to the client. Literally.""" st = os.stat(self.fsname) f = open(self.fsname, 'rb') writ = 0 # for send Range xxx-xxx if bpoint>0 and bpointbpoint: if epoint<=st.st_size: rsize = epoint - bpoint + 1 else: rsize = st.st_size - bpoint else: rsize = st.st_size while writ < rsize: if (rsize - writ) < 65536: buf = f.read(rsize) else: buf = f.read(65536) if not buf: break writ += len(buf) wfile.write(buf) f.close() def findMember(self, name): ''' 2024/6/1 add ''' return None class DirCollection(FileMember, Collection): COLLECTION_MIME_TYPE = 'httpd/unix-directory' # application/x-collection ? def __init__(self, fsdir, virdir, parent=None): if not os.path.exists(fsdir): raise "Local directory (fsdir) not found: " + fsdir self.fsname = fsdir self.name = virdir if self.fsname[-1] != os.sep: if self.fsname[-1] == '/': # fixup win/dos/mac separators self.fsname = self.fsname[:-1] + os.sep else: self.fsname += os.sep self.virname = virdir if self.virname[-1] != '/': self.virname += '/' self.parent = parent self.type = Member.M_COLLECTION def getProperties(self): p = FileMember.getProperties(self) # inherit file properties p['iscollection'] = 1 p['getcontenttype'] = DirCollection.COLLECTION_MIME_TYPE # Inherited displayname is `self.name`, which for a DirCollection is the # full virtual path (e.g. "/sd/40 tracks/") because __init__ sets # self.name = virdir. WebDAV clients expect a basename, so override it # with just the leaf name. leaf = os.path.basename(self.fsname.rstrip('/\\')) if leaf: p['displayname'] = leaf return p def getMembers(self): """Get immediate members of this collection.""" l = os.listdir(self.fsname) # obtain a copy of dirlist tcount=0 for tmpi in l: if os.path.isfile(self.fsname+tmpi) == False: l[tcount]=l[tcount]+'/' tcount += 1 r = [] for f in l: if f[-1] != '/': m = FileMember(f, self) # Member is a file else: m = DirCollection(self.fsname + f, self.virname + f, self) # Member is a collection r.append(m) return r # Return WebDav Root Dir info def rootdir(self): return self.fsname def findMember(self, name): """Search for a particular member.""" l = os.listdir(self.fsname) # obtain a copy of dirlist tcount=0 for tmpi in l: if os.path.isfile(self.fsname+tmpi) == False: l[tcount]=l[tcount]+'/' tcount += 1 if name in l: if name[-1] != '/': return FileMember(name, self) else: return DirCollection(self.fsname + name, self.virname + name, self) elif name[-1] != '/': name += '/' if name in l: return DirCollection(self.fsname + name, self.virname + name, self) def sendData(self, wfile): """Send "file" to the client. Since this is a directory, build some arbitrary HTML.""" memb = self.getMembers() def esc(s): return (str(s) .replace('&', '&') .replace('<', '<') .replace('>', '>') .replace('"', '"')) data = '%s' % esc(self.virname) data += '' for m in memb: p = m.getProperties() if 'getcontentlength' in p: p['size'] = int(p['getcontentlength']) p['timestamp'] = p['getlastmodified'] else: p['size'] = 0 p['timestamp'] = '-DIR-' data += '' % (esc(p['displayname']), p['size'], esc(p['timestamp'])) data += '
NameSizeTimestamp
%s%d%s
' wfile.write(data.encode('utf-8')) def recvMember(self, rfile, name, size, req): """Receive (save) a member file""" fname = os.path.join(self.fsname, urllib.parse.unquote(name)) f = open(fname, 'wb') # if size=-1 it's Transfer-Encoding: Chunked mode, like OSX finder using this mode put data # so the file size need get here. if size == -2: l = int(rfile.readline(), 16) ltotal = 0 while l > 0: buf = rfile.read(l) f.write(buf) #yield buf rfile.readline() ltotal += l l = int(rfile.readline(), 16) elif size > 0: # if size=0 ,just save a empty file. writ = 0 bs = 65536 while True: if size != -1 and (bs > size-writ): bs = size-writ buf = rfile.read(bs) if len(buf) == 0: break f.write(buf) writ += len(buf) if size != -1 and writ >= size: break f.close() def unixdate2iso8601(d): tz = timezone / 3600 # can it be fractional? tz = '%+03d' % tz return strftime('%Y-%m-%dT%H:%M:%S', localtime(d)) + tz + ':00' def unixdate2httpdate(d): return strftime('%a, %d %b %Y %H:%M:%S GMT', gmtime(d)) class Tag: def __init__(self, name, attrs, data='', parser=None): self.d = {} self.name = name self.attrs = attrs if type(self.attrs) == type(''): self.attrs = splitattrs(self.attrs) for a in self.attrs: if a.startswith('xmlns'): # `xmlns:D="DAV:"` should register prefix "D" → "DAV:". # `xmlns="DAV:"` (default ns) registers prefix "" → "DAV:". nsname = a[5:] # drop "xmlns"; leaves "" (default) or ":D" (prefixed) if nsname.startswith(':'): nsname = nsname[1:] parser.namespaces[nsname] = self.attrs[a] self.rawname = self.name p = name.find(':') if p > 0: nsname = name[0:p] if nsname in parser.namespaces: self.ns = parser.namespaces[nsname] self.name = self.rawname[p+1:] else: self.ns = '' self.data = data # Emulate dictionary d def __len__(self): return len(self.d) def __getitem__(self, key): return self.d[key] def __setitem__(self, key, value): self.d[key] = value def __delitem__(self, key): del self.d[key] def __iter__(self): return iter(self.d.keys()) def __contains__(self, key): return key in self.d def __str__(self): """Returns unicode semi human-readable representation of the structure""" if self.attrs: s = '<%s %s> %s ' % (self.name, self.attrs, self.data) else: s = '<%s> %s ' % (self.name, self.data) for k in self.d: if type(self.d[k]) == type(self): s += '|%s: %s|' % (k, str(self.d[k])) else: s += '|' + ','.join([str(x) for x in self.d[k]]) + '|' return s def addChild(self, tag): """Adds a child to self. tag must be instance of Tag""" if tag.name in self.d: if type(self.d[tag.name]) == type(self): # If there are multiple sibiling tags with same name, form a list :) self.d[tag.name] = [self.d[tag.name]] self.d[tag.name].append(tag) else: self.d[tag.name] = tag return tag class XMLDict_Parser: def __init__(self, xml): self.xml = xml self.p = 0 self.encoding = sys.getdefaultencoding() self.namespaces = {} def getnexttag(self): ptag = self.xml.find('<', self.p) if ptag < 0: return None, None, self.xml[self.p:].strip() data = self.xml[self.p:ptag].strip() self.p = ptag self.tagbegin = ptag p2 = self.xml.find('>', self.p+1) if p2 < 0: raise "Malformed XML - unclosed tag?" tag = self.xml[ptag+1:p2] self.p = p2+1 self.tagend = p2+1 ps = tag.find(' ') ### Change By LCJ @ 2017/9/7 from [ if ps > 0: ] ### ### for IOS Coda Webdav support ### if ps > 0 and tag[-1] != '/': tag, attrs = tag.split(' ', 1) else: attrs = '' return tag, attrs, data def builddict(self): """Builds a nested-dictionary-like structure from the xml. This method picks up tags on the main level and calls processTag() for nested tags.""" d = Tag('', '') while True: tag, attrs, data = self.getnexttag() if data != '': # data is actually that between the last tag and this one sys.stderr.write("Warning: inline data between tags?!\n") if not tag: break if tag[-1] == '/': # an 'empty' tag (e.g. ) d.addChild(Tag(tag[:-1], attrs, parser=self)) continue elif tag[0] == '?': # special tag t = d.addChild(Tag(tag, attrs, parser=self)) if tag == '?xml' and 'encoding' in t.attrs: self.encoding = t.attrs['encoding'] else: try: self.processTag(d.addChild(Tag(tag, attrs, parser=self))) except: sys.stderr.write("Error processing tag %s\n" % tag) d.encoding = self.encoding return d def processTag(self, dtag): """Process single tag's data""" until = '/'+dtag.rawname while True: tag, attrs, data = self.getnexttag() if data: dtag.data += data if tag == None: sys.stderr.write("Unterminated tag '"+dtag.rawname+"'?\n") break if tag == until: break if tag[-1] == '/': dtag.addChild(Tag(tag[:-1], attrs, parser=self)) continue self.processTag(dtag.addChild(Tag(tag, attrs, parser=self))) def splitattrs(att): """Extracts name="value" pairs from string; returns them as dictionary""" d = {} for m in re.findall('([a-zA-Z_][a-zA-Z_:0-9]*?)="(.+?)"', att): d[m[0]] = m[1] return d def builddict(xml): """Wrapper function for straightforward parsing""" if sys_debug: print('-> XML:',xml) p = XMLDict_Parser(xml.decode('utf-8')) return p.builddict() class DAVRequestHandler(BaseHTTPRequestHandler): server_version = "tiny_WebDAV" all_props = ['name', 'parentname', 'href', 'ishidden', 'isreadonly', 'getcontenttype', 'contentclass', 'getcontentlanguage', 'creationdate', 'lastaccessed', 'getlastmodified', 'getcontentlength', 'iscollection', 'isstructureddocument', 'defaultdocument', 'displayname', 'isroot', 'resourcetype'] basic_props = ['name', 'getcontenttype', 'getcontentlength', 'creationdate', 'getlastmodified', 'iscollection'] auth_file = False auth_enable = False Auserlist = [] # for debug out def send_response(self, code, message=None): if sys_debug: print(f"<- status code: {code}") super().send_response(code, message) # Emit CORS headers on every response so the browser doesn't block # cross-origin requests (e.g. http://127.0.0.1:5500 -> http://localhost/). # Echo the request Origin if present, otherwise allow any origin. origin = self.headers.get('Origin') if hasattr(self, 'headers') else None self.send_header('Access-Control-Allow-Origin', origin or '*') self.send_header('Vary', 'Origin') self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY') self.send_header('Access-Control-Allow-Headers', 'Authorization, Content-Type, Depth, Destination, If, If-Match, If-Modified-Since, If-None-Match, If-Range, Lock-Token, Overwrite, Timeout, X-Requested-With') self.send_header('Access-Control-Max-Age', '86400') def send_header(self, keyword, value): if sys_debug: print(f"<- header: {keyword}: {value}") super().send_header(keyword, value) def end_headers(self): if sys_debug: print("<- End of headers") super().end_headers() def handle_one_request(self): super().handle_one_request() if sys_debug: print(f'>>>>>> {self.command} <<<<<<') # User Auth # if success ,return False; # Get WebDav User/Pass file : wdusers.conf # file formate: user:pass\n user:pass\n def WebAuth(self): if sys_debug: import inspect print(f'## {inspect.stack()[1].function}, PATH: {urllib.parse.unquote(self.path)}') print(self.headers) if self.server.auth_enable: if 'Authorization' in self.headers: try: AuthInfo = self.headers['Authorization'][6:].encode('utf-8') except: AuthInfo = '' if AuthInfo in self.server.userpwd: return False # Auth success self.send_response(401,'Authorization Required') self.send_header('WWW-Authenticate', 'Basic realm="WebDav Auth"') self.send_header('Content-type', 'text/html') self.end_headers() return True else: return False def do_OPTIONS(self): # CORS preflight for /proxy-download (called by browser before POST) if self.path == '/proxy-download': self.send_response(204) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.send_header('Content-Length', '0') self.end_headers() return # CORS preflight for /leet/ proxy if self.path.startswith('/leet/'): self.send_response(204) self.send_header('Access-Control-Allow-Origin', '*') self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS') self.send_header('Access-Control-Allow-Headers', 'Content-Type') self.send_header('Content-Length', '0') self.end_headers() return if self.WebAuth(): return self.send_response(200, DAVRequestHandler.server_version) self.send_header('Allow', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY') self.send_header('Content-length', '0') self.send_header('X-Server-Copyright', DAVRequestHandler.server_version) self.send_header('DAV', '1, 2') #OSX Finder need Ver 2, if Ver 1 -- read only self.send_header('MS-Author-Via', 'DAV') self.end_headers() def do_DELETE(self): if self.WebAuth(): return path = urllib.parse.unquote(self.path) if path == '': self.send_error(404, 'Object not found') self.send_header('Content-length', '0') self.end_headers() return path = self.server.root.rootdir() + path if os.path.isfile(path): os.remove(path) #delete file elif os.path.isdir(path): shutil.rmtree(path) #delete dir else: self.send_response(404,'Not Found') self.send_header('Content-length', '0') self.end_headers() return self.send_response(204, 'No Content') self.send_header('Content-length', '0') self.end_headers() def do_MKCOL(self): if self.WebAuth(): return path = urllib.parse.unquote(self.path) if path != '': path = self.server.root.rootdir() + path if os.path.isdir(path) == False: os.mkdir(path) self.send_response(201, "Created") self.send_header('Content-length', '0') self.end_headers() return self.send_response(403, "OK") self.send_header('Content-length', '0') self.end_headers() def do_MOVE(self): if self.WebAuth(): return oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path) newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path if (os.path.isfile(oldfile)==True and os.path.isfile(newfile)==False): shutil.move(oldfile,newfile) if (os.path.isdir(oldfile)==True and os.path.isdir(newfile)==False): os.rename(oldfile,newfile) self.send_response(201, "Created") self.send_header('Content-length', '0') self.end_headers() def do_COPY(self): if self.WebAuth(): return oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path) newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path if (os.path.isfile(oldfile)==True): # and os.path.isfile(newfile)==False): copy can rewrite. shutil.copyfile(oldfile,newfile) self.send_response(201, "Created") self.send_header('Content-length', '0') self.end_headers() def do_LOCK(self): if self.WebAuth(): return if 'Content-length' in self.headers: req = self.rfile.read(int(self.headers['Content-length'])) else: req = self.rfile.read() d = builddict(req) try: clientid = str(d['lockinfo']['owner']['href'])[7:] # temp: need Change other method!!! except: clientid = '' lockid = str(uuid.uuid1()) retstr = '\n\n\n\n\n\nInfinity\n\n'+clientid+'\n\nInfinite\nopaquelocktoken:'+lockid+'\n\n\n\n' self.send_response(201,'Created') self.send_header("Content-type",'text/xml') self.send_header("charset",'"utf-8"') self.send_header("Lock-Token",'') self.send_header('Content-Length',len(retstr)) w = BufWriter(self.wfile, sys_debug) w.write(retstr) self.send_header('Content-Length', str(w.getSize())) self.end_headers() w.flush() def do_UNLOCK(self): if self.WebAuth(): return # frome self.headers get Lock-Token: self.send_response(204, 'No Content') # unlock using 204 for sucess. self.send_header('Content-length', '0') self.end_headers() def do_PROPFIND(self): if self.WebAuth(): return depth = 'infinity' if 'Depth' in self.headers: depth = self.headers['Depth'].lower() if 'Content-length' in self.headers: req = self.rfile.read(int(self.headers['Content-length'])) else: req = self.rfile.read() d = builddict(req) # change all http.request to dict stru wished_all = False if len(d) == 0: wished_props = DAVRequestHandler.basic_props else: if 'allprop' in d['propfind']: wished_props = DAVRequestHandler.all_props wished_all = True else: wished_props = [] for prop in d['propfind']['prop']: ### 2017/9/7 Edit By LCJ , Old is [ wished_props.append(prop) ] ### for IOS Coda Webdav support ### wished_props.append(prop.split(' ')[0]) path, elem = self.path_elem() if not elem: if len(path) >= 1: # it's a non-existing file self.send_response(404, 'Not Found') self.send_header('Content-length', '0') self.end_headers() return else: elem = self.server.root # fixup root lookups? if depth != '0' and not elem: #or elem.type != Member.M_COLLECTION: self.send_response(406, 'This is not allowed') self.send_header('Content-length', '0') self.end_headers() return self.send_response(207, 'Multi-Status') #Multi-Status self.send_header('Content-Type', 'text/xml') self.send_header("charset",'"utf-8"') w = BufWriter(self.wfile, sys_debug) w.write('\n') w.write('\n') def write_props_member(w, m): w.write('\n%s\n\n\n' % urllib.parse.quote(m.virname)) #add urllib.quote for chinese props = m.getProperties() # get the file or dir props # For OSX Finder : getlastmodified, getcontentlength, resourceType if ('quota-available-bytes' in wished_props) or ('quota-used-bytes'in wished_props) or ('quota' in wished_props) or ('quotaused'in wished_props): # windows don't support os.statvfs try: sDisk = os.statvfs('/') props['quota-used-bytes'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize props['quotaused'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize props['quota-available-bytes'] = sDisk.f_bavail * sDisk.f_frsize props['quota'] = sDisk.f_bavail * sDisk.f_frsize except: pass for wp in wished_props: if (wp in props) == False: w.write(' \n' % wp) else: val = str(props[wp]) if wp != 'resourcetype': val = val.replace('&', '&').replace('<', '<').replace('>', '>') w.write(' %s\n' % (wp, val, wp)) w.write('\nHTTP/1.1 200 OK\n\n\n') def write_recursive(m): write_props_member(w, m) if m.type == Member.M_COLLECTION: for child in m.getMembers(): write_recursive(child) write_props_member(w, elem) if depth == '1' and elem.type == Member.M_COLLECTION: for m in elem.getMembers(): write_props_member(w, m) elif depth == 'infinity' and elem.type == Member.M_COLLECTION: for m in elem.getMembers(): write_recursive(m) w.write('') self.send_header('Content-Length', str(w.getSize())) self.end_headers() w.flush() def do_PROPPATCH(self): if self.WebAuth(): return if 'Content-length' in self.headers: req = self.rfile.read(int(self.headers['Content-length'])) else: req = self.rfile.read() d = builddict(req) # change all http.request to dict stru self.send_response(207, 'Multi-Status') #Multi-Status self.send_header('Content-Type', 'text/xml') self.send_header("charset",'"utf-8"') retstr = f'{urllib.parse.unquote(self.path)}HTTP/1.1 200 OK' w = BufWriter(self.wfile, sys_debug) w.write(retstr) self.send_header('Content-Length', str(w.getSize())) self.end_headers() w.flush() def _ws_recv(self, sock, n): data = b'' while len(data) < n: chunk = sock.recv(n - len(data)) if not chunk: raise ConnectionError('closed') data += chunk return data def ws_process_message(self, message: str) -> str: """Override this to handle incoming WebSocket messages. Return the response to broadcast.""" return message # echo by default def _ws_broadcast(self, payload: bytes): # Thin wrapper kept for binary payloads; text goes through ws_broadcast(). with _ws_lock: clients = set(_ws_clients) for sock in clients: try: self._ws_send(sock, payload) except Exception: pass def _ws_send(self, sock, payload: bytes): length = len(payload) if length < 126: header = bytes([0x81, length]) elif length < 65536: header = bytes([0x81, 126]) + struct.pack('>H', length) else: header = bytes([0x81, 127]) + struct.pack('>Q', length) sock.send(header + payload) def _handle_websocket(self): key = self.headers.get('Sec-WebSocket-Key', '') accept = base64.b64encode( hashlib.sha1((key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11').encode()).digest() ).decode() self.send_response(101, 'Switching Protocols') self.send_header('Upgrade', 'websocket') self.send_header('Connection', 'Upgrade') self.send_header('Sec-WebSocket-Accept', accept) self.end_headers() client = f'{self.client_address[0]}:{self.client_address[1]}' sock = self.connection sock.settimeout(None) with _ws_lock: _ws_clients.add(sock) _log('WS', f'{client} connected (clients: {len(_ws_clients)})') try: while True: b0, b1 = self._ws_recv(sock, 2) opcode = b0 & 0x0F masked = bool(b1 & 0x80) length = b1 & 0x7F if length == 126: length = struct.unpack('>H', self._ws_recv(sock, 2))[0] elif length == 127: length = struct.unpack('>Q', self._ws_recv(sock, 8))[0] mask = self._ws_recv(sock, 4) if masked else b'' payload = self._ws_recv(sock, length) if masked: payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload)) if opcode == 0x8: # close _log('WS', f'{client} close frame') sock.send(b'\x88\x00') break elif opcode == 0x9: # ping → pong sock.send(bytes([0x8A, len(payload)]) + payload) elif opcode in (0x1, 0x2): # text or binary message = payload.decode('utf-8', errors='replace') _log('WS', f'{client} recv: {message}') response = self.ws_process_message(message) if response is not None: ws_broadcast(response) except Exception: pass finally: with _ws_lock: _ws_clients.discard(sock) _log('WS', f'{client} disconnected (clients: {len(_ws_clients)})') # ── Leet API proxy ───────────────────────────────────────────────────────── # Browsers cannot override the User-Agent header (Fetch spec §forbidden # header names). Assembly64 returns HTTP 463 for non-matching UAs. # Requests to /leet/ are forwarded to hackerswithstyle.se with the # required headers added server-side. def _proxy_leet(self): target = 'https://hackerswithstyle.se' + self.path try: req = urllib.request.Request(target, headers={ 'User-Agent': 'Assembly Query', 'Client-Id': 'Ultimate', 'Accept': 'application/json', }) with urllib.request.urlopen(req, timeout=15) as resp: body = resp.read() ct = resp.headers.get('Content-Type', 'application/octet-stream') self.send_response(200) self.send_header('Content-Type', ct) self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) _log('LEET', f'{self.path} → {len(body)} bytes') except Exception as e: _log('LEET', f'Proxy error: {self.path} → {e}') body = json.dumps({'error': str(e)}).encode('utf-8') self.send_response(502) self.send_header('Content-Type', 'application/json') self.send_header('Content-Length', str(len(body))) self.end_headers() self.wfile.write(body) def do_GET(self, onlyhead=False): if (self.path == '/ws' and self.headers.get('Upgrade', '').lower() == 'websocket'): self._handle_websocket() return if self.path.startswith('/leet/'): self._proxy_leet() return if self.WebAuth(): return path, elem = self.path_elem() if not elem: self.send_error(404, 'Object not found') return try: props = elem.getProperties() except: self.send_response(500, "Error retrieving properties") self.end_headers() return # Collections don't carry a content length — render a directory # listing instead of treating them like a file. if elem.type == Member.M_COLLECTION: ctype = props.get('getcontenttype') or DirCollection.COLLECTION_MIME_TYPE self.send_response(200, 'OK') self.send_header("Content-type", ctype) if 'getlastmodified' in props: self.send_header("Last-modified", props['getlastmodified']) self.end_headers() if not onlyhead: elem.sendData(self.wfile) return # when the client had Range: bytes=3156-3681 bpoint = 0 epoint = 0 fullen = props.get('getcontentlength', 0) if 'Range' in self.headers: stmp = self.headers['Range'][6:] stmp = stmp.split('-') try: bpoint = int(stmp[0]) except: bpoint = 0 try: epoint = int(stmp[1]) except: epoint = fullen - 1 if (epoint<=bpoint): bpoint = 0 epoint = fullen - 1 fullen = epoint - bpoint + 1 if epoint>0: self.send_response(206, 'Partial Content') self.send_header("Content-Range", " Bytes %s-%s/%s" % (bpoint, epoint, fullen)) else: self.send_response(200, 'OK') self.send_header("Content-type", props.get('getcontenttype', 'application/octet-stream')) if 'getlastmodified' in props: self.send_header("Last-modified", props['getlastmodified']) self.send_header("Content-length", fullen) self.end_headers() if not onlyhead: if fullen >0 : # all 0 size file don't need this elem.sendData(self.wfile,bpoint,epoint) def do_HEAD(self): self.do_GET(True) # HEAD should behave like GET, only without contents def do_PUT(self): if self.WebAuth(): return try: if 'Content-length' in self.headers: size = int(self.headers['Content-length']) elif 'Transfer-Encoding' in self.headers: if self.headers['Transfer-Encoding'].lower()=='chunked': size = -2 else: size = -1 path, elem = self.path_elem_prev() ename = path[-1] except: self.send_response(400, 'Cannot parse request') self.send_header('Content-length', '0') self.end_headers() return # for OSX finder, it's first send a 0 byte file,and you need response a 201 code,and then osx send real file. # OSX finder don't using content-length. if ename == '.DS_Store': self.send_response(403, 'Forbidden') self.send_header('Content-length', '0') self.end_headers() else: try: elem.recvMember(self.rfile, ename, size, self) except: self.send_response(500, 'Cannot save file') self.send_header('Content-length', '0') self.end_headers() return # sucess return self.send_response(201, 'Created') self.send_header('Content-length', '0') self.end_headers() # ── Proxy download ───────────────────────────────────────────────────────── # The browser cannot fetch third-party binary URLs that return duplicate # Access-Control-Allow-Origin headers (e.g. Assembly64 /search/bin/). # POST /proxy-download { "url": "...", "dest": "/sd/downloads/file.d64", # "headers": { "Client-Id": "Ultimate", ... } } # The server fetches the URL with Python (no CORS restrictions) and saves # the binary to the filesystem under the WebDAV root. def _json_response(self, status: int, data: dict): body = json.dumps(data).encode('utf-8') self.send_response(status) self.send_header('Content-Type', 'application/json') self.send_header('Content-Length', str(len(body))) self.send_header('Access-Control-Allow-Origin', '*') self.end_headers() self.wfile.write(body) def do_POST(self): if self.path != '/proxy-download': self.send_error(404, 'Not found') return try: size = int(self.headers.get('Content-Length', 0)) body = self.rfile.read(size) req_data = json.loads(body) url = req_data['url'] dest = req_data['dest'] extra_headers = req_data.get('headers', {}) except Exception as e: self._json_response(400, {'error': f'Bad request: {e}'}) return # Map virtual dest path (/sd/downloads/foo.d64) to filesystem path vpath = urllib.parse.unquote(dest).lstrip('/') if '..' in vpath.split('/'): self._json_response(400, {'error': 'Invalid path'}) return fspath = os.path.join(self.server.root.fsname, *vpath.split('/')) # Fetch the URL server-side (no browser CORS restrictions apply here) try: req = urllib.request.Request(url, headers=extra_headers) with urllib.request.urlopen(req, timeout=30) as resp: data = resp.read() except Exception as e: _log('PROXY', f'Fetch failed: {url} → {e}') self._json_response(502, {'error': f'Fetch failed: {e}'}) return # Save to filesystem, creating directories as needed try: os.makedirs(os.path.dirname(fspath), exist_ok=True) with open(fspath, 'wb') as f: f.write(data) except Exception as e: _log('PROXY', f'Save failed: {fspath} → {e}') self._json_response(500, {'error': f'Save failed: {e}'}) return _log('PROXY', f'{url} → {fspath} ({len(data)} bytes)') self._json_response(200, {'ok': True, 'bytes': len(data)}) def split_path(self, path): """Splits path string in form '/dir1/dir2/file' into parts""" p = path.split('/')[1:] while p and p[-1] in ('','/'): p = p[:-1] if len(p) > 0: p[-1] += '/' return p def path_elem(self): """Returns split path (see split_path()) and Member object of the last element""" path = self.split_path(urllib.parse.unquote(self.path)) elem = self.server.root for e in path: elem = elem.findMember(e) if elem == None: break return (path, elem) def path_elem_prev(self): """Returns split path (see split_path()) and Member object of the next-to-last element""" path = self.split_path(urllib.parse.unquote(self.path)) elem = self.server.root for e in path[:-1]: elem = elem.findMember(e) if elem == None: break return (path, elem) def log_message(self, format, *args): # args: requestline, status_code, content_length (from log_request) try: parts = args[0].split() # e.g. ['GET', '/path', 'HTTP/1.1'] method = parts[0] path = urllib.parse.unquote(parts[1]) if len(parts) > 1 else '?' status = args[1] _log('DAV', f'{method} {path} → {status}') except Exception: _log('DAV', format % args) # fallback for unexpected call shapes class BufWriter: def __init__(self, w, debug=False): self.w = w self.parts = [] self._size = 0 self.debug = debug def write(self, s): b = s.encode('utf-8') self.parts.append(b) self._size += len(b) def flush(self): data = b''.join(self.parts) if self.debug: print('<- XML:', data.decode('utf-8', errors='replace')) self.w.write(data) self.w.flush() def getSize(self): return self._size class DAVServer(ThreadingMixIn, HTTPServer): def __init__(self, addr, handler, root, userpwd): HTTPServer.__init__(self, addr, handler) self.root = root self.userpwd = userpwd # WebDav Auth user:passwd if len(userpwd)>0: self.auth_enable = True else: self.auth_enable = False # disable the broken pipe error message def finish_request(self,request,client_address): try: HTTPServer.finish_request(self, request, client_address) except socket.error as e: pass if __name__ == '__main__': # WebDav TCP Port srvport = 80 # Get local IP address myaddr = get_localip() print(f'# WebDav Server run at http://{myaddr}:{srvport}') server_address = ('', srvport) # WebDav Auth User/Password file # if not this file ,the auth function disable. # file format: user:passwd\n user:passwd\n # or you can change your auth mode and file save format userpwd = [] try: f = open('wdusers.conf', 'r') for uinfo in f.readlines(): uinfo = uinfo.strip() if len(uinfo) > 4: userpwd.append(base64.b64encode(uinfo.encode('utf-8'))) except: pass # first is Server root dir, Second is virtual dir # **** Change First ./ to your dir , etc :/mnt/flash/public, d:/share_file/ root = DirCollection('files/', '/') httpd = DAVServer(server_address, DAVRequestHandler, root, userpwd) repl = threading.Thread(target=_repl_thread, daemon=True, name='ws-repl') repl.start() try: httpd.serve_forever() except: _log('SRV', 'stopped')