Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
No results found
Show changes
Showing
with 775 additions and 164 deletions
......@@ -9,16 +9,23 @@ use validator::Validate;
#[derive(Validate, Serialize, Deserialize)]
pub struct Ref {
#[validate(length(min = 26, max = 26))]
#[validate(length(min = 1, max = 26))]
pub id: String,
}
impl Ref {
pub fn from_unchecked(id: String) -> Ref {
Ref { id }
}
pub fn from(id: String) -> Result<Ref> {
Ok(Ref { id })
let r = Ref { id };
r.validate()
.map_err(|error| Error::FailedValidation { error })?;
Ok(r)
}
pub async fn fetch<T: DeserializeOwned>(&self, collection: &'static str) -> Result<T> {
async fn fetch<T: DeserializeOwned>(&self, collection: &'static str) -> Result<T> {
let doc = get_collection(&collection)
.find_one(
doc! {
......@@ -31,7 +38,7 @@ impl Ref {
operation: "find_one",
with: &collection,
})?
.ok_or_else(|| Error::UnknownUser)?;
.ok_or_else(|| Error::NotFound)?;
Ok(from_document::<T>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
......@@ -47,6 +54,60 @@ impl Ref {
self.fetch("channels").await
}
pub async fn fetch_server(&self) -> Result<Server> {
self.fetch("servers").await
}
pub async fn fetch_invite(&self) -> Result<Invite> {
self.fetch("channel_invites").await
}
pub async fn fetch_member(&self, server: &str) -> Result<Member> {
let doc = get_collection("server_members")
.find_one(
doc! {
"_id.user": &self.id,
"_id.server": server
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_member",
})?
.ok_or_else(|| Error::NotFound)?;
Ok(
from_document::<Member>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server_member",
})?,
)
}
pub async fn fetch_ban(&self, server: &str) -> Result<Ban> {
let doc = get_collection("server_bans")
.find_one(
doc! {
"_id.user": &self.id,
"_id.server": server
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_ban",
})?
.ok_or_else(|| Error::NotFound)?;
Ok(from_document::<Ban>(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server_ban",
})?)
}
pub async fn fetch_message(&self, channel: &Channel) -> Result<Message> {
let message: Message = self.fetch("messages").await?;
if &message.channel != channel.id() {
......
......@@ -29,18 +29,30 @@ pub async fn create_database() {
.await
.expect("Failed to create servers collection.");
db.create_collection("migrations", None)
db.create_collection("server_members", None)
.await
.expect("Failed to create migrations collection.");
.expect("Failed to create server_members collection.");
db.create_collection("attachments", None)
db.create_collection("server_bans", None)
.await
.expect("Failed to create attachments collection.");
.expect("Failed to create server_bans collection.");
db.create_collection("channel_invites", None)
.await
.expect("Failed to create channel_invites collection.");
db.create_collection("channel_unreads", None)
.await
.expect("Failed to create channel_unreads collection.");
db.create_collection("migrations", None)
.await
.expect("Failed to create migrations collection.");
db.create_collection("attachments", None)
.await
.expect("Failed to create attachments collection.");
db.create_collection("user_settings", None)
.await
.expect("Failed to create user_settings collection.");
......@@ -110,6 +122,23 @@ pub async fn create_database() {
.await
.expect("Failed to create username index.");
db.run_command(
doc! {
"createIndexes": "messages",
"indexes": [
{
"key": {
"content": "text"
},
"name": "content"
}
]
},
None,
)
.await
.expect("Failed to create message index.");
db.collection("migrations")
.insert_one(
doc! {
......
use crate::database::{get_collection, get_db};
use crate::database::{permissions, get_collection, get_db, PermissionTuple};
use log::info;
use futures::StreamExt;
use log::info;
use mongodb::{bson::{doc, from_document, to_document}, options::FindOptions};
use serde::{Deserialize, Serialize};
use mongodb::{bson::{doc, from_document}, options::FindOptions};
#[derive(Serialize, Deserialize)]
struct MigrationInfo {
......@@ -11,7 +11,7 @@ struct MigrationInfo {
revision: i32,
}
pub const LATEST_REVISION: i32 = 4;
pub const LATEST_REVISION: i32 = 7;
pub async fn migrate_database() {
let migrations = get_collection("migrations");
......@@ -92,34 +92,36 @@ pub async fn run_migrations(revision: i32) -> i32 {
info!("Running migration [revision 3 / 2021-05-25]: Support multiple file uploads, add channel_unreads and user_settings.");
let messages = get_collection("messages");
let mut cursor = messages.find(
doc! {
"attachment": {
"$exists": 1
}
},
FindOptions::builder()
.projection(doc! {
"_id": 1,
"attachments": [ "$attachment" ]
})
.build()
)
.await
.expect("Failed to fetch messages.");
let mut cursor = messages
.find(
doc! {
"attachment": {
"$exists": 1
}
},
FindOptions::builder()
.projection(doc! {
"_id": 1,
"attachments": [ "$attachment" ]
})
.build(),
)
.await
.expect("Failed to fetch messages.");
while let Some(result) = cursor.next().await {
let doc = result.unwrap();
let id = doc.get_str("_id").unwrap();
let attachments = doc.get_array("attachments").unwrap();
messages.update_one(
doc! { "_id": id },
doc! { "$unset": { "attachment": 1 }, "$set": { "attachments": attachments } },
None
)
.await
.unwrap();
messages
.update_one(
doc! { "_id": id },
doc! { "$unset": { "attachment": 1 }, "$set": { "attachments": attachments } },
None,
)
.await
.unwrap();
}
get_db()
......@@ -133,6 +135,74 @@ pub async fn run_migrations(revision: i32) -> i32 {
.expect("Failed to create user_settings collection.");
}
if revision <= 4 {
info!("Running migration [revision 4 / 2021-06-01]: Add more server collections.");
get_db()
.create_collection("server_members", None)
.await
.expect("Failed to create server_members collection.");
get_db()
.create_collection("server_bans", None)
.await
.expect("Failed to create server_bans collection.");
get_db()
.create_collection("channel_invites", None)
.await
.expect("Failed to create channel_invites collection.");
}
if revision <= 5 {
info!("Running migration [revision 5 / 2021-06-26]: Add permissions.");
#[derive(Serialize)]
struct Server {
pub default_permissions: PermissionTuple,
}
let server = Server {
default_permissions: (
*permissions::server::DEFAULT_PERMISSION as i32,
*permissions::channel::DEFAULT_PERMISSION_SERVER as i32
)
};
get_collection("servers")
.update_many(
doc! { },
doc! {
"$set": to_document(&server).unwrap()
},
None
)
.await
.expect("Failed to migrate servers.");
}
if revision <= 6 {
info!("Running migration [revision 6 / 2021-07-09]: Add message text index.");
get_db()
.run_command(
doc! {
"createIndexes": "messages",
"indexes": [
{
"key": {
"content": "text"
},
"name": "content"
}
]
},
None,
)
.await
.expect("Failed to create message index.");
}
// Reminder to update LATEST_REVISION when adding new migrations.
LATEST_REVISION
}
use crate::database::*;
use crate::util::result::Result;
use crate::util::result::{Error, Result};
use super::PermissionCalculator;
......@@ -9,11 +9,33 @@ use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)]
pub enum ChannelPermission {
View = 1,
SendMessage = 2,
ManageMessages = 4,
ManageChannel = 8,
VoiceCall = 16,
View = 0b00000000000000000000000000000001, // 1
SendMessage = 0b00000000000000000000000000000010, // 2
ManageMessages = 0b00000000000000000000000000000100, // 4
ManageChannel = 0b00000000000000000000000000001000, // 8
VoiceCall = 0b00000000000000000000000000010000, // 16
InviteOthers = 0b00000000000000000000000000100000, // 32
EmbedLinks = 0b00000000000000000000000001000000, // 64
UploadFiles = 0b00000000000000000000000010000000, // 128
}
lazy_static! {
pub static ref DEFAULT_PERMISSION_DM: u32 =
ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::ManageChannel
+ ChannelPermission::VoiceCall
+ ChannelPermission::InviteOthers
+ ChannelPermission::EmbedLinks
+ ChannelPermission::UploadFiles;
pub static ref DEFAULT_PERMISSION_SERVER: u32 =
ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::VoiceCall
+ ChannelPermission::InviteOthers
+ ChannelPermission::EmbedLinks
+ ChannelPermission::UploadFiles;
}
impl_op_ex!(+ |a: &ChannelPermission, b: &ChannelPermission| -> u32 { *a as u32 | *b as u32 });
......@@ -27,6 +49,9 @@ bitfield! {
pub get_manage_messages, _: 29;
pub get_manage_channel, _: 28;
pub get_voice_call, _: 27;
pub get_invite_others, _: 26;
pub get_embed_links, _: 25;
pub get_upload_files, _: 24;
}
impl<'a> PermissionCalculator<'a> {
......@@ -40,7 +65,7 @@ impl<'a> PermissionCalculator<'a> {
match channel {
Channel::SavedMessages { user: owner, .. } => {
if &self.perspective.id == owner {
Ok(u32::MAX - ChannelPermission::VoiceCall as u32)
Ok(u32::MAX)
} else {
Ok(0)
}
......@@ -56,9 +81,7 @@ impl<'a> PermissionCalculator<'a> {
let perms = self.for_user(recipient).await?;
if perms.get_send_message() {
return Ok(ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::VoiceCall);
return Ok(*DEFAULT_PERMISSION_DM);
}
return Ok(ChannelPermission::View as u32);
......@@ -67,20 +90,63 @@ impl<'a> PermissionCalculator<'a> {
Ok(0)
}
Channel::Group { recipients, .. } => {
Channel::Group { recipients, permissions, owner, .. } => {
if &self.perspective.id == owner {
return Ok(*DEFAULT_PERMISSION_DM)
}
if recipients
.iter()
.find(|x| *x == &self.perspective.id)
.is_some()
{
Ok(ChannelPermission::View
+ ChannelPermission::SendMessage
+ ChannelPermission::ManageChannel
+ ChannelPermission::VoiceCall)
if let Some(permissions) = permissions {
Ok(permissions.clone() as u32)
} else {
Ok(*DEFAULT_PERMISSION_DM)
}
} else {
Ok(0)
}
}
Channel::TextChannel { server, default_permissions, role_permissions, .. }
| Channel::VoiceChannel { server, default_permissions, role_permissions, .. } => {
let server = Ref::from_unchecked(server.clone()).fetch_server().await?;
if self.perspective.id == server.owner {
Ok(u32::MAX)
} else {
match Ref::from_unchecked(self.perspective.id.clone()).fetch_member(&server.id).await {
Ok(member) => {
let mut perm = if let Some(permission) = default_permissions {
*permission as u32
} else {
server.default_permissions.1 as u32
};
if let Some(roles) = member.roles {
for role in roles {
if let Some(permission) = role_permissions.get(&role) {
perm |= *permission as u32;
}
if let Some(server_role) = server.roles.get(&role) {
perm |= server_role.permissions.1 as u32;
}
}
}
Ok(perm)
}
Err(error) => {
match &error {
Error::NotFound => Ok(0),
_ => Err(error)
}
}
}
}
}
}
}
......
pub use crate::database::*;
pub mod channel;
pub mod server;
pub mod user;
pub use user::get_relationship;
......@@ -11,6 +12,8 @@ pub struct PermissionCalculator<'a> {
user: Option<&'a User>,
relationship: Option<&'a RelationshipStatus>,
channel: Option<&'a Channel>,
server: Option<&'a Server>,
// member: Option<&'a Member>,
has_mutual_connection: bool,
}
......@@ -23,6 +26,8 @@ impl<'a> PermissionCalculator<'a> {
user: None,
relationship: None,
channel: None,
server: None,
// member: None,
has_mutual_connection: false,
}
......@@ -49,6 +54,20 @@ impl<'a> PermissionCalculator<'a> {
}
}
pub fn with_server(self, server: &'a Server) -> PermissionCalculator {
PermissionCalculator {
server: Some(&server),
..self
}
}
/* pub fn with_member(self, member: &'a Member) -> PermissionCalculator {
PermissionCalculator {
member: Some(&member),
..self
}
} */
pub fn with_mutual_connection(self) -> PermissionCalculator<'a> {
PermissionCalculator {
has_mutual_connection: true,
......
use crate::util::result::{Error, Result};
use super::PermissionCalculator;
use super::Ref;
use num_enum::TryFromPrimitive;
use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)]
pub enum ServerPermission {
View = 0b00000000000000000000000000000001, // 1
ManageRoles = 0b00000000000000000000000000000010, // 2
ManageChannels = 0b00000000000000000000000000000100, // 4
ManageServer = 0b00000000000000000000000000001000, // 8
KickMembers = 0b00000000000000000000000000010000, // 16
BanMembers = 0b00000000000000000000000000100000, // 32
// 6 bits of space
ChangeNickname = 0b00000000000000000001000000000000, // 4096
ManageNicknames = 0b00000000000000000010000000000000, // 8192
ChangeAvatar = 0b00000000000000000100000000000000, // 16382
RemoveAvatars = 0b00000000000000001000000000000000, // 32768
// 16 bits of space
}
lazy_static! {
pub static ref DEFAULT_PERMISSION: u32 =
ServerPermission::View
+ ServerPermission::ChangeNickname
+ ServerPermission::ChangeAvatar;
}
impl_op_ex!(+ |a: &ServerPermission, b: &ServerPermission| -> u32 { *a as u32 | *b as u32 });
impl_op_ex_commutative!(+ |a: &u32, b: &ServerPermission| -> u32 { *a | *b as u32 });
bitfield! {
pub struct ServerPermissions(MSB0 [u32]);
u32;
pub get_view, _: 31;
pub get_manage_roles, _: 30;
pub get_manage_channels, _: 29;
pub get_manage_server, _: 28;
pub get_kick_members, _: 27;
pub get_ban_members, _: 26;
pub get_change_nickname, _: 19;
pub get_manage_nicknames, _: 18;
pub get_change_avatar, _: 17;
pub get_remove_avatars, _: 16;
}
impl<'a> PermissionCalculator<'a> {
pub async fn calculate_server(self) -> Result<u32> {
let server = if let Some(server) = self.server {
server
} else {
unreachable!()
};
if self.perspective.id == server.owner {
Ok(u32::MAX)
} else {
match Ref::from_unchecked(self.perspective.id.clone()).fetch_member(&server.id).await {
Ok(member) => {
let mut perm = server.default_permissions.0 as u32;
if let Some(roles) = member.roles {
for role in roles {
if let Some(server_role) = server.roles.get(&role) {
perm |= server_role.permissions.0 as u32;
}
}
}
Ok(perm)
}
Err(error) => {
match &error {
Error::NotFound => Ok(0),
_ => Err(error)
}
}
}
}
}
pub async fn for_server(self) -> Result<ServerPermissions<[u32; 1]>> {
Ok(ServerPermissions([self.calculate_server().await?]))
}
}
......@@ -10,10 +10,10 @@ use std::ops;
#[derive(Debug, PartialEq, Eq, TryFromPrimitive, Copy, Clone)]
#[repr(u32)]
pub enum UserPermission {
Access = 1,
ViewProfile = 2,
SendMessage = 4,
Invite = 8,
Access = 0b00000000000000000000000000000001, // 1
ViewProfile = 0b00000000000000000000000000000010, // 2
SendMessage = 0b00000000000000000000000000000100, // 4
Invite = 0b00000000000000000000000000001000, // 8
}
bitfield! {
......@@ -30,7 +30,7 @@ impl_op_ex_commutative!(+ |a: &u32, b: &UserPermission| -> u32 { *a | *b as u32
pub fn get_relationship(a: &User, b: &str) -> RelationshipStatus {
if a.id == b {
return RelationshipStatus::Friend;
return RelationshipStatus::User;
}
if let Some(relations) = &a.relations {
......@@ -55,7 +55,7 @@ impl<'a> PermissionCalculator<'a> {
.map(|v| v.to_owned())
.unwrap_or_else(|| get_relationship(&self.perspective, &target))
{
RelationshipStatus::Friend => return Ok(u32::MAX),
RelationshipStatus::Friend | RelationshipStatus::User => return Ok(u32::MAX),
RelationshipStatus::Blocked | RelationshipStatus::BlockedOther => {
return Ok(UserPermission::Access as u32)
}
......@@ -64,12 +64,36 @@ impl<'a> PermissionCalculator<'a> {
// ! INFO: if we add boolean switch for permission to
// ! message people who have mutual, we need to get
// ! rid of this return statement.
return Ok(permissions);
// return Ok(permissions);
}
_ => {}
}
let check_server_overlap = async || {
let server_ids = User::fetch_server_ids(&self.perspective.id).await?;
Ok(
get_collection("server_members")
.find_one(
doc! {
"_id.user": &target,
"_id.server": {
"$in": server_ids
}
},
None
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find_one",
with: "server_members",
})?
.is_some()
)
};
if self.has_mutual_connection
|| check_server_overlap().await?
|| get_collection("channels")
.find_one(
doc! {
......@@ -84,7 +108,7 @@ impl<'a> PermissionCalculator<'a> {
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
operation: "find_one",
with: "channels",
})?
.is_some()
......
......@@ -86,14 +86,19 @@ async fn launch_web() {
templates: Templates {
verify_email: Template {
title: "Verify your REVOLT account.",
text: "Verify your email here: {{url}}",
html: include_str!("../assets/templates/verify.html"),
title: "Verify your Revolt account.",
text: "You're almost there!
If you did not perform this action you can safely ignore this email.
Please verify your account here: {{url}}",
html: None,
},
reset_password: Template {
title: "Reset your REVOLT password.",
text: "Reset your password here: {{url}}",
html: include_str!("../assets/templates/reset.html"),
title: "Reset your Revolt password.",
text: "You requested a password reset, if you did not perform this action you can safely ignore this email.
Reset your password here: {{url}}",
html: None,
},
welcome: None,
},
......
use hive_pubsub::PubSub;
use mongodb::bson::doc;
use rauth::auth::Session;
use rocket_contrib::json::JsonValue;
use serde::{Deserialize, Serialize};
use super::hive::{get_hive, subscribe_if_exists};
use crate::database::*;
use crate::{database::*, util::result::Result};
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "error")]
pub enum WebSocketError {
LabelMe,
......@@ -24,7 +25,7 @@ pub enum ServerboundNotification {
EndTyping { channel: String },
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveUserField {
ProfileContent,
ProfileBackground,
......@@ -32,19 +33,40 @@ pub enum RemoveUserField {
Avatar,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveChannelField {
Icon,
Description
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveServerField {
Icon,
Banner,
Description,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveRoleField {
Colour,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub enum RemoveMemberField {
Nickname,
Avatar,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type")]
pub enum ClientboundNotification {
Error(WebSocketError),
Authenticated,
Ready {
users: Vec<User>,
servers: Vec<Server>,
channels: Vec<Channel>,
members: Vec<Member>
},
Message(Message),
......@@ -65,6 +87,9 @@ pub enum ClientboundNotification {
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveChannelField>,
},
ChannelDelete {
id: String,
},
ChannelGroupJoin {
id: String,
user: String,
......@@ -73,9 +98,6 @@ pub enum ClientboundNotification {
id: String,
user: String,
},
ChannelDelete {
id: String,
},
ChannelStartTyping {
id: String,
user: String,
......@@ -84,6 +106,46 @@ pub enum ClientboundNotification {
id: String,
user: String,
},
ChannelAck {
id: String,
user: String,
message_id: String,
},
ServerUpdate {
id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveServerField>,
},
ServerDelete {
id: String,
},
ServerMemberUpdate {
id: MemberCompositeKey,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveMemberField>,
},
ServerMemberJoin {
id: String,
user: String,
},
ServerMemberLeave {
id: String,
user: String,
},
ServerRoleUpdate {
id: String,
role_id: String,
data: JsonValue,
#[serde(skip_serializing_if = "Option::is_none")]
clear: Option<RemoveRoleField>
},
ServerRoleDelete {
id: String,
role_id: String
},
UserUpdate {
id: String,
......@@ -96,24 +158,36 @@ pub enum ClientboundNotification {
user: User,
status: RelationshipStatus,
},
UserPresence {
UserSettingsUpdate {
id: String,
online: bool,
update: JsonValue,
},
}
impl ClientboundNotification {
pub fn publish(self, topic: String) {
async_std::task::spawn(async move {
prehandle_hook(&self); // ! TODO: this should be moved to pubsub
prehandle_hook(&self).await.ok(); // ! FIXME: this should be moved to pubsub
hive_pubsub::backend::mongo::publish(get_hive(), &topic, self)
.await
.ok();
});
}
pub fn publish_as_user(self, user: String) {
self.clone().publish(user.clone());
async_std::task::spawn(async move {
if let Ok(server_ids) = User::fetch_server_ids(&user).await {
for server in server_ids {
self.clone().publish(server.clone());
}
}
});
}
}
pub fn prehandle_hook(notification: &ClientboundNotification) {
pub async fn prehandle_hook(notification: &ClientboundNotification) -> Result<()> {
match &notification {
ClientboundNotification::ChannelGroupJoin { id, user } => {
subscribe_if_exists(user.clone(), id.clone()).ok();
......@@ -129,6 +203,23 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
subscribe_if_exists(recipient.clone(), channel_id.to_string()).ok();
}
}
Channel::TextChannel { server, .. }
| Channel::VoiceChannel { server, .. } => {
// ! FIXME: write a better algorithm?
let members = Server::fetch_member_ids(server).await?;
for member in members {
subscribe_if_exists(member.clone(), channel_id.to_string()).ok();
}
}
}
}
ClientboundNotification::ServerMemberJoin { id, user } => {
let server = Ref::from_unchecked(id.clone()).fetch_server().await?;
subscribe_if_exists(user.clone(), id.clone()).ok();
for channel in server.channels {
subscribe_if_exists(user.clone(), channel).ok();
}
}
ClientboundNotification::UserRelationship { id, user, status } => {
......@@ -138,27 +229,35 @@ pub fn prehandle_hook(notification: &ClientboundNotification) {
}
_ => {}
}
Ok(())
}
pub fn posthandle_hook(notification: &ClientboundNotification) {
pub async fn posthandle_hook(notification: &ClientboundNotification) {
match &notification {
ClientboundNotification::ChannelDelete { id } => {
get_hive().hive.drop_topic(&id).ok();
}
ClientboundNotification::ChannelGroupLeave { id, user } => {
get_hive().hive.unsubscribe(user, id).ok();
}
ClientboundNotification::ServerDelete { id } => {
get_hive().hive.drop_topic(&id).ok();
}
ClientboundNotification::ServerMemberLeave { id, user } => {
get_hive().hive.unsubscribe(user, id).ok();
if let Ok(server) = Ref::from_unchecked(id.clone()).fetch_server().await {
for channel in server.channels {
get_hive().hive.unsubscribe(user, &channel).ok();
}
}
}
ClientboundNotification::UserRelationship { id, user, status } => {
if status == &RelationshipStatus::None {
get_hive()
.hive
.unsubscribe(&id.to_string(), &user.id.to_string())
.ok();
get_hive().hive.unsubscribe(id, &user.id).ok();
}
}
ClientboundNotification::ChannelGroupLeave { id, user } => {
get_hive()
.hive
.unsubscribe(&user.to_string(), &id.to_string())
.ok();
}
_ => {}
}
}
......@@ -13,8 +13,11 @@ static HIVE: OnceCell<Hive> = OnceCell::new();
pub async fn init_hive() {
let hive = MongodbPubSub::new(
|ids, notification| {
super::events::posthandle_hook(&notification);
|ids, notification: ClientboundNotification| {
let notif = notification.clone();
async_std::task::spawn(async move {
super::events::posthandle_hook(&notif).await;
});
if let Ok(data) = to_string(&notification) {
debug!("Pushing out notification. {}", data);
......
......@@ -19,10 +19,49 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
);
}
let members = User::fetch_memberships(&user.id).await?;
let server_ids: Vec<String> = members.iter()
.map(|x| x.id.server.clone())
.collect();
let mut cursor = get_collection("servers")
.find(
doc! {
"_id": {
"$in": server_ids
}
},
None,
)
.await
.map_err(|_| Error::DatabaseError {
operation: "find",
with: "servers",
})?;
let mut servers = vec![];
let mut channel_ids = vec![];
while let Some(result) = cursor.next().await {
if let Ok(doc) = result {
let server: Server = from_document(doc).map_err(|_| Error::DatabaseError {
operation: "from_document",
with: "server",
})?;
channel_ids.extend(server.channels.iter().cloned());
servers.push(server);
}
}
let mut cursor = get_collection("channels")
.find(
doc! {
"$or": [
{
"_id": {
"$in": channel_ids
}
},
{
"channel_type": "SavedMessages",
"user": &user.id
......@@ -75,5 +114,10 @@ pub async fn generate_ready(mut user: User) -> Result<ClientboundNotification> {
user.online = Some(true);
users.push(user);
Ok(ClientboundNotification::Ready { users, channels })
Ok(ClientboundNotification::Ready {
users,
servers,
channels,
members
})
}
......@@ -4,6 +4,7 @@ use super::hive::get_hive;
use futures::StreamExt;
use hive_pubsub::PubSub;
use mongodb::bson::doc;
use mongodb::bson::Document;
use mongodb::options::FindOptions;
pub async fn generate_subscriptions(user: &User) -> Result<(), String> {
......@@ -16,6 +17,43 @@ pub async fn generate_subscriptions(user: &User) -> Result<(), String> {
}
}
let server_ids = User::fetch_server_ids(&user.id)
.await
.map_err(|_| "Failed to fetch memberships.".to_string())?;
let channel_ids = get_collection("servers")
.find(
doc! {
"_id": {
"$in": &server_ids
}
},
None,
)
.await
.map_err(|_| "Failed to fetch servers.".to_string())?
.filter_map(async move |s| s.ok())
.collect::<Vec<Document>>()
.await
.into_iter()
.filter_map(|x| {
x.get_array("channels").ok().map(|v| {
v.into_iter()
.filter_map(|x| x.as_str().map(|x| x.to_string()))
.collect::<Vec<String>>()
})
})
.flatten()
.collect::<Vec<String>>();
for id in server_ids {
hive.subscribe(user.id.clone(), id)?;
}
for id in channel_ids {
hive.subscribe(user.id.clone(), id)?;
}
let mut cursor = get_collection("channels")
.find(
doc! {
......
......@@ -129,11 +129,14 @@ async fn accept(stream: TcpStream) {
send(payload);
if !was_online {
ClientboundNotification::UserPresence {
ClientboundNotification::UserUpdate {
id: id.clone(),
online: true,
data: json!({
"online": true
}),
clear: None
}
.publish(id);
.publish_as_user(id);
}
}
Err(_) => {
......@@ -228,11 +231,14 @@ async fn accept(stream: TcpStream) {
}
if let Some(id) = offline {
ClientboundNotification::UserPresence {
ClientboundNotification::UserUpdate {
id: id.clone(),
online: false,
data: json!({
"online": false
}),
clear: None
}
.publish(id);
.publish_as_user(id);
}
}
......@@ -242,10 +248,15 @@ pub fn publish(ids: Vec<String>, notification: ClientboundNotification) {
let users = USERS.read().unwrap();
for id in ids {
// Block certain notifications from reaching users that aren't meant to see them.
if let ClientboundNotification::UserRelationship { id: user_id, .. } = &notification {
if &id != user_id {
continue;
match &notification {
ClientboundNotification::UserRelationship { id: user_id, .. }
| ClientboundNotification::UserSettingsUpdate { id: user_id, .. }
| ClientboundNotification::ChannelAck { user: user_id, .. } => {
if &id != user_id {
continue;
}
}
_ => {}
}
if let Some(mut arr) = users.get_left(&id) {
......
use crate::database::*;
use crate::notifications::events::ClientboundNotification;
use crate::util::result::{Error, Result};
use mongodb::bson::doc;
use mongodb::options::UpdateOptions;
#[put("/<target>/ack/<message>")]
pub async fn req(user: User, target: Ref, message: Ref) -> Result<()> {
let target = target.fetch_channel().await?;
let perm = permissions::PermissionCalculator::new(&user)
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
let id = target.id();
get_collection("channel_unreads")
.update_one(
doc! {
"_id.channel": id,
"_id.user": &user.id
},
doc! {
"$unset": {
"mentions": 1
},
"$set": {
"last_id": &message.id
}
},
UpdateOptions::builder().upsert(true).build(),
)
.await
.map_err(|_| Error::DatabaseError {
operation: "update_one",
with: "channel_unreads",
})?;
ClientboundNotification::ChannelAck {
id: id.to_string(),
user: user.id.clone(),
message_id: message.id,
}
.publish(user.id);
Ok(())
}
......@@ -11,6 +11,7 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
......@@ -97,16 +98,20 @@ pub async fn req(user: User, target: Ref) -> Result<()> {
}
.publish(id.clone());
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::UserLeft { id: user.id }),
)
.publish(&target)
.await
.ok();
Content::SystemMessage(SystemMessage::UserLeft { id: user.id })
.send_as_system(&target)
.await
.ok();
Ok(())
}
Channel::TextChannel { .. } |
Channel::VoiceChannel { .. } => {
if perm.get_manage_channel() {
target.delete().await
} else {
Err(Error::MissingPermission)
}
}
}
}
......@@ -45,7 +45,9 @@ pub async fn req(user: User, target: Ref, data: Json<Data>) -> Result<()> {
}
match &target {
Channel::Group { id, icon, .. } => {
Channel::Group { id, icon, .. }
| Channel::TextChannel { id, icon, .. }
| Channel::VoiceChannel { id, icon, .. } => {
let mut set = doc! {};
let mut unset = doc! {};
......@@ -56,6 +58,9 @@ pub async fn req(user: User, target: Ref, data: Json<Data>) -> Result<()> {
unset.insert("icon", 1);
remove_icon = true;
}
RemoveChannelField::Description => {
unset.insert("description", 1);
}
}
}
......@@ -69,7 +74,7 @@ pub async fn req(user: User, target: Ref, data: Json<Data>) -> Result<()> {
if let Some(attachment_id) = &data.icon {
let attachment =
File::find_and_use(&attachment_id, "icons", "object", &user.id).await?;
File::find_and_use(&attachment_id, "icons", "object", target.id()).await?;
set.insert(
"icon",
to_document(&attachment).map_err(|_| Error::DatabaseError {
......@@ -107,37 +112,32 @@ pub async fn req(user: User, target: Ref, data: Json<Data>) -> Result<()> {
}
.publish(id.clone());
if let Some(name) = data.name {
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::ChannelRenamed { name, by: user.id.clone() }),
)
.publish(&target)
.await
.ok();
}
if let Channel::Group { .. } = &target {
if let Some(name) = data.name {
Content::SystemMessage(SystemMessage::ChannelRenamed {
name,
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.description {
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::ChannelDescriptionChanged { by: user.id.clone() }),
)
.publish(&target)
.await
.ok();
}
if let Some(_) = data.description {
Content::SystemMessage(SystemMessage::ChannelDescriptionChanged {
by: user.id.clone(),
})
.send_as_system(&target)
.await
.ok();
}
if let Some(_) = data.icon {
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::ChannelIconChanged { by: user.id }),
)
.publish(&target)
.await
.ok();
if let Some(_) = data.icon {
Content::SystemMessage(SystemMessage::ChannelIconChanged { by: user.id })
.send_as_system(&target)
.await
.ok();
}
}
if remove_icon {
......
......@@ -11,6 +11,7 @@ pub async fn req(user: User, target: Ref) -> Result<JsonValue> {
.with_channel(&target)
.for_channel()
.await?;
if !perm.get_view() {
Err(Error::MissingPermission)?
}
......
......@@ -15,7 +15,8 @@ pub async fn req(user: User, target: Ref, member: Ref) -> Result<()> {
.with_channel(&channel)
.for_channel()
.await?;
if !perm.get_view() {
if !perm.get_invite_others() {
Err(Error::MissingPermission)?
}
......@@ -54,15 +55,11 @@ pub async fn req(user: User, target: Ref, member: Ref) -> Result<()> {
}
.publish(id.clone());
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::UserAdded {
id: member.id,
by: user.id,
}),
)
.publish(&channel)
Content::SystemMessage(SystemMessage::UserAdded {
id: member.id,
by: user.id,
})
.send_as_system(&channel)
.await
.ok();
......
......@@ -24,6 +24,7 @@ pub struct Data {
#[post("/create", data = "<info>")]
pub async fn req(user: User, info: Json<Data>) -> Result<JsonValue> {
let info = info.into_inner();
info.validate()
.map_err(|error| Error::FailedValidation { error })?;
......@@ -37,8 +38,11 @@ pub async fn req(user: User, info: Json<Data>) -> Result<JsonValue> {
}
for target in &set {
if get_relationship(&user, target) != RelationshipStatus::Friend {
Err(Error::NotFriends)?
match get_relationship(&user, target) {
RelationshipStatus::Friend | RelationshipStatus::User => {}
_ => {
return Err(Error::NotFriends);
}
}
}
......@@ -62,16 +66,14 @@ pub async fn req(user: User, info: Json<Data>) -> Result<JsonValue> {
let id = Ulid::new().to_string();
let channel = Channel::Group {
id,
nonce: Some(info.nonce.clone()),
name: info.name.clone(),
description: info
.description
.clone()
.unwrap_or_else(|| "A group.".to_string()),
nonce: Some(info.nonce),
name: info.name,
description: info.description,
owner: user.id,
recipients: set.into_iter().collect::<Vec<String>>(),
icon: None,
last_message: None,
permissions: None
};
channel.clone().publish().await?;
......
......@@ -51,15 +51,11 @@ pub async fn req(user: User, target: Ref, member: Ref) -> Result<()> {
}
.publish(id.clone());
Message::create(
"00000000000000000000000000".to_string(),
id.clone(),
Content::SystemMessage(SystemMessage::UserRemove {
id: member.id,
by: user.id,
}),
)
.publish(&channel)
Content::SystemMessage(SystemMessage::UserRemove {
id: member.id,
by: user.id,
})
.send_as_system(&channel)
.await
.ok();
......