Skip to content
Snippets Groups Projects
Commit af56f5e2 authored by insert's avatar insert
Browse files

Add hive to main join!().

parent c704f13d
No related merge requests found
Pipeline #409 passed with stage
in 2 minutes and 18 seconds
...@@ -36,13 +36,18 @@ async fn entry() { ...@@ -36,13 +36,18 @@ async fn entry() {
util::variables::preflight_checks(); util::variables::preflight_checks();
database::connect().await; database::connect().await;
notifications::hive::init_hive().await;
ctrlc::set_handler(move || { ctrlc::set_handler(move || {
// Force ungraceful exit to avoid hang. // Force ungraceful exit to avoid hang.
std::process::exit(0); std::process::exit(0);
}).expect("Error setting Ctrl-C handler"); }).expect("Error setting Ctrl-C handler");
join!(launch_web(), notifications::websocket::launch_server()); join!(
launch_web(),
notifications::websocket::launch_server(),
notifications::hive::listen(),
);
} }
async fn launch_web() { async fn launch_web() {
......
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use rauth::auth::Session;
use snafu::Snafu;
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Snafu)]
#[serde(tag = "type")] #[serde(tag = "type")]
pub enum Notification { pub enum WebSocketError {
MessageCreate { #[snafu(display("This error has not been labelled."))]
LabelMe,
#[snafu(display("Internal server error."))]
InternalError,
}
#[derive(Deserialize, Debug)]
#[serde(tag = "type")]
pub enum ServerboundNotification {
Authenticate(Session)
}
#[derive(Serialize, Deserialize, Debug)]
#[serde(tag = "type")]
pub enum ClientboundNotification {
Error(WebSocketError),
/*MessageCreate {
id: String, id: String,
nonce: Option<String>, nonce: Option<String>,
channel: String, channel: String,
...@@ -57,7 +77,7 @@ pub enum Notification { ...@@ -57,7 +77,7 @@ pub enum Notification {
GuildDelete { GuildDelete {
id: String, id: String,
}, },*/
UserRelationship { UserRelationship {
id: String, id: String,
......
use super::events::Notification; use super::events::ClientboundNotification;
// use super::websocket;
use crate::database::get_collection; use crate::database::get_collection;
use hive_pubsub::backend::mongo::MongodbPubSub; use hive_pubsub::backend::mongo::MongodbPubSub;
use hive_pubsub::PubSub; use hive_pubsub::PubSub;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use serde_json::to_string; use serde_json::to_string;
use futures::FutureExt;
use log::{error, debug}; use log::{error, debug};
static HIVE: OnceCell<MongodbPubSub<String, String, Notification>> = OnceCell::new(); static HIVE: OnceCell<MongodbPubSub<String, String, ClientboundNotification>> = OnceCell::new();
pub async fn init_hive() { pub async fn init_hive() {
let hive = MongodbPubSub::new( let hive = MongodbPubSub::new(
...@@ -20,7 +20,7 @@ pub async fn init_hive() { ...@@ -20,7 +20,7 @@ pub async fn init_hive() {
error!("Failed to serialise notification."); error!("Failed to serialise notification.");
} }
}, },
get_collection("hive"), get_collection("pubsub"),
); );
if HIVE.set(hive).is_err() { if HIVE.set(hive).is_err() {
...@@ -28,7 +28,18 @@ pub async fn init_hive() { ...@@ -28,7 +28,18 @@ pub async fn init_hive() {
} }
} }
pub fn publish(topic: &String, data: Notification) -> Result<(), String> { pub async fn listen() {
HIVE.get()
.unwrap()
.listen()
.fuse()
.await
.expect("Hive hit an error");
dbg!("a");
}
pub fn publish(topic: &String, data: ClientboundNotification) -> Result<(), String> {
let hive = HIVE.get().unwrap(); let hive = HIVE.get().unwrap();
hive.publish(topic, data) hive.publish(topic, data)
} }
......
use crate::util::variables::WS_HOST; use crate::util::variables::WS_HOST;
use log::info; use log::info;
use ulid::Ulid;
use async_std::task; use async_std::task;
use futures::prelude::*; use futures::prelude::*;
use std::str::from_utf8;
use std::sync::{Arc, RwLock};
use many_to_many::ManyToMany;
use std::collections::HashMap;
use futures::stream::SplitSink;
use async_tungstenite::WebSocketStream;
use async_tungstenite::tungstenite::Message;
use async_std::net::{TcpListener, TcpStream}; use async_std::net::{TcpListener, TcpStream};
lazy_static! {
static ref CONNECTIONS: Arc<RwLock<HashMap<String, SplitSink<WebSocketStream<TcpStream>, Message>>>> =
Arc::new(RwLock::new(HashMap::new()));
static ref USERS: Arc<RwLock<ManyToMany<String, String>>> =
Arc::new(RwLock::new(ManyToMany::new()));
}
pub async fn launch_server() { pub async fn launch_server() {
let try_socket = TcpListener::bind(WS_HOST.to_string()).await; let try_socket = TcpListener::bind(WS_HOST.to_string()).await;
let listener = try_socket.expect("Failed to bind"); let listener = try_socket.expect("Failed to bind");
...@@ -23,6 +38,20 @@ async fn accept(stream: TcpStream) { ...@@ -23,6 +38,20 @@ async fn accept(stream: TcpStream) {
info!("User established WebSocket connection from {}.", addr); info!("User established WebSocket connection from {}.", addr);
let id = Ulid::new().to_string();
let (write, read) = ws_stream.split(); let (write, read) = ws_stream.split();
read.forward(write).await.expect("Failed to forward message")
CONNECTIONS
.write()
.unwrap()
.insert(id, write);
read
.for_each(|message| async {
let data = message.unwrap().into_data();
// if you mess with the data, you get the bazooki
let string = from_utf8(&data).unwrap();
println!("{}", string);
})
.await;
} }
...@@ -10,7 +10,7 @@ mod channels; ...@@ -10,7 +10,7 @@ mod channels;
pub fn mount(rocket: Rocket) -> Rocket { pub fn mount(rocket: Rocket) -> Rocket {
rocket rocket
.mount("/", routes![root::root, root::teapot]) .mount("/", routes![root::root])
.mount("/onboard", onboard::routes()) .mount("/onboard", onboard::routes())
.mount("/users", users::routes()) .mount("/users", users::routes())
.mount("/channels", channels::routes()) .mount("/channels", channels::routes())
......
...@@ -3,7 +3,6 @@ use crate::util::variables::{DISABLE_REGISTRATION, HCAPTCHA_SITEKEY, USE_EMAIL, ...@@ -3,7 +3,6 @@ use crate::util::variables::{DISABLE_REGISTRATION, HCAPTCHA_SITEKEY, USE_EMAIL,
use rocket_contrib::json::JsonValue; use rocket_contrib::json::JsonValue;
use mongodb::bson::doc; use mongodb::bson::doc;
/// root
#[get("/")] #[get("/")]
pub async fn root() -> JsonValue { pub async fn root() -> JsonValue {
json!({ json!({
...@@ -18,12 +17,3 @@ pub async fn root() -> JsonValue { ...@@ -18,12 +17,3 @@ pub async fn root() -> JsonValue {
} }
}) })
} }
/// I'm a teapot.
#[delete("/")]
pub async fn teapot() -> JsonValue {
json!({
"teapot": true,
"can_delete": false
})
}
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment