From ea8807faf627f321705c39fa02abf01886998f8f Mon Sep 17 00:00:00 2001 From: Franklin Date: Tue, 28 Feb 2023 11:45:44 -0400 Subject: [PATCH] Chat can now send and recieve messages perfectly. Needs UI help. --- src/callbacks/chat.rs | 21 ++++++--- src/client/chat/mod.rs | 97 +++++++++++++++++++++------------------- src/client/chat/utils.rs | 2 - src/lib.rs | 1 + src/network.udl | 2 +- 5 files changed, 68 insertions(+), 55 deletions(-) diff --git a/src/callbacks/chat.rs b/src/callbacks/chat.rs index 0e7ab34..9a63ccb 100644 --- a/src/callbacks/chat.rs +++ b/src/callbacks/chat.rs @@ -1,7 +1,8 @@ use std::sync::{Arc}; +use chat_types::{dto::server_in::ServerMessageIn, domain::chat_message::{ChatMessageSender, ChatMessageContent}}; use dev_dtos::dtos::user::user_dtos::UserForAuthenticationDto; -use tokio::runtime::Runtime; +use tokio::{runtime::Runtime, sync::RwLock}; use crate::{utils::storage, ForeignError, client::chat::init_client_connection}; @@ -23,7 +24,9 @@ pub trait WebsocketFfi: Send + Sync + std::fmt::Debug { //fn attempt(&self, string_from_rust: String) -> Result, ForeignError>; } #[derive(Debug, Default, Clone)] -pub struct WebsocketCaller; +pub struct WebsocketCaller { + message_queue: Arc>>, +} impl<'a> WebsocketCaller { pub fn new() -> Self { @@ -36,14 +39,22 @@ impl<'a> WebsocketCaller { let ws_ffi_rwlock = Arc::new(websocket_ffi); let user: UserForAuthenticationDto = storage::read("user".into()).unwrap(); //TODO: Remove unwrap - //websocket_ffi.message_recieved(ChatMessage { id: 20, from_id: 2, to_id: 1, message: "What the fuck nigga".as_bytes().to_vec(), message_content: MessageContentType::Text, time_sent: 1, time_delivered: vec![], time_seen: vec![] }).unwrap(); let rt = Runtime::new().unwrap(); let _ = rt.block_on( - init_client_connection(user, ws_ffi_rwlock) + init_client_connection(user, ws_ffi_rwlock, self.message_queue.clone()) ); } - pub fn send_message(&self, ) { + pub fn send_text_message(&self, message: String, to: u32) { + let rt = Runtime::new().unwrap(); + let _ = rt.block_on( + add_message_to_queue(&self.message_queue, ServerMessageIn::SendMessage(ChatMessageSender { message: ChatMessageContent::Text(message), to })) + ); } +} + +pub async fn add_message_to_queue(queue: &Arc>>, message: ServerMessageIn) { + let mut lock = queue.write().await; + lock.push(message); } \ No newline at end of file diff --git a/src/client/chat/mod.rs b/src/client/chat/mod.rs index 1a881f6..c982cc9 100644 --- a/src/client/chat/mod.rs +++ b/src/client/chat/mod.rs @@ -1,10 +1,10 @@ pub mod utils; pub mod handler; -use std::{sync::{RwLock, Arc}}; +use std::{sync::{Arc}, time::Duration}; -use chat_types::{dto::server_in::ServerMessageIn, domain::error::SocketError}; +use chat_types::{dto::server_in::ServerMessageIn, domain::{error::SocketError}}; use dev_dtos::dtos::user::user_dtos::UserForAuthenticationDto; -use tokio::sync::Mutex; +use tokio::{sync::{Mutex, RwLock}, time::{interval}}; use tokio_tungstenite::{connect_async}; use futures_util::{StreamExt}; @@ -12,10 +12,10 @@ use crate::{WebsocketFfi}; use self::{utils::send_message, handler::handle_message}; -// TODO: Message queue for sending -static _MESSAGE_QUEUE: RwLock> = RwLock::new(Vec::new()); +const MESSAGE_QUEUE_POLLING_INTERVAL: u64 = 300; +//const RWLOCK_READ_LOCK_ERROR: &str = "Failed to lock an RwLock for reading purposes. This should never happen."; -pub async fn init_client_connection(user: UserForAuthenticationDto, ws_caller: Arc>) -> Result<(), Box> { +pub async fn init_client_connection(user: UserForAuthenticationDto, ws_caller: Arc>, message_queue: Arc>>,) -> Result<(), Box> { let ws_stream = match connect_async("ws://0.0.0.0:3000/websocket").await { Ok((stream, _response)) => { stream @@ -32,65 +32,68 @@ pub async fn init_client_connection(user: UserForAuthenticationDto, ws_caller: A // //spawn an async sender to push some more messages into the server - let caller = ws_caller.clone(); + let recv_caller = ws_caller.clone(); //receiver just prints whatever it gets let mut recv_task = tokio::spawn(async move { while let Some(Ok(msg)) = receiver.next().await { // Never break this loop? - let a = handle_message(msg, &caller).await; + let a = handle_message(msg, &recv_caller).await; println!("Something happened {:?}", a); } }); let sender_arc = Arc::new(Mutex::new(sender)); send_message(sender_arc.clone(), ServerMessageIn::Login(user)).await?; - + + // Right here I'm creating the send task. Which should every X amount of ms check the MessageQueue. + // If there's anything in it then send it to the backend and remove it from the queue. + let sender_arc_cloned = sender_arc.clone(); + let send_caller = ws_caller.clone(); + let message_queue_cloned = message_queue.clone(); + let mut send_task = tokio::spawn(async move { + let mut interval = interval(Duration::from_millis(MESSAGE_QUEUE_POLLING_INTERVAL)); + loop { // Infinite loop to check for messages sent from the frontend + interval.tick().await; // Wait 200 ms + handle_user_messages(&send_caller, sender_arc_cloned.clone(), message_queue_cloned.clone()).await; + } + }); //wait for either task to finish and kill the other task tokio::select! { - /*_ = (&mut send_task) => { + _ = (&mut send_task) => { recv_task.abort(); - },*/ + }, _ = (&mut recv_task) => { + send_task.abort(); } } Ok(()) } -/* -/// Function to handle messages we get (with a slight twist that Frame variant is visible -/// since we are working with the underlying tungstenite library directly without axum here). -fn process_message(msg: Message) -> Result<(), Box> { - match msg { - Message::Text(t) => { - println!(">>> got str: {:?}", t); + +/// Function to handle messages sent from the user to the backend via the MessageQueue +/// If no messages exist then this function will just return +pub async fn handle_user_messages(ws_caller: &Arc>, sender: Arc>, message_queue: Arc>>) +where S: futures_util::Sink + Unpin { + let queue = message_queue.read().await; + let mut messages_to_remove_from_queue = Vec::new(); + if queue.len() > 0 { + // Send message + for (index, message) in queue.iter().enumerate() { + match send_message(sender.clone(), message.clone()).await { + Ok(_) => { // if message send was successful then remove the message from the queue + messages_to_remove_from_queue.push(index); + }, + Err(error) => { // if message wasn't successful send the error back to the client + let _ = ws_caller.error(error.to_string()); // Nothing we can do here tbh + }, + }; + } - Message::Binary(d) => { - println!(">>> got {} bytes: {:?}", d.len(), d); - } - Message::Close(c) => { - if let Some(cf) = c { - println!( - ">>> got close with code {} and reason `{}`", - cf.code, cf.reason - ); - } else { - println!(">>> somehow got close message without CloseFrame"); + drop(queue); // This is to make sure the RwLock doesn't get posioned + let mut write_lock_for_queue = message_queue.write().await; + if messages_to_remove_from_queue.len() > 0 { + for i in messages_to_remove_from_queue { + write_lock_for_queue.remove(i); } - return ControlFlow::Break(()); - } - - Message::Pong(v) => { - println!(">>> got pong with {:?}", v); - } - // Just as with axum server, the underlying tungstenite websocket library - // will handle Ping for you automagically by replying with Pong and copying the - // v according to spec. But if you need the contents of the pings you can see them here. - Message::Ping(v) => { - println!(">>> got ping with {:?}", v); - } - - Message::Frame(_) => { - unreachable!("This is never supposed to happen") } } - Ok(()) -}*/ - + // Do nothing +} \ No newline at end of file diff --git a/src/client/chat/utils.rs b/src/client/chat/utils.rs index 1856869..a5f8696 100644 --- a/src/client/chat/utils.rs +++ b/src/client/chat/utils.rs @@ -7,8 +7,6 @@ use tokio_tungstenite::{tungstenite::{Message}}; /// Este es el metodo para enviar mensajes a un cliente a traves de un websocket -/// Si le pasas un None en el payload tienes que darle un tipo al metodo, ya que -/// El compilador no permite especificarle un metodo default. pub async fn send_message( sender: Arc>, message: ServerMessageIn, diff --git a/src/lib.rs b/src/lib.rs index f7cbd34..d1c37ff 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,6 +23,7 @@ pub fn get_all_sports() -> Result, RustError> { pub fn get_me() -> Result { storage::read("user".into()) } + #[macro_export] macro_rules! unwrap_rust_error { ($e:expr) => { diff --git a/src/network.udl b/src/network.udl index 0912427..c2c92ee 100644 --- a/src/network.udl +++ b/src/network.udl @@ -53,7 +53,6 @@ dictionary ChatRoom { i64 last_updated; u64 session_messages; }; - enum ClientError { "One", "Two", @@ -79,6 +78,7 @@ callback interface WebsocketFfi { interface WebsocketCaller { constructor(); void init_ws_connection(WebsocketFfi websocket_ffi); + void send_text_message(string message, u32 to); }; namespace network { [Throws=RustError]