Skip to content

Commit

Permalink
Merge branch 'master' into distribute_planner_for_cloud_mode
Browse files Browse the repository at this point in the history
  • Loading branch information
924060929 authored Jan 23, 2025
2 parents 183b79b + d7c99f8 commit aa21f2f
Show file tree
Hide file tree
Showing 139 changed files with 4,195 additions and 1,027 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,9 @@ cloud/cmake-build*/
cloud/ut_build*/

## tools
tools/ssb-tools/ssb-data/
tools/ssb-tools/ssb-dbgen/
tools/ssb-tools/bin/ssb-data/
tools/ssb-tools/bin/ssb-dbgen/
tools/ssb-tools/bin/*.tar.gz
tools/**/TPC-H_Tools_v*.zip
tools/**/TPC-H_Tools_v*/
tools/**/tpc-h_v*.docx
Expand Down
2 changes: 1 addition & 1 deletion be/src/geo/geo_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ GeoCircle::~GeoCircle() = default;

void print_s2point(std::ostream& os, const S2Point& point) {
S2LatLng coord(point);
os << std::setprecision(12) << coord.lng().degrees() << " " << coord.lat().degrees();
os << std::setprecision(15) << coord.lng().degrees() << " " << coord.lat().degrees();
}

static inline bool is_valid_lng_lat(double lng, double lat) {
Expand Down
153 changes: 107 additions & 46 deletions be/src/geo/wkb_parse.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,108 +122,169 @@ WkbParseContext* WkbParse::read(std::istream& is, WkbParseContext* ctx) {
auto size = is.tellg();
is.seekg(0, std::ios::beg);

std::vector<unsigned char> buf(static_cast<size_t>(size));
is.read(reinterpret_cast<char*>(buf.data()), static_cast<std::streamsize>(size));

ctx->dis = ByteOrderDataInStream(buf.data(), buf.size()); // will default to machine endian
// Check if size is valid
if (size <= 0) {
ctx->parse_status = GEO_PARSE_WKB_SYNTAX_ERROR;
return ctx;
}

ctx->shape = readGeometry(ctx).release();
std::vector<unsigned char> buf(static_cast<size_t>(size));
if (!is.read(reinterpret_cast<char*>(buf.data()), static_cast<std::streamsize>(size))) {
ctx->parse_status = GEO_PARSE_WKB_SYNTAX_ERROR;
return ctx;
}

if (!ctx->shape) {
// Ensure we have at least one byte for byte order
if (buf.empty()) {
ctx->parse_status = GEO_PARSE_WKB_SYNTAX_ERROR;
return ctx;
}
return ctx;
}

std::unique_ptr<GeoShape> WkbParse::readGeometry(WkbParseContext* ctx) {
// determine byte order
unsigned char byteOrder = ctx->dis.readByte();
// First read the byte order using machine endian
auto byteOrder = buf[0];

// default is machine endian
// Create ByteOrderDataInStream with the correct byte order
if (byteOrder == byteOrder::wkbNDR) {
ctx->dis = ByteOrderDataInStream(buf.data(), buf.size());
ctx->dis.setOrder(ByteOrderValues::ENDIAN_LITTLE);
} else if (byteOrder == byteOrder::wkbXDR) {
ctx->dis = ByteOrderDataInStream(buf.data(), buf.size());
ctx->dis.setOrder(ByteOrderValues::ENDIAN_BIG);
} else {
ctx->parse_status = GEO_PARSE_WKB_SYNTAX_ERROR;
return ctx;
}

std::unique_ptr<GeoShape> shape = readGeometry(ctx);
if (!shape) {
ctx->parse_status = GEO_PARSE_WKB_SYNTAX_ERROR;
return ctx;
}

uint32_t typeInt = ctx->dis.readUnsigned();
ctx->shape = shape.release();
return ctx;
}

uint32_t geometryType = (typeInt & 0xffff) % 1000;
std::unique_ptr<GeoShape> WkbParse::readGeometry(WkbParseContext* ctx) {
try {
// Ensure we have enough data to read
if (ctx->dis.size() < 5) { // At least 1 byte for order and 4 bytes for type
return nullptr;
}

std::unique_ptr<GeoShape> shape;
// Skip the byte order as we've already handled it
ctx->dis.readByte();

switch (geometryType) {
case wkbType::wkbPoint:
shape.reset(readPoint(ctx).release());
break;
case wkbType::wkbLine:
shape.reset(readLine(ctx).release());
break;
case wkbType::wkbPolygon:
shape.reset(readPolygon(ctx).release());
break;
default:
uint32_t typeInt = ctx->dis.readUnsigned();

// Check if geometry has SRID
bool has_srid = (typeInt & WKB_SRID_FLAG) != 0;

// Read SRID if present
if (has_srid) {
ctx->dis.readUnsigned(); // Read and store SRID if needed
}

// Get the base geometry type
uint32_t geometryType = typeInt & WKB_TYPE_MASK;

std::unique_ptr<GeoShape> shape;

switch (geometryType) {
case wkbType::wkbPoint:
shape = readPoint(ctx);
break;
case wkbType::wkbLine:
shape = readLine(ctx);
break;
case wkbType::wkbPolygon:
shape = readPolygon(ctx);
break;
default:
return nullptr;
}

return shape;
} catch (...) {
// Handle any exceptions from reading operations
return nullptr;
}
return shape;
}

std::unique_ptr<GeoPoint> WkbParse::readPoint(WkbParseContext* ctx) {
GeoCoordinateList coords = WkbParse::readCoordinateList(1, ctx);
std::unique_ptr<GeoPoint> point = GeoPoint::create_unique();
if (coords.list.empty()) {
return nullptr;
}

if (point->from_coord(coords.list[0]) == GEO_PARSE_OK) {
return point;
} else {
std::unique_ptr<GeoPoint> point = GeoPoint::create_unique();
if (!point || point->from_coord(coords.list[0]) != GEO_PARSE_OK) {
return nullptr;
}

return point;
}

std::unique_ptr<GeoLine> WkbParse::readLine(WkbParseContext* ctx) {
uint32_t size = ctx->dis.readUnsigned();
minMemSize(wkbLine, size, ctx);
if (minMemSize(wkbLine, size, ctx) != GEO_PARSE_OK) {
return nullptr;
}

GeoCoordinateList coords = WkbParse::readCoordinateList(size, ctx);
std::unique_ptr<GeoLine> line = GeoLine::create_unique();
if (coords.list.empty()) {
return nullptr;
}

if (line->from_coords(coords) == GEO_PARSE_OK) {
return line;
} else {
std::unique_ptr<GeoLine> line = GeoLine::create_unique();
if (!line || line->from_coords(coords) != GEO_PARSE_OK) {
return nullptr;
}

return line;
}

std::unique_ptr<GeoPolygon> WkbParse::readPolygon(WkbParseContext* ctx) {
uint32_t num_loops = ctx->dis.readUnsigned();
minMemSize(wkbPolygon, num_loops, ctx);
if (minMemSize(wkbPolygon, num_loops, ctx) != GEO_PARSE_OK) {
return nullptr;
}

GeoCoordinateListList coordss;
for (int i = 0; i < num_loops; ++i) {
for (uint32_t i = 0; i < num_loops; ++i) {
uint32_t size = ctx->dis.readUnsigned();
GeoCoordinateList* coords = new GeoCoordinateList();
if (size < 3) { // A polygon loop must have at least 3 points
return nullptr;
}

auto coords = std::make_unique<GeoCoordinateList>();
*coords = WkbParse::readCoordinateList(size, ctx);
coordss.add(coords);
if (coords->list.empty()) {
return nullptr;
}
coordss.add(coords.release());
}

std::unique_ptr<GeoPolygon> polygon = GeoPolygon::create_unique();

if (polygon->from_coords(coordss) == GEO_PARSE_OK) {
return polygon;
} else {
if (!polygon || polygon->from_coords(coordss) != GEO_PARSE_OK) {
return nullptr;
}

return polygon;
}

GeoCoordinateList WkbParse::readCoordinateList(unsigned size, WkbParseContext* ctx) {
GeoCoordinateList coords;
for (uint32_t i = 0; i < size; i++) {
readCoordinate(ctx);
if (!readCoordinate(ctx)) {
return GeoCoordinateList();
}
unsigned int j = 0;
GeoCoordinate coord;
coord.x = ctx->ordValues[j++];
coord.y = ctx->ordValues[j++];
coords.add(coord);
}

return coords;
}

Expand Down
14 changes: 12 additions & 2 deletions be/src/geo/wkb_parse.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

#pragma once

#include <stdint.h>

#include <cstdint>
#include <iosfwd>
#include <memory>

Expand All @@ -34,6 +33,17 @@ class GeoLine;
class GeoPoint;
class GeoPolygon;

// WKB format constants
// According to OpenGIS Implementation Specification:
// The high bit of the type value is set to 1 if the WKB contains a SRID.
// Reference: OpenGIS Implementation Specification for Geographic information - Simple feature access - Part 1: Common architecture
// Bit mask to check if WKB contains SRID
constexpr uint32_t WKB_SRID_FLAG = 0x20000000;

// The geometry type is stored in the least significant byte of the type value
// Bit mask to extract the base geometry type
constexpr uint32_t WKB_TYPE_MASK = 0xFF;

class WkbParse {
public:
static GeoParseStatus parse_wkb(std::istream& is, GeoShape** shape);
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/hll.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class HyperLogLog {
static bool is_valid(const Slice& slice);

// only for debug
std::string to_string() {
std::string to_string() const {
switch (_type) {
case HLL_DATA_EMPTY:
return {};
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/memtable_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ Status MemTableWriter::flush_async() {
}

VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
<< _mem_table->memory_usage() << ", tablet: " << _req.tablet_id
<< ", load id: " << print_id(_req.load_id);
<< PrettyPrinter::print_bytes(_mem_table->memory_usage())
<< ", tablet: " << _req.tablet_id << ", load id: " << print_id(_req.load_id);
auto s = _flush_memtable_async();
_reset_mem_table();
return s;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,10 @@ struct OlapReaderStatistics {

int64_t rows_vec_cond_filtered = 0;
int64_t rows_short_circuit_cond_filtered = 0;
int64_t rows_expr_cond_filtered = 0;
int64_t vec_cond_input_rows = 0;
int64_t short_circuit_cond_input_rows = 0;
int64_t expr_cond_input_rows = 0;
int64_t rows_vec_del_cond_filtered = 0;
int64_t vec_cond_ns = 0;
int64_t short_cond_ns = 0;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2299,12 +2299,14 @@ Status SegmentIterator::_execute_common_expr(uint16_t* sel_rowid_idx, uint16_t&
DCHECK(!_remaining_conjunct_roots.empty());
DCHECK(block->rows() != 0);
size_t prev_columns = block->columns();
_opts.stats->expr_cond_input_rows += selected_size;

vectorized::IColumn::Filter filter;
RETURN_IF_ERROR(vectorized::VExprContext::execute_conjuncts_and_filter_block(
_common_expr_ctxs_push_down, block, _columns_to_filter, prev_columns, filter));

selected_size = _evaluate_common_expr_filter(sel_rowid_idx, selected_size, filter);
_opts.stats->rows_expr_cond_filtered += selected_size;
return Status::OK();
}

Expand Down
33 changes: 32 additions & 1 deletion be/src/olap/snapshot_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,35 @@ using std::vector;
namespace doris {
using namespace ErrorCode;

LocalSnapshotLockGuard LocalSnapshotLock::acquire(const std::string& path) {
std::unique_lock<std::mutex> l(_lock);
auto& ctx = _local_snapshot_contexts[path];
while (ctx._is_locked) {
ctx._waiting_count++;
ctx._cv.wait(l);
ctx._waiting_count--;
}

ctx._is_locked = true;
return {path};
}

void LocalSnapshotLock::release(const std::string& path) {
std::lock_guard<std::mutex> l(_lock);
auto iter = _local_snapshot_contexts.find(path);
if (iter == _local_snapshot_contexts.end()) {
return;
}

auto& ctx = iter->second;
ctx._is_locked = false;
if (ctx._waiting_count > 0) {
ctx._cv.notify_one();
} else {
_local_snapshot_contexts.erase(iter);
}
}

SnapshotManager::SnapshotManager(StorageEngine& engine) : _engine(engine) {
_mem_tracker =
MemTrackerLimiter::create_shared(MemTrackerLimiter::Type::OTHER, "SnapshotManager");
Expand Down Expand Up @@ -118,6 +147,8 @@ Status SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* s
}

Status SnapshotManager::release_snapshot(const string& snapshot_path) {
auto local_snapshot_guard = LocalSnapshotLock::instance().acquire(snapshot_path);

// If the requested snapshot_path is located in the root/snapshot folder, it is considered legal and can be deleted.
// Otherwise, it is considered an illegal request and returns an error result.
SCOPED_ATTACH_TASK(_mem_tracker);
Expand Down Expand Up @@ -448,7 +479,7 @@ Status SnapshotManager::_create_snapshot_files(const TabletSharedPtr& ref_tablet
}
}
// be would definitely set it as true no matter has missed version or not
// but it would take no effets on the following range loop
// but it would take no effects on the following range loop
if (!is_single_rowset_clone && request.__isset.missing_version) {
for (int64_t missed_version : request.missing_version) {
Version version = {missed_version, missed_version};
Expand Down
Loading

0 comments on commit aa21f2f

Please sign in to comment.