only use fs-legal chars in names (closes #1010);

uploading a folder named COMPLE:X into exfat on linux would fail
because exfat behaves like windows, rejecting <>:|?*"\/

this would also fail on windows, but then due to
sanitize_fn being overly aggressive

fix this by detecting filesystem traits on startup and
also translating vpath early on windows
This commit is contained in:
ed
2025-12-13 19:44:56 +00:00
parent 3bbed1bc46
commit ba017f7b53
6 changed files with 77 additions and 32 deletions

View File

@@ -1204,6 +1204,7 @@ def add_fs(ap):
ap2 = ap.add_argument_group("filesystem options")
rm_re_def = "15/0.1" if ANYWIN else "0/0"
ap2.add_argument("--casechk", metavar="N", type=u, default="auto", help="detect and prevent CI (case-insensitive) behavior if the underlying filesystem is CI? [\033[32my\033[0m] = detect and prevent, [\033[32mn\033[0m] = ignore and allow, [\033[32mauto\033[0m] = \033[32my\033[0m if CI fs detected. NOTE: \033[32my\033[0m is very slow but necessary for correct WebDAV behavior on Windows/Macos (volflag=casechk)")
ap2.add_argument("--fsnt", metavar="OS", type=u, default="auto", help="which characters to allow in file/folder names; [\033[32mwin\033[0m] = windows (not <>:|?*\"\\/), [\033[32mmac\033[0m] = macos (not :), [\033[32mlin\033[0m] = linux (anything goes) (volflag=fsnt)")
ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)")
ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)")
ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)")

View File

@@ -102,6 +102,7 @@ def vf_vmap() -> dict[str, str]:
"du_who",
"ufavico",
"forget_ip",
"fsnt",
"hsortn",
"html_head",
"html_head_s",
@@ -202,10 +203,12 @@ flagcats = {
"noclone": "take dupe data from clients, even if available on HDD",
"nodupe": "rejects existing files (instead of linking/cloning them)",
"nodupem": "rejects existing files during moves as well",
"casechk=auto": "actively prevent case-insensitive filesystem? y/n",
"chmod_d=755": "unix-permission for new dirs/folders",
"chmod_f=644": "unix-permission for new files",
"uid=573": "change owner of new files/folders to unix-user 573",
"gid=999": "change owner of new files/folders to unix-group 999",
"fsnt=auto": "filesystem filename traits (lin/win/mac/auto)",
"wram": "allow uploading into ramdisks",
"sparse": "force use of sparse files, mainly for s3-backed storage",
"nosparse": "deny use of sparse files, mainly for slow storage",
@@ -267,7 +270,6 @@ flagcats = {
"no_db_ip": "never store uploader-IP in the db; disables unpost",
"fat32": "avoid excessive reindexing on android sdcardfs",
"dbd=[acid|swal|wal|yolo]": "database speed-durability tradeoff",
"casechk=auto": "actively prevent case-insensitive filesystem? y/n",
"xlink": "cross-volume dupe detection / linking (dangerous)",
"xdev": "do not descend into other filesystems",
"xvol": "do not follow symlinks leaving the volume root",

View File

@@ -2,6 +2,7 @@
from __future__ import print_function, unicode_literals
import argparse
import json
import os
import re
import time
@@ -9,7 +10,7 @@ import time
from .__init__ import ANYWIN, MACOS
from .authsrv import AXS, VFS, AuthSrv
from .bos import bos
from .util import chkcmd, min_ex, undot
from .util import chkcmd, json_hesc, min_ex, undot
if True: # pylint: disable=using-constant-test
from typing import Optional, Union
@@ -212,19 +213,26 @@ class Fstab(object):
return ret.realpath, ""
_fstab: Optional[Fstab] = None
winfs = set(("msdos", "vfat", "ntfs", "exfat"))
# "msdos" = vfat on macos
def ramdisk_chk(asrv: AuthSrv) -> None:
# should have been in authsrv but that's a circular import
global _fstab
mods = []
ramfs = ("tmpfs", "overlay")
log = asrv.log_func or print
fstab = Fstab(log, asrv.args, False)
if not _fstab:
_fstab = Fstab(log, asrv.args, False)
for vn in asrv.vfs.all_nodes.values():
if not vn.axs.uwrite or "wram" in vn.flags:
continue
ap = vn.realpath
if not ap or os.path.isfile(ap):
continue
fs, mp = fstab.get(ap)
fs, mp = _fstab.get(ap)
mp = "/" + mp.strip("/")
if fs == "tmpfs" or (mp == "/" and fs in ramfs):
mods.append((vn.vpath, ap, fs, mp))
@@ -234,8 +242,24 @@ def ramdisk_chk(asrv: AuthSrv) -> None:
zsl = list(ztsp)
zsl[1] = False
zsl[2] = False
vn.uaxs[un] = zsl
vn.uaxs[un] = tuple(zsl)
if mods:
t = "WARNING: write-access was removed from the following volumes because they are not mapped to an actual HDD for storage! All uploaded data would live in RAM only, and all uploaded files would be LOST on next reboot. To allow uploading and ignore this hazard, enable the 'wram' option (global/volflag). List of affected volumes:"
t2 = ["\n volume=[/%s], abspath=%r, type=%s, root=%r" % x for x in mods]
log("vfs", t + "".join(t2) + "\n", 1)
assume = "mac" if MACOS else "lin"
for vol in asrv.vfs.all_nodes.values():
if not vol.realpath or vol.flags.get("is_file"):
continue
zs = vol.flags["fsnt"].strip()[:3].lower()
if ANYWIN and not zs:
zs = "win"
if zs in ("lin", "win", "mac"):
vol.flags["fsnt"] = zs
continue
fs = _fstab.get(vol.realpath)[0]
fs = "win" if fs in winfs else assume
htm = json.loads(vol.js_htm)
vol.flags["fsnt"] = vol.js_ls["fsnt"] = htm["fsnt"] = fs
vol.js_htm = json_hesc(json.dumps(htm))

View File

@@ -49,6 +49,9 @@ from .util import (
HAVE_SQLITE3,
HTTPCODE,
UTC,
VPTL_MAC,
VPTL_OS,
VPTL_WIN,
Garda,
MultipartParser,
ODict,
@@ -167,6 +170,7 @@ A_FILE = os.stat_result(
)
RE_CC = re.compile(r"[\x00-\x1f]") # search always faster
RE_USAFE = re.compile(r'[\x00-\x1f<>"]') # search always faster
RE_HSAFE = re.compile(r"[\x00-\x1f<>\"'&]") # search always much faster
RE_HOST = re.compile(r"[^][0-9a-zA-Z.:_-]") # search faster <=17ch
RE_MHOST = re.compile(r"^[][0-9a-zA-Z.:_-]+$") # match faster >=18ch
@@ -515,8 +519,7 @@ class HttpCli(object):
self.loud_reply(t, status=400)
return False
ptn_cc = RE_CC
m = ptn_cc.search(self.req)
m = RE_USAFE.search(self.req)
if m:
zs = self.req
t = "malicious user; Cc in req0 %r => %r"
@@ -538,6 +541,7 @@ class HttpCli(object):
vpath = undot(vpath)
re_k = RE_K
ptn_cc = RE_CC
k_safe = UPARAM_CC_OK
for k in arglist.split("&"):
if "=" in k:
@@ -620,17 +624,18 @@ class HttpCli(object):
self.loud_reply("u wot m8", status=400)
return False
if VPTL_OS:
vpath = vpath.translate(VPTL_OS)
self.uparam = uparam
self.cookies = cookies
self.vpath = vpath
self.vpaths = (
self.vpath + "/" if self.trailing_slash and self.vpath else self.vpath
)
self.vpaths = vpath + "/" if self.trailing_slash and vpath else vpath
if "qr" in uparam:
return self.tx_qr()
if relchk(self.vpath) and (self.vpath != "*" or self.mode != "OPTIONS"):
if "\x00" in vpath or (ANYWIN and ("\n" in vpath or "\r" in vpath)):
self.log("illegal relpath; req(%r) => %r" % (self.req, "/" + self.vpath))
self.cbonk(self.conn.hsrv.gmal, self.req, "bad_vp", "invalid relpaths")
return self.tx_404() and False
@@ -2807,6 +2812,11 @@ class HttpCli(object):
raise Pebkac(400, "your client is old; press CTRL-SHIFT-R and try again")
vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True)
fsnt = vfs.flags["fsnt"]
if fsnt != "lin":
tl = VPTL_WIN if fsnt == "win" else VPTL_MAC
rem = rem.translate(tl)
name = name.translate(tl)
dbv, vrem = vfs.get_dbv(rem)
name = sanitize_fn(name, "")

View File

@@ -294,6 +294,23 @@ RE_MEMTOTAL = re.compile("^MemTotal:.* kB")
RE_MEMAVAIL = re.compile("^MemAvailable:.* kB")
if PY2:
def umktrans(s1, s2):
return {ord(c1): ord(c2) for c1, c2 in zip(s1, s2)}
else:
umktrans = str.maketrans
FNTL_WIN = umktrans('<>:|?*"\\/', "")
VPTL_WIN = umktrans('<>:|?*"\\', "")
APTL_WIN = umktrans('<>:|?*"/', "")
FNTL_MAC = VPTL_MAC = APTL_MAC = umktrans(":", "")
FNTL_OS = FNTL_WIN if ANYWIN else FNTL_MAC if MACOS else None
VPTL_OS = VPTL_WIN if ANYWIN else VPTL_MAC if MACOS else None
APTL_OS = APTL_WIN if ANYWIN else APTL_MAC if MACOS else None
BOS_SEP = ("%s" % (os.sep,)).encode("ascii")
@@ -684,7 +701,7 @@ except Exception as ex:
ub64dec = base64.urlsafe_b64decode # type: ignore
b64enc = base64.b64encode # type: ignore
b64dec = base64.b64decode # type: ignore
if not PY36:
if PY36:
print("using fallback base64 codec due to %r" % (ex,))
@@ -2232,32 +2249,22 @@ def sanitize_fn(fn: str, ok: str) -> str:
if "/" not in ok:
fn = fn.replace("\\", "/").split("/")[-1]
if ANYWIN:
remap = [
["<", ""],
[">", ""],
[":", ""],
['"', ""],
["/", ""],
["\\", ""],
["|", ""],
["?", ""],
["*", ""],
]
for a, b in [x for x in remap if x[0] not in ok]:
fn = fn.replace(a, b)
if APTL_OS:
fn = fn.translate(APTL_OS)
if ANYWIN:
bad = ["con", "prn", "aux", "nul"]
for n in range(1, 10):
bad += ("com%s lpt%s" % (n, n)).split(" ")
bad = ["con", "prn", "aux", "nul"]
for n in range(1, 10):
bad += ("com%s lpt%s" % (n, n)).split(" ")
if fn.lower().split(".")[0] in bad:
fn = "_" + fn
if fn.lower().split(".")[0] in bad:
fn = "_" + fn
return fn.strip()
def sanitize_vpath(vp: str, ok: str) -> str:
if not FNTL_OS:
return vp
parts = vp.replace(os.sep, "/").split("/")
ret = [sanitize_fn(x, ok) for x in parts]
return "/".join(ret)

View File

@@ -193,6 +193,7 @@ class Cfg(Namespace):
du_who="all",
dk_salt="b" * 16,
fk_salt="a" * 16,
fsnt="lin",
grp_all="acct",
idp_gsep=re.compile("[|:;+,]"),
iobuf=256 * 1024,