From ba017f7b532b67fe9d3cd35076e0aba2bb8f3c4a Mon Sep 17 00:00:00 2001 From: ed Date: Sat, 13 Dec 2025 19:44:56 +0000 Subject: [PATCH] only use fs-legal chars in names (closes #1010); uploading a folder named COMPLE:X into exfat on linux would fail because exfat behaves like windows, rejecting <>:|?*"\/ this would also fail on windows, but then due to sanitize_fn being overly aggressive fix this by detecting filesystem traits on startup and also translating vpath early on windows --- copyparty/__main__.py | 1 + copyparty/cfg.py | 4 +++- copyparty/fsutil.py | 32 ++++++++++++++++++++++++---- copyparty/httpcli.py | 22 +++++++++++++------ copyparty/util.py | 49 ++++++++++++++++++++++++------------------- tests/util.py | 1 + 6 files changed, 77 insertions(+), 32 deletions(-) diff --git a/copyparty/__main__.py b/copyparty/__main__.py index 137bfd07..f1881cc1 100644 --- a/copyparty/__main__.py +++ b/copyparty/__main__.py @@ -1204,6 +1204,7 @@ def add_fs(ap): ap2 = ap.add_argument_group("filesystem options") rm_re_def = "15/0.1" if ANYWIN else "0/0" ap2.add_argument("--casechk", metavar="N", type=u, default="auto", help="detect and prevent CI (case-insensitive) behavior if the underlying filesystem is CI? [\033[32my\033[0m] = detect and prevent, [\033[32mn\033[0m] = ignore and allow, [\033[32mauto\033[0m] = \033[32my\033[0m if CI fs detected. NOTE: \033[32my\033[0m is very slow but necessary for correct WebDAV behavior on Windows/Macos (volflag=casechk)") + ap2.add_argument("--fsnt", metavar="OS", type=u, default="auto", help="which characters to allow in file/folder names; [\033[32mwin\033[0m] = windows (not <>:|?*\"\\/), [\033[32mmac\033[0m] = macos (not :), [\033[32mlin\033[0m] = linux (anything goes) (volflag=fsnt)") ap2.add_argument("--rm-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be deleted because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=rm_retry)") ap2.add_argument("--mv-retry", metavar="T/R", type=u, default=rm_re_def, help="if a file cannot be renamed because it is busy, continue trying for \033[33mT\033[0m seconds, retry every \033[33mR\033[0m seconds; disable with 0/0 (volflag=mv_retry)") ap2.add_argument("--iobuf", metavar="BYTES", type=int, default=256*1024, help="file I/O buffer-size; if your volumes are on a network drive, try increasing to \033[32m524288\033[0m or even \033[32m4194304\033[0m (and let me know if that improves your performance)") diff --git a/copyparty/cfg.py b/copyparty/cfg.py index 799ab9df..69903225 100644 --- a/copyparty/cfg.py +++ b/copyparty/cfg.py @@ -102,6 +102,7 @@ def vf_vmap() -> dict[str, str]: "du_who", "ufavico", "forget_ip", + "fsnt", "hsortn", "html_head", "html_head_s", @@ -202,10 +203,12 @@ flagcats = { "noclone": "take dupe data from clients, even if available on HDD", "nodupe": "rejects existing files (instead of linking/cloning them)", "nodupem": "rejects existing files during moves as well", + "casechk=auto": "actively prevent case-insensitive filesystem? y/n", "chmod_d=755": "unix-permission for new dirs/folders", "chmod_f=644": "unix-permission for new files", "uid=573": "change owner of new files/folders to unix-user 573", "gid=999": "change owner of new files/folders to unix-group 999", + "fsnt=auto": "filesystem filename traits (lin/win/mac/auto)", "wram": "allow uploading into ramdisks", "sparse": "force use of sparse files, mainly for s3-backed storage", "nosparse": "deny use of sparse files, mainly for slow storage", @@ -267,7 +270,6 @@ flagcats = { "no_db_ip": "never store uploader-IP in the db; disables unpost", "fat32": "avoid excessive reindexing on android sdcardfs", "dbd=[acid|swal|wal|yolo]": "database speed-durability tradeoff", - "casechk=auto": "actively prevent case-insensitive filesystem? y/n", "xlink": "cross-volume dupe detection / linking (dangerous)", "xdev": "do not descend into other filesystems", "xvol": "do not follow symlinks leaving the volume root", diff --git a/copyparty/fsutil.py b/copyparty/fsutil.py index 2151c8c8..e5ee78e3 100644 --- a/copyparty/fsutil.py +++ b/copyparty/fsutil.py @@ -2,6 +2,7 @@ from __future__ import print_function, unicode_literals import argparse +import json import os import re import time @@ -9,7 +10,7 @@ import time from .__init__ import ANYWIN, MACOS from .authsrv import AXS, VFS, AuthSrv from .bos import bos -from .util import chkcmd, min_ex, undot +from .util import chkcmd, json_hesc, min_ex, undot if True: # pylint: disable=using-constant-test from typing import Optional, Union @@ -212,19 +213,26 @@ class Fstab(object): return ret.realpath, "" +_fstab: Optional[Fstab] = None +winfs = set(("msdos", "vfat", "ntfs", "exfat")) +# "msdos" = vfat on macos + + def ramdisk_chk(asrv: AuthSrv) -> None: # should have been in authsrv but that's a circular import + global _fstab mods = [] ramfs = ("tmpfs", "overlay") log = asrv.log_func or print - fstab = Fstab(log, asrv.args, False) + if not _fstab: + _fstab = Fstab(log, asrv.args, False) for vn in asrv.vfs.all_nodes.values(): if not vn.axs.uwrite or "wram" in vn.flags: continue ap = vn.realpath if not ap or os.path.isfile(ap): continue - fs, mp = fstab.get(ap) + fs, mp = _fstab.get(ap) mp = "/" + mp.strip("/") if fs == "tmpfs" or (mp == "/" and fs in ramfs): mods.append((vn.vpath, ap, fs, mp)) @@ -234,8 +242,24 @@ def ramdisk_chk(asrv: AuthSrv) -> None: zsl = list(ztsp) zsl[1] = False zsl[2] = False - vn.uaxs[un] = zsl + vn.uaxs[un] = tuple(zsl) if mods: t = "WARNING: write-access was removed from the following volumes because they are not mapped to an actual HDD for storage! All uploaded data would live in RAM only, and all uploaded files would be LOST on next reboot. To allow uploading and ignore this hazard, enable the 'wram' option (global/volflag). List of affected volumes:" t2 = ["\n volume=[/%s], abspath=%r, type=%s, root=%r" % x for x in mods] log("vfs", t + "".join(t2) + "\n", 1) + + assume = "mac" if MACOS else "lin" + for vol in asrv.vfs.all_nodes.values(): + if not vol.realpath or vol.flags.get("is_file"): + continue + zs = vol.flags["fsnt"].strip()[:3].lower() + if ANYWIN and not zs: + zs = "win" + if zs in ("lin", "win", "mac"): + vol.flags["fsnt"] = zs + continue + fs = _fstab.get(vol.realpath)[0] + fs = "win" if fs in winfs else assume + htm = json.loads(vol.js_htm) + vol.flags["fsnt"] = vol.js_ls["fsnt"] = htm["fsnt"] = fs + vol.js_htm = json_hesc(json.dumps(htm)) diff --git a/copyparty/httpcli.py b/copyparty/httpcli.py index 73ceaa82..ff3f4b9a 100644 --- a/copyparty/httpcli.py +++ b/copyparty/httpcli.py @@ -49,6 +49,9 @@ from .util import ( HAVE_SQLITE3, HTTPCODE, UTC, + VPTL_MAC, + VPTL_OS, + VPTL_WIN, Garda, MultipartParser, ODict, @@ -167,6 +170,7 @@ A_FILE = os.stat_result( ) RE_CC = re.compile(r"[\x00-\x1f]") # search always faster +RE_USAFE = re.compile(r'[\x00-\x1f<>"]') # search always faster RE_HSAFE = re.compile(r"[\x00-\x1f<>\"'&]") # search always much faster RE_HOST = re.compile(r"[^][0-9a-zA-Z.:_-]") # search faster <=17ch RE_MHOST = re.compile(r"^[][0-9a-zA-Z.:_-]+$") # match faster >=18ch @@ -515,8 +519,7 @@ class HttpCli(object): self.loud_reply(t, status=400) return False - ptn_cc = RE_CC - m = ptn_cc.search(self.req) + m = RE_USAFE.search(self.req) if m: zs = self.req t = "malicious user; Cc in req0 %r => %r" @@ -538,6 +541,7 @@ class HttpCli(object): vpath = undot(vpath) re_k = RE_K + ptn_cc = RE_CC k_safe = UPARAM_CC_OK for k in arglist.split("&"): if "=" in k: @@ -620,17 +624,18 @@ class HttpCli(object): self.loud_reply("u wot m8", status=400) return False + if VPTL_OS: + vpath = vpath.translate(VPTL_OS) + self.uparam = uparam self.cookies = cookies self.vpath = vpath - self.vpaths = ( - self.vpath + "/" if self.trailing_slash and self.vpath else self.vpath - ) + self.vpaths = vpath + "/" if self.trailing_slash and vpath else vpath if "qr" in uparam: return self.tx_qr() - if relchk(self.vpath) and (self.vpath != "*" or self.mode != "OPTIONS"): + if "\x00" in vpath or (ANYWIN and ("\n" in vpath or "\r" in vpath)): self.log("illegal relpath; req(%r) => %r" % (self.req, "/" + self.vpath)) self.cbonk(self.conn.hsrv.gmal, self.req, "bad_vp", "invalid relpaths") return self.tx_404() and False @@ -2807,6 +2812,11 @@ class HttpCli(object): raise Pebkac(400, "your client is old; press CTRL-SHIFT-R and try again") vfs, rem = self.asrv.vfs.get(self.vpath, self.uname, False, True) + fsnt = vfs.flags["fsnt"] + if fsnt != "lin": + tl = VPTL_WIN if fsnt == "win" else VPTL_MAC + rem = rem.translate(tl) + name = name.translate(tl) dbv, vrem = vfs.get_dbv(rem) name = sanitize_fn(name, "") diff --git a/copyparty/util.py b/copyparty/util.py index b16dd548..2b7ee321 100644 --- a/copyparty/util.py +++ b/copyparty/util.py @@ -294,6 +294,23 @@ RE_MEMTOTAL = re.compile("^MemTotal:.* kB") RE_MEMAVAIL = re.compile("^MemAvailable:.* kB") +if PY2: + + def umktrans(s1, s2): + return {ord(c1): ord(c2) for c1, c2 in zip(s1, s2)} + +else: + umktrans = str.maketrans + +FNTL_WIN = umktrans('<>:|?*"\\/', "<>:|?*"\/") +VPTL_WIN = umktrans('<>:|?*"\\', "<>:|?*"\") +APTL_WIN = umktrans('<>:|?*"/', "<>:|?*"/") +FNTL_MAC = VPTL_MAC = APTL_MAC = umktrans(":", ":") +FNTL_OS = FNTL_WIN if ANYWIN else FNTL_MAC if MACOS else None +VPTL_OS = VPTL_WIN if ANYWIN else VPTL_MAC if MACOS else None +APTL_OS = APTL_WIN if ANYWIN else APTL_MAC if MACOS else None + + BOS_SEP = ("%s" % (os.sep,)).encode("ascii") @@ -684,7 +701,7 @@ except Exception as ex: ub64dec = base64.urlsafe_b64decode # type: ignore b64enc = base64.b64encode # type: ignore b64dec = base64.b64decode # type: ignore - if not PY36: + if PY36: print("using fallback base64 codec due to %r" % (ex,)) @@ -2232,32 +2249,22 @@ def sanitize_fn(fn: str, ok: str) -> str: if "/" not in ok: fn = fn.replace("\\", "/").split("/")[-1] - if ANYWIN: - remap = [ - ["<", "<"], - [">", ">"], - [":", ":"], - ['"', """], - ["/", "/"], - ["\\", "\"], - ["|", "|"], - ["?", "?"], - ["*", "*"], - ] - for a, b in [x for x in remap if x[0] not in ok]: - fn = fn.replace(a, b) + if APTL_OS: + fn = fn.translate(APTL_OS) + if ANYWIN: + bad = ["con", "prn", "aux", "nul"] + for n in range(1, 10): + bad += ("com%s lpt%s" % (n, n)).split(" ") - bad = ["con", "prn", "aux", "nul"] - for n in range(1, 10): - bad += ("com%s lpt%s" % (n, n)).split(" ") - - if fn.lower().split(".")[0] in bad: - fn = "_" + fn + if fn.lower().split(".")[0] in bad: + fn = "_" + fn return fn.strip() def sanitize_vpath(vp: str, ok: str) -> str: + if not FNTL_OS: + return vp parts = vp.replace(os.sep, "/").split("/") ret = [sanitize_fn(x, ok) for x in parts] return "/".join(ret) diff --git a/tests/util.py b/tests/util.py index 91be4e0b..d4696893 100644 --- a/tests/util.py +++ b/tests/util.py @@ -193,6 +193,7 @@ class Cfg(Namespace): du_who="all", dk_salt="b" * 16, fk_salt="a" * 16, + fsnt="lin", grp_all="acct", idp_gsep=re.compile("[|:;+,]"), iobuf=256 * 1024,