Compare commits
No commits in common. "79d92dc89df39e3d9b830a53abf4115011c36e4b" and "788ec62bb2b8e678cf18387b075d66e10b3135db" have entirely different histories.
79d92dc89d
...
788ec62bb2
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -1,5 +1,4 @@
|
|||
?archives/*
|
||||
dist/*
|
||||
files/*
|
||||
node_modules/*
|
||||
package-lock.json
|
||||
|
|
|
|||
5580
package-lock.json
generated
Normal file
5580
package-lock.json
generated
Normal file
File diff suppressed because it is too large
Load Diff
|
|
@ -62,8 +62,7 @@
|
|||
"sonner": "2.0.3",
|
||||
"tailwind-merge": "3.2.0",
|
||||
"tw-animate-css": "1.3.8",
|
||||
"vaul": "1.1.2",
|
||||
"webdav": "^5.10.0"
|
||||
"vaul": "1.1.2"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@tailwindcss/vite": "4.1.12",
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
|
|
@ -1,97 +0,0 @@
|
|||
import { useState } from 'react';
|
||||
import { Type } from 'lucide-react';
|
||||
|
||||
export type DirectoryFont = 'C64_Pro_Mono' | 'CbmShift';
|
||||
|
||||
const FONTS: { id: DirectoryFont; label: string }[] = [
|
||||
{ id: 'C64_Pro_Mono', label: 'C64 Pro' },
|
||||
{ id: 'CbmShift', label: 'CBM Shift' },
|
||||
];
|
||||
|
||||
export interface DirectoryEntry {
|
||||
blocks: number;
|
||||
name: string;
|
||||
type: string;
|
||||
}
|
||||
|
||||
interface DirectoryListingProps {
|
||||
entries: DirectoryEntry[];
|
||||
footerNote?: string;
|
||||
}
|
||||
|
||||
const TYPE_BADGE: Record<string, string> = {
|
||||
PRG: 'bg-blue-100 text-blue-700',
|
||||
SEQ: 'bg-green-100 text-green-700',
|
||||
DEL: 'bg-neutral-200 text-neutral-500',
|
||||
REL: 'bg-purple-100 text-purple-700',
|
||||
USR: 'bg-orange-100 text-orange-700',
|
||||
};
|
||||
|
||||
export default function DirectoryListing({ entries, footerNote }: DirectoryListingProps) {
|
||||
const [font, setFont] = useState<DirectoryFont>('C64_Pro_Mono');
|
||||
|
||||
return (
|
||||
<div className="flex flex-col h-full" style={{ fontFamily: `'${font}', monospace` }}>
|
||||
{/* Font toggle bar */}
|
||||
<div className="flex items-center justify-end gap-2 px-4 py-2 border-b border-neutral-200 bg-neutral-50">
|
||||
<Type className="w-4 h-4 text-neutral-500" />
|
||||
<span className="text-xs text-neutral-500">Font:</span>
|
||||
<div className="inline-flex rounded-md border border-neutral-300 bg-white overflow-hidden">
|
||||
{FONTS.map((f, idx) => (
|
||||
<button
|
||||
key={f.id}
|
||||
onClick={() => setFont(f.id)}
|
||||
className={`px-3 py-1 text-xs ${
|
||||
font === f.id
|
||||
? 'bg-blue-600 text-white'
|
||||
: 'bg-white text-neutral-700 hover:bg-neutral-100'
|
||||
} ${idx > 0 ? 'border-l border-neutral-300' : ''}`}
|
||||
aria-pressed={font === f.id}
|
||||
>
|
||||
{f.label}
|
||||
</button>
|
||||
))}
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{/* Listing */}
|
||||
<div className="flex-1 overflow-auto text-sm">
|
||||
{entries.length === 0 ? (
|
||||
<div className="p-8 text-center text-neutral-500">
|
||||
Empty directory
|
||||
</div>
|
||||
) : (
|
||||
entries.map((entry, idx) => (
|
||||
<div
|
||||
key={idx}
|
||||
className="px-4 py-2 border-b border-neutral-100 flex items-center hover:bg-neutral-50"
|
||||
>
|
||||
<span className="inline-block w-16 text-neutral-700 flex-shrink-0">
|
||||
{String(entry.blocks).padStart(3, ' ')}
|
||||
</span>
|
||||
<span className="inline-block w-40 truncate text-neutral-900 flex-shrink-0">
|
||||
{entry.name}
|
||||
</span>
|
||||
<span className="inline-block w-16 flex-shrink-0">
|
||||
<span
|
||||
className={`px-1.5 py-0.5 rounded text-xs font-medium ${
|
||||
TYPE_BADGE[entry.type] || 'bg-neutral-200 text-neutral-700'
|
||||
}`}
|
||||
style={{ fontFamily: 'system-ui, sans-serif' }}
|
||||
>
|
||||
{entry.type}
|
||||
</span>
|
||||
</span>
|
||||
</div>
|
||||
))
|
||||
)}
|
||||
</div>
|
||||
|
||||
{footerNote && (
|
||||
<div className="px-4 py-3 text-xs text-neutral-500 border-t border-neutral-200">
|
||||
{footerNote}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
|
@ -3,7 +3,7 @@ import { HardDrive, Activity, Wifi, Signal, Clock, RefreshCw, FolderOpen, Map }
|
|||
import DeviceDetailOverlay from './DeviceDetailOverlay';
|
||||
import { ImageWithFallback } from './figma/ImageWithFallback';
|
||||
import { Dialog, DialogContent, DialogTitle, DialogDescription } from './ui/dialog';
|
||||
import DirectoryListing from './DirectoryListing';
|
||||
import FileBrowser from './FileBrowser';
|
||||
|
||||
interface StatusPageProps {
|
||||
config: any;
|
||||
|
|
@ -74,12 +74,7 @@ export default function StatusPage({ config, setConfig }: StatusPageProps) {
|
|||
className="bg-white border border-neutral-200 rounded-lg p-4 relative"
|
||||
>
|
||||
<div className="flex items-center justify-between mb-4">
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => setShowDeviceOverlay(true)}
|
||||
className="flex items-center gap-3 text-left rounded-lg p-1 -m-1 hover:bg-neutral-50 transition cursor-pointer"
|
||||
aria-label={`Open details for Device #${activeDevice.number}`}
|
||||
>
|
||||
<div className="flex items-center gap-3">
|
||||
<div className="w-10 h-10 bg-blue-100 rounded-full flex items-center justify-center">
|
||||
<HardDrive className="w-5 h-5 text-blue-600" />
|
||||
</div>
|
||||
|
|
@ -87,7 +82,7 @@ export default function StatusPage({ config, setConfig }: StatusPageProps) {
|
|||
<div className="font-medium">Device #{activeDevice.number}</div>
|
||||
<div className="text-sm text-neutral-500">{activeDevice.url}</div>
|
||||
</div>
|
||||
</button>
|
||||
</div>
|
||||
<div className="w-2 h-2 rounded-full bg-green-500" />
|
||||
</div>
|
||||
|
||||
|
|
@ -209,66 +204,13 @@ export default function StatusPage({ config, setConfig }: StatusPageProps) {
|
|||
{showDirectory && (
|
||||
<div className="fixed inset-0 z-50 flex items-center justify-center">
|
||||
<div className="absolute inset-0 bg-black/40 backdrop-blur-md" onClick={() => setShowDirectory(false)} />
|
||||
<div className="relative w-full h-full bg-white/90 shadow-2xl overflow-auto flex flex-col" onClick={(e) => e.stopPropagation()}>
|
||||
<div className="relative w-full h-full max-w-2xl sm:rounded-xl bg-white/90 shadow-2xl overflow-auto flex flex-col mx-0 sm:mx-auto my-0 sm:my-20 p-0 sm:p-0" style={{ maxHeight: '100dvh' }}>
|
||||
<div className="flex items-center justify-between p-4 border-b">
|
||||
<div className="min-w-0">
|
||||
<h2 className="text-xl font-medium">Directory</h2>
|
||||
<div className="text-xs text-neutral-500 truncate mt-0.5">
|
||||
Device #{activeDevice.number} • {activeDevice.url ? activeDevice.url.split('/').pop() : 'No file mounted'}
|
||||
</div>
|
||||
</div>
|
||||
<h2 className="text-xl font-medium">Directory</h2>
|
||||
<button onClick={() => setShowDirectory(false)} className="p-2 -m-2 hover:bg-neutral-100 rounded-lg"><svg xmlns='http://www.w3.org/2000/svg' className='w-6 h-6' fill='none' viewBox='0 0 24 24' stroke='currentColor'><path strokeLinecap='round' strokeLinejoin='round' strokeWidth={2} d='M6 18L18 6M6 6l12 12' /></svg></button>
|
||||
</div>
|
||||
<div className="flex-1 overflow-auto flex flex-col">
|
||||
{(() => {
|
||||
// Derive a directory listing for the currently mounted file.
|
||||
// In a real device this would come from reading the disk's
|
||||
// BAM/ directory sectors. Here we synthesize a plausible
|
||||
// listing based on the mounted file's name.
|
||||
const fileName = activeDevice.url ? activeDevice.url.split('/').pop() : '';
|
||||
if (!fileName) {
|
||||
return (
|
||||
<div className="p-8 text-center text-neutral-500 text-sm">
|
||||
No file mounted on this device.
|
||||
</div>
|
||||
);
|
||||
}
|
||||
|
||||
// Mock directory entries (C64-style: blocks, name, type)
|
||||
const mockEntries = [
|
||||
{ blocks: 42, name: 'PAC-MAN', type: 'PRG' },
|
||||
{ blocks: 38, name: 'GALAGA', type: 'PRG' },
|
||||
{ blocks: 21, name: 'HISCORE', type: 'SEQ' },
|
||||
{ blocks: 12, name: 'LOADER', type: 'PRG' },
|
||||
{ blocks: 5, name: 'TITLE-SCREEN', type: 'SEQ' },
|
||||
{ blocks: 3, name: 'CONFIG', type: 'SEQ' },
|
||||
{ blocks: 1, name: 'PARTICLES', type: 'PRG' },
|
||||
];
|
||||
|
||||
const totalBlocks = 664; // standard D64 capacity
|
||||
const usedBlocks = mockEntries.reduce((sum, e) => sum + e.blocks, 0);
|
||||
const freeBlocks = totalBlocks - usedBlocks;
|
||||
|
||||
return (
|
||||
<>
|
||||
<div
|
||||
className="px-4 py-2 bg-neutral-100 text-xs text-neutral-600 border-b border-neutral-200 flex items-center"
|
||||
style={{ fontFamily: "'C64_Pro_Mono', monospace" }}
|
||||
>
|
||||
<span className="inline-block w-16">BLOCKS</span>
|
||||
<span className="inline-block w-40">NAME</span>
|
||||
<span className="inline-block w-16">TYPE</span>
|
||||
<span className="ml-auto">{usedBlocks} BLOCKS USED · {freeBlocks} FREE</span>
|
||||
</div>
|
||||
<div className="flex-1 overflow-auto">
|
||||
<DirectoryListing
|
||||
entries={mockEntries}
|
||||
footerNote={`${mockEntries.length} FILES · ${fileName}`}
|
||||
/>
|
||||
</div>
|
||||
</>
|
||||
);
|
||||
})()}
|
||||
<div className="flex-1 overflow-auto p-4">
|
||||
<FileBrowser currentPath={activeDevice.url ? activeDevice.url.replace(/\/[^/]+$/, '') : '/'} onSelect={() => {}} onClose={() => setShowDirectory(false)} />
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
|
|
|||
|
|
@ -1,15 +0,0 @@
|
|||
@font-face {
|
||||
font-family: 'C64_Pro_Mono';
|
||||
src: url('/fonts/C64_Pro_Mono-STYLE.woff2') format('woff2');
|
||||
font-weight: normal;
|
||||
font-style: normal;
|
||||
font-display: swap;
|
||||
}
|
||||
|
||||
@font-face {
|
||||
font-family: 'CbmShift';
|
||||
src: url('/fonts/CbmShift-STYLE.woff2') format('woff2');
|
||||
font-weight: normal;
|
||||
font-style: normal;
|
||||
font-display: swap;
|
||||
}
|
||||
871
webdav3.py
871
webdav3.py
|
|
@ -1,871 +0,0 @@
|
|||
# _*_ coding:utf-8 _*_
|
||||
|
||||
# Tiny WebDav Server for Pythonista - IOS. (Base on pandav WebDav server )
|
||||
#
|
||||
# (C)2013/11 By: Lai ChuJiang
|
||||
#
|
||||
# this code can run not just Pythonista on IOS ,also run on OSX python.
|
||||
# Support Client: Windows / OSX / other Webdav client for IOS,etc : FE File explorer / goodreader / iWorks for ios
|
||||
#
|
||||
# 2024/6/1 Change Log:
|
||||
# Python3 support
|
||||
# add do_PROPPATCH function, now Windows Explorer full support. (!! windows explorer limit filesize < 50MBytes, you can using windows regedit edit it !!)
|
||||
# support show webdav request/response info. ( sys_debug = True )
|
||||
# bug fix
|
||||
#
|
||||
# 20170907 Change Log:
|
||||
# 1. Fix Coda WebDav access problem.(because Coda WebDav all xml item add xmlns="DAV:".)
|
||||
#
|
||||
# 2013/11 Change Log:
|
||||
# 1. Combind all files to one file,so can using for Pythonista easy.
|
||||
# 2. Add MKCOL(Create dir); MOVE(rename file); DELETE(delete file or dir); COPY (Copy file)
|
||||
# 3. Change some decode, Now it's can support Chinese.
|
||||
# 4. Pythonista(For IOS) not dircache module ,so change code, don't using this module.
|
||||
# 5. change DAV version from 1 to 2, so the OSX finder can write.
|
||||
# 6. for DAV version 2 support , add LOCK / UNLOCK fake support. (not real lock file)
|
||||
# *** !!!! So Don't using > 1 client sametime write or delete same file. maybe lost files.
|
||||
# 7. Change the do_PROPFIND module, now it's simply & right for OSX
|
||||
# 8. Change the do_GET module, now support RANGE
|
||||
# 9. Change the do_PUT module, add Content-Length=0 support (create empty file) ,so the OSX Finder support.
|
||||
# * if not add empty file, the Finder copy files and then delete all this.
|
||||
#10. Add WebDav Basic Auth function,now you can set user & passwd
|
||||
# ** using wdusers.conf file (just user:passwd), if not this file ,the Auth disable.
|
||||
#11. Fix the broken pipe error message
|
||||
#
|
||||
# WebDav RFC: http://www.ics.uci.edu/~ejw/authoring/protocol/rfc2518.html
|
||||
# http://restpatterns.org/HTTP_Methods
|
||||
#
|
||||
# Base : pandav v0.2
|
||||
# Copyright (c) 2005.-2006. Ivan Voras <ivoras@gmail.com>
|
||||
# Released under the Artistic License
|
||||
#
|
||||
from http.server import HTTPServer, BaseHTTPRequestHandler
|
||||
from socketserver import ThreadingMixIn
|
||||
import urllib.request, urllib.parse, urllib.error, urllib.parse
|
||||
from time import timezone, strftime, localtime, gmtime
|
||||
import os, sys, re, shutil, uuid, hashlib, mimetypes, base64, socket
|
||||
|
||||
# Debug message ( True / False )
|
||||
sys_debug = False
|
||||
|
||||
# get localhost IP address
|
||||
def get_localip():
|
||||
try:
|
||||
gAdd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
gAdd.connect(('www.bing.com', 80))
|
||||
return gAdd.getsockname()[0]
|
||||
except:
|
||||
return '127.0.0.1'
|
||||
|
||||
class Member:
|
||||
M_MEMBER = 1
|
||||
M_COLLECTION = 2
|
||||
def getProperties(self):
|
||||
return {}
|
||||
|
||||
class Collection(Member):
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def getMembers(self):
|
||||
return []
|
||||
|
||||
class FileMember(Member):
|
||||
def __init__(self, name, parent):
|
||||
self.name = name
|
||||
self.parent = parent
|
||||
self.name = name
|
||||
self.fsname = parent.fsname + name # e.g. '/var/www/mysite/some.txt'
|
||||
self.virname = parent.virname + name # e.g. '/mysite/some.txt'
|
||||
self.type = Member.M_MEMBER
|
||||
|
||||
def __str__(self):
|
||||
return "%s -> %s" % (self.virname, self.fsname)
|
||||
|
||||
def getProperties(self):
|
||||
"""Return dictionary with WebDAV properties. Values shold be
|
||||
formatted according to the WebDAV specs."""
|
||||
st = os.stat(self.fsname)
|
||||
p = {}
|
||||
p['creationdate'] = unixdate2iso8601(st.st_ctime)
|
||||
p['getlastmodified'] = unixdate2httpdate(st.st_mtime)
|
||||
p['displayname'] = self.name
|
||||
# p['getetag'] = md5.new(self.fsname).hexdigest()
|
||||
m = hashlib.md5()
|
||||
m.update(self.fsname.encode('utf-8'))
|
||||
p['getetag'] = m.hexdigest()
|
||||
if self.type == Member.M_MEMBER:
|
||||
p['getcontentlength'] = st.st_size
|
||||
p['getcontenttype'], z = mimetypes.guess_type(self.name)
|
||||
p['getcontentlanguage'] = None
|
||||
else: # Member.M_COLLECTION
|
||||
p['resourcetype'] = '<D:collection/>'
|
||||
if self.name[0] == ".":
|
||||
p['ishidden'] = 1
|
||||
if not os.access(self.fsname, os.W_OK):
|
||||
p['isreadonly'] = 1
|
||||
if self.name == '/':
|
||||
p['isroot'] = 1
|
||||
return p
|
||||
|
||||
def sendData(self, wfile,bpoint=0,epoint=0):
|
||||
"""Send the file to the client. Literally."""
|
||||
st = os.stat(self.fsname)
|
||||
f = open(self.fsname, 'rb')
|
||||
writ = 0
|
||||
# for send Range xxx-xxx
|
||||
if bpoint>0 and bpoint<st.st_size:
|
||||
f.seek(bpoint)
|
||||
if epoint>bpoint:
|
||||
if epoint<=st.st_size:
|
||||
rsize = epoint - bpoint + 1
|
||||
else:
|
||||
rsize = st.st_size - bpoint
|
||||
else:
|
||||
rsize = st.st_size
|
||||
while writ < rsize:
|
||||
if (rsize - writ) < 65536:
|
||||
buf = f.read(rsize)
|
||||
else:
|
||||
buf = f.read(65536)
|
||||
if not buf:
|
||||
break
|
||||
writ += len(buf)
|
||||
wfile.write(buf)
|
||||
f.close()
|
||||
|
||||
def findMember(self, name):
|
||||
''' 2024/6/1 add
|
||||
'''
|
||||
return None
|
||||
|
||||
class DirCollection(FileMember, Collection):
|
||||
COLLECTION_MIME_TYPE = 'httpd/unix-directory' # application/x-collection ?
|
||||
def __init__(self, fsdir, virdir, parent=None):
|
||||
if not os.path.exists(fsdir):
|
||||
raise "Local directory (fsdir) not found: " + fsdir
|
||||
self.fsname = fsdir
|
||||
self.name = virdir
|
||||
if self.fsname[-1] != os.sep:
|
||||
if self.fsname[-1] == '/': # fixup win/dos/mac separators
|
||||
self.fsname = self.fsname[:-1] + os.sep
|
||||
else:
|
||||
self.fsname += os.sep
|
||||
self.virname = virdir
|
||||
if self.virname[-1] != '/':
|
||||
self.virname += '/'
|
||||
self.parent = parent
|
||||
self.type = Member.M_COLLECTION
|
||||
|
||||
def getProperties(self):
|
||||
p = FileMember.getProperties(self) # inherit file properties
|
||||
p['iscollection'] = 1
|
||||
p['getcontenttype'] = DirCollection.COLLECTION_MIME_TYPE
|
||||
return p
|
||||
|
||||
def getMembers(self):
|
||||
"""Get immediate members of this collection."""
|
||||
l = os.listdir(self.fsname) # obtain a copy of dirlist
|
||||
tcount=0
|
||||
for tmpi in l:
|
||||
if os.path.isfile(self.fsname+tmpi) == False:
|
||||
l[tcount]=l[tcount]+'/'
|
||||
tcount += 1
|
||||
r = []
|
||||
for f in l:
|
||||
if f[-1] != '/':
|
||||
m = FileMember(f, self) # Member is a file
|
||||
else:
|
||||
m = DirCollection(self.fsname + f, self.virname + f, self) # Member is a collection
|
||||
r.append(m)
|
||||
return r
|
||||
|
||||
# Return WebDav Root Dir info
|
||||
def rootdir(self):
|
||||
return self.fsname
|
||||
|
||||
def findMember(self, name):
|
||||
"""Search for a particular member."""
|
||||
l = os.listdir(self.fsname) # obtain a copy of dirlist
|
||||
tcount=0
|
||||
for tmpi in l:
|
||||
if os.path.isfile(self.fsname+tmpi) == False:
|
||||
l[tcount]=l[tcount]+'/'
|
||||
tcount += 1
|
||||
if name in l:
|
||||
if name[-1] != '/':
|
||||
return FileMember(name, self)
|
||||
else:
|
||||
return DirCollection(self.fsname + name, self.virname + name, self)
|
||||
elif name[-1] != '/':
|
||||
name += '/'
|
||||
if name in l:
|
||||
return DirCollection(self.fsname + name, self.virname + name, self)
|
||||
|
||||
def sendData(self, wfile):
|
||||
"""Send "file" to the client. Since this is a directory, build some arbitrary HTML."""
|
||||
memb = self.getMembers()
|
||||
def esc(s):
|
||||
return (str(s)
|
||||
.replace('&', '&')
|
||||
.replace('<', '<')
|
||||
.replace('>', '>')
|
||||
.replace('"', '"'))
|
||||
data = '<html><head><title>%s</title></head><body>' % esc(self.virname)
|
||||
data += '<table><tr><th>Name</th><th>Size</th><th>Timestamp</th></tr>'
|
||||
for m in memb:
|
||||
p = m.getProperties()
|
||||
if 'getcontentlength' in p:
|
||||
p['size'] = int(p['getcontentlength'])
|
||||
p['timestamp'] = p['getlastmodified']
|
||||
else:
|
||||
p['size'] = 0
|
||||
p['timestamp'] = '-DIR-'
|
||||
data += '<tr><td>%s</td><td>%d</td><td>%s</td></tr>' % (esc(p['displayname']), p['size'], esc(p['timestamp']))
|
||||
data += '</table></body></html>'
|
||||
wfile.write(data.encode('utf-8'))
|
||||
|
||||
def recvMember(self, rfile, name, size, req):
|
||||
"""Receive (save) a member file"""
|
||||
fname = os.path.join(self.fsname, urllib.parse.unquote(name))
|
||||
f = open(fname, 'wb')
|
||||
# if size=-1 it's Transfer-Encoding: Chunked mode, like OSX finder using this mode put data
|
||||
# so the file size need get here.
|
||||
if size == -2:
|
||||
l = int(rfile.readline(), 16)
|
||||
ltotal = 0
|
||||
while l > 0:
|
||||
buf = rfile.read(l)
|
||||
f.write(buf) #yield buf
|
||||
rfile.readline()
|
||||
ltotal += l
|
||||
l = int(rfile.readline(), 16)
|
||||
elif size > 0: # if size=0 ,just save a empty file.
|
||||
writ = 0
|
||||
bs = 65536
|
||||
while True:
|
||||
if size != -1 and (bs > size-writ):
|
||||
bs = size-writ
|
||||
buf = rfile.read(bs)
|
||||
if len(buf) == 0:
|
||||
break
|
||||
f.write(buf)
|
||||
writ += len(buf)
|
||||
if size != -1 and writ >= size:
|
||||
break
|
||||
f.close()
|
||||
|
||||
def unixdate2iso8601(d):
|
||||
tz = timezone / 3600 # can it be fractional?
|
||||
tz = '%+03d' % tz
|
||||
return strftime('%Y-%m-%dT%H:%M:%S', localtime(d)) + tz + ':00'
|
||||
|
||||
def unixdate2httpdate(d):
|
||||
return strftime('%a, %d %b %Y %H:%M:%S GMT', gmtime(d))
|
||||
|
||||
class Tag:
|
||||
def __init__(self, name, attrs, data='', parser=None):
|
||||
self.d = {}
|
||||
self.name = name
|
||||
self.attrs = attrs
|
||||
if type(self.attrs) == type(''):
|
||||
self.attrs = splitattrs(self.attrs)
|
||||
for a in self.attrs:
|
||||
if a.startswith('xmlns'):
|
||||
nsname = a[6:]
|
||||
parser.namespaces[nsname] = self.attrs[a]
|
||||
self.rawname = self.name
|
||||
|
||||
p = name.find(':')
|
||||
if p > 0:
|
||||
nsname = name[0:p]
|
||||
if nsname in parser.namespaces:
|
||||
self.ns = parser.namespaces[nsname]
|
||||
self.name = self.rawname[p+1:]
|
||||
else:
|
||||
self.ns = ''
|
||||
self.data = data
|
||||
|
||||
# Emulate dictionary d
|
||||
def __len__(self):
|
||||
return len(self.d)
|
||||
|
||||
def __getitem__(self, key):
|
||||
return self.d[key]
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
self.d[key] = value
|
||||
|
||||
def __delitem__(self, key):
|
||||
del self.d[key]
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.d.keys())
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.d
|
||||
|
||||
def __str__(self):
|
||||
"""Returns unicode semi human-readable representation of the structure"""
|
||||
if self.attrs:
|
||||
s = '<%s %s> %s ' % (self.name, self.attrs, self.data)
|
||||
else:
|
||||
s = '<%s> %s ' % (self.name, self.data)
|
||||
|
||||
for k in self.d:
|
||||
if type(self.d[k]) == type(self):
|
||||
s += '|%s: %s|' % (k, str(self.d[k]))
|
||||
else:
|
||||
s += '|' + ','.join([str(x) for x in self.d[k]]) + '|'
|
||||
return s
|
||||
|
||||
def addChild(self, tag):
|
||||
"""Adds a child to self. tag must be instance of Tag"""
|
||||
if tag.name in self.d:
|
||||
if type(self.d[tag.name]) == type(self): # If there are multiple sibiling tags with same name, form a list :)
|
||||
self.d[tag.name] = [self.d[tag.name]]
|
||||
self.d[tag.name].append(tag)
|
||||
else:
|
||||
self.d[tag.name] = tag
|
||||
return tag
|
||||
|
||||
class XMLDict_Parser:
|
||||
def __init__(self, xml):
|
||||
self.xml = xml
|
||||
self.p = 0
|
||||
self.encoding = sys.getdefaultencoding()
|
||||
self.namespaces = {}
|
||||
|
||||
def getnexttag(self):
|
||||
ptag = self.xml.find('<', self.p)
|
||||
if ptag < 0:
|
||||
return None, None, self.xml[self.p:].strip()
|
||||
data = self.xml[self.p:ptag].strip()
|
||||
self.p = ptag
|
||||
self.tagbegin = ptag
|
||||
p2 = self.xml.find('>', self.p+1)
|
||||
if p2 < 0:
|
||||
raise "Malformed XML - unclosed tag?"
|
||||
tag = self.xml[ptag+1:p2]
|
||||
self.p = p2+1
|
||||
self.tagend = p2+1
|
||||
ps = tag.find(' ')
|
||||
### Change By LCJ @ 2017/9/7 from [ if ps > 0: ] ###
|
||||
### for IOS Coda Webdav support ###
|
||||
if ps > 0 and tag[-1] != '/':
|
||||
tag, attrs = tag.split(' ', 1)
|
||||
else:
|
||||
attrs = ''
|
||||
return tag, attrs, data
|
||||
|
||||
def builddict(self):
|
||||
"""Builds a nested-dictionary-like structure from the xml. This method
|
||||
picks up tags on the main level and calls processTag() for nested tags."""
|
||||
d = Tag('<root>', '')
|
||||
while True:
|
||||
tag, attrs, data = self.getnexttag()
|
||||
if data != '': # data is actually that between the last tag and this one
|
||||
sys.stderr.write("Warning: inline data between tags?!\n")
|
||||
if not tag:
|
||||
break
|
||||
if tag[-1] == '/': # an 'empty' tag (e.g. <empty/>)
|
||||
d.addChild(Tag(tag[:-1], attrs, parser=self))
|
||||
continue
|
||||
elif tag[0] == '?': # special tag
|
||||
t = d.addChild(Tag(tag, attrs, parser=self))
|
||||
if tag == '?xml' and 'encoding' in t.attrs:
|
||||
self.encoding = t.attrs['encoding']
|
||||
else:
|
||||
try:
|
||||
self.processTag(d.addChild(Tag(tag, attrs, parser=self)))
|
||||
except:
|
||||
sys.stderr.write("Error processing tag %s\n" % tag)
|
||||
d.encoding = self.encoding
|
||||
return d
|
||||
|
||||
def processTag(self, dtag):
|
||||
"""Process single tag's data"""
|
||||
until = '/'+dtag.rawname
|
||||
while True:
|
||||
tag, attrs, data = self.getnexttag()
|
||||
if data:
|
||||
dtag.data += data
|
||||
if tag == None:
|
||||
sys.stderr.write("Unterminated tag '"+dtag.rawname+"'?\n")
|
||||
break
|
||||
if tag == until:
|
||||
break
|
||||
if tag[-1] == '/':
|
||||
dtag.addChild(Tag(tag[:-1], attrs, parser=self))
|
||||
continue
|
||||
self.processTag(dtag.addChild(Tag(tag, attrs, parser=self)))
|
||||
|
||||
def splitattrs(att):
|
||||
"""Extracts name="value" pairs from string; returns them as dictionary"""
|
||||
d = {}
|
||||
for m in re.findall('([a-zA-Z_][a-zA-Z_:0-9]*?)="(.+?)"', att):
|
||||
d[m[0]] = m[1]
|
||||
return d
|
||||
|
||||
def builddict(xml):
|
||||
"""Wrapper function for straightforward parsing"""
|
||||
if sys_debug: print('-> XML:',xml)
|
||||
p = XMLDict_Parser(xml.decode('utf-8'))
|
||||
return p.builddict()
|
||||
|
||||
class DAVRequestHandler(BaseHTTPRequestHandler):
|
||||
server_version = "tiny_WebDAV"
|
||||
all_props = ['name', 'parentname', 'href', 'ishidden', 'isreadonly', 'getcontenttype',
|
||||
'contentclass', 'getcontentlanguage', 'creationdate', 'lastaccessed', 'getlastmodified',
|
||||
'getcontentlength', 'iscollection', 'isstructureddocument', 'defaultdocument',
|
||||
'displayname', 'isroot', 'resourcetype']
|
||||
basic_props = ['name', 'getcontenttype', 'getcontentlength', 'creationdate', 'getlastmodified', 'iscollection']
|
||||
auth_file = False
|
||||
auth_enable = False
|
||||
Auserlist = []
|
||||
|
||||
# for debug out
|
||||
def send_response(self, code, message=None):
|
||||
if sys_debug: print(f"<- status code: {code}")
|
||||
super().send_response(code, message)
|
||||
# Emit CORS headers on every response so the browser doesn't block
|
||||
# cross-origin requests (e.g. http://127.0.0.1:5500 -> http://localhost/).
|
||||
# Echo the request Origin if present, otherwise allow any origin.
|
||||
origin = self.headers.get('Origin') if hasattr(self, 'headers') else None
|
||||
self.send_header('Access-Control-Allow-Origin', origin or '*')
|
||||
self.send_header('Vary', 'Origin')
|
||||
self.send_header('Access-Control-Allow-Methods', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY')
|
||||
self.send_header('Access-Control-Allow-Headers', 'Authorization, Content-Type, Depth, Destination, If, If-Match, If-Modified-Since, If-None-Match, If-Range, Lock-Token, Overwrite, Timeout, X-Requested-With')
|
||||
self.send_header('Access-Control-Max-Age', '86400')
|
||||
|
||||
def send_header(self, keyword, value):
|
||||
if sys_debug: print(f"<- header: {keyword}: {value}")
|
||||
super().send_header(keyword, value)
|
||||
|
||||
def end_headers(self):
|
||||
if sys_debug: print("<- End of headers")
|
||||
super().end_headers()
|
||||
|
||||
def handle_one_request(self):
|
||||
super().handle_one_request()
|
||||
if sys_debug: print(f'>>>>>> {self.command} <<<<<<')
|
||||
|
||||
# User Auth
|
||||
# if success ,return False;
|
||||
# Get WebDav User/Pass file : wdusers.conf
|
||||
# file formate: user:pass\n user:pass\n
|
||||
def WebAuth(self):
|
||||
if sys_debug:
|
||||
import inspect
|
||||
print(f'## {inspect.stack()[1].function}, PATH: {urllib.parse.unquote(self.path)}')
|
||||
print(self.headers)
|
||||
|
||||
if self.server.auth_enable:
|
||||
if 'Authorization' in self.headers:
|
||||
try:
|
||||
AuthInfo = self.headers['Authorization'][6:].encode('utf-8')
|
||||
except:
|
||||
AuthInfo = ''
|
||||
if AuthInfo in self.server.userpwd:
|
||||
return False # Auth success
|
||||
self.send_response(401,'Authorization Required')
|
||||
self.send_header('WWW-Authenticate', 'Basic realm="WebDav Auth"')
|
||||
self.send_header('Content-type', 'text/html')
|
||||
self.end_headers()
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def do_OPTIONS(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
self.send_response(200, DAVRequestHandler.server_version)
|
||||
self.send_header('Allow', 'GET, HEAD, POST, PUT, DELETE, OPTIONS, PROPFIND, PROPPATCH, MKCOL, LOCK, UNLOCK, MOVE, COPY')
|
||||
self.send_header('Content-length', '0')
|
||||
self.send_header('X-Server-Copyright', DAVRequestHandler.server_version)
|
||||
self.send_header('DAV', '1, 2') #OSX Finder need Ver 2, if Ver 1 -- read only
|
||||
self.send_header('MS-Author-Via', 'DAV')
|
||||
self.end_headers()
|
||||
|
||||
def do_DELETE(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
path = urllib.parse.unquote(self.path)
|
||||
if path == '':
|
||||
self.send_error(404, 'Object not found')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
return
|
||||
path = self.server.root.rootdir() + path
|
||||
if os.path.isfile(path):
|
||||
os.remove(path) #delete file
|
||||
elif os.path.isdir(path):
|
||||
shutil.rmtree(path) #delete dir
|
||||
else:
|
||||
self.send_response(404,'Not Found')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
return
|
||||
self.send_response(204, 'No Content')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def do_MKCOL(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
path = urllib.parse.unquote(self.path)
|
||||
if path != '':
|
||||
path = self.server.root.rootdir() + path
|
||||
if os.path.isdir(path) == False:
|
||||
os.mkdir(path)
|
||||
self.send_response(201, "Created")
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
return
|
||||
self.send_response(403, "OK")
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def do_MOVE(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path)
|
||||
newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path
|
||||
if (os.path.isfile(oldfile)==True and os.path.isfile(newfile)==False):
|
||||
shutil.move(oldfile,newfile)
|
||||
if (os.path.isdir(oldfile)==True and os.path.isdir(newfile)==False):
|
||||
os.rename(oldfile,newfile)
|
||||
self.send_response(201, "Created")
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def do_COPY(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
oldfile = self.server.root.rootdir() + urllib.parse.unquote(self.path)
|
||||
newfile = self.server.root.rootdir() + urllib.parse.urlparse(urllib.parse.unquote(self.headers['Destination'])).path
|
||||
if (os.path.isfile(oldfile)==True): # and os.path.isfile(newfile)==False): copy can rewrite.
|
||||
shutil.copyfile(oldfile,newfile)
|
||||
self.send_response(201, "Created")
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def do_LOCK(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
if 'Content-length' in self.headers:
|
||||
req = self.rfile.read(int(self.headers['Content-length']))
|
||||
else:
|
||||
req = self.rfile.read()
|
||||
d = builddict(req)
|
||||
try:
|
||||
clientid = str(d['lockinfo']['owner']['href'])[7:] # temp: need Change other method!!!
|
||||
except:
|
||||
clientid = ''
|
||||
lockid = str(uuid.uuid1())
|
||||
retstr = '<?xml version="1.0" encoding="utf-8" ?>\n<D:prop xmlns:D="DAV:">\n<D:lockdiscovery>\n<D:activelock>\n<D:locktype><D:write/></D:locktype>\n<D:lockscope><D:exclusive/></D:lockscope>\n<D:depth>Infinity</D:depth>\n<D:owner>\n<D:href>'+clientid+'</D:href>\n</D:owner>\n<D:timeout>Infinite</D:timeout>\n<D:locktoken><D:href>opaquelocktoken:'+lockid+'</D:href></D:locktoken>\n</D:activelock>\n</D:lockdiscovery>\n</D:prop>\n'
|
||||
self.send_response(201,'Created')
|
||||
self.send_header("Content-type",'text/xml')
|
||||
self.send_header("charset",'"utf-8"')
|
||||
self.send_header("Lock-Token",'<opaquelocktoken:'+lockid+'>')
|
||||
self.send_header('Content-Length',len(retstr))
|
||||
w = BufWriter(self.wfile, sys_debug)
|
||||
w.write(retstr)
|
||||
self.send_header('Content-Length', str(w.getSize()))
|
||||
self.end_headers()
|
||||
w.flush()
|
||||
|
||||
def do_UNLOCK(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
# frome self.headers get Lock-Token:
|
||||
self.send_response(204, 'No Content') # unlock using 204 for sucess.
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def do_PROPFIND(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
depth = 'infinity'
|
||||
if 'Depth' in self.headers:
|
||||
depth = self.headers['Depth'].lower()
|
||||
if 'Content-length' in self.headers:
|
||||
req = self.rfile.read(int(self.headers['Content-length']))
|
||||
else:
|
||||
req = self.rfile.read()
|
||||
d = builddict(req) # change all http.request to dict stru
|
||||
wished_all = False
|
||||
if len(d) == 0:
|
||||
wished_props = DAVRequestHandler.basic_props
|
||||
else:
|
||||
if 'allprop' in d['propfind']:
|
||||
wished_props = DAVRequestHandler.all_props
|
||||
wished_all = True
|
||||
else:
|
||||
wished_props = []
|
||||
for prop in d['propfind']['prop']:
|
||||
### 2017/9/7 Edit By LCJ , Old is [ wished_props.append(prop) ]
|
||||
### for IOS Coda Webdav support ###
|
||||
wished_props.append(prop.split(' ')[0])
|
||||
path, elem = self.path_elem()
|
||||
if not elem:
|
||||
if len(path) >= 1: # it's a non-existing file
|
||||
self.send_response(404, 'Not Found')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
return
|
||||
else:
|
||||
elem = self.server.root # fixup root lookups?
|
||||
if depth != '0' and not elem: #or elem.type != Member.M_COLLECTION:
|
||||
self.send_response(406, 'This is not allowed')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
return
|
||||
self.send_response(207, 'Multi-Status') #Multi-Status
|
||||
self.send_header('Content-Type', 'text/xml')
|
||||
self.send_header("charset",'"utf-8"')
|
||||
w = BufWriter(self.wfile, sys_debug)
|
||||
w.write('<?xml version="1.0" encoding="utf-8" ?>\n')
|
||||
w.write('<D:multistatus xmlns:D="DAV:" xmlns:Z="urn:schemas-microsoft-com:">\n')
|
||||
|
||||
def write_props_member(w, m):
|
||||
w.write('<D:response>\n<D:href>%s</D:href>\n<D:propstat>\n<D:prop>\n' % urllib.parse.quote(m.virname)) #add urllib.quote for chinese
|
||||
props = m.getProperties() # get the file or dir props
|
||||
# For OSX Finder : getlastmodified, getcontentlength, resourceType
|
||||
if ('quota-available-bytes' in wished_props) or ('quota-used-bytes'in wished_props) or ('quota' in wished_props) or ('quotaused'in wished_props):
|
||||
# windows don't support os.statvfs
|
||||
try:
|
||||
sDisk = os.statvfs('/')
|
||||
props['quota-used-bytes'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize
|
||||
props['quotaused'] = (sDisk.f_blocks - sDisk.f_bavail) * sDisk.f_frsize
|
||||
props['quota-available-bytes'] = sDisk.f_bavail * sDisk.f_frsize
|
||||
props['quota'] = sDisk.f_bavail * sDisk.f_frsize
|
||||
except:
|
||||
pass
|
||||
for wp in wished_props:
|
||||
if (wp in props) == False:
|
||||
w.write(' <D:%s/>\n' % wp)
|
||||
else:
|
||||
w.write(' <D:%s>%s</D:%s>\n' % (wp, str(props[wp]), wp))
|
||||
w.write('</D:prop>\n<D:status>HTTP/1.1 200 OK</D:status>\n</D:propstat>\n</D:response>\n')
|
||||
|
||||
write_props_member(w, elem)
|
||||
if depth == '1':
|
||||
for m in elem.getMembers():
|
||||
write_props_member(w,m)
|
||||
w.write('</D:multistatus>')
|
||||
self.send_header('Content-Length', str(w.getSize()))
|
||||
self.end_headers()
|
||||
w.flush()
|
||||
|
||||
def do_PROPPATCH(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
if 'Content-length' in self.headers:
|
||||
req = self.rfile.read(int(self.headers['Content-length']))
|
||||
else:
|
||||
req = self.rfile.read()
|
||||
d = builddict(req) # change all http.request to dict stru
|
||||
self.send_response(207, 'Multi-Status') #Multi-Status
|
||||
self.send_header('Content-Type', 'text/xml')
|
||||
self.send_header("charset",'"utf-8"')
|
||||
retstr = f'<?xml version="1.0" encoding="utf-8" ?><D:multistatus xmlns:D="DAV:" xmlns:Z="urn:schemas-microsoft-com:"><D:response><D:href>{urllib.parse.unquote(self.path)}</D:href><D:propstat><D:status>HTTP/1.1 200 OK</D:status></D:propstat></D:response></D:multistatus>'
|
||||
w = BufWriter(self.wfile, sys_debug)
|
||||
w.write(retstr)
|
||||
self.send_header('Content-Length', str(w.getSize()))
|
||||
self.end_headers()
|
||||
w.flush()
|
||||
|
||||
def do_GET(self, onlyhead=False):
|
||||
if self.WebAuth():
|
||||
return
|
||||
path, elem = self.path_elem()
|
||||
if not elem:
|
||||
self.send_error(404, 'Object not found')
|
||||
return
|
||||
try:
|
||||
props = elem.getProperties()
|
||||
except:
|
||||
self.send_response(500, "Error retrieving properties")
|
||||
self.end_headers()
|
||||
return
|
||||
# Collections don't carry a content length — render a directory
|
||||
# listing instead of treating them like a file.
|
||||
if elem.type == Member.M_COLLECTION:
|
||||
ctype = props.get('getcontenttype') or DirCollection.COLLECTION_MIME_TYPE
|
||||
self.send_response(200, 'OK')
|
||||
self.send_header("Content-type", ctype)
|
||||
if 'getlastmodified' in props:
|
||||
self.send_header("Last-modified", props['getlastmodified'])
|
||||
self.end_headers()
|
||||
if not onlyhead:
|
||||
elem.sendData(self.wfile)
|
||||
return
|
||||
# when the client had Range: bytes=3156-3681
|
||||
bpoint = 0
|
||||
epoint = 0
|
||||
fullen = props.get('getcontentlength', 0)
|
||||
if 'Range' in self.headers:
|
||||
stmp = self.headers['Range'][6:]
|
||||
stmp = stmp.split('-')
|
||||
try:
|
||||
bpoint = int(stmp[0])
|
||||
except:
|
||||
bpoint = 0
|
||||
try:
|
||||
epoint = int(stmp[1])
|
||||
except:
|
||||
epoint = fullen - 1
|
||||
if (epoint<=bpoint):
|
||||
bpoint = 0
|
||||
epoint = fullen - 1
|
||||
fullen = epoint - bpoint + 1
|
||||
if epoint>0:
|
||||
self.send_response(206, 'Partial Content')
|
||||
self.send_header("Content-Range", " Bytes %s-%s/%s" % (bpoint, epoint, fullen))
|
||||
else:
|
||||
self.send_response(200, 'OK')
|
||||
self.send_header("Content-type", props.get('getcontenttype', 'application/octet-stream'))
|
||||
if 'getlastmodified' in props:
|
||||
self.send_header("Last-modified", props['getlastmodified'])
|
||||
self.send_header("Content-length", fullen)
|
||||
self.end_headers()
|
||||
if not onlyhead:
|
||||
if fullen >0 : # all 0 size file don't need this
|
||||
elem.sendData(self.wfile,bpoint,epoint)
|
||||
|
||||
def do_HEAD(self):
|
||||
self.do_GET(True) # HEAD should behave like GET, only without contents
|
||||
|
||||
def do_PUT(self):
|
||||
if self.WebAuth():
|
||||
return
|
||||
try:
|
||||
if 'Content-length' in self.headers:
|
||||
size = int(self.headers['Content-length'])
|
||||
elif 'Transfer-Encoding' in self.headers:
|
||||
if self.headers['Transfer-Encoding'].lower()=='chunked':
|
||||
size = -2
|
||||
else:
|
||||
size = -1
|
||||
path, elem = self.path_elem_prev()
|
||||
ename = path[-1]
|
||||
except:
|
||||
self.send_response(400, 'Cannot parse request')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
return
|
||||
# for OSX finder, it's first send a 0 byte file,and you need response a 201 code,and then osx send real file.
|
||||
# OSX finder don't using content-length.
|
||||
if ename == '.DS_Store':
|
||||
self.send_response(403, 'Forbidden')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
else:
|
||||
try:
|
||||
elem.recvMember(self.rfile, ename, size, self)
|
||||
except:
|
||||
self.send_response(500, 'Cannot save file')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
return
|
||||
# sucess return
|
||||
self.send_response(201, 'Created')
|
||||
self.send_header('Content-length', '0')
|
||||
self.end_headers()
|
||||
|
||||
def split_path(self, path):
|
||||
"""Splits path string in form '/dir1/dir2/file' into parts"""
|
||||
p = path.split('/')[1:]
|
||||
while p and p[-1] in ('','/'):
|
||||
p = p[:-1]
|
||||
if len(p) > 0:
|
||||
p[-1] += '/'
|
||||
return p
|
||||
|
||||
def path_elem(self):
|
||||
"""Returns split path (see split_path()) and Member object of the last element"""
|
||||
path = self.split_path(urllib.parse.unquote(self.path))
|
||||
elem = self.server.root
|
||||
for e in path:
|
||||
elem = elem.findMember(e)
|
||||
if elem == None:
|
||||
break
|
||||
return (path, elem)
|
||||
|
||||
def path_elem_prev(self):
|
||||
"""Returns split path (see split_path()) and Member object of the next-to-last element"""
|
||||
path = self.split_path(urllib.parse.unquote(self.path))
|
||||
elem = self.server.root
|
||||
for e in path[:-1]:
|
||||
elem = elem.findMember(e)
|
||||
if elem == None:
|
||||
break
|
||||
return (path, elem)
|
||||
|
||||
# disable log info output to screen
|
||||
def log_message(self,format,*args):
|
||||
pass
|
||||
|
||||
class BufWriter:
|
||||
def __init__(self, w, debug=False):
|
||||
self.w = w
|
||||
self.buf = ''
|
||||
self.debug = debug
|
||||
|
||||
def write(self, s):
|
||||
self.buf += s
|
||||
|
||||
def flush(self):
|
||||
if self.debug: print('<- XML:', self.buf) # sys.stderr.write(self.buf)
|
||||
self.w.write(self.buf.encode('utf-8'))
|
||||
self.w.flush()
|
||||
|
||||
def getSize(self):
|
||||
return len(self.buf.encode('utf-8'))
|
||||
|
||||
class DAVServer(ThreadingMixIn, HTTPServer):
|
||||
def __init__(self, addr, handler, root, userpwd):
|
||||
HTTPServer.__init__(self, addr, handler)
|
||||
self.root = root
|
||||
self.userpwd = userpwd # WebDav Auth user:passwd
|
||||
if len(userpwd)>0:
|
||||
self.auth_enable = True
|
||||
else:
|
||||
self.auth_enable = False
|
||||
|
||||
# disable the broken pipe error message
|
||||
def finish_request(self,request,client_address):
|
||||
try:
|
||||
HTTPServer.finish_request(self, request, client_address)
|
||||
except socket.error as e:
|
||||
pass
|
||||
|
||||
if __name__ == '__main__':
|
||||
# WebDav TCP Port
|
||||
srvport = 80
|
||||
# Get local IP address
|
||||
myaddr = get_localip()
|
||||
print(f'# WebDav Server run at http://{myaddr}:{srvport}')
|
||||
server_address = ('', srvport)
|
||||
# WebDav Auth User/Password file
|
||||
# if not this file ,the auth function disable.
|
||||
# file format: user:passwd\n user:passwd\n
|
||||
# or you can change your auth mode and file save format
|
||||
userpwd = []
|
||||
try:
|
||||
f = open('wdusers.conf', 'r')
|
||||
for uinfo in f.readlines():
|
||||
uinfo = uinfo.strip()
|
||||
if len(uinfo) > 4:
|
||||
userpwd.append(base64.b64encode(uinfo.encode('utf-8')))
|
||||
except:
|
||||
pass
|
||||
# first is Server root dir, Second is virtual dir
|
||||
# **** Change First ./ to your dir , etc :/mnt/flash/public, d:/share_file/
|
||||
root = DirCollection('files/', '/')
|
||||
httpd = DAVServer(server_address, DAVRequestHandler, root, userpwd)
|
||||
try:
|
||||
httpd.serve_forever() # todo: add some control over starting and stopping the server
|
||||
except:
|
||||
print('# WebDav Server Stop.')
|
||||
Loading…
Reference in New Issue
Block a user