From 969ea28d064688c97c1784596f330ac69bc8b76d Mon Sep 17 00:00:00 2001 From: fufesou <13586388+fufesou@users.noreply.github.com> Date: Sun, 28 Dec 2025 15:39:35 +0800 Subject: [PATCH] feat(fs): delegate win --server file reading to CM (#13736) - Route Windows server-to-client file reads through CM instead of the connection layer - Add FS IPC commands (ReadFile, CancelRead, SendConfirmForRead, ReadAllFiles) and CM data messages (ReadJobInitResult, FileBlockFromCM, FileReadDone, FileReadError, FileDigestFromCM, AllFilesResult) - Track pending read validations and read jobs to coordinate CM-driven file transfers and clean them up on completion, cancellation, and errors - Enforce a configurable file-transfer-max-files limit for ReadAllFiles and add stronger file name/path validation on the CM side - Improve Flutter file transfer UX and robustness: - Use explicit percent/percentText progress fields - Derive speed and cancel actions from the active job - Handle job errors via FileModel.handleJobError and complete pending recursive tasks on failure - Wrap recursive directory operations in try/catch and await sendRemoveEmptyDir when removing empty directories Signed-off-by: fufesou --- .../lib/desktop/pages/file_manager_page.dart | 6 +- .../lib/mobile/pages/file_manager_page.dart | 14 +- flutter/lib/models/cm_file_model.dart | 2 +- flutter/lib/models/file_model.dart | 68 +- flutter/lib/models/model.dart | 2 +- src/client/io_loop.rs | 1 + src/common.rs | 12 + src/ipc.rs | 93 +++ src/server/connection.rs | 457 ++++++++-- src/server/input_service.rs | 7 +- src/ui_cm_interface.rs | 784 +++++++++++++++++- 11 files changed, 1349 insertions(+), 97 deletions(-) diff --git a/flutter/lib/desktop/pages/file_manager_page.dart b/flutter/lib/desktop/pages/file_manager_page.dart index 6dc89d09f..9e554cbe8 100644 --- a/flutter/lib/desktop/pages/file_manager_page.dart +++ b/flutter/lib/desktop/pages/file_manager_page.dart @@ -282,11 +282,9 @@ class _FileManagerPageState extends State item.state != JobState.inProgress, child: LinearPercentIndicator( animateFromLastPercent: true, - center: Text( - '${(item.finishedSize / item.totalSize * 100).toStringAsFixed(0)}%', - ), + center: Text(item.percentText), barRadius: Radius.circular(15), - percent: item.finishedSize / item.totalSize, + percent: item.percent, progressColor: MyTheme.accent, backgroundColor: Theme.of(context).hoverColor, lineHeight: kDesktopFileTransferRowHeight, diff --git a/flutter/lib/mobile/pages/file_manager_page.dart b/flutter/lib/mobile/pages/file_manager_page.dart index c7b183d35..745df67b5 100644 --- a/flutter/lib/mobile/pages/file_manager_page.dart +++ b/flutter/lib/mobile/pages/file_manager_page.dart @@ -355,15 +355,21 @@ class _FileManagerPageState extends State { return Offstage(); } - switch (jobTable.last.state) { + // Find the first job that is in progress (the one actually transferring data) + // Rust backend processes jobs sequentially, so the first inProgress job is the active one + final activeJob = jobTable + .firstWhereOrNull((job) => job.state == JobState.inProgress) ?? + jobTable.last; + + switch (activeJob.state) { case JobState.inProgress: return BottomSheetBody( leading: CircularProgressIndicator(), title: translate("Waiting"), text: - "${translate("Speed")}: ${readableFileSize(jobTable.last.speed)}/s", + "${translate("Speed")}: ${readableFileSize(activeJob.speed)}/s", onCanceled: () { - model.jobController.cancelJob(jobTable.last.id); + model.jobController.cancelJob(activeJob.id); jobTable.clear(); }, ); @@ -371,7 +377,7 @@ class _FileManagerPageState extends State { return BottomSheetBody( leading: Icon(Icons.check), title: "${translate("Successful")}!", - text: jobTable.last.display(), + text: activeJob.display(), onCanceled: () => jobTable.clear(), ); case JobState.error: diff --git a/flutter/lib/models/cm_file_model.dart b/flutter/lib/models/cm_file_model.dart index 6609f1191..46935c188 100644 --- a/flutter/lib/models/cm_file_model.dart +++ b/flutter/lib/models/cm_file_model.dart @@ -275,7 +275,7 @@ class TransferJobSerdeData { : this( connId: d['connId'] ?? 0, id: int.tryParse(d['id'].toString()) ?? 0, - path: d['path'] ?? '', + path: d['dataSource'] ?? '', isRemote: d['isRemote'] ?? false, totalSize: d['totalSize'] ?? 0, finishedSize: d['finishedSize'] ?? 0, diff --git a/flutter/lib/models/file_model.dart b/flutter/lib/models/file_model.dart index d2ae7cff2..35001cbf2 100644 --- a/flutter/lib/models/file_model.dart +++ b/flutter/lib/models/file_model.dart @@ -113,6 +113,34 @@ class FileModel { fileFetcher.tryCompleteEmptyDirsTask(evt['value'], evt['is_local']); } + // This method fixes a deadlock that occurred when the previous code directly + // called jobController.jobError(evt) in the job_error event handler. + // + // The problem with directly calling jobController.jobError(): + // 1. fetchDirectoryRecursiveToRemove(jobID) registers readRecursiveTasks[jobID] + // and waits for completion + // 2. If the remote has no permission (or some other errors), it returns a FileTransferError + // 3. The error triggers job_error event, which called jobController.jobError() + // 4. jobController.jobError() calls getJob(jobID) to find the job in jobTable + // 5. But addDeleteDirJob() is called AFTER fetchDirectoryRecursiveToRemove(), + // so the job doesn't exist yet in jobTable + // 6. Result: jobController.jobError() does nothing useful, and + // readRecursiveTasks[jobID] never completes, causing a 2s timeout + // + // Solution: Before calling jobController.jobError(), we first check if there's + // a pending readRecursiveTasks with this ID and complete it with the error. + void handleJobError(Map evt) { + final id = int.tryParse(evt['id']?.toString() ?? ''); + if (id != null) { + final err = evt['err']?.toString() ?? 'Unknown error'; + fileFetcher.tryCompleteRecursiveTaskWithError(id, err); + } + // Always call jobController.jobError(evt) to ensure all error events are processed, + // even if the event does not have a valid job ID. This allows for generic error handling + // or logging of unexpected errors. + jobController.jobError(evt); + } + Future postOverrideFileConfirm(Map evt) async { evtLoop.pushEvent( _FileDialogEvent(WeakReference(this), FileDialogType.overwrite, evt)); @@ -591,8 +619,21 @@ class FileController { } else if (item.isDirectory) { title = translate("Not an empty directory"); dialogManager?.showLoading(translate("Waiting")); - final fd = await fileFetcher.fetchDirectoryRecursiveToRemove( - jobID, item.path, items.isLocal, true); + final FileDirectory fd; + try { + fd = await fileFetcher.fetchDirectoryRecursiveToRemove( + jobID, item.path, items.isLocal, true); + } catch (e) { + dialogManager?.dismissAll(); + final dm = dialogManager; + if (dm != null) { + msgBox(sessionId, 'custom-error-nook-nocancel-hasclose', + translate("Error"), e.toString(), '', dm); + } else { + debugPrint("removeAction error msgbox failed: $e"); + } + return; + } if (fd.path.isEmpty) { fd.path = item.path; } @@ -606,7 +647,7 @@ class FileController { item.name, false); if (confirm == true) { - sendRemoveEmptyDir( + await sendRemoveEmptyDir( item.path, 0, deleteJobId, @@ -647,7 +688,7 @@ class FileController { // handle remove res; if (item.isDirectory && res['file_num'] == (entries.length - 1).toString()) { - sendRemoveEmptyDir(item.path, i, deleteJobId); + await sendRemoveEmptyDir(item.path, i, deleteJobId); } } else { jobController.updateJobStatus(deleteJobId, @@ -660,7 +701,7 @@ class FileController { final res = await jobController.jobResultListener.start(); if (item.isDirectory && res['file_num'] == (entries.length - 1).toString()) { - sendRemoveEmptyDir(item.path, i, deleteJobId); + await sendRemoveEmptyDir(item.path, i, deleteJobId); } } } else { @@ -755,9 +796,9 @@ class FileController { fileNum: fileNum); } - void sendRemoveEmptyDir(String path, int fileNum, int actId) { + Future sendRemoveEmptyDir(String path, int fileNum, int actId) async { history.removeWhere((element) => element.contains(path)); - bind.sessionRemoveAllEmptyDirs( + await bind.sessionRemoveAllEmptyDirs( sessionId: sessionId, actId: actId, path: path, isRemote: !isLocal); } @@ -1275,6 +1316,15 @@ class FileFetcher { } } + // Complete a pending recursive read task with an error. + // See FileModel.handleJobError() for why this is necessary. + void tryCompleteRecursiveTaskWithError(int id, String error) { + final completer = readRecursiveTasks.remove(id); + if (completer != null && !completer.isCompleted) { + completer.completeError(error); + } + } + Future> readEmptyDirs( String path, bool isLocal, bool showHidden) async { try { @@ -1438,6 +1488,10 @@ class JobProgress { var err = ""; int lastTransferredSize = 0; + double get percent => + totalSize > 0 ? (finishedSize.toDouble() / totalSize) : 0.0; + String get percentText => '${(percent * 100).toStringAsFixed(0)}%'; + clear() { type = JobType.none; state = JobState.none; diff --git a/flutter/lib/models/model.dart b/flutter/lib/models/model.dart index 6e3d77c54..e2f509c13 100644 --- a/flutter/lib/models/model.dart +++ b/flutter/lib/models/model.dart @@ -363,7 +363,7 @@ class FfiModel with ChangeNotifier { parent.target?.fileModel.refreshAll(); } } else if (name == 'job_error') { - parent.target?.fileModel.jobController.jobError(evt); + parent.target?.fileModel.handleJobError(evt); } else if (name == 'override_file_confirm') { parent.target?.fileModel.postOverrideFileConfirm(evt); } else if (name == 'load_last_job') { diff --git a/src/client/io_loop.rs b/src/client/io_loop.rs index 2b52c7233..e0b3fcd6d 100644 --- a/src/client/io_loop.rs +++ b/src/client/io_loop.rs @@ -1676,6 +1676,7 @@ impl Remote { } Some(file_response::Union::Error(e)) => { let job_type = fs::remove_job(e.id, &mut self.write_jobs) + .or_else(|| fs::remove_job(e.id, &mut self.read_jobs)) .map(|j| j.r#type) .unwrap_or(fs::JobType::Generic); match job_type { diff --git a/src/common.rs b/src/common.rs index 6decd2d04..0dc944d83 100644 --- a/src/common.rs +++ b/src/common.rs @@ -181,6 +181,18 @@ pub fn is_server() -> bool { *IS_SERVER } +#[inline] +pub fn need_fs_cm_send_files() -> bool { + #[cfg(windows)] + { + is_server() + } + #[cfg(not(windows))] + { + false + } +} + #[inline] pub fn is_main() -> bool { *IS_MAIN diff --git a/src/ipc.rs b/src/ipc.rs index 2281686ac..e5f163c2e 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -112,6 +112,33 @@ pub enum FS { path: String, new_name: String, }, + // CM-side file reading operations (Windows only) + // These enable Connection Manager to read files and stream them back to Connection + ReadFile { + path: String, + id: i32, + file_num: i32, + include_hidden: bool, + conn_id: i32, + overwrite_detection: bool, + }, + CancelRead { + id: i32, + conn_id: i32, + }, + SendConfirmForRead { + id: i32, + file_num: i32, + skip: bool, + offset_blk: u32, + conn_id: i32, + }, + ReadAllFiles { + path: String, + id: i32, + include_hidden: bool, + conn_id: i32, + }, } #[cfg(target_os = "windows")] @@ -268,6 +295,72 @@ pub enum Data { #[cfg(windows)] ControlledSessionCount(usize), CmErr(String), + // CM-side file reading responses (Windows only) + // These are sent from CM back to Connection when CM handles file reading + /// Response to ReadFile: contains initial file list or error + ReadJobInitResult { + id: i32, + file_num: i32, + include_hidden: bool, + conn_id: i32, + /// Serialized protobuf bytes of FileDirectory, or error string + result: Result, String>, + }, + /// File data block read by CM. + /// + /// The actual data is sent separately via `send_raw()` after this message to avoid + /// JSON encoding overhead for large binary data. This mirrors the `WriteBlock` pattern. + /// + /// **Protocol:** + /// - Sender: `send(FileBlockFromCM{...})` then `send_raw(data)` + /// - Receiver: `next()` returns `FileBlockFromCM`, then `next_raw()` returns data bytes + /// + /// **Note on empty data (e.g., empty files):** + /// Empty data is supported. The IPC connection uses `BytesCodec` with `raw=false` (default), + /// which prefixes each frame with a length header. So `send_raw(Bytes::new())` sends a + /// 1-byte frame (length=0), and `next_raw()` correctly returns an empty `BytesMut`. + /// See `libs/hbb_common/src/bytes_codec.rs` test `test_codec2` for verification. + FileBlockFromCM { + id: i32, + file_num: i32, + /// Data is sent separately via `send_raw()` to avoid JSON encoding overhead. + /// This field is skipped during serialization; sender must call `send_raw()` after sending. + /// Receiver must call `next_raw()` and populate this field manually. + #[serde(skip)] + data: bytes::Bytes, + compressed: bool, + conn_id: i32, + }, + /// File read completed successfully + FileReadDone { + id: i32, + file_num: i32, + conn_id: i32, + }, + /// File read failed with error + FileReadError { + id: i32, + file_num: i32, + err: String, + conn_id: i32, + }, + /// Digest info from CM for overwrite detection + FileDigestFromCM { + id: i32, + file_num: i32, + last_modified: u64, + file_size: u64, + is_resume: bool, + conn_id: i32, + }, + /// Response to ReadAllFiles: recursive directory listing + AllFilesResult { + id: i32, + conn_id: i32, + path: String, + /// Serialized protobuf bytes of FileDirectory, or error string + result: Result, String>, + }, CheckHwcodec, #[cfg(feature = "flutter")] VideoConnCount(Option), diff --git a/src/server/connection.rs b/src/server/connection.rs index af4892eb0..3670fb7cf 100644 --- a/src/server/connection.rs +++ b/src/server/connection.rs @@ -50,6 +50,7 @@ use serde_json::{json, value::Value}; #[cfg(not(any(target_os = "android", target_os = "ios")))] use std::sync::atomic::Ordering; use std::{ + collections::HashSet, net::Ipv6Addr, num::NonZeroI64, path::PathBuf, @@ -63,8 +64,6 @@ use windows::Win32::Foundation::{CloseHandle, HANDLE}; #[cfg(windows)] use crate::virtual_display_manager; -#[cfg(not(any(target_os = "ios")))] -use std::collections::HashSet; pub type Sender = mpsc::UnboundedSender<(Instant, Arc)>; lazy_static::lazy_static! { @@ -287,6 +286,11 @@ pub struct Connection { // For post requests that need to be sent sequentially. // eg. post_conn_audit tx_post_seq: mpsc::UnboundedSender<(String, Value)>, + // Tracks read job IDs delegated to CM process. + // When a read job is delegated to CM (via FS::ReadFile), the job id is added here. + // Used to filter stale responses (FileBlockFromCM, FileReadDone, etc.) for + // cancelled or unknown jobs. + cm_read_job_ids: HashSet, terminal_service_id: String, terminal_persistent: bool, // The user token must be set when terminal is enabled. @@ -459,6 +463,7 @@ impl Connection { tx_from_authed, printer_data: Vec::new(), tx_post_seq, + cm_read_job_ids: HashSet::new(), terminal_service_id: "".to_owned(), terminal_persistent: false, #[cfg(not(any(target_os = "android", target_os = "ios")))] @@ -717,6 +722,36 @@ impl Connection { let msg = new_voice_call_request(false); conn.send(msg).await; } + ipc::Data::ReadJobInitResult { id, file_num, include_hidden, conn_id, result } => { + if conn_id == conn.inner.id() { + conn.handle_read_job_init_result(id, file_num, include_hidden, result).await; + } + } + ipc::Data::FileBlockFromCM { id, file_num, data, compressed, conn_id } => { + if conn_id == conn.inner.id() { + conn.handle_file_block_from_cm(id, file_num, data, compressed).await; + } + } + ipc::Data::FileReadDone { id, file_num, conn_id } => { + if conn_id == conn.inner.id() { + conn.handle_file_read_done(id, file_num).await; + } + } + ipc::Data::FileReadError { id, file_num, err, conn_id } => { + if conn_id == conn.inner.id() { + conn.handle_file_read_error(id, file_num, err).await; + } + } + ipc::Data::FileDigestFromCM { id, file_num, last_modified, file_size, is_resume, conn_id } => { + if conn_id == conn.inner.id() { + conn.handle_file_digest_from_cm(id, file_num, last_modified, file_size, is_resume).await; + } + } + ipc::Data::AllFilesResult { id, conn_id, path, result } => { + if conn_id == conn.inner.id() { + conn.handle_all_files_result(id, path, result).await; + } + } _ => {} } }, @@ -2666,28 +2701,74 @@ impl Connection { self.read_dir(&rd.path, rd.include_hidden); } Some(file_action::Union::AllFiles(f)) => { - match fs::get_recursive_files(&f.path, f.include_hidden) { - Err(err) => { - self.send(fs::new_error(f.id, err, -1)).await; - } - Ok(files) => { - self.send(fs::new_dir(f.id, f.path, files)).await; + if crate::common::need_fs_cm_send_files() { + self.send_fs(ipc::FS::ReadAllFiles { + path: f.path, + id: f.id, + include_hidden: f.include_hidden, + conn_id: self.inner.id(), + }); + } else { + match fs::get_recursive_files(&f.path, f.include_hidden) { + Err(err) => { + log::error!( + "Failed to get recursive files for {}: {}", + f.path, + err + ); + self.send(fs::new_error(f.id, err, -1)).await; + } + Ok(files) => { + if let Err(msg) = + crate::ui_cm_interface::check_file_count_limit( + files.len(), + ) + { + self.send(fs::new_error(f.id, msg, -1)).await; + } else { + self.send(fs::new_dir(f.id, f.path, files)).await; + } + } } } } Some(file_action::Union::Send(s)) => { // server to client let id = s.id; - let od = can_enable_overwrite_detection(get_version_number( - &self.lr.version, - )); let path = s.path.clone(); - let r#type = JobType::from_proto(s.file_type); - let data_source; - match r#type { + let job_type = JobType::from_proto(s.file_type); + match job_type { JobType::Generic => { - data_source = - fs::DataSource::FilePath(PathBuf::from(&path)); + let od = can_enable_overwrite_detection( + get_version_number(&self.lr.version), + ); + if crate::common::need_fs_cm_send_files() { + // Delegate file reading to CM on Windows + self.cm_read_job_ids.insert(id); + self.send_fs(ipc::FS::ReadFile { + path, + id, + file_num: s.file_num, + include_hidden: s.include_hidden, + conn_id: self.inner.id(), + overwrite_detection: od, + }); + } else { + // Handle file reading in Connection on non-Windows + let data_source = + fs::DataSource::FilePath(PathBuf::from(&path)); + self.create_and_start_read_job( + id, + job_type, + data_source, + s.file_num, + s.include_hidden, + od, + path, + true, // check file count limit + ) + .await; + } } JobType::Printer => { if let Some((_, _, data)) = self @@ -2696,49 +2777,26 @@ impl Connection { .position(|(_, p, _)| *p == path) .map(|index| self.printer_data.remove(index)) { - data_source = fs::DataSource::MemoryCursor( + let data_source = fs::DataSource::MemoryCursor( std::io::Cursor::new(data), ); + // Printer jobs don't need file count limit check + self.create_and_start_read_job( + id, + job_type, + data_source, + s.file_num, + s.include_hidden, + true, // always enable overwrite detection for printer + path, + false, // no file count limit for printer + ) + .await; } else { // Ignore this message if the printer data is not found return true; } } - }; - match fs::TransferJob::new_read( - id, - r#type, - "".to_string(), - data_source, - s.file_num, - s.include_hidden, - false, - od, - ) { - Err(err) => { - self.send(fs::new_error(id, err, 0)).await; - } - Ok(mut job) => { - self.send(fs::new_dir(id, path, job.files().to_vec())) - .await; - let files = job.files().to_owned(); - job.is_remote = true; - job.conn_id = self.inner.id(); - let job_type = job.r#type; - self.read_jobs.push(job); - self.file_timer = - crate::rustdesk_interval(time::interval(MILLI1)); - self.post_file_audit( - FileAuditType::RemoteSend, - if job_type == fs::JobType::Printer { - "Remote print" - } else { - &s.path - }, - Self::get_files_for_audit(job_type, files), - json!({}), - ); - } } self.file_transferred = true; } @@ -2805,6 +2863,11 @@ impl Connection { } Some(file_action::Union::Cancel(c)) => { self.send_fs(ipc::FS::CancelWrite { id: c.id }); + let _ = self.cm_read_job_ids.remove(&c.id); + self.send_fs(ipc::FS::CancelRead { + id: c.id, + conn_id: self.inner.id(), + }); if let Some(job) = fs::remove_job(c.id, &mut self.read_jobs) { self.send_to_cm(ipc::Data::FileTransferLog(( "transfer".to_string(), @@ -2815,6 +2878,15 @@ impl Connection { Some(file_action::Union::SendConfirm(r)) => { if let Some(job) = fs::get_job(r.id, &mut self.read_jobs) { job.confirm(&r).await; + } else if self.cm_read_job_ids.contains(&r.id) { + // Forward to CM for CM-read jobs + self.send_fs(ipc::FS::SendConfirmForRead { + id: r.id, + file_num: r.file_num, + skip: r.skip(), + offset_blk: r.offset_blk(), + conn_id: self.inner.id(), + }); } else { if let Ok(sc) = r.write_to_bytes() { self.send_fs(ipc::FS::SendConfirm(sc)); @@ -4013,6 +4085,219 @@ impl Connection { raii::AuthedConnID::check_remove_session(self.inner.id(), self.session_key()); } + async fn handle_read_job_init_result( + &mut self, + id: i32, + _file_num: i32, + _include_hidden: bool, + result: Result, String>, + ) { + // Check if this response is still expected (not stale/cancelled) + if !self.cm_read_job_ids.contains(&id) { + log::warn!( + "Received ReadJobInitResult for unknown or stale job id={}, ignoring", + id + ); + return; + } + + match result { + Err(error) => { + self.cm_read_job_ids.remove(&id); + self.send(fs::new_error(id, error, 0)).await; + } + Ok(dir_bytes) => { + // Deserialize FileDirectory from protobuf bytes + let dir = match FileDirectory::parse_from_bytes(&dir_bytes) { + Ok(d) => d, + Err(e) => { + log::error!("Failed to parse FileDirectory: {}", e); + self.cm_read_job_ids.remove(&id); + self.send(fs::new_error(id, "internal error".to_string(), 0)) + .await; + return; + } + }; + + let path_str = dir.path.clone(); + let file_entries: Vec = dir.entries.into(); + + // Send file directory to client + self.send(fs::new_dir(id, path_str.clone(), file_entries.clone())) + .await; + + // Post audit for file transfer + self.post_file_audit( + FileAuditType::RemoteSend, + &path_str, + Self::get_files_for_audit(fs::JobType::Generic, file_entries), + json!({}), + ); + + // CM will handle the actual file reading and send blocks via IPC + self.file_transferred = true; + } + } + } + + async fn handle_file_block_from_cm( + &mut self, + id: i32, + file_num: i32, + data: bytes::Bytes, + compressed: bool, + ) { + // Check if the job is still valid (not cancelled) + if !self.cm_read_job_ids.contains(&id) { + log::debug!( + "Dropping file block for cancelled/unknown job id={}, file_num={}", + id, + file_num + ); + return; + } + + // Forward file block to client + let mut block = FileTransferBlock::new(); + block.id = id; + block.file_num = file_num; + block.data = data.to_vec().into(); + block.compressed = compressed; + + let mut msg = Message::new(); + let mut fr = FileResponse::new(); + fr.set_block(block); + msg.set_file_response(fr); + self.send(msg).await; + } + + async fn handle_file_read_done(&mut self, id: i32, file_num: i32) { + // Drop stale completions for cancelled/unknown jobs + if !self.cm_read_job_ids.remove(&id) { + log::debug!( + "Dropping FileReadDone for cancelled/unknown job id={}, file_num={}", + id, + file_num + ); + return; + } + + // Forward done message to client + let mut done = FileTransferDone::new(); + done.id = id; + done.file_num = file_num; + + let mut msg = Message::new(); + let mut fr = FileResponse::new(); + fr.set_done(done); + msg.set_file_response(fr); + self.send(msg).await; + } + + async fn handle_file_read_error(&mut self, id: i32, file_num: i32, err: String) { + // Drop stale errors for cancelled/unknown jobs + if !self.cm_read_job_ids.remove(&id) { + log::debug!( + "Dropping FileReadError for cancelled/unknown job id={}, file_num={}", + id, + file_num + ); + return; + } + + // Forward error to client + self.send(fs::new_error(id, err, file_num)).await; + } + + async fn handle_file_digest_from_cm( + &mut self, + id: i32, + file_num: i32, + last_modified: u64, + file_size: u64, + is_resume: bool, + ) { + // Check if the job is still valid (not cancelled) + if !self.cm_read_job_ids.contains(&id) { + log::debug!( + "Dropping digest for cancelled/unknown job id={}, file_num={}", + id, + file_num + ); + return; + } + + // Forward digest to client for overwrite detection + let mut digest = FileTransferDigest::new(); + digest.id = id; + digest.file_num = file_num; + digest.last_modified = last_modified; + digest.file_size = file_size; + digest.is_upload = false; // Server sending to client + digest.is_resume = is_resume; + + let mut msg = Message::new(); + let mut fr = FileResponse::new(); + fr.set_digest(digest); + msg.set_file_response(fr); + self.send(msg).await; + } + + async fn process_new_read_job(&mut self, mut job: fs::TransferJob, path: String) { + let files = job.files().to_owned(); + let job_type = job.r#type; + self.send(fs::new_dir(job.id, path.clone(), files.clone())) + .await; + job.is_remote = true; + job.conn_id = self.inner.id(); + self.read_jobs.push(job); + self.file_timer = crate::rustdesk_interval(time::interval(MILLI1)); + let audit_path = if job_type == fs::JobType::Printer { + "Remote print".to_owned() + } else { + path + }; + self.post_file_audit( + FileAuditType::RemoteSend, + &audit_path, + Self::get_files_for_audit(job_type, files), + json!({}), + ); + } + + async fn handle_all_files_result( + &mut self, + id: i32, + path: String, + result: Result, String>, + ) { + match result { + Err(err) => { + self.send(fs::new_error(id, err, -1)).await; + } + Ok(bytes) => { + // Deserialize FileDirectory from protobuf bytes and send as FileResponse + match FileDirectory::parse_from_bytes(&bytes) { + Ok(fd) => { + let mut msg = Message::new(); + let mut fr = FileResponse::new(); + fr.set_dir(fd); + msg.set_file_response(fr); + self.send(msg).await; + } + Err(e) => { + self.send(fs::new_error( + id, + format!("deserialize failed for {}: {}", path, e), + -1, + )) + .await; + } + } + } + } + } + fn read_empty_dirs(&mut self, dir: &str, include_hidden: bool) { let dir = dir.to_string(); self.send_fs(ipc::FS::ReadEmptyDirs { @@ -4029,6 +4314,57 @@ impl Connection { }); } + /// Create a new read job and start processing it (Connection-side). + /// + /// This is a generic Connection-side read job creation helper used for: + /// - Generic file transfers on non-Windows platforms + /// - Printer jobs on all platforms (including Windows) + /// + /// On Windows, generic file reads are delegated to CM via `start_read_job()` in + /// `src/ui_cm_interface.rs` for elevated access. Printer jobs bypass this delegation + /// since they read from in-memory data (`MemoryCursor`), not the filesystem. + /// + /// Both Connection-side and CM-side implementations use `TransferJob::new_read()` + /// with similar parameters. When modifying job creation logic, ensure both paths + /// stay in sync. + async fn create_and_start_read_job( + &mut self, + id: i32, + job_type: fs::JobType, + data_source: fs::DataSource, + file_num: i32, + include_hidden: bool, + overwrite_detection: bool, + path: String, + check_file_limit: bool, + ) { + match fs::TransferJob::new_read( + id, + job_type, + "".to_string(), + data_source, + file_num, + include_hidden, + false, + overwrite_detection, + ) { + Err(err) => { + self.send(fs::new_error(id, err, 0)).await; + } + Ok(job) => { + if check_file_limit { + if let Err(msg) = + crate::ui_cm_interface::check_file_count_limit(job.files().len()) + { + self.send(fs::new_error(id, msg, -1)).await; + return; + } + } + self.process_new_read_job(job, path).await; + } + } + } + #[inline] async fn send(&mut self, msg: Message) { allow_err!(self.stream.send(&msg).await); @@ -4436,6 +4772,23 @@ async fn start_ipc( let data = ipc::Data::ClickTime(ct); stream.send(&data).await?; } + // FileBlockFromCM: data is always sent separately via send_raw. + // The data field has #[serde(skip)], so it's empty after deserialization. + // Read the raw data bytes following this message. + // + // Note: Empty data (for empty files) is correctly handled. BytesCodec with + // raw=false adds a length prefix, so next_raw() returns empty BytesMut for + // zero-length frames. This mirrors the WriteBlock pattern below. + ipc::Data::FileBlockFromCM { id, file_num, data: _, compressed, conn_id } => { + let raw_data = stream.next_raw().await?; + tx_from_cm.send(ipc::Data::FileBlockFromCM { + id, + file_num, + data: raw_data.into(), + compressed, + conn_id, + })?; + } _ => { tx_from_cm.send(data)?; } diff --git a/src/server/input_service.rs b/src/server/input_service.rs index 203651b58..adb6a7a97 100644 --- a/src/server/input_service.rs +++ b/src/server/input_service.rs @@ -17,13 +17,12 @@ use rdev::{self, EventType, Key as RdevKey, KeyCode, RawKey}; use rdev::{CGEventSourceStateID, CGEventTapLocation, VirtualInput}; #[cfg(target_os = "linux")] use scrap::wayland::pipewire::RDP_SESSION_INFO; +#[cfg(target_os = "linux")] +use std::sync::mpsc; use std::{ convert::TryFrom, ops::{Deref, DerefMut}, - sync::{ - atomic::{AtomicBool, Ordering}, - mpsc, - }, + sync::atomic::{AtomicBool, Ordering}, thread, time::{self, Duration, Instant}, }; diff --git a/src/ui_cm_interface.rs b/src/ui_cm_interface.rs index 959187cb9..d1c1d21ef 100644 --- a/src/ui_cm_interface.rs +++ b/src/ui_cm_interface.rs @@ -6,13 +6,14 @@ use crate::ipc::{self, Data}; use crate::{clipboard::ClipboardSide, ipc::ClipboardNonFile}; #[cfg(target_os = "windows")] use clipboard::ContextSend; +#[cfg(not(any(target_os = "ios")))] +use hbb_common::fs::serialize_transfer_job; #[cfg(not(any(target_os = "android", target_os = "ios")))] use hbb_common::tokio::sync::mpsc::unbounded_channel; use hbb_common::{ - allow_err, - config::Config, - fs::is_write_need_confirmation, - fs::{self, get_string, new_send_confirm, DigestCheckResult}, + allow_err, bail, + config::{keys::OPTION_FILE_TRANSFER_MAX_FILES, Config}, + fs::{self, get_string, is_write_need_confirmation, new_send_confirm, DigestCheckResult}, log, message_proto::*, protobuf::Message as _, @@ -21,16 +22,18 @@ use hbb_common::{ sync::mpsc::{self, UnboundedSender}, task::spawn_blocking, }, + ResultType, }; #[cfg(target_os = "windows")] use hbb_common::{ config::{keys::*, option2bool}, tokio::sync::Mutex as TokioMutex, - ResultType, }; use serde_derive::Serialize; #[cfg(any(target_os = "android", target_os = "ios", feature = "flutter"))] use std::iter::FromIterator; +#[cfg(not(any(target_os = "ios")))] +use std::path::PathBuf; #[cfg(target_os = "windows")] use std::sync::Arc; use std::{ @@ -42,6 +45,85 @@ use std::{ }, }; +/// Default maximum number of files allowed per transfer request. +/// Unit: number of files (not bytes). +#[cfg(not(any(target_os = "ios")))] +const DEFAULT_MAX_VALIDATED_FILES: usize = 10_000; + +/// Maximum number of files allowed in a single file transfer request. +/// +/// This limit prevents excessive I/O and memory usage when dealing with +/// large directories. It applies to: +/// - CM-side read jobs (server to client file transfers on Windows) +/// - `AllFiles` recursive directory listing operations +/// - Connection-side read jobs (non-Windows platforms) +/// +/// Unit: number of files (not bytes). +/// Default: 10,000 files. +/// Configured via: `OPTION_FILE_TRANSFER_MAX_FILES` ("file-transfer-max-files") +#[cfg(not(any(target_os = "ios")))] +static MAX_VALIDATED_FILES: std::sync::OnceLock = std::sync::OnceLock::new(); + +/// Get the maximum number of files allowed per transfer request. +/// +/// Initializes the value from configuration (`OPTION_FILE_TRANSFER_MAX_FILES`) +/// on first call. Semantics: +/// - If the option is set to `0`, `DEFAULT_MAX_VALIDATED_FILES` (10,000) is used as a safe upper bound. +/// - If the option is unset, negative, or non-integer, +/// `usize::MAX` is used to represent "no limit" for backward compatibility with older versions +/// that did not enforce any file‑count restriction. +/// (Note: negative values are not valid for `usize` and will cause parsing to fail.) +/// +/// Unit: number of files. +#[cfg(not(any(target_os = "ios")))] +#[inline] +pub fn get_max_validated_files() -> usize { + // If `OPTION_FILE_TRANSFER_MAX_FILES` unset, negative, or non-integer, use + // `usize::MAX` to represent "no limit", maintaining backward compatibility + // with versions that had no file transfer restrictions. + const NO_LIMIT_FILE_COUNT: usize = usize::MAX; + *MAX_VALIDATED_FILES.get_or_init(|| { + let c = crate::get_builtin_option(OPTION_FILE_TRANSFER_MAX_FILES) + .trim() + .parse::() + .unwrap_or(NO_LIMIT_FILE_COUNT); + if c == 0 { + DEFAULT_MAX_VALIDATED_FILES + } else { + c + } + }) +} + +/// Check if file count exceeds the maximum allowed limit. +/// +/// This check is enforced in: +/// - `start_read_job()` for CM-side read jobs +/// - `read_all_files()` for recursive directory listings +/// - `Connection::on_message()` for connection-side read jobs +/// +/// # Arguments +/// * `file_count` - Number of files in the transfer request +/// +/// # Returns +/// * `Ok(())` if within limit +/// * `Err(String)` with error message if limit exceeded +#[cfg(not(any(target_os = "ios")))] +pub fn check_file_count_limit(file_count: usize) -> Result<(), String> { + let max_files = get_max_validated_files(); + if file_count > max_files { + let msg = format!( + "file transfer rejected: too many files ({} files exceeds limit of {}). \ + Adjust '{}' option to increase limit.", + file_count, max_files, OPTION_FILE_TRANSFER_MAX_FILES + ); + log::warn!("{}", msg); + Err(msg) + } else { + Ok(()) + } +} + #[derive(Serialize, Clone)] pub struct Client { pub id: i32, @@ -81,6 +163,8 @@ struct IpcTaskRunner { file_transfer_enabled: bool, #[cfg(target_os = "windows")] file_transfer_enabled_peer: bool, + /// Read jobs for CM-side file reading (server to client transfers) + read_jobs: Vec, } lazy_static::lazy_static! { @@ -348,9 +432,16 @@ pub fn switch_back(id: i32) { impl IpcTaskRunner { async fn run(&mut self) { use hbb_common::config::LocalConfig; + use hbb_common::tokio::time::{self, Duration, Instant}; + + const MILLI5: Duration = Duration::from_millis(5); + const SEC30: Duration = Duration::from_secs(30); // for tmp use, without real conn id let mut write_jobs: Vec = Vec::new(); + // File timer for processing read_jobs + let mut file_timer = + crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30)); #[cfg(target_os = "windows")] let is_authorized = self.cm.is_authorized(self.conn_id); @@ -443,10 +534,16 @@ impl IpcTaskRunner { if let ipc::FS::WriteBlock { id, file_num, data: _, compressed } = fs { if let Ok(bytes) = self.stream.next_raw().await { fs = ipc::FS::WriteBlock{id, file_num, data:bytes.into(), compressed}; - handle_fs(fs, &mut write_jobs, &self.tx, Some(&tx_log)).await; + handle_fs(fs, &mut write_jobs, &mut self.read_jobs, &self.tx, Some(&tx_log), self.conn_id).await; } } else { - handle_fs(fs, &mut write_jobs, &self.tx, Some(&tx_log)).await; + handle_fs(fs, &mut write_jobs, &mut self.read_jobs, &self.tx, Some(&tx_log), self.conn_id).await; + } + // Activate fast timer immediately when read jobs exist. + // This ensures new jobs start processing without waiting for the slow 30s timer. + // Deactivation (back to 30s) happens in tick handler when jobs are exhausted. + if !self.read_jobs.is_empty() { + file_timer = crate::rustdesk_interval(time::interval(MILLI5)); } let log = fs::serialize_transfer_jobs(&write_jobs); self.cm.ui_handler.file_transfer_log("transfer", &log); @@ -550,6 +647,31 @@ impl IpcTaskRunner { } } Some(data) = self.rx.recv() => { + // For FileBlockFromCM, data is sent separately via send_raw (data field has #[serde(skip)]). + // This avoids JSON encoding overhead for large binary data. + // This mirrors the WriteBlock pattern in start_ipc (see rx_to_cm handler). + // + // Note: Empty data (for empty files) is correctly handled. BytesCodec with raw=false + // (the default for IPC connections) adds a length prefix, so send_raw(Bytes::new()) + // sends a 1-byte frame that next_raw() can correctly receive as empty data. + if let Data::FileBlockFromCM { id, file_num, ref data, compressed, conn_id } = data { + // Send metadata first (data field is skipped by serde), then raw data bytes + if let Err(e) = self.stream.send(&Data::FileBlockFromCM { + id, + file_num, + data: bytes::Bytes::new(), // placeholder, skipped by serde + compressed, + conn_id, + }).await { + log::error!("error sending FileBlockFromCM metadata: {}", e); + break; + } + if let Err(e) = self.stream.send_raw(data.clone()).await { + log::error!("error sending FileBlockFromCM data: {}", e); + break; + } + continue; + } if let Err(e) = self.stream.send(&data).await { log::error!("error encountered in IPC task, quitting: {}", e); break; @@ -600,6 +722,18 @@ impl IpcTaskRunner { Some(job_log) = rx_log.recv() => { self.cm.ui_handler.file_transfer_log("transfer", &job_log); } + _ = file_timer.tick() => { + if !self.read_jobs.is_empty() { + let conn_id = self.conn_id; + if let Err(e) = handle_read_jobs_tick(&mut self.read_jobs, &self.tx, conn_id).await { + log::error!("Error processing read jobs: {}", e); + } + let log = fs::serialize_transfer_jobs(&self.read_jobs); + self.cm.ui_handler.file_transfer_log("transfer", &log); + } else { + file_timer = crate::rustdesk_interval(time::interval_at(Instant::now() + SEC30, SEC30)); + } + } } } } @@ -619,6 +753,7 @@ impl IpcTaskRunner { file_transfer_enabled: false, #[cfg(target_os = "windows")] file_transfer_enabled_peer: false, + read_jobs: Vec::new(), }; while task_runner.running { @@ -720,7 +855,17 @@ pub async fn start_listen( cm.new_message(current_id, text); } Some(Data::FS(fs)) => { - handle_fs(fs, &mut write_jobs, &tx, None).await; + // Android doesn't need CM-side file reading (no need_validate_file_read_access) + let mut read_jobs_placeholder: Vec = Vec::new(); + handle_fs( + fs, + &mut write_jobs, + &mut read_jobs_placeholder, + &tx, + None, + current_id, + ) + .await; } Some(Data::Close) => { break; @@ -747,13 +892,11 @@ pub async fn start_listen( async fn handle_fs( fs: ipc::FS, write_jobs: &mut Vec, + read_jobs: &mut Vec, tx: &UnboundedSender, tx_log: Option<&UnboundedSender>, + _conn_id: i32, ) { - use std::path::PathBuf; - - use hbb_common::fs::serialize_transfer_job; - match fs { ipc::FS::ReadEmptyDirs { dir, @@ -789,6 +932,25 @@ async fn handle_fs( total_size, conn_id, } => { + // Validate file names to prevent path traversal attacks. + // This must be done BEFORE any path operations to ensure attackers cannot + // escape the target directory using names like "../../malicious.txt" + if let Err(e) = validate_transfer_file_names(&files) { + log::warn!("Path traversal attempt detected for {}: {}", path, e); + send_raw(fs::new_error(id, e, file_num), tx); + return; + } + + // Convert files to FileEntry + let file_entries: Vec = files + .drain(..) + .map(|f| FileEntry { + name: f.0, + modified_time: f.1, + ..Default::default() + }) + .collect(); + // cm has no show_hidden context // dummy remote, show_hidden, is_remote let mut job = fs::TransferJob::new_write( @@ -799,14 +961,7 @@ async fn handle_fs( file_num, false, false, - files - .drain(..) - .map(|f| FileEntry { - name: f.0, - modified_time: f.1, - ..Default::default() - }) - .collect(), + file_entries, overwrite_detection, ); job.total_size = total_size; @@ -816,9 +971,11 @@ async fn handle_fs( ipc::FS::CancelWrite { id } => { if let Some(job) = fs::remove_job(id, write_jobs) { job.remove_download_file(); - tx_log.map(|tx: &UnboundedSender| { - tx.send(serialize_transfer_job(&job, false, true, "")) - }); + if let Some(tx) = tx_log { + if let Err(e) = tx.send(serialize_transfer_job(&job, false, true, "")) { + log::error!("error sending transfer job log via IPC: {}", e); + } + } } } ipc::FS::WriteDone { id, file_num } => { @@ -922,10 +1079,436 @@ async fn handle_fs( ipc::FS::Rename { id, path, new_name } => { rename_file(path, new_name, id, tx).await; } + ipc::FS::ReadFile { + path, + id, + file_num, + include_hidden, + conn_id, + overwrite_detection, + } => { + start_read_job( + path, + file_num, + include_hidden, + id, + conn_id, + overwrite_detection, + read_jobs, + tx, + ) + .await; + } + // Cancel an ongoing read job (file transfer from server to client). + // Note: This only cancels jobs in `read_jobs`. It does NOT cancel `ReadAllFiles` + // operations, which are one-shot directory scans that complete quickly and don't + // have persistent job tracking. + ipc::FS::CancelRead { id, conn_id: _ } => { + if let Some(job) = fs::remove_job(id, read_jobs) { + if let Some(tx) = tx_log { + if let Err(e) = tx.send(serialize_transfer_job(&job, false, true, "")) { + log::error!("error sending transfer job log via IPC: {}", e); + } + } + } + } + ipc::FS::SendConfirmForRead { + id, + file_num: _, + skip, + offset_blk, + conn_id: _, + } => { + if let Some(job) = fs::get_job(id, read_jobs) { + let req = FileTransferSendConfirmRequest { + id, + file_num: job.file_num(), + union: if skip { + Some(file_transfer_send_confirm_request::Union::Skip(true)) + } else { + Some(file_transfer_send_confirm_request::Union::OffsetBlk( + offset_blk, + )) + }, + ..Default::default() + }; + job.confirm(&req).await; + } + } + // Recursively list all files in a directory. + // This is a one-shot operation that cannot be cancelled via CancelRead. + // The operation typically completes quickly as it only reads directory metadata, + // not file contents. File count is limited by `check_file_count_limit()`. + ipc::FS::ReadAllFiles { + path, + id, + include_hidden, + conn_id, + } => { + read_all_files(path, include_hidden, id, conn_id, tx).await; + } _ => {} } } +/// Validates that a file name does not contain path traversal sequences. +/// This prevents attackers from escaping the base directory by using names like +/// "../../../etc/passwd" or "..\\..\\Windows\\System32\\malicious.dll". +#[cfg(not(any(target_os = "ios")))] +fn validate_file_name_no_traversal(name: &str) -> ResultType<()> { + // Check for null bytes which could cause path truncation in some APIs + if name.bytes().any(|b| b == 0) { + bail!("file name contains null bytes"); + } + + // Check for path traversal patterns + // We check for both Unix and Windows path separators + if name + .split(|c| c == '/' || c == '\\') + .filter(|s| !s.is_empty()) + .any(|component| component == "..") + { + bail!("path traversal detected in file name"); + } + + // On Windows, also check for drive letters (e.g., "C:") + #[cfg(windows)] + { + if name.len() >= 2 { + let bytes = name.as_bytes(); + if bytes[0].is_ascii_alphabetic() && bytes[1] == b':' { + bail!("absolute path detected in file name"); + } + } + } + + // Check for names starting with path separator: + // - Unix absolute paths (e.g., "/etc/passwd") + // - Windows UNC paths (e.g., "\\server\share") + if name.starts_with('/') || name.starts_with('\\') { + bail!("absolute path detected in file name"); + } + + Ok(()) +} + +#[inline] +fn is_single_file_with_empty_name(files: &[(String, u64)]) -> bool { + files.len() == 1 && files.first().map_or(false, |f| f.0.is_empty()) +} + +/// Validates all file names in a transfer request to prevent path traversal attacks. +/// Returns an error if any file name contains dangerous path components. +#[cfg(not(any(target_os = "ios")))] +fn validate_transfer_file_names(files: &[(String, u64)]) -> ResultType<()> { + if is_single_file_with_empty_name(files) { + // Allow empty name for single file. + // The full path is provided in the `path` parameter for single file transfers. + return Ok(()); + } + + for (name, _) in files { + // In multi-file transfers, empty names are not allowed. + // Each file must have a valid name to construct the destination path. + if name.is_empty() { + bail!("empty file name in multi-file transfer"); + } + validate_file_name_no_traversal(name)?; + } + Ok(()) +} + +/// Start a read job in CM for file transfer from server to client (Windows only). +/// +/// This creates a `TransferJob` using `new_read()`, validates it, and sends the +/// initial file list back to Connection via IPC. +/// +/// NOTE: This is the CM-side equivalent of `create_and_start_read_job()` in +/// `src/server/connection.rs`. On non-Windows platforms, Connection handles +/// read jobs directly. Both use `TransferJob::new_read()` with similar logic. +/// When modifying job creation or validation, ensure both paths stay in sync. +#[cfg(not(any(target_os = "ios")))] +async fn start_read_job( + path: String, + file_num: i32, + include_hidden: bool, + id: i32, + conn_id: i32, + overwrite_detection: bool, + read_jobs: &mut Vec, + tx: &UnboundedSender, +) { + let path_clone = path.clone(); + let result = spawn_blocking(move || -> ResultType { + let data_source = fs::DataSource::FilePath(PathBuf::from(&path)); + fs::TransferJob::new_read( + id, + fs::JobType::Generic, + "".to_string(), + data_source, + file_num, + include_hidden, + true, + overwrite_detection, + ) + }) + .await; + + match result { + Ok(Ok(mut job)) => { + // Optional: enforce file count limit for CM-side jobs to avoid + // excessive I/O. This is applied on the job's file list produced + // by `new_read`, similar to how AllFiles uses the same helper. + if let Err(msg) = check_file_count_limit(job.files().len()) { + if let Err(e) = tx.send(Data::ReadJobInitResult { + id, + file_num, + include_hidden, + conn_id, + result: Err(msg), + }) { + log::error!("error sending ReadJobInitResult via IPC: {}", e); + } + return; + } + + // Build FileDirectory from the job's file list and serialize + let files = job.files().to_owned(); + let mut dir = FileDirectory::new(); + dir.id = id; + dir.path = path_clone.clone(); + dir.entries = files.clone().into(); + + let dir_bytes = match dir.write_to_bytes() { + Ok(bytes) => bytes, + Err(e) => { + if let Err(e) = tx.send(Data::ReadJobInitResult { + id, + file_num, + include_hidden, + conn_id, + result: Err(format!("serialize failed: {}", e)), + }) { + log::error!("error sending ReadJobInitResult via IPC: {}", e); + } + return; + } + }; + + if let Err(e) = tx.send(Data::ReadJobInitResult { + id, + file_num, + include_hidden, + conn_id, + result: Ok(dir_bytes), + }) { + log::error!("error sending ReadJobInitResult via IPC: {}", e); + } + + // Attach connection id so CM can route read blocks back correctly + job.conn_id = conn_id; + read_jobs.push(job); + } + Ok(Err(e)) => { + if let Err(e) = tx.send(Data::ReadJobInitResult { + id, + file_num, + include_hidden, + conn_id, + result: Err(format!("validation failed: {}", e)), + }) { + log::error!("error sending ReadJobInitResult via IPC: {}", e); + } + } + Err(e) => { + if let Err(e) = tx.send(Data::ReadJobInitResult { + id, + file_num, + include_hidden, + conn_id, + result: Err(format!("validation task failed: {}", e)), + }) { + log::error!("error sending ReadJobInitResult via IPC: {}", e); + } + } + } +} + +/// Process read jobs periodically, reading file blocks and sending them via IPC. +/// +/// NOTE: This is the CM-side equivalent of `handle_read_jobs()` in +/// `libs/hbb_common/src/fs.rs`. The logic mirrors that implementation +/// but communicates via IPC instead of direct network stream. +/// When modifying job processing logic, ensure both implementations stay in sync. +#[cfg(not(any(target_os = "ios")))] +async fn handle_read_jobs_tick( + jobs: &mut Vec, + tx: &UnboundedSender, + conn_id: i32, +) -> ResultType<()> { + let mut finished = Vec::new(); + + for job in jobs.iter_mut() { + if job.is_last_job { + continue; + } + + // Initialize data stream if needed (opens file, sends digest for overwrite detection) + if let Err(err) = init_read_job_for_cm(job, tx, conn_id).await { + if let Err(e) = tx.send(Data::FileReadError { + id: job.id, + file_num: job.file_num(), + err: format!("{}", err), + conn_id, + }) { + log::error!("error sending FileReadError via IPC: {}", e); + } + finished.push(job.id); + continue; + } + + // Read a block from the file + match job.read().await { + Err(err) => { + if let Err(e) = tx.send(Data::FileReadError { + id: job.id, + file_num: job.file_num(), + err: format!("{}", err), + conn_id, + }) { + log::error!("error sending FileReadError via IPC: {}", e); + } + // Mark job as finished to prevent infinite retries. + // Connection side will have already removed cm_read_job_ids + // after receiving FileReadError, so continuing would be pointless. + finished.push(job.id); + } + Ok(Some(block)) => { + if let Err(e) = tx.send(Data::FileBlockFromCM { + id: block.id, + file_num: block.file_num, + data: block.data, + compressed: block.compressed, + conn_id, + }) { + log::error!("error sending FileBlockFromCM via IPC: {}", e); + } + } + Ok(None) => { + if job.job_completed() { + finished.push(job.id); + match job.job_error() { + Some(err) => { + if let Err(e) = tx.send(Data::FileReadError { + id: job.id, + file_num: job.file_num(), + err, + conn_id, + }) { + log::error!("error sending FileReadError via IPC: {}", e); + } + } + None => { + if let Err(e) = tx.send(Data::FileReadDone { + id: job.id, + file_num: job.file_num(), + conn_id, + }) { + log::error!("error sending FileReadDone via IPC: {}", e); + } + } + } + } + // else: waiting for confirmation from peer + } + } + // Break to handle jobs one by one. + break; + } + + for id in finished { + let _ = fs::remove_job(id, jobs); + } + + Ok(()) +} + +/// Initialize a read job's data stream and handle digest sending for overwrite detection. +/// +/// NOTE: This is the CM-side equivalent of `TransferJob::init_data_stream()` in +/// `libs/hbb_common/src/fs.rs`. It calls `init_data_stream_for_cm()` and sends +/// digest via IPC instead of direct network stream. +/// When modifying initialization or digest logic, ensure both paths stay in sync. +#[cfg(not(any(target_os = "ios")))] +async fn init_read_job_for_cm( + job: &mut fs::TransferJob, + tx: &UnboundedSender, + conn_id: i32, +) -> ResultType<()> { + // Initialize data stream and get digest info if overwrite detection is needed + match job.init_data_stream_for_cm().await? { + Some((last_modified, file_size)) => { + // Send digest via IPC for overwrite detection + if let Err(e) = tx.send(Data::FileDigestFromCM { + id: job.id, + file_num: job.file_num(), + last_modified, + file_size, + is_resume: job.is_resume, + conn_id, + }) { + log::error!("error sending FileDigestFromCM via IPC: {}", e); + } + } + None => { + // Job done or already initialized, nothing to do + } + } + Ok(()) +} + +#[cfg(not(any(target_os = "ios")))] +async fn read_all_files( + path: String, + include_hidden: bool, + id: i32, + conn_id: i32, + tx: &UnboundedSender, +) { + let path_clone = path.clone(); + let result = spawn_blocking(move || fs::get_recursive_files(&path, include_hidden)).await; + + let result = match result { + Ok(Ok(files)) => { + // Check file count limit to prevent excessive I/O and resource usage + if let Err(msg) = check_file_count_limit(files.len()) { + Err(msg) + } else { + // Serialize FileDirectory to protobuf bytes + let mut fd = FileDirectory::new(); + fd.id = id; + fd.path = path_clone.clone(); + fd.entries = files.into(); + match fd.write_to_bytes() { + Ok(bytes) => Ok(bytes), + Err(e) => Err(format!("serialize failed: {}", e)), + } + } + } + Ok(Err(e)) => Err(format!("{}", e)), + Err(e) => Err(format!("task failed: {}", e)), + }; + + if let Err(e) = tx.send(Data::AllFilesResult { + id, + conn_id, + path: path_clone, + result, + }) { + log::error!("error sending AllFilesResult via IPC: {}", e); + } +} + #[cfg(not(any(target_os = "ios")))] async fn read_empty_dirs(dir: &str, include_hidden: bool, tx: &UnboundedSender) { let path = dir.to_owned(); @@ -1009,7 +1592,16 @@ async fn create_dir(path: String, id: i32, tx: &UnboundedSender) { #[cfg(not(any(target_os = "ios")))] async fn rename_file(path: String, new_name: String, id: i32, tx: &UnboundedSender) { handle_result( - spawn_blocking(move || fs::rename_file(&path, &new_name)).await, + spawn_blocking(move || { + // Rename target must not be empty + if new_name.is_empty() { + bail!("new file name cannot be empty"); + } + // Validate that new_name doesn't contain path traversal + validate_file_name_no_traversal(&new_name)?; + fs::rename_file(&path, &new_name) + }) + .await, id, 0, tx, @@ -1106,3 +1698,147 @@ pub fn quit_cm() { CLIENTS.write().unwrap().clear(); crate::platform::quit_gui(); } + +#[cfg(test)] +mod tests { + use super::*; + + use crate::ipc::Data; + use hbb_common::{ + message_proto::{FileDirectory, Message}, + tokio::{runtime::Runtime, sync::mpsc::unbounded_channel}, + }; + use std::fs; + + #[test] + #[cfg(not(any(target_os = "ios")))] + fn read_all_files_success() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let (tx, mut rx) = unbounded_channel(); + let dir = std::env::temp_dir().join("rustdesk_read_all_test"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + fs::write(dir.join("test.txt"), b"hello").unwrap(); + + let path_str = dir.to_string_lossy().to_string(); + super::read_all_files(path_str.clone(), false, 1, 2, &tx).await; + + match rx.recv().await.unwrap() { + Data::AllFilesResult { result, .. } => { + let bytes = result.unwrap(); + let fd = FileDirectory::parse_from_bytes(&bytes).unwrap(); + assert!(!fd.entries.is_empty()); + } + _ => panic!("unexpected data"), + } + let _ = fs::remove_dir_all(&dir); + }); + } + + #[test] + #[cfg(not(any(target_os = "ios")))] + fn read_dir_success() { + let rt = Runtime::new().unwrap(); + rt.block_on(async { + let (tx, mut rx) = unbounded_channel(); + let dir = std::env::temp_dir().join("rustdesk_read_dir_test"); + let _ = fs::remove_dir_all(&dir); + fs::create_dir_all(&dir).unwrap(); + + super::read_dir(&dir.to_string_lossy(), false, &tx).await; + + match rx.recv().await.unwrap() { + Data::RawMessage(bytes) => { + let mut msg = Message::new(); + msg.merge_from_bytes(&bytes).unwrap(); + assert!(msg + .file_response() + .dir() + .path + .contains("rustdesk_read_dir_test")); + } + _ => panic!("unexpected data"), + } + let _ = fs::remove_dir_all(&dir); + }); + } + + #[test] + #[cfg(not(any(target_os = "ios")))] + fn validate_file_name_security() { + // Null byte injection + assert!(super::validate_file_name_no_traversal("file\0.txt").is_err()); + assert!(super::validate_file_name_no_traversal("test\0").is_err()); + + // Path traversal + assert!(super::validate_file_name_no_traversal("../etc/passwd").is_err()); + assert!(super::validate_file_name_no_traversal("foo/../bar").is_err()); + assert!(super::validate_file_name_no_traversal("..").is_err()); + + // Absolute paths + assert!(super::validate_file_name_no_traversal("/etc/passwd").is_err()); + assert!(super::validate_file_name_no_traversal("\\Windows").is_err()); + #[cfg(windows)] + assert!(super::validate_file_name_no_traversal("C:\\Windows").is_err()); + + // Valid paths + assert!(super::validate_file_name_no_traversal("file.txt").is_ok()); + assert!(super::validate_file_name_no_traversal("subdir/file.txt").is_ok()); + assert!(super::validate_file_name_no_traversal("").is_ok()); + } + + #[test] + #[cfg(not(any(target_os = "ios")))] + fn validate_transfer_file_names_security() { + assert!(super::validate_transfer_file_names(&[("file.txt".into(), 100)]).is_ok()); + assert!(super::validate_transfer_file_names(&[("".into(), 100)]).is_ok()); + assert!( + super::validate_transfer_file_names(&[("".into(), 100), ("file.txt".into(), 100)]) + .is_err() + ); + assert!(super::validate_transfer_file_names(&[("../passwd".into(), 100)]).is_err()); + } + + /// Tests that symlink creation works on this platform. + /// This is a helper to verify the test environment supports symlinks. + #[test] + #[cfg(not(any(target_os = "ios")))] + fn test_symlink_creation_works() { + let base_dir = std::env::temp_dir().join("rustdesk_symlink_test"); + let _ = fs::remove_dir_all(&base_dir); + fs::create_dir_all(&base_dir).unwrap(); + + // Create target file in a subdirectory + let target_dir = base_dir.join("target_dir"); + fs::create_dir_all(&target_dir).unwrap(); + let target_file = target_dir.join("target.txt"); + fs::write(&target_file, b"content").unwrap(); + + // Create symlink in a different directory + let link_dir = base_dir.join("link_dir"); + fs::create_dir_all(&link_dir).unwrap(); + let link_path = link_dir.join("link.txt"); + + #[cfg(unix)] + { + use std::os::unix::fs::symlink; + if symlink(&target_file, &link_path).is_err() { + let _ = fs::remove_dir_all(&base_dir); + return; + } + } + + #[cfg(windows)] + { + use std::os::windows::fs::symlink_file; + if symlink_file(&target_file, &link_path).is_err() { + // Skip if no permission (needs admin or dev mode on Windows) + let _ = fs::remove_dir_all(&base_dir); + return; + } + } + + let _ = fs::remove_dir_all(&base_dir); + } +}