Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Limit the cluster cache by memory instead of number of clusters #956

Open
wants to merge 9 commits into
base: cache_config
Choose a base branch
from
41 changes: 21 additions & 20 deletions include/zim/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,27 @@ namespace zim
efficientOrder
};

/** Get the maximum size of the cluster cache.
*
* @return The maximum memory size used the cluster cache.
*/
size_t LIBZIM_API get_cluster_cache_max_size();

/** Get the current size of the cluster cache.
*
* @return The current memory size used by the cluster cache.
*/
size_t LIBZIM_API get_cluster_cache_current_size();

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param size_in_b The memory limit (in bytes) for the cluster cache.
*/
void LIBZIM_API set_cluster_cache_max_size(size_t size_in_b);

/**
* The Archive class to access content in a zim file.
*
Expand Down Expand Up @@ -534,26 +555,6 @@ namespace zim
*/
std::shared_ptr<FileImpl> getImpl() const { return m_impl; }

/** Get the maximum size of the cluster cache.
*
* @return The maximum number of clusters stored in the cache.
*/
size_t get_cluster_cache_max_size() const;

/** Get the current size of the cluster cache.
*
* @return The number of clusters currently stored in the cache.
*/
size_t get_cluster_cache_current_size() const;

/** Set the size of the cluster cache.
*
* If the new size is lower than the number of currently stored clusters
* some clusters will be dropped from cache to respect the new size.
*
* @param nb_clusters The maximum number of clusters stored in the cache.
*/
void set_cluster_cache_max_size(size_t nb_clusters);

/** Get the size of the dirent cache.
*
Expand Down
4 changes: 2 additions & 2 deletions meson_options.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
option('CLUSTER_CACHE_SIZE', type : 'string', value : '16',
description : 'set cluster cache size to number (default:16)')
option('CLUSTER_CACHE_SIZE', type : 'string', value : '67108864',
description : 'set cluster cache size to number (default:64MB)')
option('DIRENT_CACHE_SIZE', type : 'string', value : '512',
description : 'set dirent cache size to number (default:512)')
option('DIRENT_LOOKUP_CACHE_SIZE', type : 'string', value : '1024',
Expand Down
13 changes: 6 additions & 7 deletions src/archive.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -504,19 +504,19 @@ namespace zim
return m_impl->hasNewNamespaceScheme();
}

size_t Archive::get_cluster_cache_max_size() const
size_t get_cluster_cache_max_size()
{
return m_impl->get_cluster_cache_max_size();
return getClusterCache().get_max_size();
}

size_t Archive::get_cluster_cache_current_size() const
size_t get_cluster_cache_current_size()
{
return m_impl->get_cluster_cache_current_size();
return getClusterCache().get_current_size();
}

void Archive::set_cluster_cache_max_size(size_t nb_clusters)
void set_cluster_cache_max_size(size_t size_in_b)
{
m_impl->set_cluster_cache_max_size(nb_clusters);
getClusterCache().set_max_size(size_in_b);
}

size_t Archive::get_dirent_cache_max_size() const
Expand All @@ -534,7 +534,6 @@ namespace zim
m_impl->set_dirent_cache_max_size(nb_dirents);
}


size_t Archive::get_dirent_lookup_cache_max_size() const
{
return m_impl->get_dirent_lookup_cache_max_size();
Expand Down
33 changes: 32 additions & 1 deletion src/cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
Cluster::Cluster(std::unique_ptr<IStreamReader> reader_, Compression comp, bool isExtended)
: compression(comp),
isExtended(isExtended),
m_reader(std::move(reader_))
m_reader(std::move(reader_)),
m_memorySize(0)
{
if (isExtended) {
read_header<uint64_t>();
Expand Down Expand Up @@ -180,3 +181,33 @@ getClusterReader(const Reader& zimReader, offset_t offset, Cluster::Compression*
}

}

// This function must return a constant size for a given cluster.
// This is important as we want to remove the same size that what we add when we remove
// the cluster from the cache.
// However, because of partial decompression, this size can change:
// - As we advance in the compression, we can create new blob readers in `m_blobReaders`
// - The stream itself may allocate memory.
// To solve this, we take the average and say a cluster's blob readers will half be created and
// so we assume a readers size of half the full uncompressed cluster data size.
// It also appears that when we get the size of the stream, we reach a state where no
// futher allocation will be done by it. Probably because:
// - We already started to decompresse the stream to read the offsets
// - Cluster data size is smaller than window size associated to compression level (?)
// We anyway check that and print a warning if this is not the case, hopping that user will create
// an issue allowing us for further analysis.
size_t zim::ClusterMemorySize::get_cluster_size(const Cluster& cluster) {
if (!cluster.m_memorySize) {
auto base_struct = sizeof(Cluster);
auto offsets_size = sizeof(offset_t) * cluster.m_blobOffsets.size();
auto readers_size = cluster.m_blobOffsets.back().v / 2;
cluster.m_streamSize = cluster.m_reader->getSize();
cluster.m_memorySize = base_struct + offsets_size + readers_size + cluster.m_streamSize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want to be that precise in your approximate estimation of the memory usage by the cluster, shouldn't you multiply readers_size by the additional memory usage of an individual blob reader?

}
auto streamSize = cluster.m_reader->getSize();
if (streamSize != cluster.m_streamSize) {
std::cerr << "WARNING: stream size have changed from " << cluster.m_streamSize << " to " << streamSize << std::endl;
std::cerr << "Please open an issue on https://github.com/openzim/libzim/issues with this message and the zim file you use" << std::endl;
}
return cluster.m_memorySize;
}
Comment on lines +185 to +213
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that for our next revision of the ZIM file format we should consider including the uncompressed size of a compressed cluster in the cluster header.

Also, though this is not the best place for discussing such an idea, I propose to think about possible benefits of introducing an item data cache. In some scenarios, clusters may be too large a unit of caching. Consider a usage pattern when multiple relatively small items all from different clusters are constantly used. The total memory consumed by those items can be orders of magnitude smaller than the size of their clusters. And if the latter exceeds the cluster cache limit, libzim may keep thrashing for no good reason.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that for our next revision of the ZIM file format we should consider including the uncompressed size of a compressed cluster in the cluster header.

This is what I have done in Jubako.
I would like to consider it as the base for the next major revision of ZIM file format but it is a discussion far beyond the current one.

13 changes: 13 additions & 0 deletions src/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ namespace zim
class Blob;
class Reader;
class IStreamReader;
class Cluster;


class LIBZIM_PRIVATE_API Cluster : public std::enable_shared_from_this<Cluster> {
typedef std::vector<offset_t> BlobOffsets;
Expand Down Expand Up @@ -70,6 +72,8 @@ namespace zim

mutable std::mutex m_readerAccessMutex;
mutable BlobReaders m_blobReaders;
mutable size_t m_memorySize;
mutable size_t m_streamSize;


template<typename OFFSET_TYPE>
Expand All @@ -91,6 +95,15 @@ namespace zim
Blob getBlob(blob_index_t n, offset_t offset, zsize_t size) const;

static std::shared_ptr<Cluster> read(const Reader& zimReader, offset_t clusterOffset);
friend struct ClusterMemorySize;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it better to have a public Cluster::getMemorySize()?

};

struct ClusterMemorySize {
static size_t cost(const std::shared_ptr<const Cluster>& cluster) {
return get_cluster_size(*cluster);
}

static size_t get_cluster_size(const Cluster& cluster);
};

}
Expand Down
13 changes: 13 additions & 0 deletions src/compression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ void LZMA_INFO::stream_end_decode(stream_t* stream)
lzma_end(stream);
}

size_t LZMA_INFO::state_size(const stream_t& stream)
{
return lzma_memusage(&stream);
}


const std::string ZSTD_INFO::name = "zstd";

Expand Down Expand Up @@ -170,3 +175,11 @@ void ZSTD_INFO::stream_end_decode(stream_t* stream)
void ZSTD_INFO::stream_end_encode(stream_t* stream)
{
}

size_t ZSTD_INFO::state_size(const stream_t& stream) {
if (stream.decoder_stream) {
return ZSTD_sizeof_CStream(stream.encoder_stream);
} else {
return ZSTD_sizeof_DStream(stream.decoder_stream);
}
}
2 changes: 2 additions & 0 deletions src/compression.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct LZMA_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static CompStatus stream_run(stream_t* stream, CompStep step);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down Expand Up @@ -94,6 +95,7 @@ struct LIBZIM_PRIVATE_API ZSTD_INFO {
static CompStatus stream_run_decode(stream_t* stream, CompStep step);
static void stream_end_encode(stream_t* stream);
static void stream_end_decode(stream_t* stream);
static size_t state_size(const stream_t& stream);
};


Expand Down
39 changes: 37 additions & 2 deletions src/concurrent_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,31 @@
namespace zim
{

template<typename CostEstimation>
struct FutureToValueCostEstimation {
template<typename T>
static size_t cost(const std::shared_future<T>& future) {
// The future is the value in the cache.
// When calling getOrPut, if the key is not in the cache,
// we add a future and then we compute the value and set the future.
// But lrucache call us when we add the future, meaning before we have
// computed the value. If we wait here (or use future.get), we will dead lock
// as we need to exit before setting the value.
// So in this case, we return 0. `ConcurrentCache::getOrPut` will correctly increase
// the current cache size when it have an actual value.
// We still need to compute the size of the value if the future has a value as it
// is also use to decrease the cache size when the value is drop.
using namespace std::chrono_literals;
std::future_status status = future.wait_for(0s);
if (status == std::future_status::ready) {
return CostEstimation::cost(future.get());
} else {
return 0;
}
}

};

/**
ConcurrentCache implements a concurrent thread-safe cache

Expand All @@ -39,12 +64,12 @@ namespace zim
safe, and, in case of a cache miss, will block until that element becomes
available.
*/
template <typename Key, typename Value>
template <typename Key, typename Value, typename CostEstimation>
class ConcurrentCache
{
private: // types
typedef std::shared_future<Value> ValuePlaceholder;
typedef lru_cache<Key, ValuePlaceholder> Impl;
typedef lru_cache<Key, ValuePlaceholder, FutureToValueCostEstimation<CostEstimation>> Impl;

public: // types
explicit ConcurrentCache(size_t maxEntries)
Expand All @@ -70,6 +95,10 @@ class ConcurrentCache
if ( x.miss() ) {
try {
valuePromise.set_value(f());
{
std::unique_lock<std::mutex> l(lock_);
impl_.increaseSize(CostEstimation::cost(x.value().get()));
}
} catch (std::exception& e) {
drop(key);
throw;
Expand All @@ -85,6 +114,12 @@ class ConcurrentCache
return impl_.drop(key);
}

template<class F>
void drop_all(F f) {
std::unique_lock<std::mutex> l(lock_);
impl_.drop_all(f);
}

size_t get_max_size() const {
std::unique_lock<std::mutex> l(lock_);
return impl_.get_max_size();
Expand Down
5 changes: 5 additions & 0 deletions src/decoderstreamreader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

#include "compression.h"
#include "istreamreader.h"
#include <cstddef>

namespace zim
{
Expand All @@ -49,6 +50,10 @@ class DecoderStreamReader : public IStreamReader
Decoder::stream_end_decode(&m_decoderState);
}

size_t getSize() const override {
return m_encodedDataReader->size().v + m_encodedDataChunk.size().v + Decoder::state_size(m_decoderState);
}

private: // functions
void readNextChunk()
{
Expand Down
2 changes: 1 addition & 1 deletion src/dirent_accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class LIBZIM_PRIVATE_API DirectDirentAccessor
std::unique_ptr<const Reader> mp_pathPtrReader;
entry_index_t m_direntCount;

mutable lru_cache<entry_index_type, std::shared_ptr<const Dirent>> m_direntCache;
mutable lru_cache<entry_index_type, std::shared_ptr<const Dirent>, UnitCostEstimation> m_direntCache;
mutable std::mutex m_direntCacheLock;

mutable std::vector<char> m_bufferDirentZone;
Expand Down
Loading
Loading