1172 lines
42 KiB
Python
1172 lines
42 KiB
Python
# _*_ coding:utf-8 _*_
|
||
|
||
# Tiny WebDav Server for Pythonista - IOS. (Base on pandav WebDav server )
|
||
#
|
||
# (C)2013/11 By: Lai ChuJiang
|
||
#
|
||
# this code can run not just Pythonista on IOS ,also run on OSX python.
|
||
# Support Client: Windows / OSX / other Webdav client for IOS,etc : FE File explorer / goodreader / iWorks for ios
|
||
#
|
||
# 2024/6/1 Change Log:
|
||
# Python3 support
|
||
# add do_PROPPATCH function, now Windows Explorer full support. (!! windows explorer limit filesize < 50MBytes, you can using windows regedit edit it !!)
|
||
# support show webdav request/response info. ( sys_debug = True )
|
||
# bug fix
|
||
#
|
||
# 20170907 Change Log:
|
||
# 1. Fix Coda WebDav access problem.(because Coda WebDav all xml item add xmlns="DAV:".)
|
||
#
|
||
# 2013/11 Change Log:
|
||
# 1. Combind all files to one file,so can using for Pythonista easy.
|
||
# 2. Add MKCOL(Create dir); MOVE(rename file); DELETE(delete file or dir); COPY (Copy file)
|
||
# 3. Change some decode, Now it's can support Chinese.
|
||
# 4. Pythonista(For IOS) not dircache module ,so change code, don't using this module.
|
||
# 5. change DAV version from 1 to 2, so the OSX finder can write.
|
||
# 6. for DAV version 2 support , add LOCK / UNLOCK fake support. (not real lock file)
|
||
# *** !!!! So Don't using > 1 client sametime write or delete same file. maybe lost files.
|
||
# 7. Change the do_PROPFIND module, now it's simply & right for OSX
|
||
# 8. Change the do_GET module, now support RANGE
|
||
# 9. Change the do_PUT module, add Content-Length=0 support (create empty file) ,so the OSX Finder support.
|
||
# * if not add empty file, the Finder copy files and then delete all this.
|
||
#10. Add WebDav Basic Auth function,now you can set user & passwd
|
||
# ** using wdusers.conf file (just user:passwd), if not this file ,the Auth disable.
|
||
#11. Fix the broken pipe error message
|
||
#
|
||
# WebDav RFC: http://www.ics.uci.edu/~ejw/authoring/protocol/rfc2518.html
|
||
# http://restpatterns.org/HTTP_Methods
|
||
#
|
||
# Base : pandav v0.2
|
||
# Copyright (c) 2005.-2006. Ivan Voras <ivoras@gmail.com>
|
||
# Released under the Artistic License
|
||
#
|
||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||
from socketserver import ThreadingMixIn
|
||
import urllib.request, urllib.parse, urllib.error, urllib.parse
|
||
from time import timezone, strftime, localtime, gmtime
|
||
import os, sys, re, shutil, struct, threading, uuid, hashlib, mimetypes, base64, socket, json
|
||
|
||
_ws_lock = threading.Lock()
|
||
_ws_clients: set = set() # connected WebSocket sockets
|
||
|
||
# Debug message ( True / False )
|
||
sys_debug = False
|
||
|
||
# ── Logging ────────────────────────────────────────────────────────────────────
|
||
|
||
def _log(tag: str, msg: str):
|
||
print(f'[{strftime("%H:%M:%S")}] [{tag}] {msg}', flush=True)
|
||
|
||
# ── WebSocket broadcast (module-level so the REPL can call it) ─────────────────
|
||
|
||
def ws_broadcast(text: str) -> int:
|
||
"""Send a UTF-8 text frame to every connected WebSocket client."""
|
||
payload = text.encode('utf-8')
|
||
n = len(payload)
|
||
if n < 126:
|
||
header = bytes([0x81, n])
|
||
elif n < 65536:
|
||
header = bytes([0x81, 126]) + struct.pack('>H', n)
|
||
else:
|
||
header = bytes([0x81, 127]) + struct.pack('>Q', n)
|
||
frame = header + payload
|
||
with _ws_lock:
|
||
clients = set(_ws_clients)
|
||
sent = 0
|
||
for sock in clients:
|
||
try:
|
||
sock.send(frame)
|
||
sent += 1
|
||
except Exception:
|
||
with _ws_lock:
|
||
_ws_clients.discard(sock)
|
||
return sent
|
||
|
||
# ── REPL ───────────────────────────────────────────────────────────────────────
|
||
|
||
def _repl_thread():
|
||
"""Read lines from stdin and broadcast them to all connected WS clients."""
|
||
_log('REPL', 'ready — type a message and press Enter to broadcast, Ctrl-D to quit')
|
||
while True:
|
||
try:
|
||
sys.stdout.write('> ')
|
||
sys.stdout.flush()
|
||
line = sys.stdin.readline()
|
||
if not line: # EOF / Ctrl-D
|
||
break
|
||
line = line.rstrip('\r\n')
|
||
if not line:
|
||
continue
|
||
n = ws_broadcast(line)
|
||
_log('REPL', f'sent to {n} client(s): {line}')
|
||
except (EOFError, KeyboardInterrupt):
|
||
break
|
||
|
||
# get localhost IP address
|
||
def get_localip():
|
||
try:
|
||
gAdd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
gAdd.connect(('www.bing.com', 80))
|
||
return gAdd.getsockname()[0]
|
||
except:
|
||
return '127.0.0.1'
|
||
|
||
class Member:
|
||
M_MEMBER = 1
|
||
M_COLLECTION = 2
|
||
def getProperties(self):
|
||
return {}
|
||
|
||
class Collection(Member):
|
||
def __init__(self, name):
|
||
self.name = name
|
||
|
||
def getMembers(self):
|
||
return []
|
||
|
||
class FileMember(Member):
|
||
def __init__(self, name, parent):
|
||
self.name = name
|
||
self.parent = parent
|
||
self.name = name
|
||
self.fsname = parent.fsname + name # e.g. '/var/www/mysite/some.txt'
|
||
self.virname = parent.virname + name # e.g. '/mysite/some.txt'
|
||
self.type = Member.M_MEMBER
|
||
|
||
def __str__(self):
|
||
return "%s -> %s" % (self.virname, self.fsname)
|
||
|
||
def getProperties(self):
|
||
"""Return dictionary with WebDAV properties. Values shold be
|
||
formatted according to the WebDAV specs."""
|
||
st = os.stat(self.fsname)
|
||
p = {}
|
||
p['creationdate'] = unixdate2iso8601(st.st_ctime)
|
||
p['getlastmodified'] = unixdate2httpdate(st.st_mtime)
|
||
p['displayname'] = self.name
|
||
# p['getetag'] = md5.new(self.fsname).hexdigest()
|
||
m = hashlib.md5()
|
||
m.update(self.fsname.encode('utf-8'))
|
||
p['getetag'] = m.hexdigest()
|
||
if self.type == Member.M_MEMBER:
|
||
p['getcontentlength'] = st.st_size
|
||
p['getcontenttype'], z = mimetypes.guess_type(self.name)
|
||
p['getcontentlanguage'] = None
|
||
else: # Member.M_COLLECTION
|
||
p['resourcetype'] = '<D:collection/>'
|
||
if self.name[0] == ".":
|
||
p['ishidden'] = 1
|
||
if not os.access(self.fsname, os.W_OK):
|
||
p['isreadonly'] = 1
|
||
if self.name == '/':
|
||
p['isroot'] = 1
|
||
return p
|
||
|
||
def sendData(self, wfile,bpoint=0,epoint=0):
|
||
"""Send the file to the client. Literally."""
|
||
st = os.stat(self.fsname)
|
||
f = open(self.fsname, 'rb')
|
||
writ = 0
|
||
# for send Range xxx-xxx
|
||
if bpoint>0 and bpoint<st.st_size:
|
||
f.seek(bpoint)
|
||
if epoint>bpoint:
|
||
if epoint<=st.st_size:
|
||
rsize = epoint - bpoint + 1
|
||
else:
|
||
rsize = st.st_size - bpoint
|
||
else:
|
||
rsize = st.st_size
|
||
while writ < rsize:
|
||
if (rsize - writ) < 65536:
|
||
buf = f.read(rsize)
|
||
else:
|
||
buf = f.read(65536)
|
||
if not buf:
|
||
break
|
||
writ += len(buf)
|
||
wfile.write(buf)
|
||
f.close()
|
||
|
||
def findMember(self, name):
|
||
''' 2024/6/1 add
|
||
'''
|
||
return None
|
||
|
||
class DirCollection(FileMember, Collection):
|
||
COLLECTION_MIME_TYPE = 'httpd/unix-directory' # application/x-collection ?
|
||
def __init__(self, fsdir, virdir, parent=None):
|
||
if not os.path.exists(fsdir):
|
||
raise "Local directory (fsdir) not found: " + fsdir
|
||
self.fsname = fsdir
|
||
self.name = virdir
|
||
if self.fsname[-1] != os.sep:
|
||
if self.fsname[-1] == '/': # fixup win/dos/mac separators
|
||
self.fsname = self.fsname[:-1] + os.sep
|
||
else:
|
||
self.fsname += os.sep
|
||
self.virname = virdir
|
||
if self.virname[-1] != '/':
|
||
self.virname += '/'
|
||
self.parent = parent
|
||
self.type = Member.M_COLLECTION
|
||
|
||
def getProperties(self):
|
||
p = FileMember.getProperties(self) # inherit file properties
|
||
p['iscollection'] = 1
|
||
p['getcontenttype'] = DirCollection.COLLECTION_MIME_TYPE
|
||
# Inherited displayname is `self.name`, which for a DirCollection is the
|
||
# full virtual path (e.g. "/sd/40 tracks/") because __init__ sets
|
||
# self.name = virdir. WebDAV clients expect a basename, so override it
|
||
# with just the leaf name.
|
||
leaf = os.path.basename(self.fsname.rstrip('/\\'))
|
||
if leaf:
|
||
p['displayname'] = leaf
|
||
return p
|
||
|
||
def getMembers(self):
|
||
"""Get immediate members of this collection."""
|
||
l = os.listdir(self.fsname) # obtain a copy of dirlist
|
||
tcount=0
|
||
for tmpi in l:
|
||
if os.path.isfile(self.fsname+tmpi) == False:
|
||
l[tcount]=l[tcount]+'/'
|
||
tcount += 1
|
||
r = []
|
||
for f in l:
|
||
if f[-1] != '/':
|
||
m = FileMember(f, self) # Member is a file
|
||
else:
|
||
m = DirCollection(self.fsname + f, self.virname + f, self) # Member is a collection
|
||
r.append(m)
|
||
return r
|
||
|
||
# Return WebDav Root Dir info
|
||
def rootdir(self):
|
||
return self.fsname
|
||
|
||
def findMember(self, name):
|
||
"""Search for a particular member."""
|
||
l = os.listdir(self.fsname) # obtain a copy of dirlist
|
||
tcount=0
|
||
for tmpi in l:
|
||
if os.path.isfile(self.fsname+tmpi) == False:
|
||
l[tcount]=l[tcount]+'/'
|
||
tcount += 1
|
||
if name in l:
|
||
if name[-1] != '/':
|
||
return FileMember(name, self)
|
||
else:
|
||
return DirCollection(self.fsname + name, self.virname + name, self)
|
||
elif name[-1] != '/':
|
||
name += '/'
|
||
if name in l:
|
||
return DirCollection(self.fsname + name, self.virname + name, self)
|
||
|
||
def sendData(self, wfile):
|
||
"""Send "file" to the client. Since this is a directory, build some arbitrary HTML."""
|
||
memb = self.getMembers()
|
||
def esc(s):
|
||
return (str(s)
|
||
.replace('&', '&')
|
||
.replace('<', '<')
|
||
.replace('>', '>')
|
||
.replace('"', '"'))
|
||
data = '<html><head><title>%s</title></head><body>' % esc(self.virname)
|
||
data += '<table><tr><th>Name</th><th>Size</th><th>Timestamp</th></tr>'
|
||
for m in memb:
|
||
p = m.getProperties()
|
||
if 'getcontentlength' in p:
|
||
p['size'] = int(p['getcontentlength'])
|
||
p['timestamp'] = p['getlastmodified']
|
||
else:
|
||
p['size'] = 0
|
||
p['timestamp'] = '-DIR-'
|
||
data += '<tr><td>%s</td><td>%d</td><td>%s</td></tr>' % (esc(p['displayname']), p['size'], esc(p['timestamp']))
|
||
data += '</table></body></html>'
|
||
wfile.write(data.encode('utf-8'))
|
||
|
||
def recvMember(self, rfile, name, size, req):
|
||
"""Receive (save) a member file"""
|
||
fname = os.path.join(self.fsname, urllib.parse.unquote(name))
|
||
f = open(fname, 'wb')
|
||
# if size=-1 it's Transfer-Encoding: Chunked mode, like OSX finder using this mode put data
|
||
# so the file size need get here.
|
||
if size == -2:
|
||
l = int(rfile.readline(), 16)
|
||
ltotal = 0
|
||
while l > 0:
|
||
buf = rfile.read(l)
|
||
f.write(buf) #yield buf
|
||
rfile.readline()
|
||
ltotal += l
|
||
l = int(rfile.readline(), 16)
|
||
elif size > 0: # if size=0 ,just save a empty file.
|
||
writ = 0
|
||
bs = 65536
|
||
while True:
|
||
if size != -1 and (bs > size-writ):
|
||
bs = size-writ
|
||
buf = rfile.read(bs)
|
||
if len(buf) == 0:
|
||
break
|
||
f.write(buf)
|
||
writ += len(buf)
|
||
if size != -1 and writ >= size:
|
||
break
|
||
f.close()
|
||
|
||
def unixdate2iso8601(d):
|
||
tz = timezone / 3600 # can it be fractional?
|
||
tz = '%+03d' % tz
|
||
return strftime('%Y-%m-%dT%H:%M:%S', localtime(d)) + tz + ':00'
|
||
|
||
def unixdate2httpdate(d):
|
||
return strftime('%a, %d %b %Y %H:%M:%S GMT', gmtime(d))
|
||
|
||
class Tag:
|
||
def __init__(self, name, attrs, data='', parser=None):
|
||
self.d = {}
|
||
self.name = name
|
||
self.attrs = attrs
|
||
if type(self.attrs) == type(''):
|
||
self.attrs = splitattrs(self.attrs)
|
||
for a in self.attrs:
|
||
if a.startswith('xmlns'):
|
||
# `xmlns:D="DAV:"` should register prefix "D" → "DAV:".
|
||
# `xmlns="DAV:"` (default ns) registers prefix "" → "DAV:".
|
||
nsname = a[5:] # drop "xmlns"; leaves "" (default) or ":D" (prefixed)
|
||
if nsname.startswith(':'):
|
||
nsname = nsname[1:]
|
||
parser.namespaces[nsname] = self.attrs[a]
|
||
self.rawname = self.name
|
||
|
||
p = name.find(':')
|
||
if p > 0:
|
||
nsname = name[0:p]
|
||
if nsname in parser.namespaces:
|
||
self.ns = parser.namespaces[nsname]
|
||
self.name = self.rawname[p+1:]
|
||
else:
|
||
self.ns = ''
|
||
self.data = data
|
||
|
||
# Emulate dictionary d
|
||
def __len__(self):
|
||
return len(self.d)
|
||
|
||
def __getitem__(self, key):
|
||
return self.d[key]
|
||
|
||
def __setitem__(self, key, value):
|
||
self.d[key] = value
|
||
|
||
def __delitem__(self, key):
|
||
del self.d[key]
|
||
|
||
def __iter__(self):
|
||
return iter(self.d.keys())
|
||
|
||
def __contains__(self, key):
|
||
return key in self.d
|
||
|
||
def __str__(self):
|
||
"""Returns unicode semi human-readable representation of the structure"""
|
||
if self.attrs:
|
||
s = '<%s %s> %s ' % (self.name, self.attrs, self.data)
|
||
else:
|
||
s = '<%s> %s ' % (self.name, self.data)
|
||
|
||
for k in self.d:
|
||
if type(self.d[k]) == type(self):
|
||
s += '|%s: %s|' % (k, str(self.d[k]))
|
||
else:
|
||
s += '|' + ','.join([str(x) for x in self.d[k]]) + '|'
|
||
return s
|
||
|
||
def addChild(self, tag):
|
||
"""Adds a child to self. tag must be instance of Tag"""
|
||
if tag.name in self.d:
|
||
if type(self.d[tag.name]) == type(self): # If there are multiple sibiling tags with same name, form a list :)
|
||
self.d[tag.name] = [self.d[tag.name]]
|
||
self.d[tag.name].append(tag)
|
||
else:
|
||
self.d[tag.name] = tag
|
||
return tag
|
||
|
||
class XMLDict_Parser:
|
||
def __init__(self, xml):
|
||
self.xml = xml
|
||
self.p = 0
|
||
self.encoding = sys.getdefaultencoding()
|
||
self.namespaces = {}
|
||
|
||
def getnexttag(self):
|
||
ptag = self.xml.find('<', self.p)
|
||
if ptag < 0:
|
||
return None, None, self.xml[self.p:].strip()
|
||
data = self.xml[self.p:ptag].strip()
|
||
self.p = ptag
|
||
self.tagbegin = ptag
|
||
p2 = self.xml.find('>', self.p+1)
|
||
if p2 < 0:
|
||
raise "Malformed XML - unclosed tag?"
|
||
tag = self.xml[ptag+1:p2]
|
||
self.p = p2+1
|
||
self.tagend = p2+1
|
||
ps = tag.find(' ')
|
||
### Change By LCJ @ 2017/9/7 from [ if ps > 0: ] ###
|
||
### for IOS Coda Webdav support ###
|
||
if ps > 0 and tag[-1] != '/':
|
||
tag, attrs = tag.split(' ', 1)
|
||
else:
|
||
attrs = ''
|
||
return tag, attrs, data
|
||
|
||
def builddict(self):
|
||
"""Builds a nested-dictionary-like structure from the xml. This method
|
||
picks up tags on the main level and calls processTag() for nested tags."""
|
||
d = Tag('<root>', '')
|
||
while True:
|
||
tag, attrs, data = self.getnexttag()
|
||
if data != '': # data is actually that between the last tag and this one
|
||
sys.stderr.write("Warning: inline data between tags?!\n")
|
||
if not tag:
|
||
break
|
||
if tag[-1] == '/': # an 'empty' tag (e.g. <empty/>)
|
||
d.addChild(Tag(tag[:-1], attrs, parser=self))
|
||
continue
|
||
elif tag[0] == '?': # special tag
|
||
t = d.addChild(Tag(tag, attrs, parser=self))
|
||
if tag == '?xml' and 'encoding' in t.attrs:
|
||
self.encoding = t.attrs['encoding']
|
||
else:
|
||
try:
|
||
self.processTag(d.addChild(Tag(tag, attrs, parser=self)))
|
||
except:
|
||
sys.stderr.write("Error processing tag %s\n" % tag)
|
||
d.encoding = self.encoding
|
||
return d
|
||
|
||
def processTag(self, dtag):
|
||
"""Process single tag's data"""
|
||
until = '/'+dtag.rawname
|
||
while True:
|
||
tag, attrs, data = self.getnexttag()
|
||
if data:
|
||
dtag.data += data
|
||
if tag == None:
|
||
sys.stderr.write("Unterminated tag '"+dtag.rawname+"'?\n")
|
||
break
|
||
if tag == until:
|
||
break
|
||
if tag[-1] == '/':
|
||
dtag.addChild(Tag(tag[:-1], attrs, parser=self))
|
||
continue
|
||
self.processTag(dtag.addChild(Tag(tag, attrs, parser=self)))
|
||
|
||
def splitattrs(att):
|
||
"""Extracts name="value" pairs from string; returns them as dictionary"""
|
||
d = {}
|
||
for m in re.findall('([a-zA-Z_][a-zA-Z_:0-9]*?)="(.+?)"', att):
|
||
d[m[0]] = m[1]
|
||
return d
|
||
|
||
def builddict(xml):
|
||
"""Wrapper function for straightforward parsing"""
|
||
if sys_debug: print('-> XML:',xml)
|
||
p = XMLDict_Parser(xml.decode('utf-8'))
|
||
return p.builddict()
|
||
|
||
class DAVRequestHandler(BaseHTTPRequestHandler):
|
||
server_version = "tiny_WebDAV"
|
||
all_props = ['name', 'parentname', 'href', 'ishidden', 'isreadonly', 'getcontenttype',
|
||
'contentclass', 'getcontentlanguage', 'creationdate', 'lastaccessed', 'getlastmodified',
|
||
'getcontentlength', 'iscollection', 'isstructureddocument', 'defaultdocument',
|
||
'displayname', 'isroot', 'resourcetype']
|
||
basic_props = ['name', 'getcontenttype', 'getcontentlength', 'creationdate', 'getlastmodified', 'iscollection']
|
||
auth_file = False
|
||
auth_enable = False
|
||
Auserlist = []
|
||
|
||
# for debug out
|
||
def send_response(self, code, message=None):
|
||
if sys_debug: print(f"<- status code: {code}")
|
||
super().send_response(code, message)
|
||
# Emit CORS headers on every response so the browser doesn't block
|
||
# cross-origin requests (e.g. http://127.0.0.1:5500 -> http://localhost/).
|
||
# Echo the request Origin if present, otherwise allow any origin.
|
||
origin = self.headers.get('Origin') if hasattr(self, 'headers') else None
|
||
self.send_header('Access-Control-Allow-Origin', origin or '*')
|
||
self.send_header('Vary', 'Origin')
|
||
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY')
|
||
self.send_header('Access-Control-Allow-Headers', 'Authorization, Content-Type, Depth, Destination, If, If-Match, If-Modified-Since, If-None-Match, If-Range, Lock-Token, Overwrite, Timeout, X-Requested-With')
|
||
self.send_header('Access-Control-Max-Age', '86400')
|
||
|
||
def send_header(self, keyword, value):
|
||
if sys_debug: print(f"<- header: {keyword}: {value}")
|
||
super().send_header(keyword, value)
|
||
|
||
def end_headers(self):
|
||
if sys_debug: print("<- End of headers")
|
||
super().end_headers()
|
||
|
||
def handle_one_request(self):
|
||
super().handle_one_request()
|
||
if sys_debug: print(f'>>>>>> {self.command} <<<<<<')
|
||
|
||
# User Auth
|
||
# if success ,return False;
|
||
# Get WebDav User/Pass file : wdusers.conf
|
||
# file formate: user:pass\n user:pass\n
|
||
def WebAuth(self):
|
||
if sys_debug:
|
||
import inspect
|
||
print(f'## {inspect.stack()[1].function}, PATH: {urllib.parse.unquote(self.path)}')
|
||
print(self.headers)
|
||
|
||
if self.server.auth_enable:
|
||
if 'Authorization' in self.headers:
|
||
try:
|
||
AuthInfo = self.headers['Authorization'][6:].encode('utf-8')
|
||
except:
|
||
AuthInfo = ''
|
||
if AuthInfo in self.server.userpwd:
|
||
return False # Auth success
|
||
self.send_response(401,'Authorization Required')
|
||
self.send_header('WWW-Authenticate', 'Basic realm="WebDav Auth"')
|
||
self.send_header('Content-type', 'text/html')
|
||
self.end_headers()
|
||
return True
|
||
else:
|
||
return False
|
||
|
||
def do_OPTIONS(self):
|
||
# CORS preflight for /proxy-download (called by browser before POST)
|
||
if self.path == '/proxy-download':
|
||
self.send_response(204)
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.send_header('Access-Control-Allow-Methods', 'POST, OPTIONS')
|
||
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
||
self.send_header('Content-Length', '0')
|
||
self.end_headers()
|
||
return
|
||
# CORS preflight for /leet/ and /commoserve/ proxies
|
||
if self.path.startswith('/leet/') or self.path.startswith('/commoserve/'):
|
||
self.send_response(204)
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.send_header('Access-Control-Allow-Methods', 'GET, OPTIONS')
|
||
self.send_header('Access-Control-Allow-Headers', 'Content-Type')
|
||
self.send_header('Content-Length', '0')
|
||
self.end_headers()
|
||
return
|
||
if self.WebAuth():
|
||
return
|
||
self.send_response(200, DAVRequestHandler.server_version)
|
||
self.send_header('Allow', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY')
|
||
self.send_header('Content-length', '0')
|
||
self.send_header('X-Server-Copyright', DAVRequestHandler.server_version)
|
||
self.send_header('DAV', '1, 2') #OSX Finder need Ver 2, if Ver 1 -- read only
|
||
self.send_header('MS-Author-Via', 'DAV')
|
||
self.end_headers()
|
||
|
||
def do_DELETE(self):
|
||
if self.WebAuth():
|
||
return
|
||
path = urllib.parse.unquote(self.path)
|
||
if path == '':
|
||
self.send_error(404, 'Object not found')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
return
|
||
path = self.server.root.rootdir() + path
|
||
if os.path.isfile(path):
|
||
os.remove(path) #delete file
|
||
elif os.path.isdir(path):
|
||
shutil.rmtree(path) #delete dir
|
||
else:
|
||
self.send_response(404,'Not Found')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
return
|
||
self.send_response(204, 'No Content')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
|
||
def do_MKCOL(self):
|
||
if self.WebAuth():
|
||
return
|
||
path = urllib.parse.unquote(self.path)
|
||
if path != '':
|
||
path = self.server.root.rootdir() + path
|
||
if os.path.isdir(path) == False:
|
||
os.mkdir(path)
|
||
self.send_response(201, "Created")
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
return
|
||
self.send_response(403, "OK")
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
|
||
def do_MOVE(self):
|
||
if self.WebAuth():
|
||
return
|
||
oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path)
|
||
newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path
|
||
if (os.path.isfile(oldfile)==True and os.path.isfile(newfile)==False):
|
||
shutil.move(oldfile,newfile)
|
||
if (os.path.isdir(oldfile)==True and os.path.isdir(newfile)==False):
|
||
os.rename(oldfile,newfile)
|
||
self.send_response(201, "Created")
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
|
||
def do_COPY(self):
|
||
if self.WebAuth():
|
||
return
|
||
oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path)
|
||
newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path
|
||
if (os.path.isfile(oldfile)==True): # and os.path.isfile(newfile)==False): copy can rewrite.
|
||
shutil.copyfile(oldfile,newfile)
|
||
self.send_response(201, "Created")
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
|
||
def do_LOCK(self):
|
||
if self.WebAuth():
|
||
return
|
||
if 'Content-length' in self.headers:
|
||
req = self.rfile.read(int(self.headers['Content-length']))
|
||
else:
|
||
req = self.rfile.read()
|
||
d = builddict(req)
|
||
try:
|
||
clientid = str(d['lockinfo']['owner']['href'])[7:] # temp: need Change other method!!!
|
||
except:
|
||
clientid = ''
|
||
lockid = str(uuid.uuid1())
|
||
retstr = '<?xml version="1.0" encoding="utf-8" ?>\n<D:prop xmlns:D="DAV:">\n<D:lockdiscovery>\n<D:activelock>\n<D:locktype><D:write/></D:locktype>\n<D:lockscope><D:exclusive/></D:lockscope>\n<D:depth>Infinity</D:depth>\n<D:owner>\n<D:href>'+clientid+'</D:href>\n</D:owner>\n<D:timeout>Infinite</D:timeout>\n<D:locktoken><D:href>opaquelocktoken:'+lockid+'</D:href></D:locktoken>\n</D:activelock>\n</D:lockdiscovery>\n</D:prop>\n'
|
||
self.send_response(201,'Created')
|
||
self.send_header("Content-type",'text/xml')
|
||
self.send_header("charset",'"utf-8"')
|
||
self.send_header("Lock-Token",'<opaquelocktoken:'+lockid+'>')
|
||
self.send_header('Content-Length',len(retstr))
|
||
w = BufWriter(self.wfile, sys_debug)
|
||
w.write(retstr)
|
||
self.send_header('Content-Length', str(w.getSize()))
|
||
self.end_headers()
|
||
w.flush()
|
||
|
||
def do_UNLOCK(self):
|
||
if self.WebAuth():
|
||
return
|
||
# frome self.headers get Lock-Token:
|
||
self.send_response(204, 'No Content') # unlock using 204 for sucess.
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
|
||
def do_PROPFIND(self):
|
||
if self.WebAuth():
|
||
return
|
||
depth = 'infinity'
|
||
if 'Depth' in self.headers:
|
||
depth = self.headers['Depth'].lower()
|
||
if 'Content-length' in self.headers:
|
||
req = self.rfile.read(int(self.headers['Content-length']))
|
||
else:
|
||
req = self.rfile.read()
|
||
d = builddict(req) # change all http.request to dict stru
|
||
wished_all = False
|
||
if len(d) == 0:
|
||
wished_props = DAVRequestHandler.basic_props
|
||
else:
|
||
if 'allprop' in d['propfind']:
|
||
wished_props = DAVRequestHandler.all_props
|
||
wished_all = True
|
||
else:
|
||
wished_props = []
|
||
for prop in d['propfind']['prop']:
|
||
### 2017/9/7 Edit By LCJ , Old is [ wished_props.append(prop) ]
|
||
### for IOS Coda Webdav support ###
|
||
wished_props.append(prop.split(' ')[0])
|
||
path, elem = self.path_elem()
|
||
if not elem:
|
||
if len(path) >= 1: # it's a non-existing file
|
||
self.send_response(404, 'Not Found')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
return
|
||
else:
|
||
elem = self.server.root # fixup root lookups?
|
||
if depth != '0' and not elem: #or elem.type != Member.M_COLLECTION:
|
||
self.send_response(406, 'This is not allowed')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
return
|
||
self.send_response(207, 'Multi-Status') #Multi-Status
|
||
self.send_header('Content-Type', 'text/xml')
|
||
self.send_header("charset",'"utf-8"')
|
||
w = BufWriter(self.wfile, sys_debug)
|
||
w.write('<?xml version="1.0" encoding="utf-8" ?>\n')
|
||
w.write('<D:multistatus xmlns:D="DAV:" xmlns:Z="urn:schemas-microsoft-com:">\n')
|
||
|
||
def write_props_member(w, m):
|
||
w.write('<D:response>\n<D:href>%s</D:href>\n<D:propstat>\n<D:prop>\n' % urllib.parse.quote(m.virname)) #add urllib.quote for chinese
|
||
props = m.getProperties() # get the file or dir props
|
||
# For OSX Finder : getlastmodified, getcontentlength, resourceType
|
||
if ('quota-available-bytes' in wished_props) or ('quota-used-bytes'in wished_props) or ('quota' in wished_props) or ('quotaused'in wished_props):
|
||
# windows don't support os.statvfs
|
||
try:
|
||
sDisk = os.statvfs('/')
|
||
props['quota-used-bytes'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize
|
||
props['quotaused'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize
|
||
props['quota-available-bytes'] = sDisk.f_bavail * sDisk.f_frsize
|
||
props['quota'] = sDisk.f_bavail * sDisk.f_frsize
|
||
except:
|
||
pass
|
||
for wp in wished_props:
|
||
if (wp in props) == False:
|
||
w.write(' <D:%s/>\n' % wp)
|
||
else:
|
||
val = str(props[wp])
|
||
if wp != 'resourcetype':
|
||
val = val.replace('&', '&').replace('<', '<').replace('>', '>')
|
||
w.write(' <D:%s>%s</D:%s>\n' % (wp, val, wp))
|
||
w.write('</D:prop>\n<D:status>HTTP/1.1 200 OK</D:status>\n</D:propstat>\n</D:response>\n')
|
||
|
||
def write_recursive(m):
|
||
write_props_member(w, m)
|
||
if m.type == Member.M_COLLECTION:
|
||
for child in m.getMembers():
|
||
write_recursive(child)
|
||
|
||
write_props_member(w, elem)
|
||
if depth == '1' and elem.type == Member.M_COLLECTION:
|
||
for m in elem.getMembers():
|
||
write_props_member(w, m)
|
||
elif depth == 'infinity' and elem.type == Member.M_COLLECTION:
|
||
for m in elem.getMembers():
|
||
write_recursive(m)
|
||
w.write('</D:multistatus>')
|
||
self.send_header('Content-Length', str(w.getSize()))
|
||
self.end_headers()
|
||
w.flush()
|
||
|
||
def do_PROPPATCH(self):
|
||
if self.WebAuth():
|
||
return
|
||
if 'Content-length' in self.headers:
|
||
req = self.rfile.read(int(self.headers['Content-length']))
|
||
else:
|
||
req = self.rfile.read()
|
||
d = builddict(req) # change all http.request to dict stru
|
||
self.send_response(207, 'Multi-Status') #Multi-Status
|
||
self.send_header('Content-Type', 'text/xml')
|
||
self.send_header("charset",'"utf-8"')
|
||
retstr = f'<?xml version="1.0" encoding="utf-8" ?><D:multistatus xmlns:D="DAV:" xmlns:Z="urn:schemas-microsoft-com:"><D:response><D:href>{urllib.parse.unquote(self.path)}</D:href><D:propstat><D:status>HTTP/1.1 200 OK</D:status></D:propstat></D:response></D:multistatus>'
|
||
w = BufWriter(self.wfile, sys_debug)
|
||
w.write(retstr)
|
||
self.send_header('Content-Length', str(w.getSize()))
|
||
self.end_headers()
|
||
w.flush()
|
||
|
||
def _ws_recv(self, sock, n):
|
||
data = b''
|
||
while len(data) < n:
|
||
chunk = sock.recv(n - len(data))
|
||
if not chunk:
|
||
raise ConnectionError('closed')
|
||
data += chunk
|
||
return data
|
||
|
||
def ws_process_message(self, message: str) -> str:
|
||
"""Override this to handle incoming WebSocket messages. Return the response to broadcast."""
|
||
return message # echo by default
|
||
|
||
def _ws_broadcast(self, payload: bytes):
|
||
# Thin wrapper kept for binary payloads; text goes through ws_broadcast().
|
||
with _ws_lock:
|
||
clients = set(_ws_clients)
|
||
for sock in clients:
|
||
try:
|
||
self._ws_send(sock, payload)
|
||
except Exception:
|
||
pass
|
||
|
||
def _ws_send(self, sock, payload: bytes):
|
||
length = len(payload)
|
||
if length < 126:
|
||
header = bytes([0x81, length])
|
||
elif length < 65536:
|
||
header = bytes([0x81, 126]) + struct.pack('>H', length)
|
||
else:
|
||
header = bytes([0x81, 127]) + struct.pack('>Q', length)
|
||
sock.send(header + payload)
|
||
|
||
def _handle_websocket(self):
|
||
key = self.headers.get('Sec-WebSocket-Key', '')
|
||
accept = base64.b64encode(
|
||
hashlib.sha1((key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11').encode()).digest()
|
||
).decode()
|
||
self.send_response(101, 'Switching Protocols')
|
||
self.send_header('Upgrade', 'websocket')
|
||
self.send_header('Connection', 'Upgrade')
|
||
self.send_header('Sec-WebSocket-Accept', accept)
|
||
self.end_headers()
|
||
client = f'{self.client_address[0]}:{self.client_address[1]}'
|
||
sock = self.connection
|
||
sock.settimeout(None)
|
||
with _ws_lock:
|
||
_ws_clients.add(sock)
|
||
_log('WS', f'{client} connected (clients: {len(_ws_clients)})')
|
||
try:
|
||
while True:
|
||
b0, b1 = self._ws_recv(sock, 2)
|
||
opcode = b0 & 0x0F
|
||
masked = bool(b1 & 0x80)
|
||
length = b1 & 0x7F
|
||
if length == 126:
|
||
length = struct.unpack('>H', self._ws_recv(sock, 2))[0]
|
||
elif length == 127:
|
||
length = struct.unpack('>Q', self._ws_recv(sock, 8))[0]
|
||
mask = self._ws_recv(sock, 4) if masked else b''
|
||
payload = self._ws_recv(sock, length)
|
||
if masked:
|
||
payload = bytes(b ^ mask[i % 4] for i, b in enumerate(payload))
|
||
if opcode == 0x8: # close
|
||
_log('WS', f'{client} close frame')
|
||
sock.send(b'\x88\x00')
|
||
break
|
||
elif opcode == 0x9: # ping → pong
|
||
sock.send(bytes([0x8A, len(payload)]) + payload)
|
||
elif opcode in (0x1, 0x2): # text or binary
|
||
message = payload.decode('utf-8', errors='replace')
|
||
_log('WS', f'{client} recv: {message}')
|
||
response = self.ws_process_message(message)
|
||
if response is not None:
|
||
ws_broadcast(response)
|
||
except Exception:
|
||
pass
|
||
finally:
|
||
with _ws_lock:
|
||
_ws_clients.discard(sock)
|
||
_log('WS', f'{client} disconnected (clients: {len(_ws_clients)})')
|
||
|
||
# ── Leet API proxy ─────────────────────────────────────────────────────────
|
||
# Browsers cannot override the User-Agent header (Fetch spec §forbidden
|
||
# header names). Assembly64 returns HTTP 463 for non-matching UAs.
|
||
# commoserve.files.commodore.net also lacks CORS headers on AQL endpoints.
|
||
#
|
||
# Routes:
|
||
# GET /leet/* → https://hackerswithstyle.se/leet/*
|
||
# GET /commoserve/* → https://commoserve.files.commodore.net/leet/*
|
||
|
||
def _proxy_leet(self, target: str):
|
||
try:
|
||
req = urllib.request.Request(target, headers={
|
||
'User-Agent': 'Assembly Query',
|
||
'Client-Id': 'Ultimate',
|
||
'Accept': 'application/json',
|
||
})
|
||
with urllib.request.urlopen(req, timeout=15) as resp:
|
||
body = resp.read()
|
||
ct = resp.headers.get('Content-Type', 'application/octet-stream')
|
||
self.send_response(200)
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.send_header('Content-Type', ct)
|
||
self.send_header('Content-Length', str(len(body)))
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
_log('LEET', f'{self.path} → {len(body)} bytes')
|
||
except Exception as e:
|
||
_log('LEET', f'Proxy error: {self.path} → {e}')
|
||
body = json.dumps({'error': str(e)}).encode('utf-8')
|
||
self.send_response(502)
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.send_header('Content-Type', 'application/json')
|
||
self.send_header('Content-Length', str(len(body)))
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
def do_GET(self, onlyhead=False):
|
||
if (self.path == '/ws'
|
||
and self.headers.get('Upgrade', '').lower() == 'websocket'):
|
||
self._handle_websocket()
|
||
return
|
||
if self.path.startswith('/leet/'):
|
||
self._proxy_leet('https://hackerswithstyle.se' + self.path)
|
||
return
|
||
if self.path.startswith('/commoserve/'):
|
||
# /commoserve/search/categories → /leet/search/categories on commoserve server
|
||
leet_path = '/leet' + self.path[len('/commoserve'):]
|
||
self._proxy_leet('https://commoserve.files.commodore.net' + leet_path)
|
||
return
|
||
if self.WebAuth():
|
||
return
|
||
path, elem = self.path_elem()
|
||
if not elem:
|
||
self.send_error(404, 'Object not found')
|
||
return
|
||
try:
|
||
props = elem.getProperties()
|
||
except:
|
||
self.send_response(500, "Error retrieving properties")
|
||
self.end_headers()
|
||
return
|
||
# Collections don't carry a content length — render a directory
|
||
# listing instead of treating them like a file.
|
||
if elem.type == Member.M_COLLECTION:
|
||
ctype = props.get('getcontenttype') or DirCollection.COLLECTION_MIME_TYPE
|
||
self.send_response(200, 'OK')
|
||
self.send_header("Content-type", ctype)
|
||
if 'getlastmodified' in props:
|
||
self.send_header("Last-modified", props['getlastmodified'])
|
||
self.end_headers()
|
||
if not onlyhead:
|
||
elem.sendData(self.wfile)
|
||
return
|
||
# when the client had Range: bytes=3156-3681
|
||
bpoint = 0
|
||
epoint = 0
|
||
fullen = props.get('getcontentlength', 0)
|
||
if 'Range' in self.headers:
|
||
stmp = self.headers['Range'][6:]
|
||
stmp = stmp.split('-')
|
||
try:
|
||
bpoint = int(stmp[0])
|
||
except:
|
||
bpoint = 0
|
||
try:
|
||
epoint = int(stmp[1])
|
||
except:
|
||
epoint = fullen - 1
|
||
if (epoint<=bpoint):
|
||
bpoint = 0
|
||
epoint = fullen - 1
|
||
fullen = epoint - bpoint + 1
|
||
if epoint>0:
|
||
self.send_response(206, 'Partial Content')
|
||
self.send_header("Content-Range", " Bytes %s-%s/%s" % (bpoint, epoint, fullen))
|
||
else:
|
||
self.send_response(200, 'OK')
|
||
self.send_header("Content-type", props.get('getcontenttype', 'application/octet-stream'))
|
||
if 'getlastmodified' in props:
|
||
self.send_header("Last-modified", props['getlastmodified'])
|
||
self.send_header("Content-length", fullen)
|
||
self.end_headers()
|
||
if not onlyhead:
|
||
if fullen >0 : # all 0 size file don't need this
|
||
elem.sendData(self.wfile,bpoint,epoint)
|
||
|
||
def do_HEAD(self):
|
||
self.do_GET(True) # HEAD should behave like GET, only without contents
|
||
|
||
def do_PUT(self):
|
||
if self.WebAuth():
|
||
return
|
||
try:
|
||
if 'Content-length' in self.headers:
|
||
size = int(self.headers['Content-length'])
|
||
elif 'Transfer-Encoding' in self.headers:
|
||
if self.headers['Transfer-Encoding'].lower()=='chunked':
|
||
size = -2
|
||
else:
|
||
size = -1
|
||
path, elem = self.path_elem_prev()
|
||
ename = path[-1]
|
||
if elem is None and len(path) > 1:
|
||
# Parent directory doesn't exist — create it and re-resolve
|
||
parent_parts = [p.rstrip('/') for p in path[:-1]]
|
||
parent_fs = os.path.join(self.server.root.fsname, *parent_parts)
|
||
os.makedirs(parent_fs, exist_ok=True)
|
||
path, elem = self.path_elem_prev()
|
||
except:
|
||
self.send_response(400, 'Cannot parse request')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
return
|
||
# for OSX finder, it's first send a 0 byte file,and you need response a 201 code,and then osx send real file.
|
||
# OSX finder don't using content-length.
|
||
if ename == '.DS_Store':
|
||
self.send_response(403, 'Forbidden')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
else:
|
||
try:
|
||
elem.recvMember(self.rfile, ename, size, self)
|
||
except:
|
||
self.send_response(500, 'Cannot save file')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
return
|
||
# sucess return
|
||
self.send_response(201, 'Created')
|
||
self.send_header('Content-length', '0')
|
||
self.end_headers()
|
||
|
||
# ── Proxy download ─────────────────────────────────────────────────────────
|
||
# The browser cannot fetch third-party binary URLs that return duplicate
|
||
# Access-Control-Allow-Origin headers (e.g. Assembly64 /search/bin/).
|
||
# POST /proxy-download { "url": "...", "dest": "/sd/downloads/file.d64",
|
||
# "headers": { "Client-Id": "Ultimate", ... } }
|
||
# The server fetches the URL with Python (no CORS restrictions) and saves
|
||
# the binary to the filesystem under the WebDAV root.
|
||
|
||
def _json_response(self, status: int, data: dict):
|
||
body = json.dumps(data).encode('utf-8')
|
||
self.send_response(status)
|
||
self.send_header('Content-Type', 'application/json')
|
||
self.send_header('Content-Length', str(len(body)))
|
||
self.send_header('Access-Control-Allow-Origin', '*')
|
||
self.end_headers()
|
||
self.wfile.write(body)
|
||
|
||
def do_POST(self):
|
||
if self.path != '/proxy-download':
|
||
self.send_error(404, 'Not found')
|
||
return
|
||
try:
|
||
size = int(self.headers.get('Content-Length', 0))
|
||
body = self.rfile.read(size)
|
||
req_data = json.loads(body)
|
||
url = req_data['url']
|
||
dest = req_data['dest']
|
||
extra_headers = req_data.get('headers', {})
|
||
except Exception as e:
|
||
self._json_response(400, {'error': f'Bad request: {e}'})
|
||
return
|
||
# Map virtual dest path (/sd/downloads/foo.d64) to filesystem path
|
||
vpath = urllib.parse.unquote(dest).lstrip('/')
|
||
if '..' in vpath.split('/'):
|
||
self._json_response(400, {'error': 'Invalid path'})
|
||
return
|
||
fspath = os.path.join(self.server.root.fsname, *vpath.split('/'))
|
||
# Fetch the URL server-side (no browser CORS restrictions apply here)
|
||
try:
|
||
req = urllib.request.Request(url, headers=extra_headers)
|
||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||
data = resp.read()
|
||
except Exception as e:
|
||
_log('PROXY', f'Fetch failed: {url} → {e}')
|
||
self._json_response(502, {'error': f'Fetch failed: {e}'})
|
||
return
|
||
# Save to filesystem, creating directories as needed
|
||
try:
|
||
os.makedirs(os.path.dirname(fspath), exist_ok=True)
|
||
with open(fspath, 'wb') as f:
|
||
f.write(data)
|
||
except Exception as e:
|
||
_log('PROXY', f'Save failed: {fspath} → {e}')
|
||
self._json_response(500, {'error': f'Save failed: {e}'})
|
||
return
|
||
_log('PROXY', f'{url} → {fspath} ({len(data)} bytes)')
|
||
self._json_response(200, {'ok': True, 'bytes': len(data)})
|
||
|
||
def split_path(self, path):
|
||
"""Splits path string in form '/dir1/dir2/file' into parts"""
|
||
p = path.split('/')[1:]
|
||
while p and p[-1] in ('','/'):
|
||
p = p[:-1]
|
||
if len(p) > 0:
|
||
p[-1] += '/'
|
||
return p
|
||
|
||
def path_elem(self):
|
||
"""Returns split path (see split_path()) and Member object of the last element"""
|
||
path = self.split_path(urllib.parse.unquote(self.path))
|
||
elem = self.server.root
|
||
for e in path:
|
||
elem = elem.findMember(e)
|
||
if elem == None:
|
||
break
|
||
return (path, elem)
|
||
|
||
def path_elem_prev(self):
|
||
"""Returns split path (see split_path()) and Member object of the next-to-last element"""
|
||
path = self.split_path(urllib.parse.unquote(self.path))
|
||
elem = self.server.root
|
||
for e in path[:-1]:
|
||
elem = elem.findMember(e)
|
||
if elem == None:
|
||
break
|
||
return (path, elem)
|
||
|
||
def log_message(self, format, *args):
|
||
# args: requestline, status_code, content_length (from log_request)
|
||
try:
|
||
parts = args[0].split() # e.g. ['GET', '/path', 'HTTP/1.1']
|
||
method = parts[0]
|
||
path = urllib.parse.unquote(parts[1]) if len(parts) > 1 else '?'
|
||
status = args[1]
|
||
_log('DAV', f'{method} {path} → {status}')
|
||
except Exception:
|
||
_log('DAV', format % args) # fallback for unexpected call shapes
|
||
|
||
class BufWriter:
|
||
def __init__(self, w, debug=False):
|
||
self.w = w
|
||
self.parts = []
|
||
self._size = 0
|
||
self.debug = debug
|
||
|
||
def write(self, s):
|
||
b = s.encode('utf-8')
|
||
self.parts.append(b)
|
||
self._size += len(b)
|
||
|
||
def flush(self):
|
||
data = b''.join(self.parts)
|
||
if self.debug: print('<- XML:', data.decode('utf-8', errors='replace'))
|
||
self.w.write(data)
|
||
self.w.flush()
|
||
|
||
def getSize(self):
|
||
return self._size
|
||
|
||
class DAVServer(ThreadingMixIn, HTTPServer):
|
||
def __init__(self, addr, handler, root, userpwd):
|
||
HTTPServer.__init__(self, addr, handler)
|
||
self.root = root
|
||
self.userpwd = userpwd # WebDav Auth user:passwd
|
||
if len(userpwd)>0:
|
||
self.auth_enable = True
|
||
else:
|
||
self.auth_enable = False
|
||
|
||
# disable the broken pipe error message
|
||
def finish_request(self,request,client_address):
|
||
try:
|
||
HTTPServer.finish_request(self, request, client_address)
|
||
except socket.error as e:
|
||
pass
|
||
|
||
if __name__ == '__main__':
|
||
# WebDav TCP Port
|
||
srvport = 80
|
||
# Get local IP address
|
||
myaddr = get_localip()
|
||
print(f'# WebDav Server run at http://{myaddr}:{srvport}')
|
||
server_address = ('', srvport)
|
||
# WebDav Auth User/Password file
|
||
# if not this file ,the auth function disable.
|
||
# file format: user:passwd\n user:passwd\n
|
||
# or you can change your auth mode and file save format
|
||
userpwd = []
|
||
try:
|
||
f = open('wdusers.conf', 'r')
|
||
for uinfo in f.readlines():
|
||
uinfo = uinfo.strip()
|
||
if len(uinfo) > 4:
|
||
userpwd.append(base64.b64encode(uinfo.encode('utf-8')))
|
||
except:
|
||
pass
|
||
# first is Server root dir, Second is virtual dir
|
||
# **** Change First ./ to your dir , etc :/mnt/flash/public, d:/share_file/
|
||
root = DirCollection('files/', '/')
|
||
httpd = DAVServer(server_address, DAVRequestHandler, root, userpwd)
|
||
repl = threading.Thread(target=_repl_thread, daemon=True, name='ws-repl')
|
||
repl.start()
|
||
try:
|
||
httpd.serve_forever()
|
||
except:
|
||
_log('SRV', 'stopped') |