Add ws::Hub + ws::Listener — WebSocket pub/sub hub

Lifted from fewo-webapp src/ws/ — zero fewo-webapp domain coupling in
the public surface. Classes renamed WSHub→Hub, WSListener→Listener and
namespaced under oatpp_authkit::ws.

Features:
- 64 KB per-message cap (rejects fragmented frames exceeding the buffer)
- 500-socket cap
- Detached housekeeper thread pinging idle sockets >90 s, closing >180 s
- Per-socket SocketInfo (userId, role, property scopes) populated via
  thread_local handoff from the HTTP controller that served the upgrade

Consumers construct a Hub and pass it to oatpp's
HttpConnectionHandler::setSocketInstanceListener. No other integration
required.

Unblocks fewo-webapp #452.
This commit is contained in:
Uwe Schuster 2026-04-22 23:19:40 +02:00
parent f9a244bf2b
commit ccb77daac5
2 changed files with 550 additions and 0 deletions

View file

@ -0,0 +1,393 @@
#pragma once
#include <atomic>
#include <chrono>
#include <map>
#include <mutex>
#include <optional>
#include <set>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "oatpp-websocket/ConnectionHandler.hpp"
#include "oatpp-websocket/WebSocket.hpp"
#include "Listener.hpp"
namespace oatpp_authkit::ws {
/**
* @brief Per-socket authentication and property-access metadata.
*
* Populated by WSController during the WebSocket handshake and picked up
* by Hub::onAfterCreate via thread_local storage.
*/
struct SocketInfo {
std::string userId;
std::string username;
std::string role;
std::set<std::string> propertyIds; ///< Empty = all (admin or no restrictions).
};
/**
* @brief Singleton that owns all active WebSocket connections and dispatches
* server-push notifications and presence tracking.
*
* Implements `oatpp::websocket::ConnectionHandler::SocketInstanceListener`
* so that it is notified whenever a WebSocket connection is established or
* torn down. All state (socket set, presence maps) is protected by a single
* static mutex and is therefore safe to access from multiple server threads.
*
* Only authenticated connections (validated by WSController before the
* handshake) are accepted. Each socket stores the user's identity and
* property-access set so that booking notifications can be scoped to
* authorised recipients.
*
* **Serverclient change notifications**
* @code
* {"type":"booking_updated","id":"<uuid>"}
* {"type":"booking_created","id":"<uuid>"}
* {"type":"booking_deleted","id":"<uuid>"}
* {"type":"person_updated","id":"<uuid>"}
* {"type":"feature_request_updated","id":"<uuid>"}
* @endcode
*
* **Clientserver presence messages** (handled in Listener):
* @code
* {"type":"presence_open","booking_id":"<uuid>"}
* {"type":"presence_close","booking_id":"<uuid>"}
* @endcode
*
* **Serverclient presence update** (broadcast whenever presence changes):
* @code
* {"type":"presence_update","booking_id":"<uuid>","users":["alice","bob"]}
* @endcode
*/
struct HubHousekeeper; // forward-declare for friend (#439)
class Hub
: public oatpp::websocket::ConnectionHandler::SocketInstanceListener {
friend struct HubHousekeeper;
public:
using WebSocket = oatpp::websocket::WebSocket;
/**
* @brief Thread-local slot used by WSController to pass authenticated
* user context to onAfterCreate (which runs on the same thread).
*/
static inline thread_local std::optional<SocketInfo> t_pendingAuth;
public:
/** @brief Hard cap on simultaneously-connected WebSocket clients (#439).
* When reached, new connections are accepted by oatpp's transport layer
* but immediately closed with code 1013 (Try Again Later). */
static constexpr std::size_t kMaxSockets = 500;
/** @brief Idle durations (#439). Any socket that has not sent a frame or
* answered a pong within kIdlePing receives a ping; if it does not produce
* any traffic within kIdleClose total, it is closed with code 1001. */
static constexpr std::chrono::seconds kIdlePing {90};
static constexpr std::chrono::seconds kIdleClose{180};
private:
static std::mutex s_mx;
static std::unordered_map<const WebSocket*, SocketInfo> s_sockets;
/** @brief Last time a frame (any opcode) arrived from the peer, used by the
* housekeeper thread to expire silent sockets (#439). */
static std::unordered_map<const WebSocket*, std::chrono::steady_clock::time_point> s_lastSeen;
// Presence: booking entity_id → set of usernames currently editing it
static std::map<std::string, std::set<std::string>> s_presence;
// Per-socket presence: socket → map of booking entity_id → username
static std::map<const WebSocket*, std::map<std::string, std::string>> s_socketPresence;
/**
* @brief Serialise a presence-update notification as a JSON string.
*/
static std::string buildPresenceMsg(const std::string& bookingId, const std::set<std::string>& users) {
std::string list = "[";
bool first = true;
for (const auto& u : users) {
if (!first) list += ",";
list += "\"" + u + "\"";
first = false;
}
list += "]";
return "{\"type\":\"presence_update\",\"booking_id\":\""
+ bookingId + "\",\"users\":" + list + "}";
}
/**
* @brief Check whether a socket has access to a given property.
*
* Admins and users with no explicit permission rows (empty propertyIds)
* have access to all properties.
*/
static bool socketHasPropertyAccess(const SocketInfo& info, const std::string& propertyId) {
if (info.role == "admin") return true;
if (info.propertyIds.empty()) return true; // no restrictions
return info.propertyIds.find(propertyId) != info.propertyIds.end();
}
public:
// --- SocketInstanceListener interface (1.3.0) ---
/**
* @brief Called by oatpp after a new WebSocket connection is established.
*
* Picks up authenticated user context from the thread_local slot set by
* WSController. If no auth context is present, the socket is immediately
* closed (should not happen since WSController rejects unauthenticated
* upgrade requests).
*/
void onAfterCreate(const WebSocket& socket,
const std::shared_ptr<const ParameterMap>&) override
{
socket.setListener(std::make_shared<Listener>());
if (!t_pendingAuth.has_value()) {
// Should not happen — WSController validates before handshake.
OATPP_LOGW("Hub", "WebSocket connected without auth context — closing");
try { socket.sendClose(4001, "Unauthorized"); } catch (...) {}
return;
}
{
std::lock_guard<std::mutex> g(s_mx);
if (s_sockets.size() >= kMaxSockets) {
// #439: refuse extra connections beyond the cap rather than
// allowing unbounded growth of s_sockets / presence maps.
OATPP_LOGW("Hub", "socket cap %zu hit — rejecting", kMaxSockets);
t_pendingAuth.reset();
try { socket.sendClose(1013, "Server Busy"); } catch (...) {}
return;
}
s_sockets[&socket] = std::move(*t_pendingAuth);
s_lastSeen[&socket] = std::chrono::steady_clock::now();
}
t_pendingAuth.reset();
OATPP_LOGD("Hub", "client connected: %s (total=%zu)",
s_sockets[&socket].username.c_str(), s_sockets.size());
}
/**
* @brief Bump the last-seen timestamp for a socket. Called by Listener
* on every incoming frame/pong so the idle housekeeper can
* distinguish live from dead peers (#439).
*/
static void touchSocket(const WebSocket* socket) {
std::lock_guard<std::mutex> g(s_mx);
auto it = s_lastSeen.find(socket);
if (it != s_lastSeen.end()) it->second = std::chrono::steady_clock::now();
}
/**
* @brief Called by oatpp before a WebSocket connection is closed.
*/
void onBeforeDestroy(const WebSocket& socket) override {
{
std::lock_guard<std::mutex> g(s_mx);
s_sockets.erase(&socket);
s_lastSeen.erase(&socket);
OATPP_LOGD("Hub", "client disconnected (total=%zu)", s_sockets.size());
}
presenceCleanup(&socket);
}
// --- Broadcast ---
/**
* @brief Send a JSON string to every connected client. Thread-safe.
*/
static void broadcast(const std::string& json) {
oatpp::String msg = json.c_str();
std::lock_guard<std::mutex> g(s_mx);
for (auto& [ws, info] : s_sockets) {
try { ws->sendOneFrameText(msg); }
catch (...) { /* ignore dead sockets */ }
}
}
/**
* @brief Send a JSON string only to sockets that have access to the
* given property. If propertyId is empty, broadcasts to all.
*/
static void broadcastToProperty(const std::string& json, const std::string& propertyId) {
if (propertyId.empty()) { broadcast(json); return; }
oatpp::String msg = json.c_str();
std::lock_guard<std::mutex> g(s_mx);
for (auto& [ws, info] : s_sockets) {
if (socketHasPropertyAccess(info, propertyId)) {
try { ws->sendOneFrameText(msg); }
catch (...) {}
}
}
}
/**
* @brief Broadcast a booking lifecycle event, scoped to a property.
* @param type Event type: `"booking_created"`, `"booking_updated"`, or `"booking_deleted"`.
* @param id The booking entity_id affected.
* @param propertyId The property this booking belongs to (empty = broadcast to all).
*/
static void notifyBooking(const char* type, const std::string& id, const std::string& propertyId) {
broadcastToProperty(
std::string("{\"type\":\"") + type + "\",\"id\":\"" + id + "\"}",
propertyId);
}
/**
* @brief Broadcast a booking lifecycle event to all connected clients.
*
* Legacy overload for call sites that do not have the property ID readily
* available. Sends to all authenticated sockets.
*/
static void notifyBooking(const char* type, const std::string& id) {
broadcast(std::string("{\"type\":\"") + type
+ "\",\"id\":\"" + id + "\"}");
}
/**
* @brief Broadcast a person lifecycle event to all connected clients.
*
* Persons are cross-cutting (linked to bookings across properties), so
* notifications are not property-scoped.
*/
static void notifyPerson(const char* type, const std::string& id) {
broadcast(std::string("{\"type\":\"") + type
+ "\",\"id\":\"" + id + "\"}");
}
// --- Presence ---
/**
* @brief Look up the authenticated username for a socket.
* @return The username, or empty string if not found.
*/
static std::string getSocketUsername(const WebSocket* socket) {
std::lock_guard<std::mutex> g(s_mx);
auto it = s_sockets.find(socket);
if (it != s_sockets.end()) return it->second.username;
return "";
}
/**
* @brief Register that a user has opened the edit modal for a booking.
*
* Uses the server-validated username from the socket's auth context
* instead of trusting the client-sent username.
*/
static void presenceOpen(const WebSocket* socket, const std::string& bookingId, const std::string& /* clientUser */) {
std::string username = getSocketUsername(socket);
if (username.empty()) return;
std::string msg;
{
std::lock_guard<std::mutex> g(s_mx);
s_presence[bookingId].insert(username);
s_socketPresence[socket][bookingId] = username;
msg = buildPresenceMsg(bookingId, s_presence[bookingId]);
}
broadcast(msg);
}
/**
* @brief Deregister a user from the presence set for a booking.
*/
static void presenceClose(const WebSocket* socket, const std::string& bookingId) {
std::string msg;
{
std::lock_guard<std::mutex> g(s_mx);
auto sockIt = s_socketPresence.find(socket);
if (sockIt == s_socketPresence.end()) return;
auto bidIt = sockIt->second.find(bookingId);
if (bidIt == sockIt->second.end()) return;
s_presence[bookingId].erase(bidIt->second);
const auto& remaining = s_presence[bookingId];
msg = buildPresenceMsg(bookingId, remaining);
if (remaining.empty()) s_presence.erase(bookingId);
sockIt->second.erase(bidIt);
}
broadcast(msg);
}
/**
* @brief Remove all presence entries owned by a disconnecting socket.
*/
static void presenceCleanup(const WebSocket* socket) {
std::vector<std::string> msgs;
{
std::lock_guard<std::mutex> g(s_mx);
auto sockIt = s_socketPresence.find(socket);
if (sockIt == s_socketPresence.end()) return;
for (auto& [bookingId, username] : sockIt->second) {
s_presence[bookingId].erase(username);
msgs.push_back(buildPresenceMsg(bookingId, s_presence[bookingId]));
if (s_presence[bookingId].empty()) s_presence.erase(bookingId);
}
s_socketPresence.erase(sockIt);
}
for (const auto& m : msgs) broadcast(m);
}
};
inline std::mutex Hub::s_mx;
inline std::unordered_map<const oatpp::websocket::WebSocket*, SocketInfo> Hub::s_sockets;
inline std::unordered_map<const oatpp::websocket::WebSocket*, std::chrono::steady_clock::time_point> Hub::s_lastSeen;
inline std::map<std::string, std::set<std::string>> Hub::s_presence;
inline std::map<const oatpp::websocket::WebSocket*, std::map<std::string, std::string>> Hub::s_socketPresence;
/**
* @brief Background sweeper that pings silent WebSocket peers and closes
* ones past the idle-close threshold (#439).
*
* Started once at static-init time, detached. Wakes every 30 s, iterates
* Hub::s_sockets under its mutex to build a work list, then releases
* the lock before issuing any pings/closes to avoid holding s_mx across
* I/O. The thread runs for the process lifetime; a clean-shutdown signal
* would be nice but is not required oatpp tears the listener down
* first and subsequent send{Ping,Close} calls no-op on a dead socket.
*/
struct HubHousekeeper {
std::thread t;
HubHousekeeper() {
t = std::thread([]{
using namespace std::chrono_literals;
while (true) {
std::this_thread::sleep_for(30s);
auto now = std::chrono::steady_clock::now();
std::vector<const oatpp::websocket::WebSocket*> toPing, toClose;
{
std::lock_guard<std::mutex> g(Hub::s_mx);
for (auto& kv : Hub::s_lastSeen) {
auto dt = now - kv.second;
if (dt > Hub::kIdleClose) toClose.push_back(kv.first);
else if (dt > Hub::kIdlePing) toPing.push_back(kv.first);
}
}
for (auto* ws : toPing) { try { ws->sendPing(oatpp::String("")); } catch (...) {} }
for (auto* ws : toClose) { try { ws->sendClose(1001, "Idle timeout"); } catch (...) {} }
}
});
t.detach();
}
};
inline HubHousekeeper s_wsHubHousekeeper;
inline void Listener::touchActivity(const WebSocket* socket) { Hub::touchSocket(socket); }
// Listener::handleMessage defined here (after Hub) to break the circular dependency
inline void Listener::handleMessage(const WebSocket& socket, const std::string& text) {
Hub::touchSocket(&socket); // #439: record activity to suppress idle close
std::string type = jsonStr(text, "type");
std::string bookingId = jsonStr(text, "booking_id");
if (bookingId.empty()) return;
if (type == "presence_open") {
// Client-sent "user" field is ignored; server uses the authenticated username.
Hub::presenceOpen(&socket, bookingId, "");
} else if (type == "presence_close") {
Hub::presenceClose(&socket, bookingId);
}
}
} // namespace oatpp_authkit::ws

View file

@ -0,0 +1,157 @@
#pragma once
#include "oatpp-websocket/WebSocket.hpp"
#include "oatpp/core/data/stream/BufferStream.hpp"
#include <cctype>
#include <string>
namespace oatpp_authkit::ws {
/**
* @brief Per-connection WebSocket listener.
*
* One instance is created per accepted WebSocket connection by Hub::onAfterCreate().
* Handles ping/pong housekeeping, reassembles fragmented text frames, and
* dispatches fully received messages to handleMessage().
*
* The following clientserver presence messages are parsed:
* @code
* {"type":"presence_open","booking_id":42,"user":"alice"}
* {"type":"presence_close","booking_id":42}
* @endcode
*
* @note handleMessage() is defined at the bottom of Hub.hpp (after Hub is
* fully declared) to avoid a circular include dependency between
* Listener.hpp and Hub.hpp.
*/
class Listener : public oatpp::websocket::WebSocket::Listener {
public:
/** @brief Hard cap on a single reassembled WS message (#439). Frames that
* push the accumulated buffer past this are dropped and the connection
* closed with code 1009 (Message Too Big). */
static constexpr std::size_t kMaxMessageBytes = 64 * 1024;
private:
oatpp::data::stream::BufferOutputStream m_buffer{256}; ///< Accumulates frame payloads until end-of-message.
bool m_overflowed = false; ///< Set when kMaxMessageBytes was exceeded; drop remainder of the current message.
/**
* @brief Extract a JSON string value for the given key from a JSON object.
* @param json The raw JSON text.
* @param key The field name to look up.
* @return The string value, or an empty string if the key is absent.
*/
static std::string jsonStr(const std::string& json, const std::string& key) {
auto kpos = json.find("\"" + key + "\"");
if (kpos == std::string::npos) return "";
auto cpos = json.find(':', kpos + key.size() + 2);
if (cpos == std::string::npos) return "";
auto qpos = json.find('"', cpos + 1);
if (qpos == std::string::npos) return "";
auto epos = json.find('"', qpos + 1);
if (epos == std::string::npos) return "";
return json.substr(qpos + 1, epos - qpos - 1);
}
/**
* @brief Extract a JSON integer value for the given key from a JSON object.
* @param json The raw JSON text.
* @param key The field name to look up.
* @return The integer value, or -1 if the key is absent or not a digit sequence.
*/
static int jsonInt(const std::string& json, const std::string& key) {
auto kpos = json.find("\"" + key + "\"");
if (kpos == std::string::npos) return -1;
auto cpos = json.find(':', kpos + key.size() + 2);
if (cpos == std::string::npos) return -1;
cpos++;
while (cpos < json.size() && (json[cpos] == ' ' || json[cpos] == '\t')) cpos++;
if (cpos >= json.size() || !std::isdigit((unsigned char)json[cpos])) return -1;
return std::stoi(json.substr(cpos));
}
/**
* @brief Dispatch a fully received text-frame message to Hub presence handlers.
*
* Defined in Hub.hpp after Hub is fully declared to avoid a circular
* include dependency.
*
* @param socket The WebSocket connection the message arrived on.
* @param text The complete UTF-8 text of the message.
*/
void handleMessage(const WebSocket& socket, const std::string& text);
public:
/**
* @brief Respond to a WebSocket ping frame with a pong.
* @param socket The connection that sent the ping.
* @param msg The ping payload to echo back.
*/
void onPing(const WebSocket& socket, const oatpp::String& msg) override {
socket.sendPong(msg);
touchActivity(&socket);
}
/** @brief Bump activity timestamp on pong so the idle sweeper treats the
* peer as live even if they never send application traffic (#439). */
void onPong(const WebSocket& socket, const oatpp::String&) override {
touchActivity(&socket);
}
private:
/** @brief Forward declaration; defined in Hub.hpp alongside handleMessage
* to break the HubListener circular include. */
static void touchActivity(const WebSocket* socket);
public:
/**
* @brief Log the close frame code when the client initiates a close.
* @param code The WebSocket close status code.
*/
void onClose(const WebSocket&, v_uint16 code, const oatpp::String&) override {
OATPP_LOGD("WS", "client closed (code=%d)", (int)code);
}
/**
* @brief Accumulate frame chunks and dispatch the message when complete.
*
* oatpp calls this method once per frame chunk. A `size` of 0 signals
* the end of the message; at that point the buffer is flushed and, if
* the opcode is a text frame (opcode == 1), handleMessage() is called.
*
* @param socket The WebSocket connection.
* @param opcode WebSocket opcode (1 = text, 2 = binary, etc.).
* @param data Pointer to the chunk payload bytes.
* @param size Number of bytes in this chunk, or 0 at end of message.
*/
void readMessage(const WebSocket& socket, v_uint8 opcode,
p_char8 data, oatpp::v_io_size size) override {
touchActivity(&socket); // #439: any inbound frame counts as activity
if (size > 0) {
if (m_overflowed) return; // ignore remaining frames of a too-large message
if (m_buffer.getCurrentPosition() + (std::size_t)size > kMaxMessageBytes) {
// #439: cap a single authenticated client from OOMing the
// process by streaming gigabytes into a single text frame.
m_overflowed = true;
m_buffer.setCurrentPosition(0);
OATPP_LOGW("WS", "client exceeded %zu B message cap — closing", kMaxMessageBytes);
try { socket.sendClose(1009, "Message Too Big"); } catch (...) {}
return;
}
m_buffer.writeSimple(data, size);
} else {
// size == 0 signals end of message
if (m_overflowed) {
m_overflowed = false; // reset for next message (though the socket is closing)
m_buffer.setCurrentPosition(0);
return;
}
std::string text = m_buffer.toString();
m_buffer.setCurrentPosition(0);
if (opcode == 1 && !text.empty()) { // text frame
handleMessage(socket, text);
}
}
}
};
} // namespace oatpp_authkit::ws