Skip to content
Snippets Groups Projects
Commit f6c52de1 authored by insert's avatar insert
Browse files

Add basic presence tracking.

parent 5e70ceea
No related merge requests found
Pipeline #447 passed with stage
in 2 minutes and 38 seconds
......@@ -29,4 +29,6 @@ pub struct User {
// ? This should never be pushed to the collection.
#[serde(skip_serializing_if = "Option::is_none")]
pub relationship: Option<RelationshipStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
pub online: Option<bool>,
}
......@@ -11,7 +11,7 @@ pub enum WebSocketError {
#[snafu(display("This error has not been labelled."))]
LabelMe,
#[snafu(display("Internal server error."))]
InternalError,
InternalError { at: String },
#[snafu(display("Invalid session."))]
InvalidSession,
#[snafu(display("User hasn't completed onboarding."))]
......@@ -90,11 +90,17 @@ pub enum ClientboundNotification {
GuildDelete {
id: String,
},*/
UserRelationship {
id: String,
user: String,
status: RelationshipStatus,
},
UserPresence {
id: String,
online: bool
}
}
impl ClientboundNotification {
......
......@@ -9,7 +9,9 @@ use mongodb::{
options::FindOptions,
};
pub async fn generate_ready(user: User) -> Result<ClientboundNotification> {
use super::websocket::is_online;
pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
let mut users = vec![];
if let Some(relationships) = &user.relations {
......@@ -52,11 +54,14 @@ pub async fn generate_ready(user: User) -> Result<ClientboundNotification> {
.clone(),
);
user.online = Some(is_online(&user.id));
users.push(user);
}
}
}
user.online = Some(is_online(&user.id));
users.push(user);
Ok(ClientboundNotification::Ready { users })
......
......@@ -63,81 +63,104 @@ async fn accept(stream: TcpStream) {
}
};
let mut session: Option<Session> = None;
let session: Arc<Mutex<Option<Session>>> = Arc::new(Mutex::new(None));
let mutex_generator = || { session.clone() };
let fwd = rx.map(Ok).forward(write);
let incoming = read.try_for_each(|msg| {
let incoming = read.try_for_each(async move |msg| {
let mutex = mutex_generator();
//dbg!(&mutex.lock().unwrap());
if let Message::Text(text) = msg {
if let Ok(notification) = serde_json::from_str::<ServerboundNotification>(&text) {
match notification {
ServerboundNotification::Authenticate(new_session) => {
if session.is_some() {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
));
return future::ok(());
{
if mutex.lock().unwrap().is_some() {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
));
return Ok(())
}
}
match task::block_on(
Auth::new(get_collection("accounts")).verify_session(new_session),
) {
Ok(validated_session) => {
match task::block_on(
Ref {
id: validated_session.user_id.clone(),
}
.fetch_user(),
) {
Ok(user) => {
if let Ok(mut map) = USERS.write() {
map.insert(validated_session.user_id.clone(), addr);
session = Some(validated_session);
if let Ok(_) = task::block_on(
subscriptions::generate_subscriptions(&user),
) {
send(ClientboundNotification::Authenticated);
match task::block_on(
super::payload::generate_ready(user),
) {
Ok(payload) => {
send(payload);
}
Err(_) => {
send(ClientboundNotification::Error(
WebSocketError::InternalError,
));
}
}
} else {
send(ClientboundNotification::Error(
WebSocketError::InternalError,
));
}
} else {
if let Ok(validated_session) = Auth::new(get_collection("accounts"))
.verify_session(new_session)
.await {
let id = validated_session.user_id.clone();
if let Ok(user) = (
Ref {
id: id.clone()
}
)
.fetch_user()
.await {
let was_online = is_online(&id);
{
match USERS.write() {
Ok(mut map) => {
map.insert(id.clone(), addr);
}
Err(_) => {
send(ClientboundNotification::Error(
WebSocketError::InternalError,
WebSocketError::InternalError { at: "Writing users map.".to_string() },
));
return Ok(())
}
}
}
*mutex.lock().unwrap() = Some(validated_session);
if let Err(_) = subscriptions::generate_subscriptions(&user).await {
send(ClientboundNotification::Error(
WebSocketError::InternalError { at: "Generating subscriptions.".to_string() },
));
return Ok(())
}
send(ClientboundNotification::Authenticated);
match super::payload::generate_ready(user).await {
Ok(payload) => {
send(payload);
if !was_online {
ClientboundNotification::UserPresence {
id: id.clone(),
online: true
}
.publish(id)
.await
.ok();
}
}
Err(_) => {
send(ClientboundNotification::Error(
WebSocketError::OnboardingNotFinished,
WebSocketError::InternalError { at: "Generating payload.".to_string() },
));
return Ok(())
}
}
}
Err(_) => {
} else {
send(ClientboundNotification::Error(
WebSocketError::InvalidSession,
WebSocketError::OnboardingNotFinished,
));
}
} else {
send(ClientboundNotification::Error(
WebSocketError::InvalidSession,
));
}
}
}
}
}
future::ok(())
Ok(())
});
pin_mut!(fwd, incoming);
......@@ -146,7 +169,8 @@ async fn accept(stream: TcpStream) {
info!("User {} disconnected.", &addr);
CONNECTIONS.lock().unwrap().remove(&addr);
if let Some(session) = session {
let session = session.lock().unwrap();
if let Some(session) = session.as_ref() {
let mut users = USERS.write().unwrap();
users.remove(&session.user_id, &addr);
if users.get_left(&session.user_id).is_none() {
......@@ -184,3 +208,7 @@ pub fn publish(ids: Vec<String>, notification: ClientboundNotification) {
}
}
}
pub fn is_online(user: &String) -> bool {
USERS.read().unwrap().get_left(&user).is_some()
}
use crate::database::*;
use crate::{database::*, notifications::websocket::is_online};
use crate::util::result::{Error, Result};
use rocket_contrib::json::JsonValue;
......@@ -31,5 +31,7 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> {
target.relationship = Some(RelationshipStatus::User);
}
target.online = Some(is_online(&target.id));
Ok(json!(target))
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment