Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 781 additions and 1809 deletions
use crate::database;
use crate::util::vec_to_set;
use hashbrown::{HashMap, HashSet};
use mongodb::bson::doc;
use mongodb::options::FindOneOptions;
use once_cell::sync::OnceCell;
use std::sync::RwLock;
use ws::Sender;
pub enum StateResult {
DatabaseError,
InvalidToken,
Success(String),
}
static CONNECTIONS: OnceCell<RwLock<HashMap<String, Sender>>> = OnceCell::new();
pub fn add_connection(id: String, sender: Sender) {
CONNECTIONS
.get()
.unwrap()
.write()
.unwrap()
.insert(id, sender);
}
pub struct User {
connections: HashSet<String>,
guilds: HashSet<String>,
}
impl User {
pub fn new() -> User {
User {
connections: HashSet::new(),
guilds: HashSet::new(),
}
}
}
pub struct Guild {
users: HashSet<String>,
}
impl Guild {
pub fn new() -> Guild {
Guild {
users: HashSet::new(),
}
}
}
pub struct GlobalState {
users: HashMap<String, User>,
guilds: HashMap<String, Guild>,
}
impl GlobalState {
pub fn new() -> GlobalState {
GlobalState {
users: HashMap::new(),
guilds: HashMap::new(),
}
}
pub fn push_to_guild(&mut self, guild: String, user: String) {
if !self.guilds.contains_key(&guild) {
self.guilds.insert(guild.clone(), Guild::new());
}
self.guilds.get_mut(&guild).unwrap().users.insert(user);
}
pub fn try_authenticate(&mut self, connection: String, access_token: String) -> StateResult {
if let Ok(result) = database::get_collection("users").find_one(
doc! {
"access_token": access_token,
},
FindOneOptions::builder()
.projection(doc! { "_id": 1 })
.build(),
) {
if let Some(user) = result {
let user_id = user.get_str("_id").unwrap();
if self.users.contains_key(user_id) {
self.users
.get_mut(user_id)
.unwrap()
.connections
.insert(connection);
return StateResult::Success(user_id.to_string());
}
if let Ok(results) =
database::get_collection("members").find(doc! { "_id.user": &user_id }, None)
{
let mut guilds = vec![];
for result in results {
if let Ok(entry) = result {
guilds.push(
entry
.get_document("_id")
.unwrap()
.get_str("guild")
.unwrap()
.to_string(),
);
}
}
let mut user = User::new();
for guild in guilds {
user.guilds.insert(guild.clone());
self.push_to_guild(guild, user_id.to_string());
}
user.connections.insert(connection);
self.users.insert(user_id.to_string(), user);
StateResult::Success(user_id.to_string())
} else {
StateResult::DatabaseError
}
} else {
StateResult::InvalidToken
}
} else {
StateResult::DatabaseError
}
}
pub fn disconnect<U: Into<Option<String>>>(&mut self, user_id: U, connection: String) {
if let Some(user_id) = user_id.into() {
let user = self.users.get_mut(&user_id).unwrap();
user.connections.remove(&connection);
if user.connections.len() == 0 {
for guild in &user.guilds {
self.guilds.get_mut(guild).unwrap().users.remove(&user_id);
}
self.users.remove(&user_id);
}
}
CONNECTIONS
.get()
.unwrap()
.write()
.unwrap()
.remove(&connection);
}
}
pub static DATA: OnceCell<RwLock<GlobalState>> = OnceCell::new();
pub fn init() {
if CONNECTIONS.set(RwLock::new(HashMap::new())).is_err() {
panic!("Failed to set global connections map.");
}
if DATA.set(RwLock::new(GlobalState::new())).is_err() {
panic!("Failed to set global state.");
}
}
pub fn send_message(users: Option<Vec<String>>, guild: Option<String>, data: String) {
let state = DATA.get().unwrap().read().unwrap();
let mut connections = HashSet::new();
let mut users = vec_to_set(&users.unwrap_or(vec![]));
if let Some(guild) = guild {
if let Some(entry) = state.guilds.get(&guild) {
for user in &entry.users {
users.insert(user.to_string());
}
}
}
for user in users {
if let Some(entry) = state.users.get(&user) {
for connection in &entry.connections {
connections.insert(connection.clone());
}
}
}
let targets = CONNECTIONS.get().unwrap().read().unwrap();
for conn in connections {
if let Some(sender) = targets.get(&conn) {
if sender.send(data.clone()).is_err() {
eprintln!("Failed to send a notification to a websocket. [{}]", &conn);
}
}
}
}
use crate::database::*;
use super::hive::get_hive;
use futures::StreamExt;
use hive_pubsub::PubSub;
use mongodb::bson::doc;
use mongodb::bson::Document;
use mongodb::options::FindOptions;
pub async fn generate_subscriptions(user: &User) -> Result<(), String> {
let hive = get_hive();
hive.subscribe(user.id.clone(), user.id.clone())?;
if let Some(relations) = &user.relations {
for relation in relations {
hive.subscribe(user.id.clone(), relation.id.clone())?;
}
}
let server_ids = User::fetch_server_ids(&user.id)
.await
.map_err(|_| "Failed to fetch memberships.".to_string())?;
let channel_ids = get_collection("servers")
.find(
doc! {
"_id": {
"$in": &server_ids
}
},
None,
)
.await
.map_err(|_| "Failed to fetch servers.".to_string())?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
x.get_array("channels").ok().map(|v| {
v.into_iter()
.filter_map(|x| x.as_str().map(|x| x.to_string()))
.collect::<Vec<String>>()
})
})
.flatten()
.collect::<Vec<String>>();
for id in server_ids {
hive.subscribe(user.id.clone(), id)?;
}
for id in channel_ids {
hive.subscribe(user.id.clone(), id)?;
}
let mut cursor = get_collection("channels")
.find(
doc! {
"$or": [
{
"channel_type": "SavedMessages",
"user": &user.id
},
{
"channel_type": "DirectMessage",
"recipients": &user.id
},
{
"channel_type": "Group",
"recipients": &user.id
}
]
},
FindOptions::builder().projection(doc! { "_id": 1 }).build(),
)
.await
.map_err(|_| "Failed to fetch channels.".to_string())?;
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
hive.subscribe(user.id.clone(), doc.get_str("_id").unwrap().to_string())?;
}
}
Ok(())
}
use crate::database::*;
use crate::util::variables::WS_HOST;
use super::subscriptions;
use async_std::net::{TcpListener, TcpStream};
use async_std::task;
use async_tungstenite::tungstenite::Message;
use futures::channel::mpsc::{unbounded, UnboundedSender};
use futures::stream::TryStreamExt;
use futures::{pin_mut, prelude::*};
use hive_pubsub::PubSub;
use log::{debug, info};
use many_to_many::ManyToMany;
use rauth::{
auth::{Auth, Session},
options::Options,
};
use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex, RwLock};
use super::{
events::{ClientboundNotification, ServerboundNotification, WebSocketError},
hive::get_hive,
};
type Tx = UnboundedSender<Message>;
type PeerMap = Arc<Mutex<HashMap<SocketAddr, Tx>>>;
lazy_static! {
static ref CONNECTIONS: PeerMap = Arc::new(Mutex::new(HashMap::new()));
static ref USERS: Arc<RwLock<ManyToMany<String, SocketAddr>>> =
Arc::new(RwLock::new(ManyToMany::new()));
}
pub async fn launch_server() {
let try_socket = TcpListener::bind(WS_HOST.to_string()).await;
let listener = try_socket.expect("Failed to bind");
info!("Listening on: {}", *WS_HOST);
while let Ok((stream, _)) = listener.accept().await {
task::spawn(accept(stream));
}
}
async fn accept(stream: TcpStream) {
let addr = stream
.peer_addr()
.expect("Connected streams should have a peer address.");
let ws_stream = async_tungstenite::accept_async(stream)
.await
.expect("Error during websocket handshake.");
info!("User established WebSocket connection from {}.", &addr);
let (write, read) = ws_stream.split();
let (tx, rx) = unbounded();
CONNECTIONS.lock().unwrap().insert(addr, tx.clone());
let send = |notification: ClientboundNotification| {
if let Ok(response) = serde_json::to_string(&notification) {
if let Err(_) = tx.unbounded_send(Message::Text(response)) {
debug!("Failed unbounded_send to websocket stream.");
}
}
};
let session: Arc<Mutex<Option<Session>>> = Arc::new(Mutex::new(None));
let mutex_generator = || session.clone();
let fwd = rx.map(Ok).forward(write);
let incoming = read.try_for_each(async move |msg| {
let mutex = mutex_generator();
if let Message::Text(text) = msg {
if let Ok(notification) = serde_json::from_str::<ServerboundNotification>(&text) {
match notification {
ServerboundNotification::Authenticate(new_session) => {
{
if mutex.lock().unwrap().is_some() {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
));
return Ok(());
}
}
if let Ok(validated_session) =
Auth::new(get_collection("accounts"), Options::new())
.verify_session(new_session)
.await
{
let id = validated_session.user_id.clone();
if let Ok(user) = (Ref { id: id.clone() }).fetch_user().await {
let was_online = is_online(&id);
{
match USERS.write() {
Ok(mut map) => {
map.insert(id.clone(), addr);
}
Err(_) => {
send(ClientboundNotification::Error(
WebSocketError::InternalError {
at: "Writing users map.".to_string(),
},
));
return Ok(());
}
}
}
*mutex.lock().unwrap() = Some(validated_session);
if let Err(_) = subscriptions::generate_subscriptions(&user).await {
send(ClientboundNotification::Error(
WebSocketError::InternalError {
at: "Generating subscriptions.".to_string(),
},
));
return Ok(());
}
send(ClientboundNotification::Authenticated);
match super::payload::generate_ready(user).await {
Ok(payload) => {
send(payload);
if !was_online {
ClientboundNotification::UserUpdate {
id: id.clone(),
data: json!({
"online": true
}),
clear: None
}
.publish_as_user(id);
}
}
Err(_) => {
send(ClientboundNotification::Error(
WebSocketError::InternalError {
at: "Generating payload.".to_string(),
},
));
return Ok(());
}
}
} else {
send(ClientboundNotification::Error(
WebSocketError::OnboardingNotFinished,
));
}
} else {
send(ClientboundNotification::Error(
WebSocketError::InvalidSession,
));
}
}
// ! TEMP: verify user part of channel
// ! Could just run permission check here.
ServerboundNotification::BeginTyping { channel } => {
if mutex.lock().unwrap().is_some() {
let user = {
let mutex = mutex.lock().unwrap();
let session = mutex.as_ref().unwrap();
session.user_id.clone()
};
ClientboundNotification::ChannelStartTyping {
id: channel.clone(),
user,
}
.publish(channel);
} else {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
));
return Ok(());
}
}
ServerboundNotification::EndTyping { channel } => {
if mutex.lock().unwrap().is_some() {
let user = {
let mutex = mutex.lock().unwrap();
let session = mutex.as_ref().unwrap();
session.user_id.clone()
};
ClientboundNotification::ChannelStopTyping {
id: channel.clone(),
user,
}
.publish(channel);
} else {
send(ClientboundNotification::Error(
WebSocketError::AlreadyAuthenticated,
));
return Ok(());
}
}
}
}
}
Ok(())
});
pin_mut!(fwd, incoming);
future::select(fwd, incoming).await;
info!("User {} disconnected.", &addr);
CONNECTIONS.lock().unwrap().remove(&addr);
let mut offline = None;
{
let session = session.lock().unwrap();
if let Some(session) = session.as_ref() {
let mut users = USERS.write().unwrap();
users.remove(&session.user_id, &addr);
if users.get_left(&session.user_id).is_none() {
get_hive().drop_client(&session.user_id).unwrap();
offline = Some(session.user_id.clone());
}
}
}
if let Some(id) = offline {
ClientboundNotification::UserUpdate {
id: id.clone(),
data: json!({
"online": false
}),
clear: None
}
.publish_as_user(id);
}
}
pub fn publish(ids: Vec<String>, notification: ClientboundNotification) {
let mut targets = vec![];
{
let users = USERS.read().unwrap();
for id in ids {
// Block certain notifications from reaching users that aren't meant to see them.
match &notification {
ClientboundNotification::UserRelationship { id: user_id, .. }
| ClientboundNotification::UserSettingsUpdate { id: user_id, .. }
| ClientboundNotification::ChannelAck { user: user_id, .. } => {
if &id != user_id {
continue;
}
}
_ => {}
}
if let Some(mut arr) = users.get_left(&id) {
targets.append(&mut arr);
}
}
}
let msg = Message::Text(serde_json::to_string(&notification).unwrap());
let connections = CONNECTIONS.lock().unwrap();
for target in targets {
if let Some(conn) = connections.get(&target) {
if let Err(_) = conn.unbounded_send(msg.clone()) {
debug!("Failed unbounded_send.");
}
}
}
}
pub fn is_online(user: &String) -> bool {
USERS.read().unwrap().get_left(&user).is_some()
}
use super::state::{self, StateResult};
use crate::util::variables::WS_HOST;
use serde_json::{from_str, json, Value};
use ulid::Ulid;
use ws::{listen, CloseCode, Error, Handler, Handshake, Message, Result, Sender};
struct Server {
sender: Sender,
user_id: Option<String>,
id: String,
}
impl Handler for Server {
fn on_open(&mut self, _: Handshake) -> Result<()> {
state::add_connection(self.id.clone(), self.sender.clone());
Ok(())
}
fn on_message(&mut self, msg: Message) -> Result<()> {
if let Message::Text(text) = msg {
if let Ok(data) = from_str(&text) as std::result::Result<Value, _> {
if let Value::String(packet_type) = &data["type"] {
if packet_type == "authenticate" {
if self.user_id.is_some() {
self.sender.send(
json!({
"type": "authenticate",
"success": false,
"error": "Already authenticated!"
})
.to_string(),
)
} else if let Value::String(token) = &data["token"] {
let mut state = unsafe { state::DATA.get().unwrap().write().unwrap() };
match state.try_authenticate(self.id.clone(), token.to_string()) {
StateResult::Success(user_id) => {
let user = crate::database::user::fetch_user(&user_id)
.unwrap()
.unwrap();
self.user_id = Some(user_id);
self.sender.send(
json!({
"type": "authenticate",
"success": true,
})
.to_string(),
)?;
if let Ok(payload) = user.create_payload() {
self.sender.send(
json!({
"type": "ready",
"data": payload
})
.to_string(),
)
} else {
// ! TODO: FIXME: ALL THE NOTIFICATIONS CODE NEEDS TO BE
// ! RESTRUCTURED, IT IS UTTER GARBAGE. :)))))
Ok(())
}
}
StateResult::DatabaseError => self.sender.send(
json!({
"type": "authenticate",
"success": false,
"error": "Had database error."
})
.to_string(),
),
StateResult::InvalidToken => self.sender.send(
json!({
"type": "authenticate",
"success": false,
"error": "Invalid token."
})
.to_string(),
),
}
} else {
self.sender.send(
json!({
"type": "authenticate",
"success": false,
"error": "Token not present."
})
.to_string(),
)
}
} else {
Ok(())
}
} else {
Ok(())
}
} else {
Ok(())
}
} else {
Ok(())
}
}
fn on_close(&mut self, _code: CloseCode, _reason: &str) {
unsafe {
state::DATA
.get()
.unwrap()
.write()
.unwrap()
.disconnect(self.user_id.clone(), self.id.clone());
}
println!("User disconnected. [{}]", self.id);
}
fn on_error(&mut self, err: Error) {
println!("The server encountered an error: {:?}", err);
}
}
pub fn launch_server() {
state::init();
listen(WS_HOST.to_string(), |sender| Server {
sender,
user_id: None,
id: Ulid::new().to_string(),
})
.unwrap()
}
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserJoin {
pub id: String,
pub user: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserLeave {
pub id: String,
pub user: String,
}
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserJoin {
pub id: String,
pub user: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct UserLeave {
pub id: String,
pub user: String,
pub banned: bool,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChannelCreate {
pub id: String,
pub channel: String,
pub name: String,
pub description: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ChannelDelete {
pub id: String,
pub channel: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Delete {
pub id: String,
}
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Create {
pub id: String,
pub nonce: Option<String>,
pub channel: String,
pub author: String,
pub content: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Edit {
pub id: String,
pub channel: String,
pub author: String,
pub content: String,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Delete {
pub id: String,
}
use serde::{Deserialize, Serialize};
pub mod groups;
pub mod guilds;
pub mod message;
pub mod users;
#[allow(non_camel_case_types)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", content = "data")]
pub enum Notification {
message_create(message::Create),
message_edit(message::Edit),
message_delete(message::Delete),
group_user_join(groups::UserJoin),
group_user_leave(groups::UserLeave),
guild_user_join(guilds::UserJoin),
guild_user_leave(guilds::UserLeave),
guild_channel_create(guilds::ChannelCreate),
guild_channel_delete(guilds::ChannelDelete),
guild_delete(guilds::Delete),
user_friend_status(users::FriendStatus),
}
impl Notification {
pub fn push_to_cache(&self) {
//crate::database::channel::process_event(&self);
//crate::database::guild::process_event(&self);
//crate::database::user::process_event(&self);
}
}
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct FriendStatus {
pub id: String,
pub user: String,
pub status: i32,
}
pub mod events;
pub mod hive;
pub mod websocket;
use super::state;
use crate::database::get_collection;
use crate::pubsub::hive;
use log::{error, info};
use mongodb::bson::doc;
use mongodb::options::FindOneOptions;
use serde_json::{from_str, json, Value};
use ulid::Ulid;
use ws::{CloseCode, Error, Handler, Handshake, Message, Result, Sender};
pub struct Client {
id: String,
sender: Sender,
user_id: Option<String>,
}
impl Client {
pub fn new(sender: Sender) -> Client {
Client {
id: Ulid::new().to_string(),
user_id: None,
sender,
}
}
}
impl Handler for Client {
fn on_open(&mut self, handshake: Handshake) -> Result<()> {
info!("Client connected. [{}] {:?}", self.id, handshake.peer_addr);
Ok(())
}
// Client sends { "type": "authenticate", "token": token }.
// Receives { "type": "authorised" } and waits.
// Client then receives { "type": "ready", "data": payload }.
// If at any point we hit an error, send { "type": "error", "error": error }.
fn on_message(&mut self, msg: Message) -> Result<()> {
if let Message::Text(text) = msg {
if let Ok(data) = from_str(&text) as std::result::Result<Value, _> {
if let Value::String(packet_type) = &data["type"] {
if packet_type == "authenticate" {
if self.user_id.is_some() {
return self.sender.send(
json!({
"type": "error",
"error": "Already authenticated!"
})
.to_string(),
);
} else if let Value::String(token) = &data["token"] {
let user = get_collection("users").find_one(
doc! {
"access_token": token
},
FindOneOptions::builder()
.projection(doc! { "_id": 1 })
.build(),
);
if let Ok(result) = user {
if let Some(doc) = result {
self.sender.send(
json!({
"type": "authorised"
})
.to_string(),
)?;
// FIXME: fetch above when we switch to new token system
// or auth cache system, something like that
let user = crate::database::user::fetch_user(
doc.get_str("_id").unwrap(),
)
.unwrap()
.unwrap(); // this should be guranteed, I think, maybe? I'm getting rid of it later. FIXME
self.user_id = Some(user.id.clone());
match user.create_payload() {
Ok(payload) => {
// ! Grab the ids from the payload,
// ! there's probably a better way to
// ! do this. I'll rewrite it at some point.
let mut ids = vec![
self.user_id.as_ref().unwrap().clone()
];
{
// This is bad code. But to be fair
// it should work just fine.
for user in payload.get("users").unwrap().as_array().unwrap() {
ids.push(user.as_object().unwrap().get("id").unwrap().as_str().unwrap().to_string());
}
for channel in payload.get("channels").unwrap().as_array().unwrap() {
ids.push(channel.as_object().unwrap().get("id").unwrap().as_str().unwrap().to_string());
}
for guild in payload.get("guilds").unwrap().as_array().unwrap() {
ids.push(guild.as_object().unwrap().get("id").unwrap().as_str().unwrap().to_string());
}
}
if let Err(err) = hive::subscribe(self.user_id.as_ref().unwrap().clone(), ids) {
self.sender.send(
json!({
"type": "warn",
"error": "Failed to subscribe you to the Hive. You may not receive all notifications."
})
.to_string(),
)?;
}
self.sender.send(
json!({
"type": "ready",
"data": payload
})
.to_string(),
)?;
if state::accept(
self.id.clone(),
self.user_id.as_ref().unwrap().clone(),
self.sender.clone(),
)
.is_err()
{
self.sender.send(
json!({
"type": "warn",
"error": "Failed to accept your connection. You will not receive any notifications."
})
.to_string(),
)?;
}
}
Err(error) => {
error!("Failed to create payload! {}", error);
self.sender.send(
json!({
"type": "error",
"error": "Failed to create payload."
})
.to_string(),
)?;
}
}
} else {
self.sender.send(
json!({
"type": "error",
"error": "Invalid token."
})
.to_string(),
)?;
}
} else {
self.sender.send(
json!({
"type": "error",
"error": "Failed to fetch from database."
})
.to_string(),
)?;
}
} else {
self.sender.send(
json!({
"type": "error",
"error": "Missing token."
})
.to_string(),
)?;
}
}
}
}
}
Ok(())
}
fn on_close(&mut self, _code: CloseCode, reason: &str) {
info!("Client disconnected. [{}] {}", self.id, reason);
if let Err(error) = state::drop(&self.id) {
error!("Also failed to drop client from state! {}", error);
}
}
fn on_error(&mut self, err: Error) {
error!(
"A client disconnected due to an error. [{}] {}",
self.id, err
);
}
}
use crate::util::variables::WS_HOST;
use log::{error, info};
use std::thread;
use ws::listen;
mod client;
mod state;
pub use state::publish;
pub fn launch_server() {
thread::spawn(|| {
if listen(WS_HOST.to_string(), |sender| client::Client::new(sender)).is_err() {
error!(
"Failed to listen for WebSocket connections on {:?}!",
*WS_HOST
);
} else {
info!("Listening for WebSocket connections on {:?}", *WS_HOST);
}
});
}
use crate::pubsub::hive;
use many_to_many::ManyToMany;
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use log::{error, info};
use ws::Sender;
lazy_static! {
static ref CONNECTIONS: Arc<RwLock<HashMap<String, Sender>>> =
Arc::new(RwLock::new(HashMap::new()));
static ref CLIENTS: Arc<RwLock<ManyToMany<String, String>>> =
Arc::new(RwLock::new(ManyToMany::new()));
}
pub fn accept(id: String, user_id: String, sender: Sender) -> Result<(), String> {
let mut conns = CONNECTIONS
.write()
.map_err(|_| "Failed to lock connections for writing.")?;
conns.insert(id.clone(), sender);
let mut clients = CLIENTS
.write()
.map_err(|_| "Failed to lock clients for writing.")?;
clients.insert(user_id.clone(), id.clone());
info!("Accepted user [{}] for connection {}.", user_id, id);
Ok(())
}
pub fn drop(id: &String) -> Result<(), String> {
let mut conns = CONNECTIONS
.write()
.map_err(|_| "Failed to lock connections for writing.")?;
conns.remove(id);
let mut clients = CLIENTS
.write()
.map_err(|_| "Failed to lock clients for writing.")?;
let uid = if let Some(ids) = clients.get_right(id) {
let user_id: String = ids.into_iter().next().unwrap();
info!("Dropped user [{}] for connection {}.", user_id, id);
Some(user_id)
} else {
None
};
clients.remove_right(id);
if let Some(user_id) = &uid {
if let None = clients.get_left(user_id) {
if let Err(error) = hive::drop_user(user_id) {
error!("Failed to drop user from hive! {}", error);
} else {
info!("User [{}] has completed disconnected from node.", user_id);
}
}
}
Ok(())
}
pub fn publish(clients: Vec<String>, data: String) -> Result<(), String> {
let conns = CONNECTIONS
.read()
.map_err(|_| "Failed to lock connections for reading.")?;
let client_map = CLIENTS
.read()
.map_err(|_| "Failed to lock clients for reading.")?;
for client in clients {
if let Some(targets) = client_map.get_left(&client) {
for target in &targets {
if let Some(connection) = conns.get(target) {
if let Err(err) = connection.send(data.clone()) {
error!("Failed to publish notification to client [{}]! {}", target, err);
}
}
}
}
}
Ok(())
}
use super::Response;
use crate::database;
use crate::util::variables::{DISABLE_REGISTRATION, USE_EMAIL};
use crate::util::{captcha, email, gen_token};
use bcrypt::{hash, verify};
use chrono::prelude::*;
use database::user::User;
use log::error;
use mongodb::bson::{doc, from_bson, Bson};
use rocket_contrib::json::Json;
use serde::{Deserialize, Serialize};
use ulid::Ulid;
use validator::validate_email;
#[derive(Serialize, Deserialize)]
pub struct Create {
username: String,
password: String,
email: String,
captcha: Option<String>,
}
/// create a new Revolt account
/// (1) validate input
/// [username] 2 to 32 characters
/// [password] 8 to 72 characters
/// [email] validate against RFC
/// (2) check email existence
/// (3) add user and send email verification
#[post("/create", data = "<info>")]
pub fn create(info: Json<Create>) -> Response {
if let Err(error) = captcha::verify(&info.captcha) {
return Response::BadRequest(json!({ "error": error }));
}
if *DISABLE_REGISTRATION {
return Response::BadRequest(json!({ "error": "Registration disabled." }));
}
let col = database::get_collection("users");
if info.username.len() < 2 || info.username.len() > 32 {
return Response::NotAcceptable(
json!({ "error": "Username needs to be at least 2 chars and less than 32 chars." }),
);
}
if info.password.len() < 8 || info.password.len() > 72 {
return Response::NotAcceptable(
json!({ "error": "Password needs to be at least 8 chars and at most 72." }),
);
}
if !validate_email(info.email.clone()) {
return Response::UnprocessableEntity(json!({ "error": "Invalid email." }));
}
if let Some(_) = col
.find_one(doc! { "email": info.email.clone() }, None)
.expect("Failed user lookup")
{
return Response::Conflict(json!({ "error": "Email already in use!" }));
}
if let Some(_) = col
.find_one(doc! { "username": info.username.clone() }, None)
.expect("Failed user lookup")
{
return Response::Conflict(json!({ "error": "Username already in use!" }));
}
if let Ok(hashed) = hash(info.password.clone(), 10) {
let access_token = gen_token(92);
let code = gen_token(48);
let email_verification = match *USE_EMAIL {
true => doc! {
"verified": false,
"target": info.email.clone(),
"expiry": Bson::DateTime(Utc::now() + chrono::Duration::days(1)),
"rate_limit": Bson::DateTime(Utc::now() + chrono::Duration::minutes(1)),
"code": code.clone(),
},
false => doc! {
"verified": true
},
};
let id = Ulid::new().to_string();
match col.insert_one(
doc! {
"_id": &id,
"email": info.email.clone(),
"username": info.username.clone(),
"display_name": info.username.clone(),
"password": hashed,
"access_token": &access_token,
"email_verification": email_verification
},
None,
) {
Ok(_) => {
if *USE_EMAIL {
let sent = email::send_verification_email(info.email.clone(), code);
Response::Success(json!({
"email_sent": sent,
}))
} else {
Response::Success(json!({
"id": id,
"access_token": access_token
}))
}
}
Err(_) => {
Response::InternalServerError(json!({ "error": "Failed to create account." }))
}
}
} else {
Response::InternalServerError(json!({ "error": "Failed to hash." }))
}
}
/// verify an email for a Revolt account
/// (1) check if code is valid
/// (2) check if it expired yet
/// (3) set account as verified
#[get("/verify/<code>")]
pub fn verify_email(code: String) -> Response {
let col = database::get_collection("users");
if let Some(u) = col
.find_one(doc! { "email_verification.code": code.clone() }, None)
.expect("Failed user lookup")
{
let user: User = from_bson(Bson::Document(u)).expect("Failed to unwrap user.");
let ev = user.email_verification;
if Utc::now() > *ev.expiry.unwrap() {
Response::Gone(json!({
"success": false,
"error": "Token has expired!",
}))
} else {
let target = ev.target.unwrap();
col.update_one(
doc! { "_id": user.id },
doc! {
"$unset": {
"email_verification.code": "",
"email_verification.expiry": "",
"email_verification.target": "",
"email_verification.rate_limit": "",
},
"$set": {
"email_verification.verified": true,
"email": target.clone(),
},
},
None,
)
.expect("Failed to update user!");
if *USE_EMAIL {
if let Err(err) = email::send_welcome_email(target.to_string(), user.username) {
error!("Failed to send welcome email! {}", err);
}
}
Response::Redirect(super::Redirect::to("https://app.revolt.chat"))
}
} else {
Response::BadRequest(json!({ "error": "Invalid code." }))
}
}
#[derive(Serialize, Deserialize)]
pub struct Resend {
email: String,
captcha: Option<String>,
}
/// resend a verification email
/// (1) check if verification is pending for x email
/// (2) check for rate limit
/// (3) resend the email
#[post("/resend", data = "<info>")]
pub fn resend_email(info: Json<Resend>) -> Response {
if let Err(error) = captcha::verify(&info.captcha) {
return Response::BadRequest(json!({ "error": error }));
}
let col = database::get_collection("users");
if let Some(u) = col
.find_one(
doc! { "email_verification.target": info.email.clone() },
None,
)
.expect("Failed user lookup.")
{
let user: User = from_bson(Bson::Document(u)).expect("Failed to unwrap user.");
let ev = user.email_verification;
let expiry = ev.expiry.unwrap();
let rate_limit = ev.rate_limit.unwrap();
if Utc::now() < *rate_limit {
Response::TooManyRequests(
json!({ "error": "You are being rate limited, please try again in a while." }),
)
} else {
let mut new_expiry = Bson::DateTime(Utc::now() + chrono::Duration::days(1));
if info.email.clone() != user.email {
if Utc::now() > *expiry {
return Response::Gone(
json!({ "error": "To help protect your account, please login and change your email again. The original request was made over one day ago." }),
);
}
new_expiry = Bson::DateTime(*expiry);
}
let code = gen_token(48);
col.update_one(
doc! { "_id": user.id },
doc! {
"$set": {
"email_verification.code": code.clone(),
"email_verification.expiry": new_expiry,
"email_verification.rate_limit": Bson::DateTime(Utc::now() + chrono::Duration::minutes(1)),
},
},
None,
).expect("Failed to update user!");
if let Err(err) = email::send_verification_email(info.email.clone(), code) {
return Response::InternalServerError(json!({ "error": err }));
}
Response::Result(super::Status::Ok)
}
} else {
Response::NotFound(json!({ "error": "Email not found or pending verification!" }))
}
}
#[derive(Serialize, Deserialize)]
pub struct Login {
email: String,
password: String,
captcha: Option<String>,
}
/// login to a Revolt account
/// (1) find user by email
/// (2) verify password
/// (3) return access token
#[post("/login", data = "<info>")]
pub fn login(info: Json<Login>) -> Response {
if let Err(error) = captcha::verify(&info.captcha) {
return Response::BadRequest(json!({ "error": error }));
}
let col = database::get_collection("users");
if let Some(u) = col
.find_one(doc! { "email": info.email.clone() }, None)
.expect("Failed user lookup")
{
let user: User = from_bson(Bson::Document(u)).expect("Failed to unwrap user.");
match verify(info.password.clone(), &user.password)
.expect("Failed to check hash of password.")
{
true => {
let token = match user.access_token {
Some(t) => t.to_string(),
None => {
let token = gen_token(92);
if col
.update_one(
doc! { "_id": &user.id },
doc! { "$set": { "access_token": token.clone() } },
None,
)
.is_err()
{
return Response::InternalServerError(
json!({ "error": "Failed database operation." }),
);
}
token
}
};
Response::Success(json!({ "access_token": token, "id": user.id }))
}
false => Response::Unauthorized(json!({ "error": "Invalid password." })),
}
} else {
Response::NotFound(json!({ "error": "Email is not registered." }))
}
}
#[derive(Serialize, Deserialize)]
pub struct Token {
token: String,
}
/// login to a Revolt account via token
#[post("/token", data = "<info>")]
pub fn token(info: Json<Token>) -> Response {
let col = database::get_collection("users");
if let Ok(result) = col.find_one(doc! { "access_token": info.token.clone() }, None) {
if let Some(user) = result {
Response::Success(json!({
"id": user.get_str("_id").unwrap(),
}))
} else {
Response::Unauthorized(json!({
"error": "Invalid token!",
}))
}
} else {
Response::InternalServerError(json!({
"error": "Failed database query.",
}))
}
}
#[options("/create")]
pub fn create_preflight() -> Response {
Response::Result(super::Status::Ok)
}
#[options("/verify/<_code>")]
pub fn verify_email_preflight(_code: String) -> Response {
Response::Result(super::Status::Ok)
}
#[options("/resend")]
pub fn resend_email_preflight() -> Response {
Response::Result(super::Status::Ok)
}
#[options("/login")]
pub fn login_preflight() -> Response {
Response::Result(super::Status::Ok)
}
#[options("/token")]
pub fn token_preflight() -> Response {
Response::Result(super::Status::Ok)
}
use super::Response;
use crate::database::{
self, channel::Channel, get_relationship, get_relationship_internal, message::Message,
user::User, Permission, PermissionCalculator, Relationship,
};
use crate::notifications::{
self,
events::{groups::*, guilds::ChannelDelete, message::*, Notification},
};
use crate::util::vec_to_set;
use chrono::prelude::*;
use mongodb::bson::{doc, from_bson, Bson};
use mongodb::options::FindOptions;
use num_enum::TryFromPrimitive;
use rocket::request::Form;
use rocket_contrib::json::Json;
use serde::{Deserialize, Serialize};
use ulid::Ulid;
const MAXGROUPSIZE: usize = 50;
#[derive(Debug, TryFromPrimitive)]
#[repr(usize)]
pub enum ChannelType {
DM = 0,
GROUPDM = 1,
GUILDCHANNEL = 2,
}
macro_rules! with_permissions {
($user: expr, $target: expr) => {{
let permissions = PermissionCalculator::new($user.clone())
.channel($target.clone())
.fetch_data();
let value = permissions.as_permission();
if !value.get_access() {
return None;
}
value
}};
}
#[derive(Serialize, Deserialize)]
pub struct CreateGroup {
name: String,
nonce: String,
users: Vec<String>,
}
/// create a new group
#[post("/create", data = "<info>")]
pub fn create_group(user: User, info: Json<CreateGroup>) -> Response {
let name: String = info.name.chars().take(32).collect();
let nonce: String = info.nonce.chars().take(32).collect();
let mut set = vec_to_set(&info.users);
set.insert(user.id.clone());
if set.len() > MAXGROUPSIZE {
return Response::BadRequest(json!({ "error": "Maximum group size is 50." }));
}
let col = database::get_collection("channels");
if let Some(_) = col.find_one(doc! { "nonce": nonce.clone() }, None).unwrap() {
return Response::BadRequest(json!({ "error": "Group already created!" }));
}
let mut query = vec![];
for item in &set {
if item == &user.id {
continue;
}
query.push(Bson::String(item.clone()));
}
if let Ok(result) = database::get_collection("users").find(
doc! {
"_id": {
"$in": &query
}
},
FindOptions::builder().limit(query.len() as i64).build(),
) {
if result.count() != query.len() {
return Response::BadRequest(json!({ "error": "Specified non-existant user(s)." }));
}
for item in set {
if item == user.id {
continue;
}
if get_relationship_internal(&user.id, &item, &user.relations) != Relationship::Friend {
return Response::BadRequest(json!({ "error": "Not friends with user(s)." }));
}
}
query.push(Bson::String(user.id.clone()));
let id = Ulid::new().to_string();
if col
.insert_one(
doc! {
"_id": id.clone(),
"nonce": nonce,
"type": ChannelType::GROUPDM as u32,
"recipients": &query,
"name": name,
"owner": &user.id,
},
None,
)
.is_ok()
{
Response::Success(json!({ "id": id }))
} else {
Response::InternalServerError(json!({ "error": "Failed to create guild channel." }))
}
} else {
Response::InternalServerError(json!({ "error": "Failed to validate users." }))
}
}
/// fetch channel information
#[get("/<target>")]
pub fn channel(user: User, target: Channel) -> Option<Response> {
with_permissions!(user, target);
Some(Response::Success(target.serialise()))
}
/// [groups] add user to channel
#[put("/<target>/recipients/<member>")]
pub fn add_member(user: User, target: Channel, member: User) -> Option<Response> {
if target.channel_type != 1 {
return Some(Response::BadRequest(json!({ "error": "Not a group DM." })));
}
with_permissions!(user, target);
let recp = target.recipients.as_ref().unwrap();
if recp.len() == 50 {
return Some(Response::BadRequest(
json!({ "error": "Maximum group size is 50." }),
));
}
let set = vec_to_set(&recp);
if set.get(&member.id).is_some() {
return Some(Response::BadRequest(
json!({ "error": "User already in group!" }),
));
}
match get_relationship(&user, &member) {
Relationship::Friend => {
if database::get_collection("channels")
.update_one(
doc! { "_id": target.id.clone() },
doc! {
"$push": {
"recipients": &member.id
}
},
None,
)
.is_ok()
{
if (Message {
id: Ulid::new().to_string(),
nonce: None,
channel: target.id.clone(),
author: "system".to_string(),
content: format!("<@{}> added <@{}> to the group.", &user.id, &member.id),
edited: None,
previous_content: vec![],
})
.send(&target)
{
notifications::send_message_given_channel(
Notification::group_user_join(UserJoin {
id: target.id.clone(),
user: member.id.clone(),
}),
&target,
);
Some(Response::Result(super::Status::Ok))
} else {
Some(Response::PartialStatus(
json!({ "error": "Failed to send join message, but user has been added." }),
))
}
} else {
Some(Response::InternalServerError(
json!({ "error": "Failed to add user to group." }),
))
}
}
_ => Some(Response::BadRequest(
json!({ "error": "Not friends with user." }),
)),
}
}
/// [groups] remove user from channel
#[delete("/<target>/recipients/<member>")]
pub fn remove_member(user: User, target: Channel, member: User) -> Option<Response> {
if target.channel_type != 1 {
return Some(Response::BadRequest(json!({ "error": "Not a group DM." })));
}
if &user.id == &member.id {
return Some(Response::BadRequest(
json!({ "error": "Cannot kick yourself, leave the channel instead." }),
));
}
let permissions = with_permissions!(user, target);
if !permissions.get_kick_members() {
return Some(Response::LackingPermission(Permission::KickMembers));
}
let set = vec_to_set(target.recipients.as_ref().unwrap());
if set.get(&member.id).is_none() {
return Some(Response::BadRequest(
json!({ "error": "User not in group!" }),
));
}
if database::get_collection("channels")
.update_one(
doc! { "_id": &target.id },
doc! {
"$pull": {
"recipients": &member.id
}
},
None,
)
.is_ok()
{
if (Message {
id: Ulid::new().to_string(),
nonce: None,
channel: target.id.clone(),
author: "system".to_string(),
content: format!("<@{}> removed <@{}> from the group.", &user.id, &member.id),
edited: None,
previous_content: vec![],
})
.send(&target)
{
notifications::send_message_given_channel(
Notification::group_user_leave(UserLeave {
id: target.id.clone(),
user: member.id.clone(),
}),
&target,
);
Some(Response::Result(super::Status::Ok))
} else {
Some(Response::PartialStatus(
json!({ "error": "Failed to send join message, but user has been removed." }),
))
}
} else {
Some(Response::InternalServerError(
json!({ "error": "Failed to add user to group." }),
))
}
}
/// delete channel
/// or leave group DM
/// or close DM conversation
#[delete("/<target>")]
pub fn delete(user: User, target: Channel) -> Option<Response> {
let permissions = with_permissions!(user, target);
if !permissions.get_manage_channels() {
return Some(Response::LackingPermission(Permission::ManageChannels));
}
let col = database::get_collection("channels");
let target_id = target.id.clone();
let try_delete = || {
let messages = database::get_collection("messages");
if messages
.delete_many(doc! { "channel": &target_id }, None)
.is_ok()
{
if col.delete_one(doc! { "_id": &target_id }, None).is_ok() {
Some(Response::Result(super::Status::Ok))
} else {
Some(Response::InternalServerError(
json!({ "error": "Failed to delete channel." }),
))
}
} else {
Some(Response::InternalServerError(
json!({ "error": "Failed to delete messages." }),
))
}
};
match target.channel_type {
0 => {
if col
.update_one(
doc! { "_id": &target_id },
doc! { "$set": { "active": false } },
None,
)
.is_ok()
{
Some(Response::Result(super::Status::Ok))
} else {
Some(Response::InternalServerError(
json!({ "error": "Failed to close channel." }),
))
}
}
1 => {
let mut recipients = vec_to_set(
target
.recipients
.as_ref()
.expect("Missing recipients on Group DM."),
);
let owner = target.owner.as_ref().expect("Missing owner on Group DM.");
if recipients.len() == 1 {
try_delete()
} else {
recipients.remove(&user.id);
let new_owner = if owner == &user.id {
recipients.iter().next().unwrap()
} else {
&owner
};
if col
.update_one(
doc! { "_id": target_id },
doc! {
"$set": {
"owner": new_owner,
},
"$pull": {
"recipients": &user.id,
}
},
None,
)
.is_ok()
{
if (Message {
id: Ulid::new().to_string(),
nonce: None,
channel: target.id.clone(),
author: "system".to_string(),
content: format!("<@{}> left the group.", &user.id),
edited: None,
previous_content: vec![],
})
.send(&target)
{
notifications::send_message_given_channel(
Notification::group_user_leave(UserLeave {
id: target.id.clone(),
user: user.id.clone(),
}),
&target,
);
Some(Response::Result(super::Status::Ok))
} else {
Some(Response::PartialStatus(
json!({ "error": "Failed to send leave message, but you have left the group." }),
))
}
} else {
Some(Response::InternalServerError(
json!({ "error": "Failed to remove you from the group." }),
))
}
}
}
2 => {
let guild_id = target.guild.unwrap();
if database::get_collection("guilds")
.update_one(
doc! { "_id": &guild_id },
doc! {
"$pull": {
"invites": {
"channel": &target.id
},
"channels": &target.id
}
},
None,
)
.is_ok()
{
notifications::send_message_threaded(
None,
guild_id.clone(),
Notification::guild_channel_delete(ChannelDelete {
id: guild_id.clone(),
channel: target.id.clone(),
}),
);
try_delete()
} else {
Some(Response::InternalServerError(
json!({ "error": "Failed to remove invites." }),
))
}
}
_ => Some(Response::InternalServerError(
json!({ "error": "Unknown error has occurred." }),
)),
}
}
#[derive(Serialize, Deserialize, FromForm)]
pub struct MessageFetchOptions {
limit: Option<i64>,
before: Option<String>,
after: Option<String>,
}
/// fetch channel messages
#[get("/<target>/messages?<options..>")]
pub fn messages(
user: User,
target: Channel,
options: Form<MessageFetchOptions>,
) -> Option<Response> {
let permissions = with_permissions!(user, target);
if !permissions.get_read_messages() {
return Some(Response::LackingPermission(Permission::ReadMessages));
}
// ! FIXME: update wiki to reflect changes
let mut query = doc! { "channel": target.id };
if let Some(before) = &options.before {
query.insert("_id", doc! { "$lt": before });
}
if let Some(after) = &options.after {
query.insert("_id", doc! { "$gt": after });
}
let limit = if let Some(limit) = options.limit {
limit.min(100).max(0)
} else {
50
};
let col = database::get_collection("messages");
let result = col
.find(
query,
FindOptions::builder()
.limit(limit)
.sort(doc! {
"_id": -1
})
.build(),
)
.unwrap();
let mut messages = Vec::new();
for item in result {
let message: Message =
from_bson(Bson::Document(item.unwrap())).expect("Failed to unwrap message.");
messages.push(json!({
"id": message.id,
"author": message.author,
"content": message.content,
"edited": if let Some(t) = message.edited { Some(t.timestamp()) } else { None }
}));
}
Some(Response::Success(json!(messages)))
}
#[derive(Serialize, Deserialize)]
pub struct SendMessage {
content: String,
nonce: String,
}
/// send a message to a channel
#[post("/<target>/messages", data = "<message>")]
pub fn send_message(user: User, target: Channel, message: Json<SendMessage>) -> Option<Response> {
let permissions = with_permissions!(user, target);
if !permissions.get_send_messages() {
if target.channel_type == 0 {
return Some(Response::LackingPermission(Permission::SendDirectMessages));
}
return Some(Response::LackingPermission(Permission::SendMessages));
}
let content: String = message.content.chars().take(2000).collect();
let nonce: String = message.nonce.chars().take(32).collect();
if content.len() == 0 {
return Some(Response::NotAcceptable(
json!({ "error": "No message content!" }),
));
}
let col = database::get_collection("messages");
if col
.find_one(doc! { "nonce": nonce.clone() }, None)
.unwrap()
.is_some()
{
return Some(Response::BadRequest(
json!({ "error": "Message already sent!" }),
));
}
let id = Ulid::new().to_string();
let message = Message {
id: id.clone(),
nonce: Some(nonce),
channel: target.id.clone(),
author: user.id,
content,
edited: None,
previous_content: vec![],
};
if message.send(&target) {
Some(Response::Success(json!({ "id": id })))
} else {
Some(Response::BadRequest(
json!({ "error": "Failed to send message." }),
))
}
}
/// get a message
#[get("/<target>/messages/<message>")]
pub fn get_message(user: User, target: Channel, message: Message) -> Option<Response> {
let permissions = with_permissions!(user, target);
if !permissions.get_read_messages() {
return Some(Response::LackingPermission(Permission::ReadMessages));
}
// ! CHECK IF USER HAS PERMISSION TO VIEW EDITS OF MESSAGES
let mut entries = vec![];
for entry in message.previous_content {
entries.push(json!({
"content": entry.content,
"time": entry.time.timestamp(),
}));
}
Some(Response::Success(json!({
"id": message.id,
"author": message.author,
"content": message.content,
"edited": if let Some(t) = message.edited { Some(t.timestamp()) } else { None },
"previous_content": entries,
})))
}
#[derive(Serialize, Deserialize)]
pub struct EditMessage {
content: String,
}
/// edit a message
#[patch("/<target>/messages/<message>", data = "<edit>")]
pub fn edit_message(
user: User,
target: Channel,
message: Message,
edit: Json<EditMessage>,
) -> Option<Response> {
with_permissions!(user, target);
if message.author != user.id {
return Some(Response::Unauthorized(
json!({ "error": "You did not send this message." }),
));
}
let col = database::get_collection("messages");
let time = if let Some(edited) = message.edited {
edited.0
} else {
Ulid::from_string(&message.id).unwrap().datetime()
};
let edited = Utc::now();
match col.update_one(
doc! { "_id": message.id.clone() },
doc! {
"$set": {
"content": &edit.content,
"edited": Bson::DateTime(edited)
},
"$push": {
"previous_content": {
"content": &message.content,
"time": time,
}
},
},
None,
) {
Ok(_) => {
notifications::send_message_given_channel(
Notification::message_edit(Edit {
id: message.id.clone(),
channel: target.id.clone(),
author: message.author.clone(),
content: edit.content.clone(),
}),
&target,
);
Some(Response::Result(super::Status::Ok))
}
Err(_) => Some(Response::InternalServerError(
json!({ "error": "Failed to update message." }),
)),
}
}
/// delete a message
#[delete("/<target>/messages/<message>")]
pub fn delete_message(user: User, target: Channel, message: Message) -> Option<Response> {
let permissions = with_permissions!(user, target);
if !permissions.get_manage_messages() && message.author != user.id {
return Some(Response::LackingPermission(Permission::ManageMessages));
}
let col = database::get_collection("messages");
match col.delete_one(doc! { "_id": &message.id }, None) {
Ok(_) => {
notifications::send_message_given_channel(
Notification::message_delete(Delete {
id: message.id.clone(),
}),
&target,
);
Some(Response::Result(super::Status::Ok))
}
Err(_) => Some(Response::InternalServerError(
json!({ "error": "Failed to delete message." }),
)),
}
}
#[options("/create")]
pub fn create_group_preflight() -> Response {
Response::Result(super::Status::Ok)
}
#[options("/<_target>")]
pub fn channel_preflight(_target: String) -> Response {
Response::Result(super::Status::Ok)
}
#[options("/<_target>/recipients/<_member>")]
pub fn member_preflight(_target: String, _member: String) -> Response {
Response::Result(super::Status::Ok)
}
#[options("/<_target>/messages")]
pub fn messages_preflight(_target: String) -> Response {
Response::Result(super::Status::Ok)
}
#[options("/<_target>/messages/<_message>")]
pub fn message_preflight(_target: String, _message: String) -> Response {
Response::Result(super::Status::Ok)
}
use crate::database::*;
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use mongodb::bson::doc;
use mongodb::options::UpdateOptions;
#[put("/<target>/ack/<message>")]
pub async fn req(user: User, target: Ref, message: Ref) -> Result<()> {
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
let id = target.id();
get_collection("channel_unreads")
.update_one(
doc! {
"_id.channel": id,
"_id.user": &user.id
},
doc! {
"$unset": {
"mentions": 1
},
"$set": {
"last_id": &message.id
}
},
UpdateOptions::builder().upsert(true).build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel_unreads",
})?;
ClientboundNotification::ChannelAck {
id: id.to_string(),
user: user.id.clone(),
message_id: message.id,
}
.publish(user.id);
Ok(())
}
use crate::util::result::{Error, Result};
use crate::{database::*, notifications::events::ClientboundNotification};
use mongodb::bson::doc;
#[delete("/<target>")]
pub async fn req(user: User, target: Ref) -> Result<()> {
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
match &target {
Channel::SavedMessages { .. } => Err(Error::NoEffect),
Channel::DirectMessage { .. } => {
get_collection("channels")
.update_one(
doc! {
"_id": target.id()
},
doc! {
"$set": {
"active": false
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
Ok(())
}
Channel::Group {
id,
owner,
recipients,
..
} => {
if &user.id == owner {
if let Some(new_owner) = recipients.iter().find(|x| *x != &user.id) {
get_collection("channels")
.update_one(
doc! {
"_id": &id
},
doc! {
"$set": {
"owner": new_owner
},
"$pull": {
"recipients": &user.id
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
target.publish_update(json!({ "owner": new_owner })).await?;
} else {
return target.delete().await;
}
} else {
get_collection("channels")
.update_one(
doc! {
"_id": &id
},
doc! {
"$pull": {
"recipients": &user.id
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
}
ClientboundNotification::ChannelGroupLeave {
id: id.clone(),
user: user.id.clone(),
}
.publish(id.clone());
Content::SystemMessage(SystemMessage::UserLeft { id: user.id })
.send_as_system(&target)
.await
.ok();
Ok(())
}
Channel::TextChannel { .. } |
Channel::VoiceChannel { .. } => {
if perm.get_manage_channel() {
target.delete().await
} else {
Err(Error::MissingPermission)
}
}
}
}
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use crate::{database::*, notifications::events::RemoveChannelField};
use mongodb::bson::{doc, to_document};
use rocket_contrib::json::Json;
use serde::{Deserialize, Serialize};
use validator::Validate;
#[derive(Validate, Serialize, Deserialize)]
pub struct Data {
#[validate(length(min = 1, max = 32))]
#[serde(skip_serializing_if = "Option::is_none")]
name: Option<String>,
#[validate(length(min = 0, max = 1024))]
#[serde(skip_serializing_if = "Option::is_none")]
description: Option<String>,
#[validate(length(min = 1, max = 128))]
icon: Option<String>,
remove: Option<RemoveChannelField>,
}
#[patch("/<target>", data = "<data>")]
pub async fn req(user: User, target: Ref, data: Json<Data>) -> Result<()> {
let data = data.into_inner();
data.validate()
.map_err(|error| Error::FailedValidation { error })?;
if data.name.is_none()
&& data.description.is_none()
&& data.icon.is_none()
&& data.remove.is_none()
{
return Ok(());
}
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_manage_channel() {
Err(Error::MissingPermission)?
}
match &target {
Channel::Group { id, icon, .. }
| Channel::TextChannel { id, icon, .. }
| Channel::VoiceChannel { id, icon, .. } => {
let mut set = doc! {};
let mut unset = doc! {};
let mut remove_icon = false;
if let Some(remove) = &data.remove {
match remove {
RemoveChannelField::Icon => {
unset.insert("icon", 1);
remove_icon = true;
}
RemoveChannelField::Description => {
unset.insert("description", 1);
}
}
}
if let Some(name) = &data.name {
set.insert("name", name);
}
if let Some(description) = &data.description {
set.insert("description", description);
}
if let Some(attachment_id) = &data.icon {
let attachment =
File::find_and_use(&attachment_id, "icons", "object", target.id()).await?;
set.insert(
"icon",
to_document(&attachment).map_err(|_| Error::DatabaseError {
operation: "to_document",
with: "attachment",
})?,
);
remove_icon = true;
}
let mut operations = doc! {};
if set.len() > 0 {
operations.insert("$set", &set);
}
if unset.len() > 0 {
operations.insert("$unset", unset);
}
if operations.len() > 0 {
get_collection("channels")
.update_one(doc! { "_id": &id }, operations, None)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
}
ClientboundNotification::ChannelUpdate {
id: id.clone(),
data: json!(set),
clear: data.remove,
}
.publish(id.clone());
if let Channel::Group { .. } = &target {
if let Some(name) = data.name {
Content::SystemMessage(SystemMessage::ChannelRenamed {
name,
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.description {
Content::SystemMessage(SystemMessage::ChannelDescriptionChanged {
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.icon {
Content::SystemMessage(SystemMessage::ChannelIconChanged { by: user.id })
.send_as_system(&target)
.await
.ok();
}
}
if remove_icon {
if let Some(old_icon) = icon {
old_icon.delete().await?;
}
}
Ok(())
}
_ => Err(Error::InvalidOperation),
}
}
use crate::database::*;
use crate::util::result::{Error, Result};
use rocket_contrib::json::JsonValue;
#[get("/<target>")]
pub async fn req(user: User, target: Ref) -> Result<JsonValue> {
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
Ok(json!(target))
}
use crate::util::result::{Error, Result};
use crate::util::variables::MAX_GROUP_SIZE;
use crate::{database::*, notifications::events::ClientboundNotification};
use mongodb::bson::doc;
#[put("/<target>/recipients/<member>")]
pub async fn req(user: User, target: Ref, member: Ref) -> Result<()> {
if get_relationship(&user, &member.id) != RelationshipStatus::Friend {
Err(Error::NotFriends)?
}
let channel = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&channel)
.for_channel()
.await?;
if !perm.get_invite_others() {
Err(Error::MissingPermission)?
}
if let Channel::Group { id, recipients, .. } = &channel {
if recipients.len() >= *MAX_GROUP_SIZE {
Err(Error::GroupTooLarge {
max: *MAX_GROUP_SIZE,
})?
}
if recipients.iter().find(|x| *x == &member.id).is_some() {
Err(Error::AlreadyInGroup)?
}
get_collection("channels")
.update_one(
doc! {
"_id": &id
},
doc! {
"$push": {
"recipients": &member.id
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel",
})?;
ClientboundNotification::ChannelGroupJoin {
id: id.clone(),
user: member.id.clone(),
}
.publish(id.clone());
Content::SystemMessage(SystemMessage::UserAdded {
id: member.id,
by: user.id,
})
.send_as_system(&channel)
.await
.ok();
Ok(())
} else {
Err(Error::InvalidOperation)
}
}