Skip to content

Commit 4740031

Browse files
committed
add cache manager to fuse
Signed-off-by: sitan liu <[email protected]>
1 parent 2f2c7f7 commit 4740031

File tree

10 files changed

+377
-27
lines changed

10 files changed

+377
-27
lines changed

modules/fuse/cache_manager/manager.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/** Copyright 2020-2022 Alibaba Group Holding Limited.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License.
14+
*/
15+
#ifndef MODULES_FUSE_CACHE_MANAGER_MANAGER_H_
16+
#define MODULES_FUSE_CACHE_MANAGER_MANAGER_H_
17+
#include <iostream>
18+
#include <list>
19+
#include <memory>
20+
#include <type_traits>
21+
#include <unordered_map>
22+
#include <vector>
23+
24+
#include "arrow/buffer.h"
25+
#include "common/util/logging.h"
26+
#include "common/util/status.h"
27+
namespace vineyard {
28+
namespace fuse {
29+
30+
namespace cache_manager {
31+
template <typename K, typename V>
32+
struct KeyValue {
33+
using KeyType = K;
34+
using ValType = std::shared_ptr<V>;
35+
const KeyType key;
36+
ValType value;
37+
KeyValue(KeyType k, ValType v) : key(k), value(v) { ; }
38+
KeyValue(const KeyValue<K, V>& kv) : key(kv.key), value(kv.value) { ; }
39+
};
40+
41+
template <typename KeyValue>
42+
class CacheManager {
43+
private:
44+
std::list<KeyValue> myList;
45+
std::unordered_map<typename KeyValue::KeyType,
46+
typename std::list<KeyValue>::iterator>
47+
myMap;
48+
size_t capacityBytes;
49+
size_t curBytes;
50+
void popToNBytes(size_t n);
51+
bool WithInCapacity(size_t data);
52+
53+
public:
54+
explicit CacheManager(size_t capacity);
55+
CacheManager();
56+
void resize(size_t capacity);
57+
Status put(const typename KeyValue::KeyType& key,
58+
typename KeyValue::ValType val);
59+
typename KeyValue::ValType get(const typename KeyValue::KeyType& key);
60+
bool has(const typename KeyValue::KeyType& key);
61+
std::list<KeyValue> getLinkedList();
62+
typename KeyValue::ValType operator[](const typename KeyValue::KeyType& key);
63+
size_t getCurBytes();
64+
size_t getCapacityBytes();
65+
void destroy();
66+
};
67+
68+
} // namespace cache_manager
69+
} // namespace fuse
70+
}; // namespace vineyard
71+
#endif // MODULES_FUSE_CACHE_MANAGER_MANAGER_H_
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
#ifndef MODULES_FUSE_CACHE_MANAGER_MANAGER_HPP_
2+
#define MODULES_FUSE_CACHE_MANAGER_MANAGER_HPP_
3+
4+
#include "fuse/cache_manager/manager.h"
5+
#include "arrow/buffer.h"
6+
7+
namespace vineyard {
8+
namespace fuse {
9+
10+
namespace cache_manager {
11+
template <typename KV>
12+
void CacheManager<KV>::popToNBytes(size_t n) {
13+
while (this->curBytes > n) {
14+
auto keyToBeDel = myList.back().key;
15+
auto dataToBeDel = myList.back().value;
16+
this->curBytes -= dataToBeDel->capacity();
17+
myList.pop_back();
18+
myMap.erase(keyToBeDel);
19+
DLOG(INFO) << "remove key: " << keyToBeDel << " value: " << dataToBeDel->ToString()<< " remaining bytes: "<<this->curBytes;
20+
}
21+
}
22+
template <class KV>
23+
bool CacheManager<KV>::WithInCapacity(size_t data) {
24+
return data <= capacityBytes;
25+
}
26+
template<class KV>
27+
CacheManager<KV>::CacheManager(size_t capacityBytes):capacityBytes(capacityBytes),curBytes(0){
28+
}
29+
template<class KV>
30+
CacheManager<KV>::CacheManager():capacityBytes(0),curBytes(0){
31+
}
32+
template<class KV>
33+
void CacheManager<KV>::resize(size_t targetCapacityBytes){
34+
capacityBytes = targetCapacityBytes;
35+
}
36+
template<class KV>
37+
void CacheManager<KV>::destroy(){
38+
this->~CacheManager();
39+
}
40+
template<class KV>
41+
bool CacheManager<KV>::has(const typename KV::KeyType& key){
42+
return myMap.find(key)!= myMap.end();
43+
}
44+
template<class KV>
45+
typename KV::ValType CacheManager<KV>::operator[](const typename KV::KeyType& key) {
46+
return get(key);
47+
}
48+
49+
template<class KV>
50+
size_t CacheManager<KV>::getCapacityBytes(){
51+
return this->capacityBytes;
52+
}
53+
template<class KV>
54+
size_t CacheManager<KV>::getCurBytes(){
55+
return this->curBytes;
56+
}
57+
template <class KV>
58+
Status CacheManager<KV>::put(const typename KV::KeyType& key, typename KV::ValType v) {
59+
60+
if (WithInCapacity(v->capacity())) {
61+
62+
auto found_map_iter = myMap.find(key);
63+
64+
if (found_map_iter != myMap.end()) {
65+
DLOG(INFO) << "update key: " << key << " value: " << v->ToString()<<std::endl;
66+
67+
auto found_key = found_map_iter->first;
68+
auto& found_kv = found_map_iter->second;
69+
70+
curBytes -= found_kv->value->capacity();
71+
popToNBytes(capacityBytes - v->capacity());
72+
myList.splice(myList.begin(), this->myList, found_kv);
73+
found_kv->value = v;
74+
return Status::OK();
75+
} else {
76+
DLOG(INFO) << "put key: " << key << " value: " << v->ToString()<<std::endl;
77+
popToNBytes(capacityBytes - v->capacity());
78+
myList.emplace_front(key,v);
79+
// decltype(myMap[key])::nothing;
80+
myMap[key] = myList.begin();
81+
this->curBytes += v->capacity();
82+
return Status::OK();
83+
}
84+
} else {
85+
DLOG(INFO)<<"this keyvalue is too large to put int"<<std::endl;
86+
return Status::NotEnoughMemory("");
87+
}
88+
}
89+
template <class KV>
90+
91+
std::list<KV> CacheManager<KV>::getLinkedList(){
92+
return myList;
93+
94+
}
95+
96+
template <class KV>
97+
typename KV::ValType CacheManager<KV>::get(const typename KV::KeyType& key) {
98+
auto found_iter = myMap.find(key);
99+
if (found_iter == myMap.end()) // key doesn't exist
100+
{
101+
DLOG(INFO)<< "not found key " << key;
102+
103+
return nullptr;}
104+
DLOG(INFO)<< "found key " << key;
105+
106+
myList.splice(
107+
myList.begin(), myList,
108+
found_iter->second); // move the node corresponding to key to front
109+
return found_iter->second->value;
110+
}
111+
} // namespace cache_manager
112+
113+
} // namespace fuse
114+
} // namespace vineyard
115+
#endif // MODULES_FUSE_CACHE_MANAGER_MANAGER_HPP_

modules/fuse/fuse_impl.cc

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,8 @@ int fs::fuse_getattr(const char* path, struct stat* stbuf,
7878
stbuf->st_nlink = 1;
7979

8080
{
81-
auto iter = state.views.find(path);
82-
if (iter != state.views.end()) {
83-
stbuf->st_size = iter->second->size();
81+
if (state.views.has(path)) {
82+
stbuf->st_size = state.views.get(path)->size();
8483
return 0;
8584
}
8685
}
@@ -111,8 +110,7 @@ int fs::fuse_getattr(const char* path, struct stat* stbuf,
111110
auto d =
112111
fs::state.ipc_desearilizer_registry.at(obj->meta().GetTypeName());
113112
auto buffer = d(obj);
114-
state.views[path_string] = buffer;
115-
113+
state.views.put(path_string, buffer);
116114
stbuf->st_size = buffer->size();
117115
} else {
118116
auto obj = state.client->GetObject(ObjectIDFromString(prefix));
@@ -124,7 +122,7 @@ int fs::fuse_getattr(const char* path, struct stat* stbuf,
124122
auto d =
125123
fs::state.ipc_desearilizer_registry.at(obj->meta().GetTypeName());
126124
auto buffer = d(obj);
127-
state.views[path_string] = buffer;
125+
state.views.put(path_string, buffer);
128126
stbuf->st_size = buffer->size();
129127
}
130128
return 0;
@@ -145,10 +143,9 @@ int fs::fuse_open(const char* path, struct fuse_file_info* fi) {
145143
// the opened file referenced by the user-defined name
146144
auto filename = name_from_path(path);
147145
auto target = InvalidObjectID();
148-
auto loc = state.views.find(path);
149146
std::string path_string(path);
150147
std::shared_ptr<vineyard::Object> object = nullptr;
151-
if (loc == state.views.end()) {
148+
if (!state.views.has(path)) {
152149
if (state.client->GetName(filename, target).ok()) {
153150
object = state.client->GetObject(target);
154151
}
@@ -161,7 +158,7 @@ int fs::fuse_open(const char* path, struct fuse_file_info* fi) {
161158
} else {
162159
auto d =
163160
fs::state.ipc_desearilizer_registry.at(object->meta().GetTypeName());
164-
state.views[path_string] = d(object);
161+
state.views.put(path_string, d(object));
165162
}
166163
}
167164

@@ -177,16 +174,17 @@ int fs::fuse_read(const char* path, char* buf, size_t size, off_t offset,
177174
DLOG(INFO) << "fuse: read " << path << " from " << offset << ", expect "
178175
<< size << " bytes";
179176

180-
std::unordered_map<std::string,
181-
std::shared_ptr<arrow::Buffer>>::const_iterator loc;
177+
std::shared_ptr<arrow::Buffer> buffer;
182178
{
183179
std::lock_guard<std::mutex> guard(state.mtx_);
184-
loc = state.views.find(path);
185-
}
186-
if (loc == state.views.end()) {
187-
return -ENOENT;
180+
181+
std::string path_string(path);
182+
183+
if (!state.views.has(path)) {
184+
return -ENOENT;
185+
}
186+
buffer = state.views[path_string];
188187
}
189-
auto buffer = loc->second;
190188
if (offset >= buffer->size()) {
191189
return 0;
192190
} else {
@@ -319,7 +317,7 @@ void* fs::fuse_init(struct fuse_conn_info* conn, struct fuse_config* cfg) {
319317
void fs::fuse_destroy(void* private_data) {
320318
DLOG(INFO) << "fuse: destroy";
321319

322-
state.views.clear();
320+
state.views.destroy();
323321
state.mutable_views.clear();
324322
state.client->Disconnect();
325323
}

modules/fuse/fuse_impl.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,8 @@ limitations under the License.
3737

3838
#include "adaptors/arrow_ipc/deserializer_registry.h"
3939

40-
namespace arrow {
41-
class Buffer;
42-
}
43-
40+
#include "cache_manager/manager.h"
41+
#include "cache_manager/manager.hpp"
4442
namespace vineyard {
4543

4644
namespace fuse {
@@ -51,7 +49,9 @@ struct fs {
5149
std::string vineyard_socket;
5250
std::shared_ptr<Client> client;
5351
std::mutex mtx_;
54-
std::unordered_map<std::string, std::shared_ptr<arrow::Buffer>> views;
52+
cache_manager::CacheManager<
53+
cache_manager::KeyValue<std::string, arrow::Buffer>>
54+
views;
5555
std::unordered_map<std::string, std::shared_ptr<arrow::BufferBuilder>>
5656
mutable_views;
5757
std::unordered_map<std::string, vineyard::fuse::vineyard_deserializer_nt>

modules/fuse/fusermount.cc

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ limitations under the License.
2424
#include "common/util/env.h"
2525
#include "common/util/logging.h"
2626
#include "fuse/fuse_impl.h"
27+
#include "modules/fuse/cache_manager/manager.h"
2728

2829
/*
2930
* Command line options
@@ -34,6 +35,7 @@ limitations under the License.
3435
*/
3536
static struct options {
3637
const char* vineyard_socket;
38+
size_t cache_size;
3739
int show_help;
3840
} options;
3941

@@ -42,14 +44,16 @@ static struct options {
4244

4345
static const struct fuse_opt option_spec[] = {
4446
OPTION("--vineyard-socket=%s", vineyard_socket),
45-
OPTION("--help", show_help), OPTION("-h", show_help), FUSE_OPT_END};
46-
47+
OPTION("--max-cache-size=%d", cache_size), OPTION("--help", show_help),
48+
OPTION("-h", show_help), FUSE_OPT_END};
4749
static void print_help(const char* progname) {
4850
printf("usage: %s [options] <mountpoint>\n\n", progname);
4951
printf(
5052
"Vineyard specific options:\n"
5153
" --vineyard-socket=<s> Path of UNIX-domain socket of vineyard "
5254
"server\n"
55+
" --max-cache-size=<size> Size of cache in bytes\n"
56+
5357
" (default: \"$VINEYARD_IPC_SOCKET\")\n"
5458
"\n");
5559
}
@@ -62,6 +66,9 @@ static int process_args(struct fuse_args& args, int argc, char** argv) {
6266

6367
options.vineyard_socket = strdup(env.c_str());
6468
}
69+
if (!options.cache_size) {
70+
options.cache_size = 1 * 1024 * 1024 * 1024;
71+
}
6572

6673
/* Parse options */
6774
if (fuse_opt_parse(&args, &options, option_spec, NULL) == -1) {
@@ -86,9 +93,10 @@ static int process_args(struct fuse_args& args, int argc, char** argv) {
8693

8794
// populate state
8895
vineyard::fuse::fs::state.vineyard_socket = options.vineyard_socket;
96+
vineyard::fuse::fs::state.views.resize(options.cache_size);
8997
LOG(INFO) << "prepare to conncet to socket"
90-
<< vineyard::fuse::fs::state.vineyard_socket;
91-
98+
<< vineyard::fuse::fs::state.vineyard_socket << " with cache size "
99+
<< options.cache_size;
92100
vineyard::fuse::fs::state.ipc_desearilizer_registry =
93101
vineyard::fuse::arrow_ipc_register_once();
94102
return 0;

modules/fuse/tests/conftest.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,17 @@ def pytest_addoption(parser):
4646
default='/tmp/vineyard_fuse.default',
4747
help='fusermount directory',
4848
)
49+
parser.addoption(
50+
'--vineyard-fuse-process-pid',
51+
action='store',
52+
default=None,
53+
help='fusermount directory',
54+
)
55+
56+
57+
@pytest.fixture(scope='session')
58+
def vineyard_fuse_process_pid(request):
59+
return request.config.option.vineyard_fuse_process_pid
4960

5061

5162
@pytest.fixture(scope='session')

0 commit comments

Comments
 (0)