Browse Source

LibWeb: Send IPC messages exceeding socket buffer through shared memory

It turned out that some web applications want to send fairly large
messages to WebWorker through IPC (for example, MapLibre GL sends
~1200KiB), which led to failures (at least on macOS) because buffer size
of TransportSocket is limited to 128KiB. This change solves the problem
by wrapping messages that exceed socket buffer size into another message
that holds wrapped message content in shared memory.

Co-Authored-By: Luke Wilde <luke@ladybird.org>
Aliaksandr Kalenik 1 week ago
parent
commit
4b04e97feb

+ 17 - 3
Libraries/LibIPC/Connection.cpp

@@ -70,16 +70,21 @@ bool ConnectionBase::is_open() const
 
 ErrorOr<void> ConnectionBase::post_message(Message const& message)
 {
-    return post_message(TRY(message.encode()));
+    return post_message(message.endpoint_magic(), TRY(message.encode()));
 }
 
-ErrorOr<void> ConnectionBase::post_message(MessageBuffer buffer)
+ErrorOr<void> ConnectionBase::post_message(u32 endpoint_magic, MessageBuffer buffer)
 {
     // NOTE: If this connection is being shut down, but has not yet been destroyed,
     //       the socket will be closed. Don't try to send more messages.
     if (!m_transport.is_open())
         return Error::from_string_literal("Trying to post_message during IPC shutdown");
 
+    if (buffer.data().size() > TransportSocket::SOCKET_BUFFER_SIZE) {
+        auto wrapper = LargeMessageWrapper::create(endpoint_magic, buffer);
+        buffer = MUST(wrapper->encode());
+    }
+
     {
         Threading::MutexLocker locker(m_send_queue->mutex);
         m_send_queue->messages.append(move(buffer));
@@ -114,7 +119,7 @@ void ConnectionBase::handle_messages()
             }
 
             if (auto response = handler_result.release_value()) {
-                if (auto post_result = post_message(*response); post_result.is_error()) {
+                if (auto post_result = post_message(m_local_endpoint_magic, *response); post_result.is_error()) {
                     dbgln("IPC::ConnectionBase::handle_messages: {}", post_result.error());
                 }
             }
@@ -221,6 +226,15 @@ void ConnectionBase::try_parse_messages(Vector<u8> const& bytes, size_t& index)
         auto remaining_bytes = ReadonlyBytes { bytes.data() + index, message_size };
 
         if (auto message = try_parse_message(remaining_bytes, m_unprocessed_fds)) {
+            if (message->message_id() == LargeMessageWrapper::MESSAGE_ID) {
+                LargeMessageWrapper* wrapper = static_cast<LargeMessageWrapper*>(message.ptr());
+                auto wrapped_message = wrapper->wrapped_message_data();
+                m_unprocessed_fds.return_fds_to_front_of_queue(wrapper->take_fds());
+                auto parsed_message = try_parse_message(wrapped_message, m_unprocessed_fds);
+                VERIFY(parsed_message);
+                m_unprocessed_messages.append(parsed_message.release_nonnull());
+                continue;
+            }
             m_unprocessed_messages.append(message.release_nonnull());
             continue;
         }

+ 5 - 4
Libraries/LibIPC/Connection.h

@@ -13,6 +13,7 @@
 #include <LibIPC/File.h>
 #include <LibIPC/Forward.h>
 #include <LibIPC/Transport.h>
+#include <LibIPC/UnprocessedFileDescriptors.h>
 #include <LibThreading/ConditionVariable.h>
 #include <LibThreading/MutexProtected.h>
 #include <LibThreading/Thread.h>
@@ -27,7 +28,7 @@ public:
 
     [[nodiscard]] bool is_open() const;
     ErrorOr<void> post_message(Message const&);
-    ErrorOr<void> post_message(MessageBuffer);
+    ErrorOr<void> post_message(u32 endpoint_magic, MessageBuffer);
 
     void shutdown();
     virtual void die() { }
@@ -40,7 +41,7 @@ protected:
     virtual void may_have_become_unresponsive() { }
     virtual void did_become_responsive() { }
     virtual void shutdown_with_error(Error const&);
-    virtual OwnPtr<Message> try_parse_message(ReadonlyBytes, Queue<IPC::File>&) = 0;
+    virtual OwnPtr<Message> try_parse_message(ReadonlyBytes, UnprocessedFileDescriptors&) = 0;
 
     OwnPtr<IPC::Message> wait_for_specific_endpoint_message_impl(u32 endpoint_magic, int message_id);
     void wait_for_transport_to_become_readable();
@@ -57,7 +58,7 @@ protected:
     RefPtr<Core::Timer> m_responsiveness_timer;
 
     Vector<NonnullOwnPtr<Message>> m_unprocessed_messages;
-    Queue<IPC::File> m_unprocessed_fds; // unused on Windows
+    UnprocessedFileDescriptors m_unprocessed_fds;
     ByteBuffer m_unprocessed_bytes;
 
     u32 m_local_endpoint_magic { 0 };
@@ -113,7 +114,7 @@ protected:
         return {};
     }
 
-    virtual OwnPtr<Message> try_parse_message(ReadonlyBytes bytes, Queue<IPC::File>& fds) override
+    virtual OwnPtr<Message> try_parse_message(ReadonlyBytes bytes, UnprocessedFileDescriptors& fds) override
     {
         auto local_message = LocalEndpoint::decode_message(bytes, fds);
         if (!local_message.is_error())

+ 4 - 3
Libraries/LibIPC/Decoder.h

@@ -23,6 +23,7 @@
 #include <LibIPC/File.h>
 #include <LibIPC/Forward.h>
 #include <LibIPC/Message.h>
+#include <LibIPC/UnprocessedFileDescriptors.h>
 #include <LibURL/Origin.h>
 #include <LibURL/URL.h>
 
@@ -37,7 +38,7 @@ inline ErrorOr<T> decode(Decoder&)
 
 class Decoder {
 public:
-    Decoder(Stream& stream, Queue<IPC::File>& files)
+    Decoder(Stream& stream, UnprocessedFileDescriptors& files)
         : m_stream(stream)
         , m_files(files)
     {
@@ -62,11 +63,11 @@ public:
     ErrorOr<size_t> decode_size();
 
     Stream& stream() { return m_stream; }
-    Queue<IPC::File>& files() { return m_files; }
+    UnprocessedFileDescriptors& files() { return m_files; }
 
 private:
     Stream& m_stream;
-    Queue<IPC::File>& m_files;
+    UnprocessedFileDescriptors& m_files;
 };
 
 template<Arithmetic T>

+ 1 - 1
Libraries/LibIPC/File.cpp

@@ -14,7 +14,7 @@ namespace IPC {
 template<>
 ErrorOr<File> decode(Decoder& decoder)
 {
-    auto file = TRY(decoder.files().try_dequeue());
+    auto file = decoder.files().dequeue();
     TRY(Core::System::set_close_on_exec(file.fd(), true));
     return file;
 }

+ 52 - 0
Libraries/LibIPC/Message.cpp

@@ -5,6 +5,8 @@
  */
 
 #include <AK/Checked.h>
+#include <LibIPC/Decoder.h>
+#include <LibIPC/Encoder.h>
 #include <LibIPC/Message.h>
 
 namespace IPC {
@@ -59,4 +61,54 @@ ErrorOr<void> MessageBuffer::transfer_message(Transport& transport)
     return {};
 }
 
+NonnullOwnPtr<LargeMessageWrapper> LargeMessageWrapper::create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap)
+{
+    auto size = buffer_to_wrap.data().size() - sizeof(MessageSizeType);
+    u8 const* data = buffer_to_wrap.data().data() + sizeof(MessageSizeType);
+    auto wrapped_message_data = MUST(Core::AnonymousBuffer::create_with_size(size));
+    memcpy(wrapped_message_data.data<void>(), data, size);
+    Vector<File> files;
+    for (auto& owned_fd : buffer_to_wrap.take_fds()) {
+        files.append(File::adopt_fd(owned_fd->take_fd()));
+    }
+    return make<LargeMessageWrapper>(endpoint_magic, move(wrapped_message_data), move(files));
+}
+
+LargeMessageWrapper::LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector<File>&& wrapped_fds)
+    : m_endpoint_magic(endpoint_magic)
+    , m_wrapped_message_data(move(wrapped_message_data))
+    , m_wrapped_fds(move(wrapped_fds))
+{
+}
+
+ErrorOr<MessageBuffer> LargeMessageWrapper::encode() const
+{
+    MessageBuffer buffer;
+    Encoder stream { buffer };
+    TRY(stream.encode(m_endpoint_magic));
+    TRY(stream.encode(MESSAGE_ID));
+    TRY(stream.encode(m_wrapped_message_data));
+    TRY(stream.encode(m_wrapped_fds.size()));
+    for (auto const& wrapped_fd : m_wrapped_fds) {
+        TRY(stream.append_file_descriptor(wrapped_fd.take_fd()));
+    }
+
+    return buffer;
+}
+
+ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> LargeMessageWrapper::decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files)
+{
+    Decoder decoder { stream, files };
+    auto wrapped_message_data = TRY(decoder.decode<Core::AnonymousBuffer>());
+
+    Vector<File> wrapped_fds;
+    auto num_fds = TRY(decoder.decode<u32>());
+    for (u32 i = 0; i < num_fds; ++i) {
+        auto fd = TRY(decoder.decode<IPC::File>());
+        wrapped_fds.append(move(fd));
+    }
+
+    return make<LargeMessageWrapper>(endpoint_magic, wrapped_message_data, move(wrapped_fds));
+}
+
 }

+ 45 - 0
Libraries/LibIPC/Message.h

@@ -11,9 +11,11 @@
 #include <AK/RefCounted.h>
 #include <AK/RefPtr.h>
 #include <AK/Vector.h>
+#include <LibCore/AnonymousBuffer.h>
 #include <LibCore/Forward.h>
 #include <LibCore/System.h>
 #include <LibIPC/Transport.h>
+#include <LibIPC/UnprocessedFileDescriptors.h>
 
 namespace IPC {
 
@@ -32,6 +34,13 @@ public:
 
     int value() const { return m_fd; }
 
+    int take_fd()
+    {
+        int fd = m_fd;
+        m_fd = -1;
+        return fd;
+    }
+
 private:
     int m_fd;
 };
@@ -40,6 +49,12 @@ class MessageBuffer {
 public:
     MessageBuffer();
 
+    MessageBuffer(Vector<u8, 1024> data, Vector<NonnullRefPtr<AutoCloseFileDescriptor>, 1> fds)
+        : m_data(move(data))
+        , m_fds(move(fds))
+    {
+    }
+
     ErrorOr<void> extend_data_capacity(size_t capacity);
     ErrorOr<void> append_data(u8 const* values, size_t count);
 
@@ -47,8 +62,12 @@ public:
 
     ErrorOr<void> transfer_message(Transport& transport);
 
+    auto const& data() const { return m_data; }
+    auto take_fds() { return move(m_fds); }
+
 private:
     Vector<u8, 1024> m_data;
+    bool m_fds_taken { false };
     Vector<NonnullRefPtr<AutoCloseFileDescriptor>, 1> m_fds;
 #ifdef AK_OS_WINDOWS
     Vector<size_t> m_handle_offsets;
@@ -75,4 +94,30 @@ protected:
     Message() = default;
 };
 
+class LargeMessageWrapper : public Message {
+public:
+    ~LargeMessageWrapper() override = default;
+
+    static constexpr int MESSAGE_ID = 0x0;
+
+    static NonnullOwnPtr<LargeMessageWrapper> create(u32 endpoint_magic, MessageBuffer& buffer_to_wrap);
+
+    u32 endpoint_magic() const override { return m_endpoint_magic; }
+    int message_id() const override { return MESSAGE_ID; }
+    char const* message_name() const override { return "LargeMessageWrapper"; }
+    ErrorOr<MessageBuffer> encode() const override;
+
+    static ErrorOr<NonnullOwnPtr<LargeMessageWrapper>> decode(u32 endpoint_magic, Stream& stream, UnprocessedFileDescriptors& files);
+
+    ReadonlyBytes wrapped_message_data() const { return ReadonlyBytes { m_wrapped_message_data.data<u8>(), m_wrapped_message_data.size() }; }
+    auto take_fds() { return move(m_wrapped_fds); }
+
+    LargeMessageWrapper(u32 endpoint_magic, Core::AnonymousBuffer wrapped_message_data, Vector<IPC::File>&& wrapped_fds);
+
+private:
+    u32 m_endpoint_magic { 0 };
+    Core::AnonymousBuffer m_wrapped_message_data;
+    Vector<File> m_wrapped_fds;
+};
+
 }

+ 2 - 3
Libraries/LibIPC/TransportSocket.cpp

@@ -15,9 +15,8 @@ namespace IPC {
 TransportSocket::TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket)
     : m_socket(move(socket))
 {
-    socklen_t socket_buffer_size = 128 * KiB;
-    (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &socket_buffer_size, sizeof(socket_buffer_size));
-    (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &socket_buffer_size, sizeof(socket_buffer_size));
+    (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_SNDBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
+    (void)Core::System::setsockopt(m_socket->fd().value(), SOL_SOCKET, SO_RCVBUF, &SOCKET_BUFFER_SIZE, sizeof(SOCKET_BUFFER_SIZE));
 }
 
 TransportSocket::~TransportSocket() = default;

+ 2 - 0
Libraries/LibIPC/TransportSocket.h

@@ -15,6 +15,8 @@ class TransportSocket {
     AK_MAKE_DEFAULT_MOVABLE(TransportSocket);
 
 public:
+    static constexpr socklen_t SOCKET_BUFFER_SIZE = 128 * KiB;
+
     explicit TransportSocket(NonnullOwnPtr<Core::LocalSocket> socket);
     ~TransportSocket();
 

+ 34 - 0
Libraries/LibIPC/UnprocessedFileDescriptors.h

@@ -0,0 +1,34 @@
+/*
+ * Copyright (c) 2025, Aliaksandr Kalenik <kalenik.aliaksandr@gmail.com>
+ *
+ * SPDX-License-Identifier: BSD-2-Clause
+ */
+
+#pragma once
+
+#include <LibIPC/File.h>
+
+namespace IPC {
+
+class UnprocessedFileDescriptors {
+public:
+    void enqueue(File&& fd)
+    {
+        m_fds.append(move(fd));
+    }
+
+    File dequeue()
+    {
+        return m_fds.take_first();
+    }
+
+    void return_fds_to_front_of_queue(Vector<File>&& fds)
+    {
+        m_fds.prepend(move(fds));
+    }
+
+private:
+    Vector<File> m_fds;
+};
+
+}

+ 2 - 1
Libraries/LibWeb/HTML/MessagePort.h

@@ -12,6 +12,7 @@
 #include <LibCore/Socket.h>
 #include <LibIPC/File.h>
 #include <LibIPC/Transport.h>
+#include <LibIPC/UnprocessedFileDescriptors.h>
 #include <LibWeb/Bindings/Transferable.h>
 #include <LibWeb/DOM/EventTarget.h>
 #include <LibWeb/Forward.h>
@@ -98,7 +99,7 @@ private:
         Error,
     } m_socket_state { SocketState::Header };
     size_t m_socket_incoming_message_size { 0 };
-    Queue<IPC::File> m_unprocessed_fds;
+    IPC::UnprocessedFileDescriptors m_unprocessed_fds;
     Vector<u8> m_buffered_data;
 
     GC::Ptr<DOM::EventTarget> m_worker_event_target;

+ 9 - 3
Meta/Lagom/Tools/CodeGenerators/IPCCompiler/main.cpp

@@ -404,7 +404,7 @@ public:)~~~");
     static i32 static_message_id() { return (int)MessageID::@message.pascal_name@; }
     virtual const char* message_name() const override { return "@endpoint.name@::@message.pascal_name@"; }
 
-    static ErrorOr<NonnullOwnPtr<@message.pascal_name@>> decode(Stream& stream, Queue<IPC::File>& files)
+    static ErrorOr<NonnullOwnPtr<@message.pascal_name@>> decode(Stream& stream, IPC::UnprocessedFileDescriptors& files)
     {
         IPC::Decoder decoder { stream, files };)~~~");
 
@@ -649,7 +649,7 @@ void generate_proxy_method(SourceGenerator& message_generator, Endpoint const& e
         }
     } else {
         message_generator.append(R"~~~());
-        MUST(m_connection.post_message(move(message_buffer))); )~~~");
+        MUST(m_connection.post_message(@endpoint.magic@, move(message_buffer))); )~~~");
     }
 
     message_generator.appendln(R"~~~(
@@ -720,7 +720,7 @@ public:
 
     static u32 static_magic() { return @endpoint.magic@; }
 
-    static ErrorOr<NonnullOwnPtr<IPC::Message>> decode_message(ReadonlyBytes buffer, [[maybe_unused]] Queue<IPC::File>& files)
+    static ErrorOr<NonnullOwnPtr<IPC::Message>> decode_message(ReadonlyBytes buffer, [[maybe_unused]] IPC::UnprocessedFileDescriptors& files)
     {
         FixedMemoryStream stream { buffer };
         auto message_endpoint_magic = TRY(stream.read_value<u32>());)~~~");
@@ -757,6 +757,11 @@ public:
             do_decode_message(message.response_name());
     }
 
+    generator.append(R"~~~(
+        case (int)IPC::LargeMessageWrapper::MESSAGE_ID:
+            return TRY(IPC::LargeMessageWrapper::decode(message_endpoint_magic, stream, files));
+)~~~");
+
     generator.append(R"~~~(
         default:)~~~");
     if constexpr (GENERATE_DEBUG) {
@@ -898,6 +903,7 @@ void build(StringBuilder& builder, Vector<Endpoint> const& endpoints)
 #include <LibIPC/File.h>
 #include <LibIPC/Message.h>
 #include <LibIPC/Stub.h>
+#include <LibIPC/UnprocessedFileDescriptors.h>
 
 #if defined(AK_COMPILER_CLANG)
 #pragma clang diagnostic push