diff --git a/Cargo.lock b/Cargo.lock index 9814238ee3ae..eb1bf21c2636 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3122,6 +3122,7 @@ dependencies = [ "async-backtrace", "async-trait", "borsh", + "bytes", "bytesize", "chrono", "ctrlc", @@ -4132,6 +4133,7 @@ dependencies = [ "reqwest", "serde", "thiserror", + "url", ] [[package]] diff --git a/src/common/arrow/src/arrow/array/binview/mod.rs b/src/common/arrow/src/arrow/array/binview/mod.rs index 69bda66794b5..2d0ebc49e5b9 100644 --- a/src/common/arrow/src/arrow/array/binview/mod.rs +++ b/src/common/arrow/src/arrow/array/binview/mod.rs @@ -166,6 +166,18 @@ impl BinaryViewArrayGeneric { total_bytes_len: usize, total_buffer_len: usize, ) -> Self { + #[cfg(debug_assertions)] + { + if total_bytes_len != UNKNOWN_LEN as usize { + let total = views.iter().map(|v| v.length as usize).sum::(); + assert_eq!(total, total_bytes_len); + } + + if total_buffer_len != UNKNOWN_LEN as usize { + let total = buffers.iter().map(|v| v.len()).sum::(); + assert_eq!(total, total_buffer_len); + } + } // # Safety // The caller must ensure // - the data is valid utf8 (if required) diff --git a/src/common/arrow/src/arrow/array/binview/mutable.rs b/src/common/arrow/src/arrow/array/binview/mutable.rs index abf2530b6a38..119fa085bd08 100644 --- a/src/common/arrow/src/arrow/array/binview/mutable.rs +++ b/src/common/arrow/src/arrow/array/binview/mutable.rs @@ -155,12 +155,11 @@ impl MutableBinaryViewArray { #[inline] pub(crate) unsafe fn push_view_unchecked(&mut self, v: View, buffers: &[Buffer]) { let len = v.length; - self.total_bytes_len += len as usize; if len <= 12 { + self.total_bytes_len += len as usize; debug_assert!(self.views.capacity() > self.views.len()); self.views.push(v) } else { - self.total_buffer_len += len as usize; let data = buffers.get_unchecked(v.buffer_idx as usize); let offset = v.offset as usize; let bytes = data.get_unchecked(offset..offset + len as usize); @@ -263,12 +262,19 @@ impl MutableBinaryViewArray { // Push and pop to get the properly encoded value. // For long string this leads to a dictionary encoding, // as we push the string only once in the buffers + + let old_bytes_len = self.total_bytes_len; + let view_value = value .map(|v| { self.push_value_ignore_validity(v); self.views.pop().unwrap() }) .unwrap_or_default(); + + self.total_bytes_len += + (self.total_bytes_len - old_bytes_len) * additional.saturating_sub(1); + self.views .extend(std::iter::repeat(view_value).take(additional)); } diff --git a/src/common/arrow/src/arrow/array/growable/binview.rs b/src/common/arrow/src/arrow/array/growable/binview.rs index a23ba22bffe4..6fa525471e82 100644 --- a/src/common/arrow/src/arrow/array/growable/binview.rs +++ b/src/common/arrow/src/arrow/array/growable/binview.rs @@ -74,7 +74,6 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> { capacity: usize, ) -> Self { let data_type = arrays[0].data_type().clone(); - // if any of the arrays has nulls, insertions from any array requires setting bits // as there is at least one array with nulls. if !use_validity & arrays.iter().any(|array| array.null_count() > 0) { @@ -91,11 +90,8 @@ impl<'a, T: ViewType + ?Sized> GrowableBinaryViewArray<'a, T> { .map(|buf| BufferKey { inner: buf }) }) .collect::>(); - let total_buffer_len = arrays - .iter() - .map(|arr| arr.data_buffers().len()) - .sum::(); + let total_buffer_len = buffers.iter().map(|v| v.inner.len()).sum(); Self { arrays, data_type, diff --git a/src/common/arrow/tests/it/arrow/array/binview/mutable_values.rs b/src/common/arrow/tests/it/arrow/array/binview/mutable_values.rs index 0c23a157f65c..e0384ad7bd39 100644 --- a/src/common/arrow/tests/it/arrow/array/binview/mutable_values.rs +++ b/src/common/arrow/tests/it/arrow/array/binview/mutable_values.rs @@ -29,3 +29,17 @@ fn extend_from_iter() { .as_box() ) } + +#[test] +fn extend_from_repeats() { + let mut b = MutableBinaryViewArray::::new(); + b.extend_constant(4, Some("databend")); + + let a = b.clone(); + b.extend_trusted_len_values(a.values_iter()); + + assert_eq!( + b.as_box(), + MutableBinaryViewArray::::from_values_iter(vec!["databend"; 8].into_iter()).as_box() + ) +} diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index 81ddc3e8e9b2..0560b19d3a25 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -26,6 +26,7 @@ databend-common-exception = { workspace = true } async-backtrace = { workspace = true } async-trait = { workspace = true } borsh = { workspace = true } +bytes = { workspace = true } bytesize = { workspace = true } chrono = { workspace = true } ctrlc = { workspace = true } diff --git a/src/common/base/src/base/dma.rs b/src/common/base/src/base/dma.rs index 7aeda6307fdd..8851e2acab9b 100644 --- a/src/common/base/src/base/dma.rs +++ b/src/common/base/src/base/dma.rs @@ -28,6 +28,7 @@ use std::path::Path; use std::ptr; use std::ptr::NonNull; +use bytes::Bytes; use rustix::fs::OFlags; use tokio::fs::File; use tokio::io::AsyncSeekExt; @@ -116,10 +117,6 @@ impl DmaAllocator { Layout::from_size_align(layout.size(), self.0.as_usize()).unwrap() } } - - fn real_cap(&self, cap: usize) -> usize { - self.0.align_up(cap) - } } unsafe impl Allocator for DmaAllocator { @@ -131,6 +128,10 @@ unsafe impl Allocator for DmaAllocator { Global {}.allocate_zeroed(self.real_layout(layout)) } + unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: Layout) { + Global {}.deallocate(ptr, self.real_layout(layout)) + } + unsafe fn grow( &self, ptr: NonNull, @@ -157,20 +158,38 @@ unsafe impl Allocator for DmaAllocator { ) } - unsafe fn deallocate(&self, ptr: std::ptr::NonNull, layout: Layout) { - Global {}.deallocate(ptr, self.real_layout(layout)) + unsafe fn shrink( + &self, + ptr: NonNull, + old_layout: Layout, + new_layout: Layout, + ) -> Result, AllocError> { + Global {}.shrink( + ptr, + self.real_layout(old_layout), + self.real_layout(new_layout), + ) } } type DmaBuffer = Vec; -pub fn dma_buffer_as_vec(mut buf: DmaBuffer) -> Vec { - let ptr = buf.as_mut_ptr(); - let len = buf.len(); - let cap = buf.allocator().real_cap(buf.capacity()); - std::mem::forget(buf); - - unsafe { Vec::from_raw_parts(ptr, len, cap) } +pub fn dma_buffer_to_bytes(buf: DmaBuffer) -> Bytes { + if buf.is_empty() { + return Bytes::new(); + } + let (ptr, len, cap, alloc) = buf.into_raw_parts_with_alloc(); + // Memory fitting + let old_layout = Layout::from_size_align(cap, alloc.0.as_usize()).unwrap(); + let new_layout = Layout::from_size_align(len, std::mem::align_of::()).unwrap(); + let data = unsafe { + let p = Global {} + .shrink(NonNull::new(ptr).unwrap(), old_layout, new_layout) + .unwrap(); + let cap = p.len(); + Vec::from_raw_parts(p.cast().as_mut(), len, cap) + }; + Bytes::from(data) } /// A `DmaFile` is similar to a `File`, but it is opened with the `O_DIRECT` file in order to @@ -697,4 +716,28 @@ mod tests { let _ = std::fs::remove_file(filename); } + + #[test] + fn test_dma_buffer_to_bytes() { + let want = (0..10_u8).collect::>(); + let alloc = DmaAllocator::new(Alignment::new(4096).unwrap()); + let mut buf = DmaBuffer::with_capacity_in(3000, alloc); + buf.extend_from_slice(&want); + + println!("{:?} {}", buf.as_ptr(), buf.capacity()); + buf.shrink_to_fit(); + println!("{:?} {}", buf.as_ptr(), buf.capacity()); + buf.reserve(3000 - buf.capacity()); + println!("{:?} {}", buf.as_ptr(), buf.capacity()); + + // let slice = buf.into_boxed_slice(); + // println!("{:?}", slice.as_ptr()); + + let got = dma_buffer_to_bytes(buf); + println!("{:?}", got.as_ptr()); + assert_eq!(&want, &got); + + let buf = got.to_vec(); + println!("{:?} {}", buf.as_ptr(), buf.capacity()); + } } diff --git a/src/common/base/src/base/mod.rs b/src/common/base/src/base/mod.rs index 72e96459220c..e77671ec3cb4 100644 --- a/src/common/base/src/base/mod.rs +++ b/src/common/base/src/base/mod.rs @@ -28,7 +28,7 @@ mod take_mut; mod uniq_id; mod watch_notify; -pub use dma::dma_buffer_as_vec; +pub use dma::dma_buffer_to_bytes; pub use dma::dma_read_file; pub use dma::dma_read_file_range; pub use dma::dma_write_file_vectored; diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 790a43c49dd0..73e20267daa4 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -25,6 +25,7 @@ #![feature(slice_swap_unchecked)] #![feature(variant_count)] #![feature(ptr_alignment_type)] +#![feature(vec_into_raw_parts)] pub mod base; pub mod containers; @@ -35,7 +36,6 @@ pub mod http_client; pub mod mem_allocator; pub mod rangemap; pub mod runtime; -pub mod slice_ext; pub mod vec_ext; pub mod version; diff --git a/src/common/base/src/slice_ext.rs b/src/common/base/src/slice_ext.rs deleted file mode 100644 index a2ff8e30cea2..000000000000 --- a/src/common/base/src/slice_ext.rs +++ /dev/null @@ -1,62 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::slice::SliceIndex; - -pub trait GetSaferUnchecked { - /// # Safety - /// - /// Calling this method with an out-of-bounds index is *[undefined behavior]* - /// even if the resulting reference is not used. - unsafe fn get_unchecked_release(&self, index: I) -> &>::Output - where I: SliceIndex<[T]>; - - /// # Safety - /// - /// Calling this method with an out-of-bounds index is *[undefined behavior]* - /// even if the resulting reference is not used. - unsafe fn get_unchecked_release_mut( - &mut self, - index: I, - ) -> &mut >::Output - where - I: SliceIndex<[T]>; -} - -impl GetSaferUnchecked for [T] { - #[inline(always)] - unsafe fn get_unchecked_release(&self, index: I) -> &>::Output - where I: SliceIndex<[T]> { - if cfg!(debug_assertions) { - &self[index] - } else { - unsafe { self.get_unchecked(index) } - } - } - - #[inline(always)] - unsafe fn get_unchecked_release_mut( - &mut self, - index: I, - ) -> &mut >::Output - where - I: SliceIndex<[T]>, - { - if cfg!(debug_assertions) { - &mut self[index] - } else { - unsafe { self.get_unchecked_mut(index) } - } - } -} diff --git a/src/common/storage/Cargo.toml b/src/common/storage/Cargo.toml index a25e3702c3ef..0f197f4dddd2 100644 --- a/src/common/storage/Cargo.toml +++ b/src/common/storage/Cargo.toml @@ -34,6 +34,7 @@ regex = { workspace = true } reqwest = { workspace = true } serde = { workspace = true } thiserror = { workspace = true } +url = { workspace = true } [dev-dependencies] diff --git a/src/common/storage/src/http_client.rs b/src/common/storage/src/http_client.rs index 7c2c99f2b955..d3d81ab171aa 100644 --- a/src/common/storage/src/http_client.rs +++ b/src/common/storage/src/http_client.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::future; use std::mem; use std::str::FromStr; @@ -26,6 +27,7 @@ use opendal::raw::parse_content_length; use opendal::raw::HttpBody; use opendal::raw::HttpFetch; use opendal::Buffer; +use url::Url; pub struct StorageHttpClient { client: reqwest::Client, @@ -46,13 +48,14 @@ impl HttpFetch for StorageHttpClient { let uri = req.uri().clone(); let is_head = req.method() == http::Method::HEAD; - let host = uri.host().unwrap_or_default(); + let url = Url::parse(uri.to_string().as_str()).expect("input request url must be valid"); + let host = url.host_str().unwrap_or_default(); let method = match req.method() { &http::Method::GET => { - if uri.path() == "/" { - "LIST" - } else { - "GET" + let query: HashMap<_, _> = url.query_pairs().collect(); + match query.get("list-type") { + Some(_) => "LIST", + None => "GET", } } m => m.as_str(), diff --git a/src/query/expression/src/kernels/filter.rs b/src/query/expression/src/kernels/filter.rs index 11bfffe08812..1af12f7b046f 100644 --- a/src/query/expression/src/kernels/filter.rs +++ b/src/query/expression/src/kernels/filter.rs @@ -358,7 +358,7 @@ impl<'a> FilterVisitor<'a> { new_views, values.data.data_buffers().clone(), None, - Some(values.data.total_buffer_len()), + None, ) }; StringColumn::new(new_col) diff --git a/src/query/expression/src/kernels/take.rs b/src/query/expression/src/kernels/take.rs index f4957a1c19f3..0828c27c9643 100644 --- a/src/query/expression/src/kernels/take.rs +++ b/src/query/expression/src/kernels/take.rs @@ -19,7 +19,6 @@ use databend_common_arrow::arrow::array::Array; use databend_common_arrow::arrow::array::Utf8ViewArray; use databend_common_arrow::arrow::bitmap::Bitmap; use databend_common_arrow::arrow::buffer::Buffer; -use databend_common_base::slice_ext::GetSaferUnchecked; use databend_common_exception::Result; use string::StringColumnBuilder; @@ -258,7 +257,7 @@ where I: databend_common_arrow::arrow::types::Index let result: Vec = self .indices .iter() - .map(|index| unsafe { *col.get_unchecked_release(index.to_usize()) }) + .map(|index| unsafe { *col.get_unchecked(index.to_usize()) }) .collect(); result.into() } @@ -292,7 +291,7 @@ where I: databend_common_arrow::arrow::types::Index new_views, col.data.data_buffers().clone(), None, - Some(col.data.total_buffer_len()), + None, ) }; StringColumn::new(new_col) diff --git a/src/query/expression/src/kernels/take_compact.rs b/src/query/expression/src/kernels/take_compact.rs index 2cf400264b6d..3353f467c5fe 100644 --- a/src/query/expression/src/kernels/take_compact.rs +++ b/src/query/expression/src/kernels/take_compact.rs @@ -238,7 +238,7 @@ impl<'a> TakeCompactVisitor<'a> { new_views, col.data.data_buffers().clone(), None, - Some(col.data.total_buffer_len()), + None, ) }; StringColumn::new(new_col) diff --git a/src/query/expression/src/kernels/take_ranges.rs b/src/query/expression/src/kernels/take_ranges.rs index 872f3f5829ef..f8ed6e1d84f5 100644 --- a/src/query/expression/src/kernels/take_ranges.rs +++ b/src/query/expression/src/kernels/take_ranges.rs @@ -239,7 +239,7 @@ impl<'a> TakeRangeVisitor<'a> { new_views, col.data.data_buffers().clone(), None, - Some(col.data.total_buffer_len()), + None, ) }; StringColumn::new(new_col) diff --git a/src/query/expression/src/types/string.rs b/src/query/expression/src/types/string.rs index bc06218b358f..185e2a991fbc 100644 --- a/src/query/expression/src/types/string.rs +++ b/src/query/expression/src/types/string.rs @@ -18,7 +18,6 @@ use std::ops::Range; use databend_common_arrow::arrow::array::MutableBinaryViewArray; use databend_common_arrow::arrow::array::Utf8ViewArray; use databend_common_arrow::arrow::trusted_len::TrustedLen; -use databend_common_base::slice_ext::GetSaferUnchecked; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -293,8 +292,8 @@ impl StringColumn { } pub fn compare(col_i: &Self, i: usize, col_j: &Self, j: usize) -> Ordering { - let view_i = unsafe { col_i.data.views().as_slice().get_unchecked_release(i) }; - let view_j = unsafe { col_j.data.views().as_slice().get_unchecked_release(j) }; + let view_i = unsafe { col_i.data.views().as_slice().get_unchecked(i) }; + let view_j = unsafe { col_j.data.views().as_slice().get_unchecked(j) }; if view_i.prefix == view_j.prefix { unsafe { @@ -311,7 +310,7 @@ impl StringColumn { } pub fn compare_str(col: &Self, i: usize, value: &str) -> Ordering { - let view = unsafe { col.data.views().as_slice().get_unchecked_release(i) }; + let view = unsafe { col.data.views().as_slice().get_unchecked(i) }; let prefix = load_prefix(value.as_bytes()); if view.prefix == prefix { diff --git a/src/query/service/src/clusters/cluster.rs b/src/query/service/src/clusters/cluster.rs index d4b46ee4cee4..039ce494402b 100644 --- a/src/query/service/src/clusters/cluster.rs +++ b/src/query/service/src/clusters/cluster.rs @@ -137,28 +137,23 @@ impl ClusterHelper for Cluster { ))) } - let mut futures = Vec::with_capacity(message.len()); + let mut response = HashMap::with_capacity(message.len()); for (id, message) in message { let node = get_node(&self.nodes, &id)?; - futures.push({ - let config = GlobalConfig::instance(); - let flight_address = node.flight_address.clone(); - let node_secret = node.secret.clone(); - - async move { - let mut conn = create_client(&config, &flight_address).await?; - Ok::<_, ErrorCode>(( - id, - conn.do_action::<_, Res>(path, node_secret, message, timeout) - .await?, - )) - } - }); + let config = GlobalConfig::instance(); + let flight_address = node.flight_address.clone(); + let node_secret = node.secret.clone(); + + let mut conn = create_client(&config, &flight_address).await?; + response.insert( + id, + conn.do_action::<_, Res>(path, node_secret, message, timeout) + .await?, + ); } - let responses: Vec<(String, Res)> = futures::future::try_join_all(futures).await?; - Ok(responses.into_iter().collect::>()) + Ok(response) } } diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index 6246d86a2ffc..e545d6c89ce0 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -20,8 +20,7 @@ use std::ops::Range; use std::sync::Arc; use std::time::Instant; -use bytes::Bytes; -use databend_common_base::base::dma_buffer_as_vec; +use databend_common_base::base::dma_buffer_to_bytes; use databend_common_base::base::dma_read_file_range; use databend_common_base::base::Alignment; use databend_common_base::base::DmaWriteBuf; @@ -277,7 +276,7 @@ impl Spiller { None => { let file_size = path.size(); let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) + Buffer::from(dma_buffer_to_bytes(buf)).slice(range) } } } @@ -330,7 +329,7 @@ impl Spiller { ); let (buf, range) = dma_read_file_range(path, 0..file_size as u64).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) + Buffer::from(dma_buffer_to_bytes(buf)).slice(range) } (Location::Local(path), Some(ref local)) => { local @@ -371,7 +370,7 @@ impl Spiller { } None => { let (buf, range) = dma_read_file_range(path, data_range).await?; - Buffer::from(dma_buffer_as_vec(buf)).slice(range) + Buffer::from(dma_buffer_to_bytes(buf)).slice(range) } }, Location::Remote(loc) => self.operator.read_with(loc).range(data_range).await?, @@ -410,7 +409,7 @@ impl Spiller { let buf = buf .into_data() .into_iter() - .map(|x| Bytes::from(dma_buffer_as_vec(x))) + .map(dma_buffer_to_bytes) .collect::(); let written = buf.len(); writer.write(buf).await?; diff --git a/tests/sqllogictests/suites/base/01_system/01_0001_system_tables.test b/tests/sqllogictests/suites/base/01_system/01_0001_system_tables.test index f07c0150dfcb..8a0dfda36bfd 100644 --- a/tests/sqllogictests/suites/base/01_system/01_0001_system_tables.test +++ b/tests/sqllogictests/suites/base/01_system/01_0001_system_tables.test @@ -71,6 +71,12 @@ select name, database, owner from system.tables where database='c' and name ='t1 ---- t100 c account_admin +statement ok +select * from system.malloc_stats_totals; + +statement ok +select * from system.malloc_stats; + statement ok drop database if exists a; diff --git a/tests/sqllogictests/suites/query/join/left_outer.test b/tests/sqllogictests/suites/query/join/left_outer.test index ec53a1887605..d8edb290f8da 100644 --- a/tests/sqllogictests/suites/query/join/left_outer.test +++ b/tests/sqllogictests/suites/query/join/left_outer.test @@ -298,8 +298,18 @@ SELECT * FROM t1 LEFT JOIN t2 ON t1.a = t2.a; 4 5 NULL NULL 5 6 NULL NULL + +statement ok +create or replace table t1 (a string) as select number from numbers(100); + +statement ok +create or replace table t2 (a string) as select number from numbers(100); + +## just check it works or not statement ok -set max_block_size = 65536; +select * from ( + select 'SN0LL' as k from t1 +) as a1 left join (select * from t2) as a2 on a1.k = a2.a; statement ok DROP TABLE IF EXISTS t1;