From f6c52de171177fd3d662a058c6c1cf48423e6a57 Mon Sep 17 00:00:00 2001 From: Paul Makles <paulmakles@gmail.com> Date: Sat, 9 Jan 2021 20:49:36 +0000 Subject: [PATCH] Add basic presence tracking. --- src/database/entities/user.rs | 2 + src/notifications/events.rs | 8 +- src/notifications/payload.rs | 7 +- src/notifications/websocket.rs | 130 ++++++++++++++++++++------------- src/routes/users/fetch_user.rs | 4 +- 5 files changed, 97 insertions(+), 54 deletions(-) diff --git a/src/database/entities/user.rs b/src/database/entities/user.rs index b40bf3b..ca4c706 100644 --- a/src/database/entities/user.rs +++ b/src/database/entities/user.rs @@ -29,4 +29,6 @@ pub struct User { // ? This should never be pushed to the collection. #[serde(skip_serializing_if = "Option::is_none")] pub relationship: Option<RelationshipStatus>, + #[serde(skip_serializing_if = "Option::is_none")] + pub online: Option<bool>, } diff --git a/src/notifications/events.rs b/src/notifications/events.rs index 6b4ad66..96c57d4 100644 --- a/src/notifications/events.rs +++ b/src/notifications/events.rs @@ -11,7 +11,7 @@ pub enum WebSocketError { #[snafu(display("This error has not been labelled."))] LabelMe, #[snafu(display("Internal server error."))] - InternalError, + InternalError { at: String }, #[snafu(display("Invalid session."))] InvalidSession, #[snafu(display("User hasn't completed onboarding."))] @@ -90,11 +90,17 @@ pub enum ClientboundNotification { GuildDelete { id: String, },*/ + UserRelationship { id: String, user: String, status: RelationshipStatus, }, + + UserPresence { + id: String, + online: bool + } } impl ClientboundNotification { diff --git a/src/notifications/payload.rs b/src/notifications/payload.rs index 0769e57..c63a730 100644 --- a/src/notifications/payload.rs +++ b/src/notifications/payload.rs @@ -9,7 +9,9 @@ use mongodb::{ options::FindOptions, }; -pub async fn generate_ready(user: User) -> Result<ClientboundNotification> { +use super::websocket::is_online; + +pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> { let mut users = vec![]; if let Some(relationships) = &user.relations { @@ -52,11 +54,14 @@ pub async fn generate_ready(user: User) -> Result<ClientboundNotification> { .clone(), ); + user.online = Some(is_online(&user.id)); + users.push(user); } } } + user.online = Some(is_online(&user.id)); users.push(user); Ok(ClientboundNotification::Ready { users }) diff --git a/src/notifications/websocket.rs b/src/notifications/websocket.rs index b4ecc2f..e218323 100644 --- a/src/notifications/websocket.rs +++ b/src/notifications/websocket.rs @@ -63,81 +63,104 @@ async fn accept(stream: TcpStream) { } }; - let mut session: Option<Session> = None; + let session: Arc<Mutex<Option<Session>>> = Arc::new(Mutex::new(None)); + let mutex_generator = || { session.clone() }; let fwd = rx.map(Ok).forward(write); - let incoming = read.try_for_each(|msg| { + let incoming = read.try_for_each(async move |msg| { + let mutex = mutex_generator(); + //dbg!(&mutex.lock().unwrap()); + if let Message::Text(text) = msg { if let Ok(notification) = serde_json::from_str::<ServerboundNotification>(&text) { match notification { ServerboundNotification::Authenticate(new_session) => { - if session.is_some() { - send(ClientboundNotification::Error( - WebSocketError::AlreadyAuthenticated, - )); - return future::ok(()); + { + if mutex.lock().unwrap().is_some() { + send(ClientboundNotification::Error( + WebSocketError::AlreadyAuthenticated, + )); + + return Ok(()) + } } - match task::block_on( - Auth::new(get_collection("accounts")).verify_session(new_session), - ) { - Ok(validated_session) => { - match task::block_on( - Ref { - id: validated_session.user_id.clone(), - } - .fetch_user(), - ) { - Ok(user) => { - if let Ok(mut map) = USERS.write() { - map.insert(validated_session.user_id.clone(), addr); - session = Some(validated_session); - if let Ok(_) = task::block_on( - subscriptions::generate_subscriptions(&user), - ) { - send(ClientboundNotification::Authenticated); - - match task::block_on( - super::payload::generate_ready(user), - ) { - Ok(payload) => { - send(payload); - } - Err(_) => { - send(ClientboundNotification::Error( - WebSocketError::InternalError, - )); - } - } - } else { - send(ClientboundNotification::Error( - WebSocketError::InternalError, - )); - } - } else { + if let Ok(validated_session) = Auth::new(get_collection("accounts")) + .verify_session(new_session) + .await { + let id = validated_session.user_id.clone(); + if let Ok(user) = ( + Ref { + id: id.clone() + } + ) + .fetch_user() + .await { + let was_online = is_online(&id); + { + match USERS.write() { + Ok(mut map) => { + map.insert(id.clone(), addr); + } + Err(_) => { send(ClientboundNotification::Error( - WebSocketError::InternalError, + WebSocketError::InternalError { at: "Writing users map.".to_string() }, )); + + return Ok(()) + } + } + } + + *mutex.lock().unwrap() = Some(validated_session); + + if let Err(_) = subscriptions::generate_subscriptions(&user).await { + send(ClientboundNotification::Error( + WebSocketError::InternalError { at: "Generating subscriptions.".to_string() }, + )); + + return Ok(()) + } + + send(ClientboundNotification::Authenticated); + + match super::payload::generate_ready(user).await { + Ok(payload) => { + send(payload); + + if !was_online { + ClientboundNotification::UserPresence { + id: id.clone(), + online: true + } + .publish(id) + .await + .ok(); } } Err(_) => { send(ClientboundNotification::Error( - WebSocketError::OnboardingNotFinished, + WebSocketError::InternalError { at: "Generating payload.".to_string() }, )); + + return Ok(()) } } - } - Err(_) => { + } else { send(ClientboundNotification::Error( - WebSocketError::InvalidSession, + WebSocketError::OnboardingNotFinished, )); } + } else { + send(ClientboundNotification::Error( + WebSocketError::InvalidSession, + )); } } } } } - future::ok(()) + Ok(()) }); pin_mut!(fwd, incoming); @@ -146,7 +169,8 @@ async fn accept(stream: TcpStream) { info!("User {} disconnected.", &addr); CONNECTIONS.lock().unwrap().remove(&addr); - if let Some(session) = session { + let session = session.lock().unwrap(); + if let Some(session) = session.as_ref() { let mut users = USERS.write().unwrap(); users.remove(&session.user_id, &addr); if users.get_left(&session.user_id).is_none() { @@ -184,3 +208,7 @@ pub fn publish(ids: Vec<String>, notification: ClientboundNotification) { } } } + +pub fn is_online(user: &String) -> bool { + USERS.read().unwrap().get_left(&user).is_some() +} diff --git a/src/routes/users/fetch_user.rs b/src/routes/users/fetch_user.rs index 21f4ceb..4ae40aa 100644 --- a/src/routes/users/fetch_user.rs +++ b/src/routes/users/fetch_user.rs @@ -1,4 +1,4 @@ -use crate::database::*; +use crate::{database::*, notifications::websocket::is_online}; use crate::util::result::{Error, Result}; use rocket_contrib::json::JsonValue; @@ -31,5 +31,7 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> { target.relationship = Some(RelationshipStatus::User); } + target.online = Some(is_online(&target.id)); + Ok(json!(target)) } -- GitLab