meatloaf-config/webdav3.py

1172 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# _*_ coding:utf-8 _*_
# Tiny WebDav Server for Pythonista - IOS. (Base on pandav WebDav server )
#
# (C)2013/11 By: Lai ChuJiang
#
# this code can run not just Pythonista on IOS ,also run on OSX python.
# Support Client: Windows / OSX / other Webdav client for IOS,etc : FE File explorer / goodreader / iWorks for ios
#
# 2024/6/1 Change Log:
# Python3 support
# add do_PROPPATCH function, now Windows Explorer full support. (!! windows explorer limit filesize < 50MBytes, you can using windows regedit edit it !!)
# support show webdav request/response info. ( sys_debug = True )
# bug fix
#
# 20170907 Change Log:
# 1. Fix Coda WebDav access problem.(because Coda WebDav all xml item add xmlns="DAV:".)
#
# 2013/11 Change Log:
# 1. Combind all files to one file,so can using for Pythonista easy.
# 2. Add MKCOL(Create dir); MOVE(rename file); DELETE(delete file or dir); COPY (Copy file)
# 3. Change some decode, Now it's can support Chinese.
# 4. Pythonista(For IOS) not dircache module ,so change code, don't using this module.
# 5. change DAV version from 1 to 2, so the OSX finder can write.
# 6. for DAV version 2 support , add LOCK / UNLOCK fake support. (not real lock file)
# *** !!!! So Don't using > 1 client sametime write or delete same file. maybe lost files.
# 7. Change the do_PROPFIND module, now it's simply & right for OSX
# 8. Change the do_GET module, now support RANGE
# 9. Change the do_PUT module, add Content-Length=0 support (create empty file) ,so the OSX Finder support.
# * if not add empty file, the Finder copy files and then delete all this.
#10. Add WebDav Basic Auth function,now you can set user & passwd
# ** using wdusers.conf file (just user:passwd), if not this file ,the Auth disable.
#11. Fix the broken pipe error message
#
# WebDav RFC: http://www.ics.uci.edu/~ejw/authoring/protocol/rfc2518.html
# http://restpatterns.org/HTTP_Methods
#
# Base : pandav v0.2
# Copyright (c) 2005.-2006. Ivan Voras <ivoras@gmail.com>
# Released under the Artistic License
#
from http.server import HTTPServer, BaseHTTPRequestHandler
from socketserver import ThreadingMixIn
import urllib.request, urllib.parse, urllib.error, urllib.parse
from time import timezone, strftime, localtime, gmtime
import os, sys, re, shutil, struct, threading, uuid, hashlib, mimetypes, base64, socket, json
_ws_lock = threading.Lock()
_ws_clients: set = set() # connected WebSocket sockets
# Debug message ( True / False )
sys_debug = False
# ── Logging ────────────────────────────────────────────────────────────────────
def _log(tag: str, msg: str):
print(f'[{strftime("%H:%M:%S")}] [{tag}] {msg}', flush=True)
# ── WebSocket broadcast (module-level so the REPL can call it) ─────────────────
def ws_broadcast(text: str) -> int:
"""Send a UTF-8 text frame to every connected WebSocket client."""
payload = text.encode('utf-8')
n = len(payload)
if n < 126:
header = bytes([0x81, n])
elif n < 65536:
header = bytes([0x81, 126]) + struct.pack('>H', n)
else:
header = bytes([0x81, 127]) + struct.pack('>Q', n)
frame = header + payload
with _ws_lock:
clients = set(_ws_clients)
sent = 0
for sock in clients:
try:
sock.send(frame)
sent += 1
except Exception:
with _ws_lock:
_ws_clients.discard(sock)
return sent
# ── REPL ───────────────────────────────────────────────────────────────────────
def _repl_thread():
"""Read lines from stdin and broadcast them to all connected WS clients."""
_log('REPL', 'ready — type a message and press Enter to broadcast, Ctrl-D to quit')
while True:
try:
sys.stdout.write('> ')
sys.stdout.flush()
line = sys.stdin.readline()
if not line: # EOF / Ctrl-D
break
line = line.rstrip('\r\n')
if not line:
continue
n = ws_broadcast(line)
_log('REPL', f'sent to {n} client(s): {line}')
except (EOFError, KeyboardInterrupt):
break
# get localhost IP address
def get_localip():
try:
gAdd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
gAdd.connect(('www.bing.com', 80))
return gAdd.getsockname()[0]
except:
return '127.0.0.1'
class Member:
M_MEMBER = 1
M_COLLECTION = 2
def getProperties(self):
return {}
class Collection(Member):
def __init__(self, name):
self.name = name
def getMembers(self):
return []
class FileMember(Member):
def __init__(self, name, parent):
self.name = name
self.parent = parent
self.name = name
self.fsname = parent.fsname + name # e.g. '/var/www/mysite/some.txt'
self.virname = parent.virname + name # e.g. '/mysite/some.txt'
self.type = Member.M_MEMBER
def __str__(self):
return "%s -> %s" % (self.virname, self.fsname)
def getProperties(self):
"""Return dictionary with WebDAV properties. Values shold be
formatted according to the WebDAV specs."""
st = os.stat(self.fsname)
p = {}
p['creationdate'] = unixdate2iso8601(st.st_ctime)
p['getlastmodified'] = unixdate2httpdate(st.st_mtime)
p['displayname'] = self.name
# p['getetag'] = md5.new(self.fsname).hexdigest()
m = hashlib.md5()
m.update(self.fsname.encode('utf-8'))
p['getetag'] = m.hexdigest()
if self.type == Member.M_MEMBER:
p['getcontentlength'] = st.st_size
p['getcontenttype'], z = mimetypes.guess_type(self.name)
p['getcontentlanguage'] = None
else: # Member.M_COLLECTION
p['resourcetype'] = '<D:collection/>'
if self.name[0] == ".":
p['ishidden'] = 1
if not os.access(self.fsname, os.W_OK):
p['isreadonly'] = 1
if self.name == '/':
p['isroot'] = 1
return p
def sendData(self, wfile,bpoint=0,epoint=0):
"""Send the file to the client. Literally."""
st = os.stat(self.fsname)
f = open(self.fsname, 'rb')
writ = 0
# for send Range xxx-xxx
if bpoint>0 and bpoint<st.st_size:
f.seek(bpoint)
if epoint>bpoint:
if epoint<=st.st_size:
rsize = epoint - bpoint + 1
else:
rsize = st.st_size - bpoint
else:
rsize = st.st_size
while writ < rsize:
if (rsize - writ) < 65536:
buf = f.read(rsize)
else:
buf = f.read(65536)
if not buf:
break
writ += len(buf)
wfile.write(buf)
f.close()
def findMember(self, name):
''' 2024/6/1 add
'''
return None
class DirCollection(FileMember, Collection):
COLLECTION_MIME_TYPE = 'httpd/unix-directory' # application/x-collection
def __init__(self, fsdir, virdir, parent=None):
if not os.path.exists(fsdir):
raise "Local directory (fsdir) not found: " + fsdir
self.fsname = fsdir
self.name = virdir
if self.fsname[-1] != os.sep:
if self.fsname[-1] == '/': # fixup win/dos/mac separators
self.fsname = self.fsname[:-1] + os.sep
else:
self.fsname += os.sep
self.virname = virdir
if self.virname[-1] != '/':
self.virname += '/'
self.parent = parent
self.type = Member.M_COLLECTION
def getProperties(self):
p = FileMember.getProperties(self) # inherit file properties
p['iscollection'] = 1
p['getcontenttype'] = DirCollection.COLLECTION_MIME_TYPE
# Inherited displayname is `self.name`, which for a DirCollection is the
# full virtual path (e.g. "/sd/40 tracks/") because __init__ sets
# self.name = virdir. WebDAV clients expect a basename, so override it
# with just the leaf name.
leaf = os.path.basename(self.fsname.rstrip('/\\'))
if leaf:
p['displayname'] = leaf
return p
def getMembers(self):
"""Get immediate members of this collection."""
l = os.listdir(self.fsname) # obtain a copy of dirlist
tcount=0
for tmpi in l:
if os.path.isfile(self.fsname+tmpi) == False:
l[tcount]=l[tcount]+'/'
tcount += 1
r = []
for f in l:
if f[-1] != '/':
m = FileMember(f, self) # Member is a file
else:
m = DirCollection(self.fsname + f, self.virname + f, self) # Member is a collection
r.append(m)
return r
# Return WebDav Root Dir info
def rootdir(self):
return self.fsname
def findMember(self, name):
"""Search for a particular member."""
l = os.listdir(self.fsname) # obtain a copy of dirlist
tcount=0
for tmpi in l:
if os.path.isfile(self.fsname+tmpi) == False:
l[tcount]=l[tcount]+'/'
tcount += 1
if name in l:
if name[-1] != '/':
return FileMember(name, self)
else:
return DirCollection(self.fsname + name, self.virname + name, self)
elif name[-1] != '/':
name += '/'
if name in l:
return DirCollection(self.fsname + name, self.virname + name, self)
def sendData(self, wfile):
"""Send "file" to the client. Since this is a directory, build some arbitrary HTML."""
memb = self.getMembers()
def esc(s):
return (str(s)
.replace('&', '&amp;')
.replace('<', '&lt;')
.replace('>', '&gt;')
.replace('"', '&quot;'))
data = '<html><head><title>%s</title></head><body>' % esc(self.virname)
data += '<table><tr><th>Name</th><th>Size</th><th>Timestamp</th></tr>'
for m in memb:
p = m.getProperties()
if 'getcontentlength' in p:
p['size'] = int(p['getcontentlength'])
p['timestamp'] = p['getlastmodified']
else:
p['size'] = 0
p['timestamp'] = '-DIR-'
data += '<tr><td>%s</td><td>%d</td><td>%s</td></tr>' % (esc(p['displayname']), p['size'], esc(p['timestamp']))
data += '</table></body></html>'
wfile.write(data.encode('utf-8'))
def recvMember(self, rfile, name, size, req):
"""Receive (save) a member file"""
fname = os.path.join(self.fsname, urllib.parse.unquote(name))
f = open(fname, 'wb')
# if size=-1 it's Transfer-Encoding: Chunked mode, like OSX finder using this mode put data
# so the file size need get here.
if size == -2:
l = int(rfile.readline(), 16)
ltotal = 0
while l > 0:
buf = rfile.read(l)
f.write(buf) #yield buf
rfile.readline()
ltotal += l
l = int(rfile.readline(), 16)
elif size > 0: # if size=0 ,just save a empty file.
writ = 0
bs = 65536
while True:
if size != -1 and (bs > size-writ):
bs = size-writ
buf = rfile.read(bs)
if len(buf) == 0:
break
f.write(buf)
writ += len(buf)
if size != -1 and writ >= size:
break
f.close()
def unixdate2iso8601(d):
tz = timezone / 3600 # can it be fractional?
tz = '%+03d' % tz
return strftime('%Y-%m-%dT%H:%M:%S', localtime(d)) + tz + ':00'
def unixdate2httpdate(d):
return strftime('%a, %d %b %Y %H:%M:%S GMT', gmtime(d))
class Tag:
def __init__(self, name, attrs, data='', parser=None):
self.d = {}
self.name = name
self.attrs = attrs
if type(self.attrs) == type(''):
self.attrs = splitattrs(self.attrs)
for a in self.attrs:
if a.startswith('xmlns'):
# `xmlns:D="DAV:"` should register prefix "D" → "DAV:".
# `xmlns="DAV:"` (default ns) registers prefix "" → "DAV:".
nsname = a[5:] # drop "xmlns"; leaves "" (default) or ":D" (prefixed)
if nsname.startswith(':'):
nsname = nsname[1:]
parser.namespaces[nsname] = self.attrs[a]
self.rawname = self.name
p = name.find(':')
if p > 0:
nsname = name[0:p]
if nsname in parser.namespaces:
self.ns = parser.namespaces[nsname]
self.name = self.rawname[p+1:]
else:
self.ns = ''
self.data = data
# Emulate dictionary d
def __len__(self):
return len(self.d)
def __getitem__(self, key):
return self.d[key]
def __setitem__(self, key, value):
self.d[key] = value
def __delitem__(self, key):
del self.d[key]
def __iter__(self):
return iter(self.d.keys())
def __contains__(self, key):
return key in self.d
def __str__(self):
"""Returns unicode semi human-readable representation of the structure"""
if self.attrs:
s = '<%s %s> %s ' % (self.name, self.attrs, self.data)
else:
s = '<%s> %s ' % (self.name, self.data)
for k in self.d:
if type(self.d[k]) == type(self):
s += '|%s: %s|' % (k, str(self.d[k]))
else:
s += '|' + ','.join([str(x) for x in self.d[k]]) + '|'
return s
def addChild(self, tag):
"""Adds a child to self. tag must be instance of Tag"""
if tag.name in self.d:
if type(self.d[tag.name]) == type(self): # If there are multiple sibiling tags with same name, form a list :)
self.d[tag.name] = [self.d[tag.name]]
self.d[tag.name].append(tag)
else:
self.d[tag.name] = tag
return tag
class XMLDict_Parser:
def __init__(self, xml):
self.xml = xml
self.p = 0
self.encoding = sys.getdefaultencoding()
self.namespaces = {}
def getnexttag(self):
ptag = self.xml.find('<', self.p)
if ptag < 0:
return None, None, self.xml[self.p:].strip()
data = self.xml[self.p:ptag].strip()
self.p = ptag
self.tagbegin = ptag
p2 = self.xml.find('>', self.p+1)
if p2 < 0:
raise "Malformed XML - unclosed tag?"
tag = self.xml[ptag+1:p2]
self.p = p2+1
self.tagend = p2+1
ps = tag.find(' ')
### Change By LCJ @ 2017/9/7 from [ if ps > 0: ] ###
### for IOS Coda Webdav support ###
if ps > 0 and tag[-1] != '/':
tag, attrs = tag.split(' ', 1)
else:
attrs = ''
return tag, attrs, data
def builddict(self):
"""Builds a nested-dictionary-like structure from the xml. This method
picks up tags on the main level and calls processTag() for nested tags."""
d = Tag('<root>', '')
while True:
tag, attrs, data = self.getnexttag()
if data != '': # data is actually that between the last tag and this one
sys.stderr.write("Warning: inline data between tags?!\n")
if not tag:
break
if tag[-1] == '/': # an 'empty' tag (e.g. <empty/>)
d.addChild(Tag(tag[:-1], attrs, parser=self))
continue
elif tag[0] == '?': # special tag
t = d.addChild(Tag(tag, attrs, parser=self))
if tag == '?xml' and 'encoding' in t.attrs:
self.encoding = t.attrs['encoding']
else:
try:
self.processTag(d.addChild(Tag(tag, attrs, parser=self)))
except:
sys.stderr.write("Error processing tag %s\n" % tag)
d.encoding = self.encoding
return d
def processTag(self, dtag):
"""Process single tag's data"""
until = '/'+dtag.rawname
while True:
tag, attrs, data = self.getnexttag()
if data:
dtag.data += data
if tag == None:
sys.stderr.write("Unterminated tag '"+dtag.rawname+"'?\n")
break
if tag == until:
break
if tag[-1] == '/':
dtag.addChild(Tag(tag[:-1], attrs, parser=self))
continue
self.processTag(dtag.addChild(Tag(tag, attrs, parser=self)))
def splitattrs(att):
"""Extracts name="value" pairs from string; returns them as dictionary"""
d = {}
for m in re.findall('([a-zA-Z_][a-zA-Z_:0-9]*?)="(.+?)"', att):
d[m[0]] = m[1]
return d
def builddict(xml):
"""Wrapper function for straightforward parsing"""
if sys_debug: print('-> XML:',xml)
p = XMLDict_Parser(xml.decode('utf-8'))
return p.builddict()
class DAVRequestHandler(BaseHTTPRequestHandler):
server_version = "tiny_WebDAV"
all_props = ['name', 'parentname', 'href', 'ishidden', 'isreadonly', 'getcontenttype',
'contentclass', 'getcontentlanguage', 'creationdate', 'lastaccessed', 'getlastmodified',
'getcontentlength', 'iscollection', 'isstructureddocument', 'defaultdocument',
'displayname', 'isroot', 'resourcetype']
basic_props = ['name', 'getcontenttype', 'getcontentlength', 'creationdate', 'getlastmodified', 'iscollection']
auth_file = False
auth_enable = False
Auserlist = []
# for debug out
def send_response(self, code, message=None):
if sys_debug: print(f"<- status code: {code}")
super().send_response(code, message)
# Emit CORS headers on every response so the browser doesn't block
# cross-origin requests (e.g. http://127.0.0.1:5500 -> http://localhost/).
# Echo the request Origin if present, otherwise allow any origin.
origin = self.headers.get('Origin') if hasattr(self, 'headers') else None
self.send_header('Access-Control-Allow-Origin', origin or '*')
self.send_header('Vary', 'Origin')
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY')
self.send_header('Access-Control-Allow-Headers', 'Authorization, Content-Type, Depth, Destination, If, If-Match, If-Modified-Since, If-None-Match, If-Range, Lock-Token, Overwrite, Timeout, X-Requested-With')
self.send_header('Access-Control-Max-Age', '86400')
def send_header(self, keyword, value):
if sys_debug: print(f"<- header: {keyword}: {value}")
super().send_header(keyword, value)
def end_headers(self):
if sys_debug: print("<- End of headers")
super().end_headers()
def handle_one_request(self):
super().handle_one_request()
if sys_debug: print(f'>>>>>> {self.command} <<<<<<')
# User Auth
# if success ,return False;
# Get WebDav User/Pass file : wdusers.conf
# file formate: user:pass\n user:pass\n
def WebAuth(self):
if sys_debug:
import inspect
print(f'## {inspect.stack()[1].function}, PATH: {urllib.parse.unquote(self.path)}')
print(self.headers)
if self.server.auth_enable:
if 'Authorization' in self.headers:
try:
AuthInfo = self.headers['Authorization'][6:].encode('utf-8')
except:
AuthInfo = ''
if AuthInfo in self.server.userpwd:
return False # Auth success
self.send_response(401,'Authorization Required')
self.send_header('WWW-Authenticate', 'Basic realm="WebDav Auth"')
self.send_header('Content-type', 'text/html')
self.end_headers()
return True
else:
return False
def do_OPTIONS(self):
# CORS preflight for /proxy-download (called by browser before POST)
if self.path == '/proxy-download':
self.send_response(204)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.send_header('Content-Length', '0')
self.end_headers()
return
# CORS preflight for /leet/ and /commoserve/ proxies
if self.path.startswith('/leet/') or self.path.startswith('/commoserve/'):
self.send_response(204)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
self.send_header('Content-Length', '0')
self.end_headers()
return
if self.WebAuth():
return
self.send_response(200, DAVRequestHandler.server_version)
self.send_header('Allow', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY')
self.send_header('Content-length', '0')
self.send_header('X-Server-Copyright', DAVRequestHandler.server_version)
self.send_header('DAV', '1, 2') #OSX Finder need Ver 2, if Ver 1 -- read only
self.send_header('MS-Author-Via', 'DAV')
self.end_headers()
def do_DELETE(self):
if self.WebAuth():
return
path = urllib.parse.unquote(self.path)
if path == '':
self.send_error(404, 'Object not found')
self.send_header('Content-length', '0')
self.end_headers()
return
path = self.server.root.rootdir() + path
if os.path.isfile(path):
os.remove(path) #delete file
elif os.path.isdir(path):
shutil.rmtree(path) #delete dir
else:
self.send_response(404,'Not Found')
self.send_header('Content-length', '0')
self.end_headers()
return
self.send_response(204, 'No Content')
self.send_header('Content-length', '0')
self.end_headers()
def do_MKCOL(self):
if self.WebAuth():
return
path = urllib.parse.unquote(self.path)
if path != '':
path = self.server.root.rootdir() + path
if os.path.isdir(path) == False:
os.mkdir(path)
self.send_response(201, "Created")
self.send_header('Content-length', '0')
self.end_headers()
return
self.send_response(403, "OK")
self.send_header('Content-length', '0')
self.end_headers()
def do_MOVE(self):
if self.WebAuth():
return
oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path)
newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path
if (os.path.isfile(oldfile)==True and os.path.isfile(newfile)==False):
shutil.move(oldfile,newfile)
if (os.path.isdir(oldfile)==True and os.path.isdir(newfile)==False):
os.rename(oldfile,newfile)
self.send_response(201, "Created")
self.send_header('Content-length', '0')
self.end_headers()
def do_COPY(self):
if self.WebAuth():
return
oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path)
newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path
if (os.path.isfile(oldfile)==True): # and os.path.isfile(newfile)==False): copy can rewrite.
shutil.copyfile(oldfile,newfile)
self.send_response(201, "Created")
self.send_header('Content-length', '0')
self.end_headers()
def do_LOCK(self):
if self.WebAuth():
return
if 'Content-length' in self.headers:
req = self.rfile.read(int(self.headers['Content-length']))
else:
req = self.rfile.read()
d = builddict(req)
try:
clientid = str(d['lockinfo']['owner']['href'])[7:] # temp: need Change other method!!!
except:
clientid = ''
lockid = str(uuid.uuid1())
retstr = '<?xml version="1.0" encoding="utf-8" ?>\n<D:prop xmlns:D="DAV:">\n<D:lockdiscovery>\n<D:activelock>\n<D:locktype><D:write/></D:locktype>\n<D:lockscope><D:exclusive/></D:lockscope>\n<D:depth>Infinity</D:depth>\n<D:owner>\n<D:href>'+clientid+'</D:href>\n</D:owner>\n<D:timeout>Infinite</D:timeout>\n<D:locktoken><D:href>opaquelocktoken:'+lockid+'</D:href></D:locktoken>\n</D:activelock>\n</D:lockdiscovery>\n</D:prop>\n'
self.send_response(201,'Created')
self.send_header("Content-type",'text/xml')
self.send_header("charset",'"utf-8"')
self.send_header("Lock-Token",'<opaquelocktoken:'+lockid+'>')
self.send_header('Content-Length',len(retstr))
w = BufWriter(self.wfile, sys_debug)
w.write(retstr)
self.send_header('Content-Length', str(w.getSize()))
self.end_headers()
w.flush()
def do_UNLOCK(self):
if self.WebAuth():
return
# frome self.headers get Lock-Token:
self.send_response(204, 'No Content') # unlock using 204 for sucess.
self.send_header('Content-length', '0')
self.end_headers()
def do_PROPFIND(self):
if self.WebAuth():
return
depth = 'infinity'
if 'Depth' in self.headers:
depth = self.headers['Depth'].lower()
if 'Content-length' in self.headers:
req = self.rfile.read(int(self.headers['Content-length']))
else:
req = self.rfile.read()
d = builddict(req) # change all http.request to dict stru
wished_all = False
if len(d) == 0:
wished_props = DAVRequestHandler.basic_props
else:
if 'allprop' in d['propfind']:
wished_props = DAVRequestHandler.all_props
wished_all = True
else:
wished_props = []
for prop in d['propfind']['prop']:
### 2017/9/7 Edit By LCJ , Old is [ wished_props.append(prop) ]
### for IOS Coda Webdav support ###
wished_props.append(prop.split(' ')[0])
path, elem = self.path_elem()
if not elem:
if len(path) >= 1: # it's a non-existing file
self.send_response(404, 'Not Found')
self.send_header('Content-length', '0')
self.end_headers()
return
else:
elem = self.server.root # fixup root lookups?
if depth != '0' and not elem: #or elem.type != Member.M_COLLECTION:
self.send_response(406, 'This is not allowed')
self.send_header('Content-length', '0')
self.end_headers()
return
self.send_response(207, 'Multi-Status') #Multi-Status
self.send_header('Content-Type', 'text/xml')
self.send_header("charset",'"utf-8"')
w = BufWriter(self.wfile, sys_debug)
w.write('<?xml version="1.0" encoding="utf-8" ?>\n')
w.write('<D:multistatus xmlns:D="DAV:" xmlns:Z="urn:schemas-microsoft-com:">\n')
def write_props_member(w, m):
w.write('<D:response>\n<D:href>%s</D:href>\n<D:propstat>\n<D:prop>\n' % urllib.parse.quote(m.virname)) #add urllib.quote for chinese
props = m.getProperties() # get the file or dir props
# For OSX Finder : getlastmodified, getcontentlength, resourceType
if ('quota-available-bytes' in wished_props) or ('quota-used-bytes'in wished_props) or ('quota' in wished_props) or ('quotaused'in wished_props):
# windows don't support os.statvfs
try:
sDisk = os.statvfs('/')
props['quota-used-bytes'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize
props['quotaused'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize
props['quota-available-bytes'] = sDisk.f_bavail * sDisk.f_frsize
props['quota'] = sDisk.f_bavail * sDisk.f_frsize
except:
pass
for wp in wished_props:
if (wp in props) == False:
w.write(' <D:%s/>\n' % wp)
else:
val = str(props[wp])
if wp != 'resourcetype':
val = val.replace('&', '&amp;').replace('<', '&lt;').replace('>', '&gt;')
w.write(' <D:%s>%s</D:%s>\n' % (wp, val, wp))
w.write('</D:prop>\n<D:status>HTTP/1.1 200 OK</D:status>\n</D:propstat>\n</D:response>\n')
def write_recursive(m):
write_props_member(w, m)
if m.type == Member.M_COLLECTION:
for child in m.getMembers():
write_recursive(child)
write_props_member(w, elem)
if depth == '1' and elem.type == Member.M_COLLECTION:
for m in elem.getMembers():
write_props_member(w, m)
elif depth == 'infinity' and elem.type == Member.M_COLLECTION:
for m in elem.getMembers():
write_recursive(m)
w.write('</D:multistatus>')
self.send_header('Content-Length', str(w.getSize()))
self.end_headers()
w.flush()
def do_PROPPATCH(self):
if self.WebAuth():
return
if 'Content-length' in self.headers:
req = self.rfile.read(int(self.headers['Content-length']))
else:
req = self.rfile.read()
d = builddict(req) # change all http.request to dict stru
self.send_response(207, 'Multi-Status') #Multi-Status
self.send_header('Content-Type', 'text/xml')
self.send_header("charset",'"utf-8"')
retstr = f'<?xml version="1.0" encoding="utf-8" ?><D:multistatus xmlns:D="DAV:" xmlns:Z="urn:schemas-microsoft-com:"><D:response><D:href>{urllib.parse.unquote(self.path)}</D:href><D:propstat><D:status>HTTP/1.1 200 OK</D:status></D:propstat></D:response></D:multistatus>'
w = BufWriter(self.wfile, sys_debug)
w.write(retstr)
self.send_header('Content-Length', str(w.getSize()))
self.end_headers()
w.flush()
def _ws_recv(self, sock, n):
data = b''
while len(data) < n:
chunk = sock.recv(n - len(data))
if not chunk:
raise ConnectionError('closed')
data += chunk
return data
def ws_process_message(self, message: str) -> str:
"""Override this to handle incoming WebSocket messages. Return the response to broadcast."""
return message # echo by default
def _ws_broadcast(self, payload: bytes):
# Thin wrapper kept for binary payloads; text goes through ws_broadcast().
with _ws_lock:
clients = set(_ws_clients)
for sock in clients:
try:
self._ws_send(sock, payload)
except Exception:
pass
def _ws_send(self, sock, payload: bytes):
length = len(payload)
if length < 126:
header = bytes([0x81, length])
elif length < 65536:
header = bytes([0x81, 126]) + struct.pack('>H', length)
else:
header = bytes([0x81, 127]) + struct.pack('>Q', length)
sock.send(header + payload)
def _handle_websocket(self):
key = self.headers.get('Sec-WebSocket-Key', '')
accept = base64.b64encode(
hashlib.sha1((key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11').encode()).digest()
).decode()
self.send_response(101, 'Switching Protocols')
self.send_header('Upgrade', 'websocket')
self.send_header('Connection', 'Upgrade')
self.send_header('Sec-WebSocket-Accept', accept)
self.end_headers()
client = f'{self.client_address[0]}:{self.client_address[1]}'
sock = self.connection
sock.settimeout(None)
with _ws_lock:
_ws_clients.add(sock)
_log('WS', f'{client} connected (clients: {len(_ws_clients)})')
try:
while True:
b0, b1 = self._ws_recv(sock, 2)
opcode = b0 & 0x0F
masked = bool(b1 & 0x80)
length = b1 & 0x7F
if length == 126:
length = struct.unpack('>H', self._ws_recv(sock, 2))[0]
elif length == 127:
length = struct.unpack('>Q', self._ws_recv(sock, 8))[0]
mask = self._ws_recv(sock, 4) if masked else b''
payload = self._ws_recv(sock, length)
if masked:
payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload))
if opcode == 0x8: # close
_log('WS', f'{client} close frame')
sock.send(b'\x88\x00')
break
elif opcode == 0x9: # ping → pong
sock.send(bytes([0x8A, len(payload)]) + payload)
elif opcode in (0x1, 0x2): # text or binary
message = payload.decode('utf-8', errors='replace')
_log('WS', f'{client} recv: {message}')
response = self.ws_process_message(message)
if response is not None:
ws_broadcast(response)
except Exception:
pass
finally:
with _ws_lock:
_ws_clients.discard(sock)
_log('WS', f'{client} disconnected (clients: {len(_ws_clients)})')
# ── Leet API proxy ─────────────────────────────────────────────────────────
# Browsers cannot override the User-Agent header (Fetch spec §forbidden
# header names). Assembly64 returns HTTP 463 for non-matching UAs.
# commoserve.files.commodore.net also lacks CORS headers on AQL endpoints.
#
# Routes:
# GET /leet/* → https://hackerswithstyle.se/leet/*
# GET /commoserve/* → https://commoserve.files.commodore.net/leet/*
def _proxy_leet(self, target: str):
try:
req = urllib.request.Request(target, headers={
'User-Agent': 'Assembly Query',
'Client-Id': 'Ultimate',
'Accept': 'application/json',
})
with urllib.request.urlopen(req, timeout=15) as resp:
body = resp.read()
ct = resp.headers.get('Content-Type', 'application/octet-stream')
self.send_response(200)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Content-Type', ct)
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
_log('LEET', f'{self.path}{len(body)} bytes')
except Exception as e:
_log('LEET', f'Proxy error: {self.path}{e}')
body = json.dumps({'error': str(e)}).encode('utf-8')
self.send_response(502)
self.send_header('Access-Control-Allow-Origin', '*')
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(body)))
self.end_headers()
self.wfile.write(body)
def do_GET(self, onlyhead=False):
if (self.path == '/ws'
and self.headers.get('Upgrade', '').lower() == 'websocket'):
self._handle_websocket()
return
if self.path.startswith('/leet/'):
self._proxy_leet('https://hackerswithstyle.se' + self.path)
return
if self.path.startswith('/commoserve/'):
# /commoserve/search/categories → /leet/search/categories on commoserve server
leet_path = '/leet' + self.path[len('/commoserve'):]
self._proxy_leet('https://commoserve.files.commodore.net' + leet_path)
return
if self.WebAuth():
return
path, elem = self.path_elem()
if not elem:
self.send_error(404, 'Object not found')
return
try:
props = elem.getProperties()
except:
self.send_response(500, "Error retrieving properties")
self.end_headers()
return
# Collections don't carry a content length — render a directory
# listing instead of treating them like a file.
if elem.type == Member.M_COLLECTION:
ctype = props.get('getcontenttype') or DirCollection.COLLECTION_MIME_TYPE
self.send_response(200, 'OK')
self.send_header("Content-type", ctype)
if 'getlastmodified' in props:
self.send_header("Last-modified", props['getlastmodified'])
self.end_headers()
if not onlyhead:
elem.sendData(self.wfile)
return
# when the client had Range: bytes=3156-3681
bpoint = 0
epoint = 0
fullen = props.get('getcontentlength', 0)
if 'Range' in self.headers:
stmp = self.headers['Range'][6:]
stmp = stmp.split('-')
try:
bpoint = int(stmp[0])
except:
bpoint = 0
try:
epoint = int(stmp[1])
except:
epoint = fullen - 1
if (epoint<=bpoint):
bpoint = 0
epoint = fullen - 1
fullen = epoint - bpoint + 1
if epoint>0:
self.send_response(206, 'Partial Content')
self.send_header("Content-Range", " Bytes %s-%s/%s" % (bpoint, epoint, fullen))
else:
self.send_response(200, 'OK')
self.send_header("Content-type", props.get('getcontenttype', 'application/octet-stream'))
if 'getlastmodified' in props:
self.send_header("Last-modified", props['getlastmodified'])
self.send_header("Content-length", fullen)
self.end_headers()
if not onlyhead:
if fullen >0 : # all 0 size file don't need this
elem.sendData(self.wfile,bpoint,epoint)
def do_HEAD(self):
self.do_GET(True) # HEAD should behave like GET, only without contents
def do_PUT(self):
if self.WebAuth():
return
try:
if 'Content-length' in self.headers:
size = int(self.headers['Content-length'])
elif 'Transfer-Encoding' in self.headers:
if self.headers['Transfer-Encoding'].lower()=='chunked':
size = -2
else:
size = -1
path, elem = self.path_elem_prev()
ename = path[-1]
if elem is None and len(path) > 1:
# Parent directory doesn't exist — create it and re-resolve
parent_parts = [p.rstrip('/') for p in path[:-1]]
parent_fs = os.path.join(self.server.root.fsname, *parent_parts)
os.makedirs(parent_fs, exist_ok=True)
path, elem = self.path_elem_prev()
except:
self.send_response(400, 'Cannot parse request')
self.send_header('Content-length', '0')
self.end_headers()
return
# for OSX finder, it's first send a 0 byte file,and you need response a 201 code,and then osx send real file.
# OSX finder don't using content-length.
if ename == '.DS_Store':
self.send_response(403, 'Forbidden')
self.send_header('Content-length', '0')
self.end_headers()
else:
try:
elem.recvMember(self.rfile, ename, size, self)
except:
self.send_response(500, 'Cannot save file')
self.send_header('Content-length', '0')
self.end_headers()
return
# sucess return
self.send_response(201, 'Created')
self.send_header('Content-length', '0')
self.end_headers()
# ── Proxy download ─────────────────────────────────────────────────────────
# The browser cannot fetch third-party binary URLs that return duplicate
# Access-Control-Allow-Origin headers (e.g. Assembly64 /search/bin/).
# POST /proxy-download { "url": "...", "dest": "/sd/downloads/file.d64",
# "headers": { "Client-Id": "Ultimate", ... } }
# The server fetches the URL with Python (no CORS restrictions) and saves
# the binary to the filesystem under the WebDAV root.
def _json_response(self, status: int, data: dict):
body = json.dumps(data).encode('utf-8')
self.send_response(status)
self.send_header('Content-Type', 'application/json')
self.send_header('Content-Length', str(len(body)))
self.send_header('Access-Control-Allow-Origin', '*')
self.end_headers()
self.wfile.write(body)
def do_POST(self):
if self.path != '/proxy-download':
self.send_error(404, 'Not found')
return
try:
size = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(size)
req_data = json.loads(body)
url = req_data['url']
dest = req_data['dest']
extra_headers = req_data.get('headers', {})
except Exception as e:
self._json_response(400, {'error': f'Bad request: {e}'})
return
# Map virtual dest path (/sd/downloads/foo.d64) to filesystem path
vpath = urllib.parse.unquote(dest).lstrip('/')
if '..' in vpath.split('/'):
self._json_response(400, {'error': 'Invalid path'})
return
fspath = os.path.join(self.server.root.fsname, *vpath.split('/'))
# Fetch the URL server-side (no browser CORS restrictions apply here)
try:
req = urllib.request.Request(url, headers=extra_headers)
with urllib.request.urlopen(req, timeout=30) as resp:
data = resp.read()
except Exception as e:
_log('PROXY', f'Fetch failed: {url}{e}')
self._json_response(502, {'error': f'Fetch failed: {e}'})
return
# Save to filesystem, creating directories as needed
try:
os.makedirs(os.path.dirname(fspath), exist_ok=True)
with open(fspath, 'wb') as f:
f.write(data)
except Exception as e:
_log('PROXY', f'Save failed: {fspath}{e}')
self._json_response(500, {'error': f'Save failed: {e}'})
return
_log('PROXY', f'{url}{fspath} ({len(data)} bytes)')
self._json_response(200, {'ok': True, 'bytes': len(data)})
def split_path(self, path):
"""Splits path string in form '/dir1/dir2/file' into parts"""
p = path.split('/')[1:]
while p and p[-1] in ('','/'):
p = p[:-1]
if len(p) > 0:
p[-1] += '/'
return p
def path_elem(self):
"""Returns split path (see split_path()) and Member object of the last element"""
path = self.split_path(urllib.parse.unquote(self.path))
elem = self.server.root
for e in path:
elem = elem.findMember(e)
if elem == None:
break
return (path, elem)
def path_elem_prev(self):
"""Returns split path (see split_path()) and Member object of the next-to-last element"""
path = self.split_path(urllib.parse.unquote(self.path))
elem = self.server.root
for e in path[:-1]:
elem = elem.findMember(e)
if elem == None:
break
return (path, elem)
def log_message(self, format, *args):
# args: requestline, status_code, content_length (from log_request)
try:
parts = args[0].split() # e.g. ['GET', '/path', 'HTTP/1.1']
method = parts[0]
path = urllib.parse.unquote(parts[1]) if len(parts) > 1 else '?'
status = args[1]
_log('DAV', f'{method} {path}{status}')
except Exception:
_log('DAV', format % args) # fallback for unexpected call shapes
class BufWriter:
def __init__(self, w, debug=False):
self.w = w
self.parts = []
self._size = 0
self.debug = debug
def write(self, s):
b = s.encode('utf-8')
self.parts.append(b)
self._size += len(b)
def flush(self):
data = b''.join(self.parts)
if self.debug: print('<- XML:', data.decode('utf-8', errors='replace'))
self.w.write(data)
self.w.flush()
def getSize(self):
return self._size
class DAVServer(ThreadingMixIn, HTTPServer):
def __init__(self, addr, handler, root, userpwd):
HTTPServer.__init__(self, addr, handler)
self.root = root
self.userpwd = userpwd # WebDav Auth user:passwd
if len(userpwd)>0:
self.auth_enable = True
else:
self.auth_enable = False
# disable the broken pipe error message
def finish_request(self,request,client_address):
try:
HTTPServer.finish_request(self, request, client_address)
except socket.error as e:
pass
if __name__ == '__main__':
# WebDav TCP Port
srvport = 80
# Get local IP address
myaddr = get_localip()
print(f'# WebDav Server run at http://{myaddr}:{srvport}')
server_address = ('', srvport)
# WebDav Auth User/Password file
# if not this file ,the auth function disable.
# file format: user:passwd\n user:passwd\n
# or you can change your auth mode and file save format
userpwd = []
try:
f = open('wdusers.conf', 'r')
for uinfo in f.readlines():
uinfo = uinfo.strip()
if len(uinfo) > 4:
userpwd.append(base64.b64encode(uinfo.encode('utf-8')))
except:
pass
# first is Server root dir, Second is virtual dir
# **** Change First ./ to your dir , etc :/mnt/flash/public, d:/share_file/
root = DirCollection('files/', '/')
httpd = DAVServer(server_address, DAVRequestHandler, root, userpwd)
repl = threading.Thread(target=_repl_thread, daemon=True, name='ws-repl')
repl.start()
try:
httpd.serve_forever()
except:
_log('SRV', 'stopped')