Chat can now send and recieve messages perfectly. Needs UI help.

This commit is contained in:
Franklin 2023-02-28 11:45:44 -04:00
parent 6861a3d8c2
commit ea8807faf6
5 changed files with 68 additions and 55 deletions

View File

@ -1,7 +1,8 @@
use std::sync::{Arc}; use std::sync::{Arc};
use chat_types::{dto::server_in::ServerMessageIn, domain::chat_message::{ChatMessageSender, ChatMessageContent}};
use dev_dtos::dtos::user::user_dtos::UserForAuthenticationDto; use dev_dtos::dtos::user::user_dtos::UserForAuthenticationDto;
use tokio::runtime::Runtime; use tokio::{runtime::Runtime, sync::RwLock};
use crate::{utils::storage, ForeignError, client::chat::init_client_connection}; use crate::{utils::storage, ForeignError, client::chat::init_client_connection};
@ -23,7 +24,9 @@ pub trait WebsocketFfi: Send + Sync + std::fmt::Debug {
//fn attempt(&self, string_from_rust: String) -> Result<Option<String>, ForeignError>; //fn attempt(&self, string_from_rust: String) -> Result<Option<String>, ForeignError>;
} }
#[derive(Debug, Default, Clone)] #[derive(Debug, Default, Clone)]
pub struct WebsocketCaller; pub struct WebsocketCaller {
message_queue: Arc<RwLock<Vec<ServerMessageIn>>>,
}
impl<'a> WebsocketCaller { impl<'a> WebsocketCaller {
pub fn new() -> Self { pub fn new() -> Self {
@ -36,14 +39,22 @@ impl<'a> WebsocketCaller {
let ws_ffi_rwlock = Arc::new(websocket_ffi); let ws_ffi_rwlock = Arc::new(websocket_ffi);
let user: UserForAuthenticationDto = storage::read("user".into()).unwrap(); //TODO: Remove unwrap let user: UserForAuthenticationDto = storage::read("user".into()).unwrap(); //TODO: Remove unwrap
//websocket_ffi.message_recieved(ChatMessage { id: 20, from_id: 2, to_id: 1, message: "What the fuck nigga".as_bytes().to_vec(), message_content: MessageContentType::Text, time_sent: 1, time_delivered: vec![], time_seen: vec![] }).unwrap();
let rt = Runtime::new().unwrap(); let rt = Runtime::new().unwrap();
let _ = rt.block_on( let _ = rt.block_on(
init_client_connection(user, ws_ffi_rwlock) init_client_connection(user, ws_ffi_rwlock, self.message_queue.clone())
); );
} }
pub fn send_message(&self, ) { pub fn send_text_message(&self, message: String, to: u32) {
let rt = Runtime::new().unwrap();
let _ = rt.block_on(
add_message_to_queue(&self.message_queue, ServerMessageIn::SendMessage(ChatMessageSender { message: ChatMessageContent::Text(message), to }))
);
} }
} }
pub async fn add_message_to_queue(queue: &Arc<RwLock<Vec<ServerMessageIn>>>, message: ServerMessageIn) {
let mut lock = queue.write().await;
lock.push(message);
}

View File

@ -1,10 +1,10 @@
pub mod utils; pub mod handler; pub mod utils; pub mod handler;
use std::{sync::{RwLock, Arc}}; use std::{sync::{Arc}, time::Duration};
use chat_types::{dto::server_in::ServerMessageIn, domain::error::SocketError}; use chat_types::{dto::server_in::ServerMessageIn, domain::{error::SocketError}};
use dev_dtos::dtos::user::user_dtos::UserForAuthenticationDto; use dev_dtos::dtos::user::user_dtos::UserForAuthenticationDto;
use tokio::sync::Mutex; use tokio::{sync::{Mutex, RwLock}, time::{interval}};
use tokio_tungstenite::{connect_async}; use tokio_tungstenite::{connect_async};
use futures_util::{StreamExt}; use futures_util::{StreamExt};
@ -12,10 +12,10 @@ use crate::{WebsocketFfi};
use self::{utils::send_message, handler::handle_message}; use self::{utils::send_message, handler::handle_message};
// TODO: Message queue for sending const MESSAGE_QUEUE_POLLING_INTERVAL: u64 = 300;
static _MESSAGE_QUEUE: RwLock<Vec<ServerMessageIn>> = RwLock::new(Vec::new()); //const RWLOCK_READ_LOCK_ERROR: &str = "Failed to lock an RwLock for reading purposes. This should never happen.";
pub async fn init_client_connection(user: UserForAuthenticationDto, ws_caller: Arc<Box<dyn WebsocketFfi>>) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { pub async fn init_client_connection(user: UserForAuthenticationDto, ws_caller: Arc<Box<dyn WebsocketFfi>>, message_queue: Arc<RwLock<Vec<ServerMessageIn>>>,) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let ws_stream = match connect_async("ws://0.0.0.0:3000/websocket").await { let ws_stream = match connect_async("ws://0.0.0.0:3000/websocket").await {
Ok((stream, _response)) => { Ok((stream, _response)) => {
stream stream
@ -32,65 +32,68 @@ pub async fn init_client_connection(user: UserForAuthenticationDto, ws_caller: A
// //
//spawn an async sender to push some more messages into the server //spawn an async sender to push some more messages into the server
let caller = ws_caller.clone(); let recv_caller = ws_caller.clone();
//receiver just prints whatever it gets //receiver just prints whatever it gets
let mut recv_task = tokio::spawn(async move { let mut recv_task = tokio::spawn(async move {
while let Some(Ok(msg)) = receiver.next().await { while let Some(Ok(msg)) = receiver.next().await {
// Never break this loop? // Never break this loop?
let a = handle_message(msg, &caller).await; let a = handle_message(msg, &recv_caller).await;
println!("Something happened {:?}", a); println!("Something happened {:?}", a);
} }
}); });
let sender_arc = Arc::new(Mutex::new(sender)); let sender_arc = Arc::new(Mutex::new(sender));
send_message(sender_arc.clone(), ServerMessageIn::Login(user)).await?; send_message(sender_arc.clone(), ServerMessageIn::Login(user)).await?;
// Right here I'm creating the send task. Which should every X amount of ms check the MessageQueue.
// If there's anything in it then send it to the backend and remove it from the queue.
let sender_arc_cloned = sender_arc.clone();
let send_caller = ws_caller.clone();
let message_queue_cloned = message_queue.clone();
let mut send_task = tokio::spawn(async move {
let mut interval = interval(Duration::from_millis(MESSAGE_QUEUE_POLLING_INTERVAL));
loop { // Infinite loop to check for messages sent from the frontend
interval.tick().await; // Wait 200 ms
handle_user_messages(&send_caller, sender_arc_cloned.clone(), message_queue_cloned.clone()).await;
}
});
//wait for either task to finish and kill the other task //wait for either task to finish and kill the other task
tokio::select! { tokio::select! {
/*_ = (&mut send_task) => { _ = (&mut send_task) => {
recv_task.abort(); recv_task.abort();
},*/ },
_ = (&mut recv_task) => { _ = (&mut recv_task) => {
send_task.abort();
} }
} }
Ok(()) Ok(())
} }
/*
/// Function to handle messages we get (with a slight twist that Frame variant is visible /// Function to handle messages sent from the user to the backend via the MessageQueue
/// since we are working with the underlying tungstenite library directly without axum here). /// If no messages exist then this function will just return
fn process_message(msg: Message) -> Result<(), Box<dyn std::error::Error + Send + Sync>> { pub async fn handle_user_messages<S>(ws_caller: &Arc<Box<dyn WebsocketFfi>>, sender: Arc<Mutex<S>>, message_queue: Arc<RwLock<Vec<ServerMessageIn>>>)
match msg { where S: futures_util::Sink<tokio_tungstenite::tungstenite::Message> + Unpin {
Message::Text(t) => { let queue = message_queue.read().await;
println!(">>> got str: {:?}", t); let mut messages_to_remove_from_queue = Vec::new();
if queue.len() > 0 {
// Send message
for (index, message) in queue.iter().enumerate() {
match send_message(sender.clone(), message.clone()).await {
Ok(_) => { // if message send was successful then remove the message from the queue
messages_to_remove_from_queue.push(index);
},
Err(error) => { // if message wasn't successful send the error back to the client
let _ = ws_caller.error(error.to_string()); // Nothing we can do here tbh
},
};
} }
Message::Binary(d) => { drop(queue); // This is to make sure the RwLock doesn't get posioned
println!(">>> got {} bytes: {:?}", d.len(), d); let mut write_lock_for_queue = message_queue.write().await;
} if messages_to_remove_from_queue.len() > 0 {
Message::Close(c) => { for i in messages_to_remove_from_queue {
if let Some(cf) = c { write_lock_for_queue.remove(i);
println!(
">>> got close with code {} and reason `{}`",
cf.code, cf.reason
);
} else {
println!(">>> somehow got close message without CloseFrame");
} }
return ControlFlow::Break(());
}
Message::Pong(v) => {
println!(">>> got pong with {:?}", v);
}
// Just as with axum server, the underlying tungstenite websocket library
// will handle Ping for you automagically by replying with Pong and copying the
// v according to spec. But if you need the contents of the pings you can see them here.
Message::Ping(v) => {
println!(">>> got ping with {:?}", v);
}
Message::Frame(_) => {
unreachable!("This is never supposed to happen")
} }
} }
Ok(()) // Do nothing
}*/ }

View File

@ -7,8 +7,6 @@ use tokio_tungstenite::{tungstenite::{Message}};
/// Este es el metodo para enviar mensajes a un cliente a traves de un websocket /// Este es el metodo para enviar mensajes a un cliente a traves de un websocket
/// Si le pasas un None en el payload tienes que darle un tipo al metodo, ya que
/// El compilador no permite especificarle un metodo default.
pub async fn send_message<S>( pub async fn send_message<S>(
sender: Arc<Mutex<S>>, sender: Arc<Mutex<S>>,
message: ServerMessageIn, message: ServerMessageIn,

View File

@ -23,6 +23,7 @@ pub fn get_all_sports() -> Result<Vec<Sport>, RustError> {
pub fn get_me() -> Result<UserForAuthenticationDto, RustError> { pub fn get_me() -> Result<UserForAuthenticationDto, RustError> {
storage::read("user".into()) storage::read("user".into())
} }
#[macro_export] #[macro_export]
macro_rules! unwrap_rust_error { macro_rules! unwrap_rust_error {
($e:expr) => { ($e:expr) => {

View File

@ -53,7 +53,6 @@ dictionary ChatRoom {
i64 last_updated; i64 last_updated;
u64 session_messages; u64 session_messages;
}; };
enum ClientError { enum ClientError {
"One", "One",
"Two", "Two",
@ -79,6 +78,7 @@ callback interface WebsocketFfi {
interface WebsocketCaller { interface WebsocketCaller {
constructor(); constructor();
void init_ws_connection(WebsocketFfi websocket_ffi); void init_ws_connection(WebsocketFfi websocket_ffi);
void send_text_message(string message, u32 to);
}; };
namespace network { namespace network {
[Throws=RustError] [Throws=RustError]