From f766d28c3666cf37a80605b82c3252b8eb851b07 Mon Sep 17 00:00:00 2001 From: fufesou <13586388+fufesou@users.noreply.github.com> Date: Thu, 3 Jul 2025 17:27:50 +0800 Subject: [PATCH] Fix/linux keep terminal sessions (#12222) * fix: linux, keep terminal sessions Signed-off-by: fufesou * fix: terminal service stucked at reader join Signed-off-by: fufesou --------- Signed-off-by: fufesou --- src/ipc.rs | 19 +++++++++ src/platform/linux.rs | 6 +++ src/server/terminal_service.rs | 76 ++++++++++++++++++++++++++++------ 3 files changed, 88 insertions(+), 13 deletions(-) diff --git a/src/ipc.rs b/src/ipc.rs index 0c6a8d57e..1ae048162 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -282,6 +282,8 @@ pub enum Data { not(any(target_os = "android", target_os = "ios")) ))] ControllingSessionCount(usize), + #[cfg(target_os = "linux")] + TerminalSessionCount(usize), #[cfg(target_os = "windows")] PortForwardSessionCount(Option), SocksWs(Option, String)>>), @@ -642,6 +644,11 @@ async fn handle(data: Data, stream: &mut Connection) { Data::ControllingSessionCount(count) => { crate::updater::update_controlling_session_count(count); } + #[cfg(target_os = "linux")] + Data::TerminalSessionCount(_) => { + let count = crate::terminal_service::get_terminal_session_count(true); + allow_err!(stream.send(&Data::TerminalSessionCount(count)).await); + } #[cfg(feature = "hwcodec")] #[cfg(not(any(target_os = "android", target_os = "ios")))] Data::CheckHwcodec => { @@ -1388,6 +1395,18 @@ pub async fn update_controlling_session_count(count: usize) -> ResultType<()> { Ok(()) } +#[cfg(target_os = "linux")] +#[tokio::main(flavor = "current_thread")] +pub async fn get_terminal_session_count() -> ResultType { + let ms_timeout = 1_000; + let mut c = connect(ms_timeout, "").await?; + c.send(&Data::TerminalSessionCount(0)).await?; + if let Some(Data::TerminalSessionCount(c)) = c.next_timeout(ms_timeout).await? { + return Ok(c); + } + Ok(0) +} + async fn handle_wayland_screencast_restore_token( key: String, value: String, diff --git a/src/platform/linux.rs b/src/platform/linux.rs index 9fa69fa63..f17c5b472 100644 --- a/src/platform/linux.rs +++ b/src/platform/linux.rs @@ -370,6 +370,12 @@ fn should_start_server( && ((*cm0 && last_restart.elapsed().as_secs() > 60) || last_restart.elapsed().as_secs() > 3600) { + let terminal_session_count = crate::ipc::get_terminal_session_count().unwrap_or(0); + if terminal_session_count > 0 { + // There are terminal sessions, so we don't restart the server. + // We also need to keep `cm0` unchanged, so that we can reach this branch the next time. + return false; + } // restart server if new connections all closed, or every one hour, // as a workaround to resolve "SpotUdp" (dns resolve) // and x server get displays failure issue diff --git a/src/server/terminal_service.rs b/src/server/terminal_service.rs index 60accd720..d709454c9 100644 --- a/src/server/terminal_service.rs +++ b/src/server/terminal_service.rs @@ -8,6 +8,7 @@ use std::{ collections::{HashMap, VecDeque}, io::{Read, Write}, sync::{ + atomic::{AtomicBool, Ordering}, mpsc::{self, Receiver, SyncSender}, Arc, Mutex, }, @@ -141,11 +142,7 @@ fn remove_service(service_id: &str) { let sessions = service.lock().unwrap().sessions.clone(); for (_, session) in sessions.iter() { let mut session = session.lock().unwrap(); - if let Some(mut child) = session.child.take() { - // Kill the process - let _ = child.kill(); - add_to_reaper(child); - } + session.stop(); } } } @@ -265,6 +262,15 @@ fn ensure_cleanup_task() { } } +#[cfg(target_os = "linux")] +pub fn get_terminal_session_count(include_zombie_tasks: bool) -> usize { + let mut c = TERMINAL_SERVICES.lock().unwrap().len(); + if include_zombie_tasks { + c += TERMINAL_TASKS.lock().unwrap().len(); + } + c +} + pub fn new(service_id: String, is_persistent: bool) -> GenericService { // Create the service with initial persistence setting allow_err!(get_or_create_service(service_id.clone(), is_persistent)); @@ -393,6 +399,7 @@ pub struct TerminalSession { input_tx: Option>>, // Channel for receiving output from the reader thread output_rx: Option>>, + exiting: Arc, // Thread handles reader_thread: Option>, writer_thread: Option>, @@ -414,6 +421,7 @@ impl TerminalSession { child: None, input_tx: None, output_rx: None, + exiting: Arc::new(AtomicBool::new(false)), reader_thread: None, writer_thread: None, output_buffer: OutputBuffer::new(), @@ -428,29 +436,50 @@ impl TerminalSession { fn update_activity(&mut self) { self.last_activity = Instant::now(); } -} -impl Drop for TerminalSession { - fn drop(&mut self) { + // This helper function is to ensure that the threads are joined before the child process is dropped. + // Though this is not strictly necessary on macOS. + fn stop(&mut self) { + self.exiting.store(true, Ordering::SeqCst); + // Drop the input channel to signal writer thread to exit - drop(self.input_tx.take()); + if let Some(input_tx) = self.input_tx.take() { + // Send a final newline to ensure the reader can read some data, and then exit. + // This is required on Windows and Linux. + if let Err(e) = input_tx.send(b"\r\n".to_vec()) { + log::warn!("Failed to send final newline to the terminal: {}", e); + } + drop(input_tx); + } // Wait for threads to finish - if let Some(writer_thread) = self.writer_thread.take() { - let _ = writer_thread.join(); - } + // The reader thread should join before the writer thread on Windows. if let Some(reader_thread) = self.reader_thread.take() { let _ = reader_thread.join(); } - // Ensure child process is properly handled when session is dropped + // The read can read the last "\r\n" after the writer thread (not the child process) exits + // on Linux in my tests. + // But we still send "\r\n" to the writer thread and let the reader thread exit first for safety. + if let Some(writer_thread) = self.writer_thread.take() { + let _ = writer_thread.join(); + } + if let Some(mut child) = self.child.take() { + // Kill the process let _ = child.kill(); add_to_reaper(child); } } } +impl Drop for TerminalSession { + fn drop(&mut self) { + // Ensure child process is properly handled when session is dropped + self.stop(); + } +} + /// Persistent terminal service that can survive connection drops pub struct PersistentTerminalService { service_id: String, @@ -669,6 +698,17 @@ impl TerminalServiceProxy { let terminal_id = open.terminal_id; let writer_thread = thread::spawn(move || { let mut writer = writer; + // Write initial carriage return: + // 1. Windows requires at least one carriage return for `drop()` to work properly. + // Without this, the reader may fail to read the buffer after `input_tx.send(b"\r\n".to_vec()).ok();`. + // 2. This also refreshes the terminal interface on the controlling side (workaround for blank content on connect). + if let Err(e) = writer.write_all(b"\r") { + log::error!("Terminal {} initial write error: {}", terminal_id, e); + } else { + if let Err(e) = writer.flush() { + log::error!("Terminal {} initial flush error: {}", terminal_id, e); + } + } while let Ok(data) = input_rx.recv() { if let Err(e) = writer.write_all(&data) { log::error!("Terminal {} write error: {}", terminal_id, e); @@ -681,6 +721,7 @@ impl TerminalServiceProxy { log::debug!("Terminal {} writer thread exiting", terminal_id); }); + let exiting = session.exiting.clone(); // Spawn reader thread let terminal_id = open.terminal_id; let reader_thread = thread::spawn(move || { @@ -690,9 +731,14 @@ impl TerminalServiceProxy { match reader.read(&mut buf) { Ok(0) => { // EOF + // This branch can be reached when the child process exits on macOS. + // But not on Linux and Windows in my tests. break; } Ok(n) => { + if exiting.load(Ordering::SeqCst) { + break; + } let data = buf[..n].to_vec(); // Try to send, if channel is full, drop the data match output_tx.try_send(data) { @@ -710,6 +756,10 @@ impl TerminalServiceProxy { } } Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => { + // This branch is not reached in my tests, but we still add `exiting` check to ensure we can exit. + if exiting.load(Ordering::SeqCst) { + break; + } // For non-blocking I/O, sleep briefly thread::sleep(Duration::from_millis(10)); }