diff --git a/python/client.cc b/python/client.cc index 5c4d73f8d..ecbd8277f 100644 --- a/python/client.cc +++ b/python/client.cc @@ -486,6 +486,14 @@ void bind_client(py::module& mod) { .def( "clear", [](ClientBase* self) { throw_on_error(self->Clear()); }, doc::ClientBase_clear) + .def( + "memory_trim", + [](ClientBase* self) -> bool { + bool trimmed = false; + throw_on_error(self->MemoryTrim(trimmed)); + return trimmed; + }, + doc::ClientBase_memory_trim) .def( "label", [](ClientBase* self, ObjectID id, std::string const& key, diff --git a/python/pybind11_docs.cc b/python/pybind11_docs.cc index 1e23bbaaf..c9ca54bfb 100644 --- a/python/pybind11_docs.cc +++ b/python/pybind11_docs.cc @@ -773,6 +773,16 @@ const char* ClientBase_clear = R"doc( Drop all objects that visible to the current instance in the vineyard cluster. )doc"; +const char* ClientBase_memory_trim = R"doc( +.. method:: memory_trim() -> bool + :noindex: + +Trim the memory pool inside the shared memory allocator to return the unused +physical memory back to the OS kernel, like the `malloc_trim` API from glibc. + +Returns True if it actually released any memory. +)doc"; + const char* ClientBase_reset = R"doc( .. method:: reset() -> None :noindex: diff --git a/python/pybind11_docs.h b/python/pybind11_docs.h index 38436beca..0a9a6b05c 100644 --- a/python/pybind11_docs.h +++ b/python/pybind11_docs.h @@ -105,6 +105,7 @@ extern const char* ClientBase_list_names; extern const char* ClientBase_drop_name; extern const char* ClientBase_sync_meta; extern const char* ClientBase_clear; +extern const char* ClientBase_memory_trim; extern const char* ClientBase_reset; extern const char* ClientBase_connected; extern const char* ClientBase_instance_id; diff --git a/python/vineyard/core/tests/test_client.py b/python/vineyard/core/tests/test_client.py index 8a0c09561..fcd241d1c 100644 --- a/python/vineyard/core/tests/test_client.py +++ b/python/vineyard/core/tests/test_client.py @@ -281,3 +281,43 @@ def start_requests(rs, state, ipc_socket): r, message = rs.get(block=True) if not r: pytest.fail(message) + + +def parse_shared_memory_usage(): + '''Parse the shared memory usage from /proc/meminfo, in KB.''' + with open('/proc/meminfo', 'r', encoding='utf-8') as f: + lines = f.readlines() + for line in lines: + if line.startswith('Shmem:'): + parts = line.split() + return int(parts[1]) + + +def test_memory_trim(vineyard_client): + data = np.ones((1000, 1000, 16)) + + # cleanup the instance + vineyard_client.clear() + vineyard_client.memory_trim() + + original_memory_usage = parse_shared_memory_usage() + + data = np.ones((1000, 1000, 16)) + data_kbytes = data.nbytes / 1024 + + rs = [] + for i in range(8): + r = vineyard_client.put(data) + rs.append(r) + current_memory_usage = parse_shared_memory_usage() + assert current_memory_usage >= original_memory_usage + i * data_kbytes + + for r in rs: + vineyard_client.delete(r) + + current_memory_usage = parse_shared_memory_usage() + assert current_memory_usage >= original_memory_usage + (8 - 1) * data_kbytes + + vineyard_client.memory_trim() + # there might be some fragmentation overhead + assert parse_shared_memory_usage() <= original_memory_usage + 1 * data_kbytes diff --git a/src/client/client_base.cc b/src/client/client_base.cc index 9bcf37ca7..23ef2ab86 100644 --- a/src/client/client_base.cc +++ b/src/client/client_base.cc @@ -366,6 +366,17 @@ Status ClientBase::Clear() { return Status::OK(); } +Status ClientBase::MemoryTrim(bool& trimmed) { + ENSURE_CONNECTED(this); + std::string message_out; + WriteMemoryTrimRequest(message_out); + RETURN_ON_ERROR(doWrite(message_out)); + json message_in; + RETURN_ON_ERROR(doRead(message_in)); + RETURN_ON_ERROR(ReadMemoryTrimReply(message_in, trimmed)); + return Status::OK(); +} + Status ClientBase::Label(const ObjectID object, std::string const& key, std::string const& value) { ENSURE_CONNECTED(this); diff --git a/src/client/client_base.h b/src/client/client_base.h index 4f0a2fa9b..acb0fa7e9 100644 --- a/src/client/client_base.h +++ b/src/client/client_base.h @@ -423,6 +423,13 @@ class ClientBase { */ Status Clear(); + /** + * @brief Trim the memory pool inside the shared memory allocator to return + * the unused physical memory back to the OS kernel, like the + * `malloc_trim` API from glibc. + */ + Status MemoryTrim(bool& trimmed); + /** * @brief Associate given labels to an existing object. * diff --git a/src/common/memory/payload.h b/src/common/memory/payload.h index 4a66ca0b3..b327fe00c 100644 --- a/src/common/memory/payload.h +++ b/src/common/memory/payload.h @@ -69,6 +69,8 @@ struct Payload { bool operator==(const Payload& other) const; + inline ObjectID id() const { return object_id; } + inline void Reset() { is_sealed = false, is_owner = true; } inline void MarkAsSealed() { is_sealed = true; } @@ -162,6 +164,8 @@ struct PlasmaPayload : public Payload { (plasma_id == other.plasma_id) && (data_size == other.data_size)); } + inline PlasmaID id() const { return plasma_id; } + Payload ToNormalPayload() const { return Payload(object_id, data_size, pointer, store_fd, arena_fd, map_size, data_offset); diff --git a/src/common/util/env.cc b/src/common/util/env.cc index e5d8ff871..0efd16298 100644 --- a/src/common/util/env.cc +++ b/src/common/util/env.cc @@ -249,14 +249,14 @@ int64_t get_maximum_shared_memory() { * @brief Return the memory size in human readable way. */ std::string prettyprint_memory_size(size_t nbytes) { - if (nbytes > (1L << 40)) { - return std::to_string(nbytes * 1.0 / (1L << 40)) + " TB"; - } else if (nbytes > (1L << 30)) { - return std::to_string(nbytes * 1.0 / (1L << 30)) + " GB"; - } else if (nbytes > (1L << 20)) { - return std::to_string(nbytes * 1.0 / (1L << 20)) + " MB"; - } else if (nbytes > (1L << 10)) { - return std::to_string(nbytes * 1.0 / (1L << 10)) + " KB"; + if (nbytes >= (1LL << 40)) { + return std::to_string(nbytes * 1.0 / (1LL << 40)) + " TB"; + } else if (nbytes >= (1LL << 30)) { + return std::to_string(nbytes * 1.0 / (1LL << 30)) + " GB"; + } else if (nbytes >= (1LL << 20)) { + return std::to_string(nbytes * 1.0 / (1LL << 20)) + " MB"; + } else if (nbytes >= (1LL << 10)) { + return std::to_string(nbytes * 1.0 / (1LL << 10)) + " KB"; } else { return std::to_string(nbytes) + " B"; } diff --git a/src/common/util/protocols.cc b/src/common/util/protocols.cc index 4718c0fa5..4e3a0e297 100644 --- a/src/common/util/protocols.cc +++ b/src/common/util/protocols.cc @@ -123,6 +123,8 @@ const std::string command_t::LABEL_REQUEST = "label_request"; const std::string command_t::LABEL_REPLY = "label_reply"; const std::string command_t::CLEAR_REQUEST = "clear_request"; const std::string command_t::CLEAR_REPLY = "clear_reply"; +const std::string command_t::MEMORY_TRIM_REQUEST = "memory_trim_request"; +const std::string command_t::MEMORY_TRIM_REPLY = "memory_trim_reply"; // Stream APIs const std::string command_t::CREATE_STREAM_REQUEST = "create_stream_request"; @@ -1301,6 +1303,33 @@ Status ReadClearReply(const json& root) { CHECK_IPC_ERROR(root, command_t::CLEAR_REPLY); return Status::OK(); } + +void WriteMemoryTrimRequest(std::string& msg) { + json root; + root["type"] = command_t::MEMORY_TRIM_REQUEST; + + encode_msg(root, msg); +} + +Status ReadMemoryTrimRequest(const json& root) { + CHECK_IPC_ERROR(root, command_t::MEMORY_TRIM_REQUEST); + return Status::OK(); +} + +void WriteMemoryTrimReply(const bool trimmed, std::string& msg) { + json root; + root["type"] = command_t::MEMORY_TRIM_REPLY; + root["trimmed"] = trimmed; + + encode_msg(root, msg); +} + +Status ReadMemoryTrimReply(const json& root, bool& trimmed) { + CHECK_IPC_ERROR(root, command_t::MEMORY_TRIM_REPLY); + trimmed = root.value("trimmed", false); + return Status::OK(); +} + void WriteCreateStreamRequest(const ObjectID& object_id, std::string& msg) { json root; root["type"] = command_t::CREATE_STREAM_REQUEST; diff --git a/src/common/util/protocols.h b/src/common/util/protocols.h index a8767cfb2..5c3434718 100644 --- a/src/common/util/protocols.h +++ b/src/common/util/protocols.h @@ -99,6 +99,8 @@ struct command_t { static const std::string LABEL_REPLY; static const std::string CLEAR_REQUEST; static const std::string CLEAR_REPLY; + static const std::string MEMORY_TRIM_REQUEST; + static const std::string MEMORY_TRIM_REPLY; // Stream APIs static const std::string CREATE_STREAM_REQUEST; @@ -516,6 +518,14 @@ void WriteClearReply(std::string& msg); Status ReadClearReply(const json& root); +void WriteMemoryTrimRequest(std::string& msg); + +Status ReadMemoryTrimRequest(const json& root); + +void WriteMemoryTrimReply(const bool trimmed, std::string& msg); + +Status ReadMemoryTrimReply(const json& root, bool& trimmed); + void WriteCreateStreamRequest(const ObjectID& object_id, std::string& msg); Status ReadCreateStreamRequest(const json& root, ObjectID& object_id); diff --git a/src/common/util/uuid.h b/src/common/util/uuid.h index 4ad38d2ef..3ea673813 100644 --- a/src/common/util/uuid.h +++ b/src/common/util/uuid.h @@ -168,8 +168,6 @@ inline ObjectID GenerateSignature() { return 0x7FFFFFFFFFFFFFFFUL & detail::cycleclock::now(); } -inline bool IsBlob(ObjectID id) { return id & 0x8000000000000000UL; } - const std::string ObjectIDToString(const ObjectID id); const std::string ObjectIDToString(const PlasmaID id); @@ -267,10 +265,27 @@ inline ID GenerateBlobID(const void* ptr) { } template -ID EmptyBlobID() { +inline ID EmptyBlobID() { return GenerateBlobID(0x8000000000000000UL); } +template +inline ID PlaceholderBlobID() { + return GenerateBlobID(std::numeric_limits::max()); +} + +inline bool IsBlob(ObjectID id) { return id & 0x8000000000000000UL; } + +template +inline bool IsEmptyBlobID(ID id) { + return EmptyBlobID() == id; +} + +template +inline bool IsPlaceholderBlobID(ID id) { + return PlaceholderBlobID() == id; +} + template std::string IDToString(ID id); diff --git a/src/server/async/socket_server.cc b/src/server/async/socket_server.cc index 4f5a2065e..152d6ec5d 100644 --- a/src/server/async/socket_server.cc +++ b/src/server/async/socket_server.cc @@ -297,6 +297,8 @@ bool SocketConnection::processMessage(const std::string& message_in) { return doLabelObject(root); } else if (cmd == command_t::CLEAR_REQUEST) { return doClear(root); + } else if (cmd == command_t::MEMORY_TRIM_REQUEST) { + return doMemoryTrim(root); } else if (cmd == command_t::CREATE_STREAM_REQUEST) { return doCreateStream(root); } else if (cmd == command_t::OPEN_STREAM_REQUEST) { @@ -1078,6 +1080,17 @@ bool SocketConnection::doClear(const json& root) { return false; } +bool SocketConnection::doMemoryTrim(const json& root) { + auto self(shared_from_this()); + std::string message_out; + + TRY_READ_REQUEST(ReadMemoryTrimRequest, root); + bool trimmed = bulk_store_->MemoryTrim(); + WriteMemoryTrimReply(trimmed, message_out); + self->doWrite(message_out); + return false; +} + bool SocketConnection::doCreateStream(const json& root) { auto self(shared_from_this()); ObjectID stream_id; diff --git a/src/server/async/socket_server.h b/src/server/async/socket_server.h index c6b6f533b..e8ec8d081 100644 --- a/src/server/async/socket_server.h +++ b/src/server/async/socket_server.h @@ -104,6 +104,7 @@ class SocketConnection : public std::enable_shared_from_this { bool doIfPersist(json const& root); bool doLabelObject(json const& root); bool doClear(json const& root); + bool doMemoryTrim(json const& root); bool doCreateStream(json const& root); bool doOpenStream(json const& root); diff --git a/src/server/memory/allocator.cc b/src/server/memory/allocator.cc index 930e5f4f2..f2895038a 100644 --- a/src/server/memory/allocator.cc +++ b/src/server/memory/allocator.cc @@ -39,6 +39,7 @@ #include "server/memory/allocator.h" +#include "common/util/logging.h" #include "server/memory/dlmalloc.h" #include "server/memory/mimalloc.h" diff --git a/src/server/memory/malloc.cc b/src/server/memory/malloc.cc index 4f49a06ce..1b3119d3e 100644 --- a/src/server/memory/malloc.cc +++ b/src/server/memory/malloc.cc @@ -236,10 +236,11 @@ void* mmap_buffer(int64_t size, bool* is_committed, bool* is_zero) { size += kMmapRegionsGap; int fd = create_buffer(size); - return mmap_buffer(fd, size, is_committed, is_zero); + return mmap_buffer(fd, size, true, is_committed, is_zero); } -void* mmap_buffer(int fd, int64_t size, bool* is_committed, bool* is_zero) { +void* mmap_buffer(int fd, int64_t size, bool gap, bool* is_committed, + bool* is_zero) { if (fd < 0) { LOG(ERROR) << "failed to create buffer during mmap: " << strerror(errno); return nullptr; @@ -268,9 +269,12 @@ void* mmap_buffer(int fd, int64_t size, bool* is_committed, bool* is_zero) { MmapRecord& record = mmap_records[pointer]; record.fd = fd; record.size = size; + record.kind = MmapRecord::Kind::kMalloc; - // We lie to dlmalloc/mimalloc about where mapped memory actually lives. - pointer = pointer_advance(pointer, kMmapRegionsGap); + if (gap) { + // We lie to dlmalloc/mimalloc about where mapped memory actually lives. + pointer = pointer_advance(pointer, kMmapRegionsGap); + } return pointer; } diff --git a/src/server/memory/malloc.h b/src/server/memory/malloc.h index a5d2691a8..9c9dd1d50 100644 --- a/src/server/memory/malloc.h +++ b/src/server/memory/malloc.h @@ -48,6 +48,12 @@ void GetMallocMapinfo(void* addr, int* fd, int64_t* map_length, struct MmapRecord { int fd = -1; int64_t size = -1; + enum class Kind { + kMalloc = 0, + kAllocator = 1, + kDiskMMap = 2, + }; + Kind kind = Kind::kMalloc; }; /// Hashtable that contains one entry per segment that we got from the OS @@ -68,7 +74,8 @@ int create_buffer(int64_t size, std::string const& path); void* mmap_buffer(int64_t size, bool* is_committed, bool* is_zero); // Create a buffer, and mmap the buffer as the shared memory space. -void* mmap_buffer(int fd, int64_t size, bool* is_committed, bool* is_zero); +void* mmap_buffer(int fd, int64_t size, bool gap, bool* is_committed, + bool* is_zero); // Unmap the buffer. int munmap_buffer(void* addr, int64_t size); diff --git a/src/server/memory/memory.cc b/src/server/memory/memory.cc index b0d6a4d3b..7a4619933 100644 --- a/src/server/memory/memory.cc +++ b/src/server/memory/memory.cc @@ -36,6 +36,7 @@ #include #include #include +#include #include #include "common/util/logging.h" // IWYU pragma: keep @@ -66,8 +67,9 @@ static inline uintptr_t align_down(const uintptr_t address, return address & ~(alignment - 1); } -static inline void recycle_resident_memory(const uintptr_t aligned_left, - const uintptr_t aligned_right) { +static inline size_t recycle_resident_memory(const uintptr_t aligned_left, + const uintptr_t aligned_right, + const bool release_immediately) { if (aligned_left < aligned_right) { /** * Notes [Recycle Pages with madvise]: @@ -78,27 +80,40 @@ static inline void recycle_resident_memory(const uintptr_t aligned_left, * * See also: https://man7.org/linux/man-pages/man2/madvise.2.html */ +#if defined(__linux__) || defined(__linux) || defined(linux) || \ + defined(__gnu_linux__) + int advice = release_immediately ? MADV_REMOVE : MADV_DONTNEED; +#else + int advice = MADV_DONTNEED; +#endif if (madvise(reinterpret_cast(aligned_left), - aligned_right - aligned_left, MADV_DONTNEED)) { + aligned_right - aligned_left, advice)) { LOG(ERROR) << "madvise failed: " << errno << " -> " << strerror(errno) << ", arguments: base = " << reinterpret_cast(aligned_left) << ", length = " << (aligned_right - aligned_left) - << ", advice = MADV_DONTNEED"; + << ", advice = " + << (advice == MADV_DONTNEED ? "MADV_DONTNEED" : "MADV_REMOVE"); + } else { + return aligned_right - aligned_left; } } + return 0; } -static inline void recycle_resident_memory(const uintptr_t base, size_t left, - size_t right) { +static inline size_t recycle_resident_memory(const uintptr_t base, size_t left, + size_t right, + const bool release_immediately) { static size_t page_size = system_page_size(); uintptr_t aligned_left = align_up(base + left, page_size), aligned_right = align_down(base + right, page_size); DVLOG(10) << "recycle memory: " << reinterpret_cast(base + left) << "(" << reinterpret_cast(aligned_left) << ") to " << reinterpret_cast(base + right) << "(" - << reinterpret_cast(aligned_right) << ")"; - recycle_resident_memory(aligned_left, aligned_right); + << reinterpret_cast(aligned_right) + << "), sizes: " << (right - left); + return recycle_resident_memory(aligned_left, aligned_right, + release_immediately); } /** @@ -106,30 +121,34 @@ static inline void recycle_resident_memory(const uintptr_t base, size_t left, * * n.b.: the intervals may overlap. */ -static void recycle_arena(const uintptr_t base, const size_t size, - std::vector const& offsets, - std::vector const& sizes) { - std::map points; - points[0] = 0; - points[size] = 0; +size_t recycle_arena(const uintptr_t base, const size_t size, + std::vector const& offsets, + std::vector const& sizes, + const bool release_immediately = false) { + std::map pointers; + pointers[0] = 0; + pointers[size] = 0; for (size_t idx = 0; idx < offsets.size(); ++idx) { - points[offsets[idx]] += 1; - points[offsets[idx] + sizes[idx]] -= 1; + pointers[offsets[idx]] += 1; + pointers[offsets[idx] + sizes[idx]] -= 1; } - auto head = points.begin(); + auto head = pointers.begin(); int markersum = 0; + size_t released = 0; while (true) { markersum += head->second; auto next = std::next(head); - if (next == points.end()) { + if (next == pointers.end()) { break; } if (markersum == 0) { // release memory in the untouched interval. - recycle_resident_memory(base, head->first, next->first); + released += recycle_resident_memory(base, head->first, next->first, + release_immediately); } head = next; } + return released; } } // namespace memory @@ -341,7 +360,7 @@ Status BulkStoreBase::Delete(ID const& object_id) { << "), recycle: (" << std::max(lower, lower_bound) << ", " << std::min(upper, upper_bound) << ")"; memory::recycle_resident_memory(std::max(lower, lower_bound), - std::min(upper, upper_bound)); + std::min(upper, upper_bound), false); } } return Status::OK(); @@ -381,6 +400,51 @@ bool BulkStoreBase::Exists(const ID& object_id) { return objects_.contains(object_id); } +template +bool BulkStoreBase::MemoryTrim() { + auto locked = objects_.lock_table(); + std::map /*offsets*/, + std::vector /* sizes */>> + kepts; + + for (auto const& record : memory::mmap_records) { + if (record.second.kind == memory::MmapRecord::Kind::kMalloc) { + kepts.emplace(record.second.fd, + std::make_tuple(reinterpret_cast(record.first), + record.second.size, std::vector(), + std::vector())); + } + } + for (auto iter = locked.begin(); iter != locked.end(); iter++) { + auto& object = iter->second; + if (IsPlaceholderBlobID(object->id())) { + continue; + } + if (!object->pointer) { + continue; + } + if (kepts.find(object->store_fd) == kepts.end()) { + continue; + } + auto& kept = kepts[object->store_fd]; + auto& offsets = std::get<2>(kept); + auto& sizes = std::get<3>(kept); + offsets.emplace_back(object->data_offset); + sizes.emplace_back(object->data_size); + } + size_t released = 0; + for (auto& kept : kepts) { + auto& base = std::get<0>(kept.second); + auto& size = std::get<1>(kept.second); + auto& offsets = std::get<2>(kept.second); + auto& sizes = std::get<3>(kept.second); + released += memory::recycle_arena(base, size, offsets, sizes, true); + } + return released > 0; +} + template size_t BulkStoreBase::Footprint() const { return BulkAllocator::Allocated(); @@ -428,7 +492,7 @@ Status BulkStoreBase::PreAllocate(const size_t size, } // insert a special marker for obtaining the whole shared memory range - ID object_id = GenerateBlobID(std::numeric_limits::max()); + ID object_id = PlaceholderBlobID(); int fd = -1; int64_t map_size = 0; ptrdiff_t offset = 0; @@ -478,6 +542,7 @@ Status BulkStoreBase::FinalizeArena(const int fd, memory::mmap_records[reinterpret_cast(mmap_base)]; record.fd = fd; record.size = mmap_size; + record.kind = memory::MmapRecord::Kind::kAllocator; arenas_.erase(arena); } return Status::OK(); @@ -647,7 +712,8 @@ Status BulkStore::CreateDisk(const size_t data_size, const std::string& path, fd = memory::create_buffer(data_size, path); } uint8_t* pointer = static_cast( - memory::mmap_buffer(fd, data_size, &is_committed, &is_zero)); + memory::mmap_buffer(fd, data_size, false, &is_committed, &is_zero)); + memory::mmap_records[pointer].kind = memory::MmapRecord::Kind::kDiskMMap; if (data_size == 0) { object_id = EmptyBlobID(); object = Payload::MakeEmpty(); diff --git a/src/server/memory/memory.h b/src/server/memory/memory.h index 676e53005..3ceacece1 100644 --- a/src/server/memory/memory.h +++ b/src/server/memory/memory.h @@ -82,6 +82,8 @@ class BulkStoreBase { object_map_t& List() { return objects_; } + bool MemoryTrim(); + size_t Footprint() const; size_t FootprintLimit() const; size_t FootprintGPU() const; diff --git a/test/hashmap_mvcc_test.cc b/test/hashmap_mvcc_test.cc index 783daea34..c82902a84 100644 --- a/test/hashmap_mvcc_test.cc +++ b/test/hashmap_mvcc_test.cc @@ -31,7 +31,8 @@ limitations under the License. using namespace vineyard; // NOLINT(build/namespaces) void testHashmapMVCC(Client& client) { - using hashmap_t = HashmapMVCC; + using hashmap_t = HashmapMVCC; + LOG(INFO) << "entry element size: " << sizeof(hashmap_t::Entry) << " bytes"; std::shared_ptr hashmap; VINEYARD_CHECK_OK(hashmap_t::Make(client, 1, hashmap));