From ccb77daac54b9164fb24c0d49813a5b40f38284a Mon Sep 17 00:00:00 2001 From: Uwe Schuster Date: Wed, 22 Apr 2026 23:19:40 +0200 Subject: [PATCH] =?UTF-8?q?Add=20ws::Hub=20+=20ws::Listener=20=E2=80=94=20?= =?UTF-8?q?WebSocket=20pub/sub=20hub?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lifted from fewo-webapp src/ws/ — zero fewo-webapp domain coupling in the public surface. Classes renamed WSHub→Hub, WSListener→Listener and namespaced under oatpp_authkit::ws. Features: - 64 KB per-message cap (rejects fragmented frames exceeding the buffer) - 500-socket cap - Detached housekeeper thread pinging idle sockets >90 s, closing >180 s - Per-socket SocketInfo (userId, role, property scopes) populated via thread_local handoff from the HTTP controller that served the upgrade Consumers construct a Hub and pass it to oatpp's HttpConnectionHandler::setSocketInstanceListener. No other integration required. Unblocks fewo-webapp #452. --- include/oatpp-authkit/ws/Hub.hpp | 393 ++++++++++++++++++++++++++ include/oatpp-authkit/ws/Listener.hpp | 157 ++++++++++ 2 files changed, 550 insertions(+) create mode 100644 include/oatpp-authkit/ws/Hub.hpp create mode 100644 include/oatpp-authkit/ws/Listener.hpp diff --git a/include/oatpp-authkit/ws/Hub.hpp b/include/oatpp-authkit/ws/Hub.hpp new file mode 100644 index 0000000..6c70e22 --- /dev/null +++ b/include/oatpp-authkit/ws/Hub.hpp @@ -0,0 +1,393 @@ +#pragma once +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "oatpp-websocket/ConnectionHandler.hpp" +#include "oatpp-websocket/WebSocket.hpp" +#include "Listener.hpp" + +namespace oatpp_authkit::ws { + +/** + * @brief Per-socket authentication and property-access metadata. + * + * Populated by WSController during the WebSocket handshake and picked up + * by Hub::onAfterCreate via thread_local storage. + */ +struct SocketInfo { + std::string userId; + std::string username; + std::string role; + std::set propertyIds; ///< Empty = all (admin or no restrictions). +}; + +/** + * @brief Singleton that owns all active WebSocket connections and dispatches + * server-push notifications and presence tracking. + * + * Implements `oatpp::websocket::ConnectionHandler::SocketInstanceListener` + * so that it is notified whenever a WebSocket connection is established or + * torn down. All state (socket set, presence maps) is protected by a single + * static mutex and is therefore safe to access from multiple server threads. + * + * Only authenticated connections (validated by WSController before the + * handshake) are accepted. Each socket stores the user's identity and + * property-access set so that booking notifications can be scoped to + * authorised recipients. + * + * **Server→client change notifications** + * @code + * {"type":"booking_updated","id":""} + * {"type":"booking_created","id":""} + * {"type":"booking_deleted","id":""} + * {"type":"person_updated","id":""} + * {"type":"feature_request_updated","id":""} + * @endcode + * + * **Client→server presence messages** (handled in Listener): + * @code + * {"type":"presence_open","booking_id":""} + * {"type":"presence_close","booking_id":""} + * @endcode + * + * **Server→client presence update** (broadcast whenever presence changes): + * @code + * {"type":"presence_update","booking_id":"","users":["alice","bob"]} + * @endcode + */ +struct HubHousekeeper; // forward-declare for friend (#439) + +class Hub + : public oatpp::websocket::ConnectionHandler::SocketInstanceListener { + friend struct HubHousekeeper; +public: + using WebSocket = oatpp::websocket::WebSocket; + + /** + * @brief Thread-local slot used by WSController to pass authenticated + * user context to onAfterCreate (which runs on the same thread). + */ + static inline thread_local std::optional t_pendingAuth; + +public: + /** @brief Hard cap on simultaneously-connected WebSocket clients (#439). + * When reached, new connections are accepted by oatpp's transport layer + * but immediately closed with code 1013 (Try Again Later). */ + static constexpr std::size_t kMaxSockets = 500; + + /** @brief Idle durations (#439). Any socket that has not sent a frame or + * answered a pong within kIdlePing receives a ping; if it does not produce + * any traffic within kIdleClose total, it is closed with code 1001. */ + static constexpr std::chrono::seconds kIdlePing {90}; + static constexpr std::chrono::seconds kIdleClose{180}; + +private: + static std::mutex s_mx; + static std::unordered_map s_sockets; + /** @brief Last time a frame (any opcode) arrived from the peer, used by the + * housekeeper thread to expire silent sockets (#439). */ + static std::unordered_map s_lastSeen; + + // Presence: booking entity_id → set of usernames currently editing it + static std::map> s_presence; + // Per-socket presence: socket → map of booking entity_id → username + static std::map> s_socketPresence; + + /** + * @brief Serialise a presence-update notification as a JSON string. + */ + static std::string buildPresenceMsg(const std::string& bookingId, const std::set& users) { + std::string list = "["; + bool first = true; + for (const auto& u : users) { + if (!first) list += ","; + list += "\"" + u + "\""; + first = false; + } + list += "]"; + return "{\"type\":\"presence_update\",\"booking_id\":\"" + + bookingId + "\",\"users\":" + list + "}"; + } + + /** + * @brief Check whether a socket has access to a given property. + * + * Admins and users with no explicit permission rows (empty propertyIds) + * have access to all properties. + */ + static bool socketHasPropertyAccess(const SocketInfo& info, const std::string& propertyId) { + if (info.role == "admin") return true; + if (info.propertyIds.empty()) return true; // no restrictions + return info.propertyIds.find(propertyId) != info.propertyIds.end(); + } + +public: + // --- SocketInstanceListener interface (1.3.0) --- + + /** + * @brief Called by oatpp after a new WebSocket connection is established. + * + * Picks up authenticated user context from the thread_local slot set by + * WSController. If no auth context is present, the socket is immediately + * closed (should not happen since WSController rejects unauthenticated + * upgrade requests). + */ + void onAfterCreate(const WebSocket& socket, + const std::shared_ptr&) override + { + socket.setListener(std::make_shared()); + + if (!t_pendingAuth.has_value()) { + // Should not happen — WSController validates before handshake. + OATPP_LOGW("Hub", "WebSocket connected without auth context — closing"); + try { socket.sendClose(4001, "Unauthorized"); } catch (...) {} + return; + } + + { + std::lock_guard g(s_mx); + if (s_sockets.size() >= kMaxSockets) { + // #439: refuse extra connections beyond the cap rather than + // allowing unbounded growth of s_sockets / presence maps. + OATPP_LOGW("Hub", "socket cap %zu hit — rejecting", kMaxSockets); + t_pendingAuth.reset(); + try { socket.sendClose(1013, "Server Busy"); } catch (...) {} + return; + } + s_sockets[&socket] = std::move(*t_pendingAuth); + s_lastSeen[&socket] = std::chrono::steady_clock::now(); + } + t_pendingAuth.reset(); + + OATPP_LOGD("Hub", "client connected: %s (total=%zu)", + s_sockets[&socket].username.c_str(), s_sockets.size()); + } + + /** + * @brief Bump the last-seen timestamp for a socket. Called by Listener + * on every incoming frame/pong so the idle housekeeper can + * distinguish live from dead peers (#439). + */ + static void touchSocket(const WebSocket* socket) { + std::lock_guard g(s_mx); + auto it = s_lastSeen.find(socket); + if (it != s_lastSeen.end()) it->second = std::chrono::steady_clock::now(); + } + + /** + * @brief Called by oatpp before a WebSocket connection is closed. + */ + void onBeforeDestroy(const WebSocket& socket) override { + { + std::lock_guard g(s_mx); + s_sockets.erase(&socket); + s_lastSeen.erase(&socket); + OATPP_LOGD("Hub", "client disconnected (total=%zu)", s_sockets.size()); + } + presenceCleanup(&socket); + } + + // --- Broadcast --- + + /** + * @brief Send a JSON string to every connected client. Thread-safe. + */ + static void broadcast(const std::string& json) { + oatpp::String msg = json.c_str(); + std::lock_guard g(s_mx); + for (auto& [ws, info] : s_sockets) { + try { ws->sendOneFrameText(msg); } + catch (...) { /* ignore dead sockets */ } + } + } + + /** + * @brief Send a JSON string only to sockets that have access to the + * given property. If propertyId is empty, broadcasts to all. + */ + static void broadcastToProperty(const std::string& json, const std::string& propertyId) { + if (propertyId.empty()) { broadcast(json); return; } + oatpp::String msg = json.c_str(); + std::lock_guard g(s_mx); + for (auto& [ws, info] : s_sockets) { + if (socketHasPropertyAccess(info, propertyId)) { + try { ws->sendOneFrameText(msg); } + catch (...) {} + } + } + } + + /** + * @brief Broadcast a booking lifecycle event, scoped to a property. + * @param type Event type: `"booking_created"`, `"booking_updated"`, or `"booking_deleted"`. + * @param id The booking entity_id affected. + * @param propertyId The property this booking belongs to (empty = broadcast to all). + */ + static void notifyBooking(const char* type, const std::string& id, const std::string& propertyId) { + broadcastToProperty( + std::string("{\"type\":\"") + type + "\",\"id\":\"" + id + "\"}", + propertyId); + } + + /** + * @brief Broadcast a booking lifecycle event to all connected clients. + * + * Legacy overload for call sites that do not have the property ID readily + * available. Sends to all authenticated sockets. + */ + static void notifyBooking(const char* type, const std::string& id) { + broadcast(std::string("{\"type\":\"") + type + + "\",\"id\":\"" + id + "\"}"); + } + + /** + * @brief Broadcast a person lifecycle event to all connected clients. + * + * Persons are cross-cutting (linked to bookings across properties), so + * notifications are not property-scoped. + */ + static void notifyPerson(const char* type, const std::string& id) { + broadcast(std::string("{\"type\":\"") + type + + "\",\"id\":\"" + id + "\"}"); + } + + // --- Presence --- + + /** + * @brief Look up the authenticated username for a socket. + * @return The username, or empty string if not found. + */ + static std::string getSocketUsername(const WebSocket* socket) { + std::lock_guard g(s_mx); + auto it = s_sockets.find(socket); + if (it != s_sockets.end()) return it->second.username; + return ""; + } + + /** + * @brief Register that a user has opened the edit modal for a booking. + * + * Uses the server-validated username from the socket's auth context + * instead of trusting the client-sent username. + */ + static void presenceOpen(const WebSocket* socket, const std::string& bookingId, const std::string& /* clientUser */) { + std::string username = getSocketUsername(socket); + if (username.empty()) return; + std::string msg; + { + std::lock_guard g(s_mx); + s_presence[bookingId].insert(username); + s_socketPresence[socket][bookingId] = username; + msg = buildPresenceMsg(bookingId, s_presence[bookingId]); + } + broadcast(msg); + } + + /** + * @brief Deregister a user from the presence set for a booking. + */ + static void presenceClose(const WebSocket* socket, const std::string& bookingId) { + std::string msg; + { + std::lock_guard g(s_mx); + auto sockIt = s_socketPresence.find(socket); + if (sockIt == s_socketPresence.end()) return; + auto bidIt = sockIt->second.find(bookingId); + if (bidIt == sockIt->second.end()) return; + s_presence[bookingId].erase(bidIt->second); + const auto& remaining = s_presence[bookingId]; + msg = buildPresenceMsg(bookingId, remaining); + if (remaining.empty()) s_presence.erase(bookingId); + sockIt->second.erase(bidIt); + } + broadcast(msg); + } + + /** + * @brief Remove all presence entries owned by a disconnecting socket. + */ + static void presenceCleanup(const WebSocket* socket) { + std::vector msgs; + { + std::lock_guard g(s_mx); + auto sockIt = s_socketPresence.find(socket); + if (sockIt == s_socketPresence.end()) return; + for (auto& [bookingId, username] : sockIt->second) { + s_presence[bookingId].erase(username); + msgs.push_back(buildPresenceMsg(bookingId, s_presence[bookingId])); + if (s_presence[bookingId].empty()) s_presence.erase(bookingId); + } + s_socketPresence.erase(sockIt); + } + for (const auto& m : msgs) broadcast(m); + } +}; + +inline std::mutex Hub::s_mx; +inline std::unordered_map Hub::s_sockets; +inline std::unordered_map Hub::s_lastSeen; +inline std::map> Hub::s_presence; +inline std::map> Hub::s_socketPresence; + +/** + * @brief Background sweeper that pings silent WebSocket peers and closes + * ones past the idle-close threshold (#439). + * + * Started once at static-init time, detached. Wakes every 30 s, iterates + * Hub::s_sockets under its mutex to build a work list, then releases + * the lock before issuing any pings/closes to avoid holding s_mx across + * I/O. The thread runs for the process lifetime; a clean-shutdown signal + * would be nice but is not required — oatpp tears the listener down + * first and subsequent send{Ping,Close} calls no-op on a dead socket. + */ +struct HubHousekeeper { + std::thread t; + HubHousekeeper() { + t = std::thread([]{ + using namespace std::chrono_literals; + while (true) { + std::this_thread::sleep_for(30s); + auto now = std::chrono::steady_clock::now(); + std::vector toPing, toClose; + { + std::lock_guard g(Hub::s_mx); + for (auto& kv : Hub::s_lastSeen) { + auto dt = now - kv.second; + if (dt > Hub::kIdleClose) toClose.push_back(kv.first); + else if (dt > Hub::kIdlePing) toPing.push_back(kv.first); + } + } + for (auto* ws : toPing) { try { ws->sendPing(oatpp::String("")); } catch (...) {} } + for (auto* ws : toClose) { try { ws->sendClose(1001, "Idle timeout"); } catch (...) {} } + } + }); + t.detach(); + } +}; +inline HubHousekeeper s_wsHubHousekeeper; + +inline void Listener::touchActivity(const WebSocket* socket) { Hub::touchSocket(socket); } + +// Listener::handleMessage defined here (after Hub) to break the circular dependency +inline void Listener::handleMessage(const WebSocket& socket, const std::string& text) { + Hub::touchSocket(&socket); // #439: record activity to suppress idle close + std::string type = jsonStr(text, "type"); + std::string bookingId = jsonStr(text, "booking_id"); + if (bookingId.empty()) return; + + if (type == "presence_open") { + // Client-sent "user" field is ignored; server uses the authenticated username. + Hub::presenceOpen(&socket, bookingId, ""); + } else if (type == "presence_close") { + Hub::presenceClose(&socket, bookingId); + } +} +} // namespace oatpp_authkit::ws diff --git a/include/oatpp-authkit/ws/Listener.hpp b/include/oatpp-authkit/ws/Listener.hpp new file mode 100644 index 0000000..b8c054e --- /dev/null +++ b/include/oatpp-authkit/ws/Listener.hpp @@ -0,0 +1,157 @@ +#pragma once +#include "oatpp-websocket/WebSocket.hpp" +#include "oatpp/core/data/stream/BufferStream.hpp" + +#include +#include + +namespace oatpp_authkit::ws { + +/** + * @brief Per-connection WebSocket listener. + * + * One instance is created per accepted WebSocket connection by Hub::onAfterCreate(). + * Handles ping/pong housekeeping, reassembles fragmented text frames, and + * dispatches fully received messages to handleMessage(). + * + * The following client→server presence messages are parsed: + * @code + * {"type":"presence_open","booking_id":42,"user":"alice"} + * {"type":"presence_close","booking_id":42} + * @endcode + * + * @note handleMessage() is defined at the bottom of Hub.hpp (after Hub is + * fully declared) to avoid a circular include dependency between + * Listener.hpp and Hub.hpp. + */ +class Listener : public oatpp::websocket::WebSocket::Listener { +public: + /** @brief Hard cap on a single reassembled WS message (#439). Frames that + * push the accumulated buffer past this are dropped and the connection + * closed with code 1009 (Message Too Big). */ + static constexpr std::size_t kMaxMessageBytes = 64 * 1024; + +private: + oatpp::data::stream::BufferOutputStream m_buffer{256}; ///< Accumulates frame payloads until end-of-message. + bool m_overflowed = false; ///< Set when kMaxMessageBytes was exceeded; drop remainder of the current message. + + /** + * @brief Extract a JSON string value for the given key from a JSON object. + * @param json The raw JSON text. + * @param key The field name to look up. + * @return The string value, or an empty string if the key is absent. + */ + static std::string jsonStr(const std::string& json, const std::string& key) { + auto kpos = json.find("\"" + key + "\""); + if (kpos == std::string::npos) return ""; + auto cpos = json.find(':', kpos + key.size() + 2); + if (cpos == std::string::npos) return ""; + auto qpos = json.find('"', cpos + 1); + if (qpos == std::string::npos) return ""; + auto epos = json.find('"', qpos + 1); + if (epos == std::string::npos) return ""; + return json.substr(qpos + 1, epos - qpos - 1); + } + + /** + * @brief Extract a JSON integer value for the given key from a JSON object. + * @param json The raw JSON text. + * @param key The field name to look up. + * @return The integer value, or -1 if the key is absent or not a digit sequence. + */ + static int jsonInt(const std::string& json, const std::string& key) { + auto kpos = json.find("\"" + key + "\""); + if (kpos == std::string::npos) return -1; + auto cpos = json.find(':', kpos + key.size() + 2); + if (cpos == std::string::npos) return -1; + cpos++; + while (cpos < json.size() && (json[cpos] == ' ' || json[cpos] == '\t')) cpos++; + if (cpos >= json.size() || !std::isdigit((unsigned char)json[cpos])) return -1; + return std::stoi(json.substr(cpos)); + } + + /** + * @brief Dispatch a fully received text-frame message to Hub presence handlers. + * + * Defined in Hub.hpp after Hub is fully declared to avoid a circular + * include dependency. + * + * @param socket The WebSocket connection the message arrived on. + * @param text The complete UTF-8 text of the message. + */ + void handleMessage(const WebSocket& socket, const std::string& text); + +public: + /** + * @brief Respond to a WebSocket ping frame with a pong. + * @param socket The connection that sent the ping. + * @param msg The ping payload to echo back. + */ + void onPing(const WebSocket& socket, const oatpp::String& msg) override { + socket.sendPong(msg); + touchActivity(&socket); + } + + /** @brief Bump activity timestamp on pong so the idle sweeper treats the + * peer as live even if they never send application traffic (#439). */ + void onPong(const WebSocket& socket, const oatpp::String&) override { + touchActivity(&socket); + } + +private: + /** @brief Forward declaration; defined in Hub.hpp alongside handleMessage + * to break the Hub↔Listener circular include. */ + static void touchActivity(const WebSocket* socket); +public: + + /** + * @brief Log the close frame code when the client initiates a close. + * @param code The WebSocket close status code. + */ + void onClose(const WebSocket&, v_uint16 code, const oatpp::String&) override { + OATPP_LOGD("WS", "client closed (code=%d)", (int)code); + } + + /** + * @brief Accumulate frame chunks and dispatch the message when complete. + * + * oatpp calls this method once per frame chunk. A `size` of 0 signals + * the end of the message; at that point the buffer is flushed and, if + * the opcode is a text frame (opcode == 1), handleMessage() is called. + * + * @param socket The WebSocket connection. + * @param opcode WebSocket opcode (1 = text, 2 = binary, etc.). + * @param data Pointer to the chunk payload bytes. + * @param size Number of bytes in this chunk, or 0 at end of message. + */ + void readMessage(const WebSocket& socket, v_uint8 opcode, + p_char8 data, oatpp::v_io_size size) override { + touchActivity(&socket); // #439: any inbound frame counts as activity + if (size > 0) { + if (m_overflowed) return; // ignore remaining frames of a too-large message + if (m_buffer.getCurrentPosition() + (std::size_t)size > kMaxMessageBytes) { + // #439: cap a single authenticated client from OOMing the + // process by streaming gigabytes into a single text frame. + m_overflowed = true; + m_buffer.setCurrentPosition(0); + OATPP_LOGW("WS", "client exceeded %zu B message cap — closing", kMaxMessageBytes); + try { socket.sendClose(1009, "Message Too Big"); } catch (...) {} + return; + } + m_buffer.writeSimple(data, size); + } else { + // size == 0 signals end of message + if (m_overflowed) { + m_overflowed = false; // reset for next message (though the socket is closing) + m_buffer.setCurrentPosition(0); + return; + } + std::string text = m_buffer.toString(); + m_buffer.setCurrentPosition(0); + if (opcode == 1 && !text.empty()) { // text frame + handleMessage(socket, text); + } + } + } +}; +} // namespace oatpp_authkit::ws