[package]
name = "example-chat"
version = "0.1.0"
edition = "2021"
publish = false
[dependencies]
axum = { path = "../../axum", features = ["ws"] }
futures = "0.3"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
//! Example chat application.
//!
//! Run with
//!
//! ```not_rust
//! cargo run -p example-chat
//! ```
use axum::{
extract::{
ws::{Message, Utf8Bytes, WebSocket, WebSocketUpgrade},
State,
},
response::{Html, IntoResponse},
routing::get,
Router,
};
use futures::{sink::SinkExt, stream::StreamExt};
use std::{
collections::HashSet,
sync::{Arc, Mutex},
};
use tokio::sync::broadcast;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
// Our shared state
struct AppState {
// We require unique usernames. This tracks which usernames have been taken.
user_set: Mutex<HashSet<String>>,
// Channel used to send messages to all connected clients.
tx: broadcast::Sender<String>,
}
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| format!("{}=trace", env!("CARGO_CRATE_NAME")).into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
// Set up application state for use with with_state().
let user_set = Mutex::new(HashSet::new());
let (tx, _rx) = broadcast::channel(100);
let app_state = Arc::new(AppState { user_set, tx });
let app = Router::new()
.route("/", get(index))
.route("/websocket", get(websocket_handler))
.with_state(app_state);
let listener = tokio::net::TcpListener::bind("127.0.0.1:3000")
.await
.unwrap();
tracing::debug!("listening on {}", listener.local_addr().unwrap());
axum::serve(listener, app).await.unwrap();
}
async fn websocket_handler(
ws: WebSocketUpgrade,
State(state): State<Arc<AppState>>,
) -> impl IntoResponse {
ws.on_upgrade(|socket| websocket(socket, state))
}
// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending chat messages).
async fn websocket(stream: WebSocket, state: Arc<AppState>) {
// By splitting, we can send and receive at the same time.
let (mut sender, mut receiver) = stream.split();
// Username gets set in the receive loop, if it's valid.
let mut username = String::new();
// Loop until a text message is found.
while let Some(Ok(message)) = receiver.next().await {
if let Message::Text(name) = message {
// If username that is sent by client is not taken, fill username string.
check_username(&state, &mut username, name.as_str());
// If not empty we want to quit the loop else we want to quit function.
if !username.is_empty() {
break;
} else {
// Only send our client that username is taken.
let _ = sender
.send(Message::Text(Utf8Bytes::from_static(
"Username already taken.",
)))
.await;
return;
}
}
}
// We subscribe *before* sending the "joined" message, so that we will also
// display it to our client.
let mut rx = state.tx.subscribe();
// Now send the "joined" message to all subscribers.
let msg = format!("{username} joined.");
tracing::debug!("{msg}");
let _ = state.tx.send(msg);
// Spawn the first task that will receive broadcast messages and send text
// messages over the websocket to our client.
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// In any websocket error, break loop.
if sender.send(Message::text(msg)).await.is_err() {
break;
}
}
});
// Clone things we want to pass (move) to the receiving task.
let tx = state.tx.clone();
let name = username.clone();
// Spawn a task that takes messages from the websocket, prepends the user
// name, and sends them to all broadcast subscribers.
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
// Add username before message.
let _ = tx.send(format!("{name}: {text}"));
}
});
// If any one of the tasks run to completion, we abort the other.
tokio::select! {
_ = &mut send_task => recv_task.abort(),
_ = &mut recv_task => send_task.abort(),
};
// Send "user left" message (similar to "joined" above).
let msg = format!("{username} left.");
tracing::debug!("{msg}");
let _ = state.tx.send(msg);
// Remove username from map so new clients can take it again.
state.user_set.lock().unwrap().remove(&username);
}
fn check_username(state: &AppState, string: &mut String, name: &str) {
let mut user_set = state.user_set.lock().unwrap();
if !user_set.contains(name) {
user_set.insert(name.to_owned());
string.push_str(name);
}
}
// Include utf-8 file at **compile** time.
async fn index() -> Html<&'static str> {
Html(std::include_str!("../chat.html"))
}
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>WebSocket Chat</title>
</head>
<body>
<h1>WebSocket Chat Example</h1>
<input id="username" style="display:block; width:100px; box-sizing: border-box" type="text" placeholder="username">
<button id="join-chat" type="button">Join Chat</button>
<textarea id="chat" style="display:block; width:600px; height:400px; box-sizing: border-box" cols="30" rows="10"></textarea>
<input id="input" style="display:block; width:600px; box-sizing: border-box" type="text" placeholder="chat">
<script>
const username = document.querySelector("#username");
const join_btn = document.querySelector("#join-chat");
const textarea = document.querySelector("#chat");
const input = document.querySelector("#input");
join_btn.addEventListener("click", function(e) {
this.disabled = true;
const websocket = new WebSocket("ws://localhost:3000/websocket");
websocket.onopen = function() {
console.log("connection opened");
websocket.send(username.value);
}
const btn = this;
websocket.onclose = function() {
console.log("connection closed");
btn.disabled = false;
}
websocket.onmessage = function(e) {
console.log("received message: "+e.data);
textarea.value += e.data+"\r\n";
}
input.onkeydown = function(e) {
if (e.key == "Enter") {
websocket.send(input.value);
input.value = "";
}
}
});
</script>
</body>
</html>