Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lab2 #12

Closed
wants to merge 5 commits into from
Closed

lab2 #12

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 40 additions & 40 deletions src/server/include/storage_engine/buffer/frame_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,75 +11,75 @@
* @details 管理内存中的页帧。内存是有限的,内存中能够存放的页帧个数也是有限的。
* 当内存中的页帧不够用时,需要从内存中淘汰一些页帧,以便为新的页帧腾出空间。
* 这个管理器负责为所有的BufferPool提供页帧管理服务,也就是所有的磁盘文件在访问时都使用这个管理器映射到内存。
*/
*/
class FrameManager
{
public:
FrameManager(const char *tag);
public:
FrameManager(const char *tag);

/**
/**
* @brief 初始化FrameManager
* @param pool_num 指定FrameManager的内存池数量
*/
RC init(int pool_num);
*/
RC init(int pool_num);

/**
/**
* @brief 清理所有的frame
* @details 一般在关闭数据库时调用
*/
RC cleanup();
/**
*/
RC cleanup();
/**
* @brief 分配一个新的页面:先从LRUCache中找,如果找到就直接返回;如果没找到再用Allocator分配。
* @param file_desc 文件描述符
* @param page_num 页面编号
* @return Frame* 页帧指针
*/
Frame *alloc(int file_desc, PageNum page_num);
*/
Frame *alloc(int file_desc, PageNum page_num);

/**
/**
* @brief 从LRUCache中获取指定的页面
* @param file_desc 文件描述符,也可以当做buffer pool文件的标识
* @param page_num 页面号
* @return Frame* 页帧指针, 如果没有找到,返回nullptr
*/
Frame *get(int file_desc, PageNum page_num);
*/
Frame *get(int file_desc, PageNum page_num);

/**
/**
* 当分配的frame已满时,就尝试驱逐一些pin count=0的frame
* @param count 想要驱逐多少个frame
* @param evict_action 需要在释放frame之前,对页面做些什么操作,应该是把脏数据刷到磁盘
* @return 返回本次驱逐了多少个frame
*/
int evict_frames(int count, std::function<RC(Frame *frame)> evict_action);
*/
int evict_frames(int count, std::function<RC(Frame *frame)> evict_action);

/**
/**
* @brief 列出所有指定文件的页面
* @param file_desc 文件描述符
* @return std::list<Frame *> 页帧列表
*/
std::list<Frame *> find_list(int file_desc);

size_t frame_num() const { return frames_.count(); }
*/
std::list<Frame *> find_list(int file_desc);

RC free(int file_desc, PageNum page_num, Frame *frame);
size_t frame_num() const { return frames_.count(); }

private:
Frame *get_internal(const FrameId &frame_id);
RC free_internal(const FrameId &frame_id, Frame *frame);
RC free(int file_desc, PageNum page_num, Frame *frame);
RC free_frame(Frame *buf,FrameId frame_id);
private:
Frame *get_internal(const FrameId &frame_id);
RC free_internal(const FrameId &frame_id, Frame *frame);

private:
class FrameIdHasher {
public:
size_t operator()(const FrameId &frame_id) const
{
return frame_id.hash();
}
};
private:
class FrameIdHasher {
public:
size_t operator()(const FrameId &frame_id) const
{
return frame_id.hash();
}
};

using FrameLruCache = common::LruCache<FrameId, Frame *, FrameIdHasher>;
using FrameAllocator = common::MemPoolSimple<Frame>;
using FrameLruCache = common::LruCache<FrameId, Frame *, FrameIdHasher>;
using FrameAllocator = common::MemPoolSimple<Frame>;

std::mutex lock_; // 对frames_进行操作时需要加锁
FrameLruCache frames_; // 用于存放Frame,但内存有限
FrameAllocator allocator_; // 用于分配新的Frame
std::mutex lock_; // 对frames_进行操作时需要加锁
FrameLruCache frames_; // 用于存放Frame,但内存有限
FrameAllocator allocator_; // 用于分配新的Frame
};
Original file line number Diff line number Diff line change
Expand Up @@ -42,14 +42,29 @@ RC IndexScanPhysicalOperator::next()
{
RID rid;
record_page_handler_.cleanup();

// TODO [Lab2] 通过IndexScanner循环获取下一个RID,然后通过RecordHandler获取对应的Record
// 在现有的查询实现中,会在调用next()方法后通过current_tuple()获取当前的Tuple,
// 在现有的查询实现中,会在调用next()方法后通过current_tuple()获取当前的Tuple,
// 从current_tuple()的实现中不难看出, 数据会通过current_record_传递到Tuple中并返回,
// 因此该next()方法的主要目的就是将recordHandler获取到的数据填充到current_record_中
// while(){}

return RC::SUCCESS;
RC rc = RC::SUCCESS;
bool filter_result = false;
while (RC::SUCCESS == (rc = index_scanner_->next_entry(&rid,false))) {
rc = record_handler_->get_record(record_page_handler_, &rid, readonly_, &current_record_);
if (rc != RC::SUCCESS) {
return rc;
}
tuple_._set_record(&current_record_);
rc = filter(tuple_, filter_result);
if (rc != RC::SUCCESS) {
return rc;
}
if (!filter_result) {
continue;
}
break;
}
return rc;
}

RC IndexScanPhysicalOperator::close()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
#include "include/query_engine/planner/operator/group_by_physical_operator.h"
#include "common/log/log.h"
#include "include/storage_engine/recorder/table.h"

#include "include/query_engine/structor/expression/comparison_expression.h"
#include "include/query_engine/structor/expression/value_expression.h"
#include "include/query_engine/planner/operator/index_scan_physical_operator.h"
using namespace std;

RC PhysicalOperatorGenerator::create(LogicalNode &logical_operator, unique_ptr<PhysicalOperator> &oper, bool is_delete)
Expand Down Expand Up @@ -78,27 +80,47 @@ RC PhysicalOperatorGenerator::create(LogicalNode &logical_operator, unique_ptr<P
}
}

// TODO [Lab2]
// TODO [Lab2]
// 在原有的实现中,会直接生成TableScanOperator对所需的数据进行全表扫描,但其实在生成执行计划时,我们可以进行简单的优化:
// 首先检查扫描的table是否存在索引,如果存在可以使用的索引,那么我们可以直接生成IndexScanOperator来减少磁盘的扫描
RC PhysicalOperatorGenerator::create_plan(
TableGetLogicalNode &table_get_oper, unique_ptr<PhysicalOperator> &oper, bool is_delete)
{
vector<unique_ptr<Expression>> &predicates = table_get_oper.predicates();
Index *index = nullptr;
// TODO [Lab2] 生成IndexScanOperator的准备工作,主要包含:
// 1. 通过predicates获取具体的值表达式, 目前应该只支持等值表达式的索引查找
// example:
// if(predicate.type == ExprType::COMPARE){
// auto compare_expr = dynamic_cast<ComparisonExpr*>(predicate.get());
// if(compare_expr->comp != EQUAL_TO) continue;
// [process]
// }
// 2. 对应上面example里的process阶段, 找到等值表达式中对应的FieldExpression和ValueExpression(左值和右值)
// 通过FieldExpression找到对应的Index, 通过ValueExpression找到对应的Value
// ps: 由于我们只支持单键索引,所以只需要找到一个等值表达式即可

Table*table= table_get_oper.table();
ValueExpr*value_expression=nullptr;
for(auto& predicate:predicates)
{
if(predicate->type() == ExprType::COMPARISON) {
auto compare_expr = dynamic_cast<ComparisonExpr *>(predicate.get());
if(compare_expr->comp() != EQUAL_TO) continue;
unique_ptr<Expression> &left_expr = compare_expr->left();
unique_ptr<Expression> &right_expr = compare_expr->right();
if (left_expr->type() != ExprType::VALUE && right_expr->type() != ExprType::VALUE) {
continue;
}
FieldExpr *field_expr = nullptr;
if (left_expr->type() == ExprType::FIELD) {
field_expr = static_cast<FieldExpr *>(left_expr.get());
value_expression = static_cast<ValueExpr *>(right_expr.get());
} else{
field_expr = static_cast<FieldExpr *>(right_expr.get());
value_expression = static_cast<ValueExpr *>(left_expr.get());
}
if (field_expr == nullptr) {
continue;
}
const Field &field = field_expr->field();
index = table->find_index_by_field(field.field_name());
if (nullptr != index) {
break;
}
}
}
//index = nullptr;
if(index == nullptr){
cout<<"use table scan"<< endl;
Table *table = table_get_oper.table();
auto table_scan_oper = new TableScanPhysicalOperator(table, table_get_oper.table_alias(), table_get_oper.readonly());
table_scan_oper->isdelete_ = is_delete;
Expand All @@ -107,13 +129,15 @@ RC PhysicalOperatorGenerator::create_plan(
LOG_TRACE("use table scan");
}else{
// TODO [Lab2] 生成IndexScanOperator, 并放置在算子树上,下面是一个实现参考,具体实现可以根据需要进行修改
// IndexScanner 在设计时,考虑了范围查找索引的情况,但此处我们只需要考虑单个键的情况
// const Value &value = value_expression->get_value();
// IndexScanPhysicalOperator *operator =
// new IndexScanPhysicalOperator(table, index, readonly, &value, true, &value, true);
// oper = unique_ptr<PhysicalOperator>(operator);
}
//IndexScanner 在设计时,考虑了范围查找索引的情况,但此处我们只需要考虑单个键的情况
cout<<"use index scan"<< endl;
const Value &value = value_expression->get_value();
IndexScanPhysicalOperator * index_operator =
new IndexScanPhysicalOperator(table, index,table_get_oper.readonly(), &value, true, &value, true);
index_operator->set_predicates(predicates);
oper = unique_ptr<PhysicalOperator>(index_operator);

}
return RC::SUCCESS;
}

Expand Down
56 changes: 49 additions & 7 deletions src/server/storage_engine/buffer/buffer_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ RC FileBufferPool::allocate_page(Frame **frame)

if (file_header_->page_count >= FileHeader::MAX_PAGE_NUM) {
LOG_WARN("file buffer pool is full. page count %d, max page count %d",
file_header_->page_count, FileHeader::MAX_PAGE_NUM);
file_header_->page_count, FileHeader::MAX_PAGE_NUM);
lock_.unlock();
return RC::BUFFERPOOL_NOBUF;
}
Expand Down Expand Up @@ -213,11 +213,26 @@ RC FileBufferPool::flush_page(Frame &frame)
*/
RC FileBufferPool::flush_page_internal(Frame &frame)
{
// 1. 获取页面Page
// 2. 计算该Page在文件中的偏移量
// 3. 写入数据到文件的目标位置
// 4. 清除frame的脏标记
// 5. 记录和返回成功
// 1. 获取页面Page
Page &page=frame.page();
int64_t page_num=page.page_num;
// 2. 计算该Page在文件中的偏移量
int64_t offset = (page_num) * BP_PAGE_SIZE;
if (lseek(file_desc_, offset, SEEK_SET) == -1) {
LOG_ERROR("Failed to load page %s:%d, due to failed to lseek:%s.", file_name_.c_str(), page_num, strerror(errno));

return RC::IOERR_SEEK;
}
// 3. 写入数据到文件的目标位置
int ret = writen(file_desc_, &page, BP_PAGE_SIZE);
if (ret != 0) {
LOG_ERROR("Failed to write page %s, file_desc:%d, page num:%d, due to failed to write data:%s, ret=%d, page count=%d",
file_name_.c_str(), file_desc_, page_num, strerror(errno), ret, file_header_->allocated_pages);
return RC::IOERR_WRITE;
}
// 4. 清除frame的脏标记
frame.clear_dirty();
// 5. 记录和返回成功
return RC::SUCCESS;
}

Expand All @@ -226,13 +241,36 @@ RC FileBufferPool::flush_page_internal(Frame &frame)
*/
RC FileBufferPool::evict_page(PageNum page_num, Frame *buf)
{
lock_.lock();
buf=frame_manager_.get(file_desc_,page_num);
if(buf->dirty())//脏页需要刷盘
{
flush_page(*buf);
}
frame_manager_.free_frame(buf,FrameId(file_desc_,page_num));
disposed_pages_.insert(page_num);
lock_.unlock();
return RC::SUCCESS;
}
/**
* TODO [Lab1] 需要同学们实现该文件所有页面的驱逐
*/
RC FileBufferPool::evict_all_pages()
{
lock_.lock();
for (PageNum page_num = 0; page_num < file_header_->page_count; ++page_num) {
Frame *frame = frame_manager_.get(file_desc_, page_num);
if (frame == nullptr) {
// 页面不存在,跳过该页面
continue;
}
else
{
evict_page(page_num,frame);
}
}

lock_.unlock();
return RC::SUCCESS;
}

Expand Down Expand Up @@ -486,6 +524,10 @@ RC BufferPoolManager::close_file(const char *_file_name)
*/
RC BufferPoolManager::flush_page(Frame &frame)
{
lock_.lock();
FileBufferPool * bufferPool=fd_buffer_pools_[frame.file_desc()];
bufferPool->flush_page(frame);
lock_.unlock();
return RC::SUCCESS;
}

Expand All @@ -501,4 +543,4 @@ void BufferPoolManager::set_instance(BufferPoolManager *bpm)
BufferPoolManager &BufferPoolManager::instance()
{
return *default_bpm;
}
}
Loading