Skip to content

Commit

Permalink
Added WebSocket servers.
Browse files Browse the repository at this point in the history
  • Loading branch information
mkrueger committed Feb 9, 2025
1 parent 450ee74 commit c3b2ac2
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 144 deletions.
171 changes: 75 additions & 96 deletions crates/icboard/src/bbs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,23 +11,28 @@ use icy_board_engine::{
},
vm::TerminalTarget,
};
use icy_net::{telnet::TelnetConnection, termcap_detect::TerminalCaps, Connection, ConnectionType};
use icy_net::{
telnet::TelnetConnection,
termcap_detect::TerminalCaps,
websocket::{accept_sec_websocket, accept_websocket},
Connection, ConnectionType,
};
use tokio::{net::TcpListener, sync::Mutex};

use crate::menu_runner::PcbBoardCommand;

pub async fn await_telnet_connections(telnet: Telnet, board: Arc<tokio::sync::Mutex<IcyBoard>>, bbs: Arc<Mutex<BBS>>) -> Res<()> {
let addr = if telnet.address.is_empty() {
format!("0.0.0.0:{}", telnet.port)
pub async fn await_telnet_connections(con: Telnet, board: Arc<tokio::sync::Mutex<IcyBoard>>, bbs: Arc<Mutex<BBS>>) -> Res<()> {
let addr = if con.address.is_empty() {
format!("0.0.0.0:{}", con.port)
} else {
format!("{}:{}", telnet.address, telnet.port)
format!("{}:{}", con.address, con.port)
};
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _addr) = listener.accept().await?;
let bbs2 = bbs.clone();
let node = bbs.lock().await.create_new_node(ConnectionType::Telnet).await;
let node_list = bbs.lock().await.get_open_connections().await.clone();
let node_list: Arc<Mutex<Vec<Option<NodeState>>>> = bbs.lock().await.get_open_connections().await.clone();
let board = board.clone();
let handle = std::thread::Builder::new()
.name("Telnet handle".to_string())
Expand Down Expand Up @@ -58,157 +63,131 @@ pub async fn await_telnet_connections(telnet: Telnet, board: Arc<tokio::sync::Mu
}
}

pub fn await_ssh_connections(_ssh: SSH, _board: Arc<tokio::sync::Mutex<IcyBoard>>, _bbs: Arc<Mutex<BBS>>) -> Res<()> {
/*
let addr = if ssh.address.is_empty() {
format!("127.0.0.1:{}", ssh.port)
pub async fn await_ssh_connections(_ssh: SSH, _board: Arc<tokio::sync::Mutex<IcyBoard>>, _bbs: Arc<Mutex<BBS>>) -> Res<()> {
/* let addr = if ssh.address.is_empty() {
format!("0.0.0.0:{}", ssh.port)
} else {
format!("{}:{}", ssh.address, ssh.port)
};
let listener = match TcpListener::bind(addr) {
Ok(listener) => listener,
Err(e) => panic!("could not read start TCP listener: {}", e),
};
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let board = board.clone();
let node = bbs.lock().unwrap().create_new_node(ConnectionType::SSH);
let node_list = bbs.lock().unwrap().get_open_connections().clone();
let handle = thread::spawn(move || {
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _addr) = listener.accept().await?;
let bbs2 = bbs.clone();
let node = bbs.lock().await.create_new_node(ConnectionType::Telnet).await;
let node_list = bbs.lock().await.get_open_connections().await.clone();
let board = board.clone();
let handle: thread::JoinHandle<()> = std::thread::Builder::new()
.name("Telnet handle".to_string())
.spawn(move || {
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
let orig_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
log::error!("IcyBoard thread crashed at {:?}", panic_info.location());
log::error!("full info: {:?}", panic_info);
orig_hook(panic_info);
}));
match TelnetConnection::accept(stream) {
match SSH::accept(stream) {
Ok(connection) => {
// connection succeeded
if let Err(err) = handle_client(board, node_list, node, Box::new(connection)) {
if let Err(err) = handle_client(bbs2, board, node_list, node, Box::new(connection), None, "").await {
log::error!("Error running backround client: {}", err);
}
}
Err(e) => {
log::error!("ssh connection failed {}", e);
log::error!("telnet connection failed {}", e);
}
}
Ok(())
});
bbs.lock().unwrap().get_open_connections().lock().unwrap()[node].as_mut().unwrap().handle = Some(handle);
}
Err(e) => {
log::error!("connection failed {}", e);
}
}
}
drop(listener); */
})
.unwrap();
bbs.lock().await.get_open_connections().await.lock().await[node].as_mut().unwrap().handle = Some(handle);
}*/
Ok(())
}

pub fn await_websocket_connections(_ssh: Websocket, _board: Arc<tokio::sync::Mutex<IcyBoard>>, _bbs: Arc<Mutex<BBS>>) -> Res<()> {
/*
let addr = if ssh.address.is_empty() {
format!("127.0.0.1:{}", ssh.port)
pub async fn await_websocket_connections(con: Websocket, board: Arc<tokio::sync::Mutex<IcyBoard>>, bbs: Arc<Mutex<BBS>>) -> Res<()> {
let addr = if con.address.is_empty() {
format!("0.0.0.0:{}", con.port)
} else {
format!("{}:{}", ssh.address, ssh.port)
format!("{}:{}", con.address, con.port)
};
let listener = match TcpListener::bind(addr) {
Ok(listener) => listener,
Err(e) => panic!("could not read start TCP listener: {}", e),
};
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let board = board.clone();
let node = bbs.lock().unwrap().create_new_node(ConnectionType::Websocket);
let node_list = bbs.lock().unwrap().get_open_connections().clone();
let handle = thread::spawn(move || {
let orig_hook = std::panic::take_hook();
let listener = TcpListener::bind(&addr).await?;
loop {
let (stream, _addr) = listener.accept().await?;
let bbs2 = bbs.clone();
let node = bbs.lock().await.create_new_node(ConnectionType::Telnet).await;
let node_list = bbs.lock().await.get_open_connections().await.clone();
let board = board.clone();
let handle = std::thread::Builder::new()
.name("Websocket handle".to_string())
.spawn(move || {
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
let orig_hook: Box<dyn Fn(&std::panic::PanicHookInfo<'_>) + Send + Sync> = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
log::error!("IcyBoard thread crashed at {:?}", panic_info.location());
log::error!("full info: {:?}", panic_info);
orig_hook(panic_info);
}));

match accept_websocket(stream) {
match accept_websocket(stream).await {
Ok(connection) => {
// connection succeeded
if let Err(err) = handle_client(board, node_list, node, Box::new(connection)) {
if let Err(err) = handle_client(bbs2, board, node_list, node, Box::new(connection), None, "").await {
log::error!("Error running backround client: {}", err);
}
}
Err(e) => {
log::error!("webserver connection failed {}", e);
log::error!("telnet connection failed {}", e);
}
}
Ok(())
});
bbs.lock().unwrap().get_open_connections().lock().unwrap()[node].as_mut().unwrap().handle = Some(handle);
}
Err(e) => {
log::error!("connection failed {}", e);
}
}
})
.unwrap();
bbs.lock().await.get_open_connections().await.lock().await[node].as_mut().unwrap().handle = Some(handle);
}
drop(listener); */
Ok(())
}

pub fn await_securewebsocket_connections(_ssh: SecureWebsocket, _board: Arc<tokio::sync::Mutex<IcyBoard>>, _bbs: Arc<Mutex<BBS>>) -> Res<()> {
/*
let addr = if ssh.address.is_empty() {
format!("127.0.0.1:{}", ssh.port)
pub async fn await_securewebsocket_connections(con: SecureWebsocket, board: Arc<tokio::sync::Mutex<IcyBoard>>, bbs: Arc<Mutex<BBS>>) -> Res<()> {
let addr = if con.address.is_empty() {
format!("0.0.0.0:{}", con.port)
} else {
format!("{}:{}", ssh.address, ssh.port)
};
let listener = match TcpListener::bind(addr) {
Ok(listener) => listener,
Err(e) => panic!("could not read start TCP listener: {}", e),
format!("{}:{}", con.address, con.port)
};
for stream in listener.incoming() {
match stream {
Ok(stream) => {
let board = board.clone();
let node = bbs.lock().unwrap().create_new_node(ConnectionType::SecureWebsocket);
let node_list = bbs.lock().unwrap().get_open_connections().clone();
let cp = ssh.cert_pem.clone();
let kp = ssh.key_pem.clone();
let handle = thread::spawn(move || {
let listener = TcpListener::bind(&addr).await?;
loop {
let (stream, _addr) = listener.accept().await?;
let bbs2 = bbs.clone();
let node: usize = bbs.lock().await.create_new_node(ConnectionType::Telnet).await;
let node_list = bbs.lock().await.get_open_connections().await.clone();
let board = board.clone();
let handle = std::thread::Builder::new()
.name("Secure Websocket handle".to_string())
.spawn(move || {
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
let orig_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
log::error!("IcyBoard thread crashed at {:?}", panic_info.location());
log::error!("full info: {:?}", panic_info);
orig_hook(panic_info);
}));

match accept_websocket_secure(stream, &cp, &kp) {
match accept_sec_websocket(stream).await {
Ok(connection) => {
// connection succeeded
if let Err(err) = handle_client(board, node_list, node, Box::new(connection)) {
if let Err(err) = handle_client(bbs2, board, node_list, node, Box::new(connection), None, "").await {
log::error!("Error running backround client: {}", err);
}
}
Err(e) => {
log::error!("secure webserver connection failed {}", e);
log::error!("telnet connection failed {}", e);
}
}
Ok(())
});
bbs.lock().unwrap().get_open_connections().lock().unwrap()[node].as_mut().unwrap().handle = Some(handle);
}
Err(e) => {
log::error!("connection failed {}", e);
}
}
})
.unwrap();
bbs.lock().await.get_open_connections().await.lock().await[node].as_mut().unwrap().handle = Some(handle);
}
drop(listener);*/
Ok(())
}

#[async_recursion(?Send)]
Expand Down
14 changes: 10 additions & 4 deletions crates/icboard/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,12 +138,14 @@ async fn start_icy_board(arguments: &Cli, file: PathBuf) -> Res<()> {

let ssh_connection = board.lock().await.config.login_server.ssh.clone();
if ssh_connection.is_enabled {
let bbs = bbs.clone();
let bbs: Arc<Mutex<BBS>> = bbs.clone();
let board = board.clone();
std::thread::Builder::new()
.name("SSH connect".to_string())
.spawn(move || {
let _ = await_ssh_connections(ssh_connection, board, bbs);
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
let _ = await_ssh_connections(ssh_connection, board, bbs).await;
});
})
.unwrap();
}
Expand All @@ -155,7 +157,9 @@ async fn start_icy_board(arguments: &Cli, file: PathBuf) -> Res<()> {
std::thread::Builder::new()
.name("Websocket connect".to_string())
.spawn(move || {
let _ = await_websocket_connections(websocket_connection, board, bbs);
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
let _ = await_websocket_connections(websocket_connection, board, bbs).await;
});
})
.unwrap();
}
Expand All @@ -167,7 +171,9 @@ async fn start_icy_board(arguments: &Cli, file: PathBuf) -> Res<()> {
std::thread::Builder::new()
.name("Secure Websocket connect".to_string())
.spawn(move || {
let _ = await_securewebsocket_connections(secure_websocket_connection, board, bbs);
tokio::runtime::Builder::new_current_thread().enable_all().build().unwrap().block_on(async {
let _ = await_securewebsocket_connections(secure_websocket_connection, board, bbs).await;
});
})
.unwrap();
}
Expand Down
3 changes: 3 additions & 0 deletions crates/icbsetup/src/create/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@ impl IcyBoardCreator {
self.logger.start_action("Creating main configuration… at {}".to_string());

let mut config = IcbConfig::new();
config.board.allow_iemsi = false;
config.login_server.telnet.port = 1337;
config.login_server.ssh.port = 1338;

self.logger.start_action("Creating required paths.".to_string());
fs::create_dir_all(&self.destination.join(&config.paths.help_path))?;
Expand Down
5 changes: 4 additions & 1 deletion crates/icbsetup/src/import/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ impl PCBoardImporter {
color_configuration.file_new_file = IcbColor::Dos(color_file[start + 18]);
}

let icb_cfg = IcbConfig {
let mut icb_cfg = IcbConfig {
sysop: SysopInformation {
name: self.data.sysop_info.sysop.clone(),
password: Password::from_str(self.data.sysop_info.password.as_str()).unwrap(),
Expand Down Expand Up @@ -502,6 +502,9 @@ impl PCBoardImporter {
logoff_file: accounting_logoff_file,
},
};
icb_cfg.board.allow_iemsi = false;
icb_cfg.login_server.telnet.port = 1337;
icb_cfg.login_server.ssh.port = 1338;

let destination = self.output_directory.join(icy_board_engine::DEFAULT_ICYBOARD_FILE);
self.output.start_action(format!("Create main configuration {}…", destination.display()));
Expand Down
20 changes: 20 additions & 0 deletions crates/icy_net/src/connection/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,26 @@ impl Connection for ChannelConnection {
}
}

async fn try_read(&mut self, buf: &mut [u8]) -> crate::Result<usize> {
if !self.buffer.is_empty() {
let len = self.buffer.len().min(buf.len());
buf[..len].copy_from_slice(&self.buffer.drain(..len).collect::<Vec<u8>>());
return Ok(len);
}
match self.rx.try_recv() {
Ok(data) => {
if data.is_empty() {
return Ok(0);
}
let len = data.len().min(buf.len());
buf[..len].copy_from_slice(&data[..len]);
self.buffer.extend(data.into_iter().skip(len));
Ok(len)
}
_ => Ok(0),
}
}

async fn send(&mut self, buf: &[u8]) -> crate::Result<()> {
self.tx.send(buf.to_vec())?;
Ok(())
Expand Down
5 changes: 5 additions & 0 deletions crates/icy_net/src/connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ pub trait Connection: Send + Unpin {
fn get_connection_type(&self) -> ConnectionType;

async fn read(&mut self, buf: &mut [u8]) -> crate::Result<usize>;
async fn try_read(&mut self, buf: &mut [u8]) -> crate::Result<usize>;

async fn send(&mut self, buf: &[u8]) -> crate::Result<()>;

Expand Down Expand Up @@ -71,6 +72,10 @@ impl Connection for NullConnection {
Ok(0)
}

async fn try_read(&mut self, _buf: &mut [u8]) -> crate::Result<usize> {
Ok(0)
}

async fn send(&mut self, _buf: &[u8]) -> crate::Result<()> {
Err(NetError::Unsupported.into())
}
Expand Down
9 changes: 9 additions & 0 deletions crates/icy_net/src/connection/modem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,15 @@ impl Connection for ModemConnection {
Ok(res)
}

async fn try_read(&mut self, buf: &mut [u8]) -> crate::Result<usize> {
if !self.port.read_dsr().unwrap_or_default() {
return Ok(0);
}
let res = self.port.read(buf).await?;
// println!("Read {:?} bytes", &buf[..res]);
Ok(res)
}

async fn send(&mut self, buf: &[u8]) -> crate::Result<()> {
self.port.write_all(buf).await?;
Ok(())
Expand Down
Loading

0 comments on commit c3b2ac2

Please sign in to comment.