Skip to content

Commit

Permalink
Merge branch 'databendlabs:main' into add-glue-catalog
Browse files Browse the repository at this point in the history
  • Loading branch information
Rowlandev authored Nov 15, 2024
2 parents fd9e76f + 182b744 commit fdc709c
Show file tree
Hide file tree
Showing 21 changed files with 147 additions and 123 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions src/common/arrow/src/arrow/array/binview/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,18 @@ impl<T: ViewType + ?Sized> BinaryViewArrayGeneric<T> {
total_bytes_len: usize,
total_buffer_len: usize,
) -> Self {
#[cfg(debug_assertions)]
{
if total_bytes_len != UNKNOWN_LEN as usize {
let total = views.iter().map(|v| v.length as usize).sum::<usize>();
assert_eq!(total, total_bytes_len);
}

if total_buffer_len != UNKNOWN_LEN as usize {
let total = buffers.iter().map(|v| v.len()).sum::<usize>();
assert_eq!(total, total_buffer_len);
}
}
// # Safety
// The caller must ensure
// - the data is valid utf8 (if required)
Expand Down
10 changes: 8 additions & 2 deletions src/common/arrow/src/arrow/array/binview/mutable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,12 +155,11 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
#[inline]
pub(crate) unsafe fn push_view_unchecked(&mut self, v: View, buffers: &[Buffer<u8>]) {
let len = v.length;
self.total_bytes_len += len as usize;
if len <= 12 {
self.total_bytes_len += len as usize;
debug_assert!(self.views.capacity() > self.views.len());
self.views.push(v)
} else {
self.total_buffer_len += len as usize;
let data = buffers.get_unchecked(v.buffer_idx as usize);
let offset = v.offset as usize;
let bytes = data.get_unchecked(offset..offset + len as usize);
Expand Down Expand Up @@ -263,12 +262,19 @@ impl<T: ViewType + ?Sized> MutableBinaryViewArray<T> {
// Push and pop to get the properly encoded value.
// For long string this leads to a dictionary encoding,
// as we push the string only once in the buffers

let old_bytes_len = self.total_bytes_len;

let view_value = value
.map(|v| {
self.push_value_ignore_validity(v);
self.views.pop().unwrap()
})
.unwrap_or_default();

self.total_bytes_len +=
(self.total_bytes_len - old_bytes_len) * additional.saturating_sub(1);

self.views
.extend(std::iter::repeat(view_value).take(additional));
}
Expand Down
6 changes: 1 addition & 5 deletions src/common/arrow/src/arrow/array/growable/binview.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
capacity: usize,
) -> Self {
let data_type = arrays[0].data_type().clone();

// if any of the arrays has nulls, insertions from any array requires setting bits
// as there is at least one array with nulls.
if !use_validity & arrays.iter().any(|array| array.null_count() > 0) {
Expand All @@ -91,11 +90,8 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> {
.map(|buf| BufferKey { inner: buf })
})
.collect::<ArrowIndexSet<_>>();
let total_buffer_len = arrays
.iter()
.map(|arr| arr.data_buffers().len())
.sum::<usize>();

let total_buffer_len = buffers.iter().map(|v| v.inner.len()).sum();
Self {
arrays,
data_type,
Expand Down
14 changes: 14 additions & 0 deletions src/common/arrow/tests/it/arrow/array/binview/mutable_values.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,17 @@ fn extend_from_iter() {
.as_box()
)
}

#[test]
fn extend_from_repeats() {
let mut b = MutableBinaryViewArray::<str>::new();
b.extend_constant(4, Some("databend"));

let a = b.clone();
b.extend_trusted_len_values(a.values_iter());

assert_eq!(
b.as_box(),
MutableBinaryViewArray::<str>::from_values_iter(vec!["databend"; 8].into_iter()).as_box()
)
}
1 change: 1 addition & 0 deletions src/common/base/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ databend-common-exception = { workspace = true }
async-backtrace = { workspace = true }
async-trait = { workspace = true }
borsh = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
chrono = { workspace = true }
ctrlc = { workspace = true }
Expand Down
69 changes: 56 additions & 13 deletions src/common/base/src/base/dma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use std::path::Path;
use std::ptr;
use std::ptr::NonNull;

use bytes::Bytes;
use rustix::fs::OFlags;
use tokio::fs::File;
use tokio::io::AsyncSeekExt;
Expand Down Expand Up @@ -116,10 +117,6 @@ impl DmaAllocator {
Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap()
}
}

fn real_cap(&self, cap: usize) -> usize {
self.0.align_up(cap)
}
}

unsafe impl Allocator for DmaAllocator {
Expand All @@ -131,6 +128,10 @@ unsafe impl Allocator for DmaAllocator {
Global {}.allocate_zeroed(self.real_layout(layout))
}

unsafe fn deallocate(&self, ptr: std::ptr::NonNull<u8>, layout: Layout) {
Global {}.deallocate(ptr, self.real_layout(layout))
}

unsafe fn grow(
&self,
ptr: NonNull<u8>,
Expand All @@ -157,20 +158,38 @@ unsafe impl Allocator for DmaAllocator {
)
}

unsafe fn deallocate(&self, ptr: std::ptr::NonNull<u8>, layout: Layout) {
Global {}.deallocate(ptr, self.real_layout(layout))
unsafe fn shrink(
&self,
ptr: NonNull<u8>,
old_layout: Layout,
new_layout: Layout,
) -> Result<NonNull<[u8]>, AllocError> {
Global {}.shrink(
ptr,
self.real_layout(old_layout),
self.real_layout(new_layout),
)
}
}

type DmaBuffer = Vec<u8, DmaAllocator>;

pub fn dma_buffer_as_vec(mut buf: DmaBuffer) -> Vec<u8> {
let ptr = buf.as_mut_ptr();
let len = buf.len();
let cap = buf.allocator().real_cap(buf.capacity());
std::mem::forget(buf);

unsafe { Vec::from_raw_parts(ptr, len, cap) }
pub fn dma_buffer_to_bytes(buf: DmaBuffer) -> Bytes {
if buf.is_empty() {
return Bytes::new();
}
let (ptr, len, cap, alloc) = buf.into_raw_parts_with_alloc();
// Memory fitting
let old_layout = Layout::from_size_align(cap, alloc.0.as_usize()).unwrap();
let new_layout = Layout::from_size_align(len, std::mem::align_of::<u8>()).unwrap();
let data = unsafe {
let p = Global {}
.shrink(NonNull::new(ptr).unwrap(), old_layout, new_layout)
.unwrap();
let cap = p.len();
Vec::from_raw_parts(p.cast().as_mut(), len, cap)
};
Bytes::from(data)
}

/// A `DmaFile` is similar to a `File`, but it is opened with the `O_DIRECT` file in order to
Expand Down Expand Up @@ -697,4 +716,28 @@ mod tests {

let _ = std::fs::remove_file(filename);
}

#[test]
fn test_dma_buffer_to_bytes() {
let want = (0..10_u8).collect::<Vec<_>>();
let alloc = DmaAllocator::new(Alignment::new(4096).unwrap());
let mut buf = DmaBuffer::with_capacity_in(3000, alloc);
buf.extend_from_slice(&want);

println!("{:?} {}", buf.as_ptr(), buf.capacity());
buf.shrink_to_fit();
println!("{:?} {}", buf.as_ptr(), buf.capacity());
buf.reserve(3000 - buf.capacity());
println!("{:?} {}", buf.as_ptr(), buf.capacity());

// let slice = buf.into_boxed_slice();
// println!("{:?}", slice.as_ptr());

let got = dma_buffer_to_bytes(buf);
println!("{:?}", got.as_ptr());
assert_eq!(&want, &got);

let buf = got.to_vec();
println!("{:?} {}", buf.as_ptr(), buf.capacity());
}
}
2 changes: 1 addition & 1 deletion src/common/base/src/base/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ mod take_mut;
mod uniq_id;
mod watch_notify;

pub use dma::dma_buffer_as_vec;
pub use dma::dma_buffer_to_bytes;
pub use dma::dma_read_file;
pub use dma::dma_read_file_range;
pub use dma::dma_write_file_vectored;
Expand Down
2 changes: 1 addition & 1 deletion src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#![feature(slice_swap_unchecked)]
#![feature(variant_count)]
#![feature(ptr_alignment_type)]
#![feature(vec_into_raw_parts)]

pub mod base;
pub mod containers;
Expand All @@ -35,7 +36,6 @@ pub mod http_client;
pub mod mem_allocator;
pub mod rangemap;
pub mod runtime;
pub mod slice_ext;
pub mod vec_ext;
pub mod version;

Expand Down
62 changes: 0 additions & 62 deletions src/common/base/src/slice_ext.rs

This file was deleted.

1 change: 1 addition & 0 deletions src/common/storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ regex = { workspace = true }
reqwest = { workspace = true }
serde = { workspace = true }
thiserror = { workspace = true }
url = { workspace = true }

[dev-dependencies]

Expand Down
13 changes: 8 additions & 5 deletions src/common/storage/src/http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::future;
use std::mem;
use std::str::FromStr;
Expand All @@ -26,6 +27,7 @@ use opendal::raw::parse_content_length;
use opendal::raw::HttpBody;
use opendal::raw::HttpFetch;
use opendal::Buffer;
use url::Url;

pub struct StorageHttpClient {
client: reqwest::Client,
Expand All @@ -46,13 +48,14 @@ impl HttpFetch for StorageHttpClient {
let uri = req.uri().clone();
let is_head = req.method() == http::Method::HEAD;

let host = uri.host().unwrap_or_default();
let url = Url::parse(uri.to_string().as_str()).expect("input request url must be valid");
let host = url.host_str().unwrap_or_default();
let method = match req.method() {
&http::Method::GET => {
if uri.path() == "/" {
"LIST"
} else {
"GET"
let query: HashMap<_, _> = url.query_pairs().collect();
match query.get("list-type") {
Some(_) => "LIST",
None => "GET",
}
}
m => m.as_str(),
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/kernels/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ impl<'a> FilterVisitor<'a> {
new_views,
values.data.data_buffers().clone(),
None,
Some(values.data.total_buffer_len()),
None,
)
};
StringColumn::new(new_col)
Expand Down
5 changes: 2 additions & 3 deletions src/query/expression/src/kernels/take.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use databend_common_arrow::arrow::array::Array;
use databend_common_arrow::arrow::array::Utf8ViewArray;
use databend_common_arrow::arrow::bitmap::Bitmap;
use databend_common_arrow::arrow::buffer::Buffer;
use databend_common_base::slice_ext::GetSaferUnchecked;
use databend_common_exception::Result;
use string::StringColumnBuilder;

Expand Down Expand Up @@ -258,7 +257,7 @@ where I: databend_common_arrow::arrow::types::Index
let result: Vec<T> = self
.indices
.iter()
.map(|index| unsafe { *col.get_unchecked_release(index.to_usize()) })
.map(|index| unsafe { *col.get_unchecked(index.to_usize()) })
.collect();
result.into()
}
Expand Down Expand Up @@ -292,7 +291,7 @@ where I: databend_common_arrow::arrow::types::Index
new_views,
col.data.data_buffers().clone(),
None,
Some(col.data.total_buffer_len()),
None,
)
};
StringColumn::new(new_col)
Expand Down
2 changes: 1 addition & 1 deletion src/query/expression/src/kernels/take_compact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ impl<'a> TakeCompactVisitor<'a> {
new_views,
col.data.data_buffers().clone(),
None,
Some(col.data.total_buffer_len()),
None,
)
};
StringColumn::new(new_col)
Expand Down
Loading

0 comments on commit fdc709c

Please sign in to comment.