Skip to content

Commit 4bc46a0

Browse files
Merge pull request #49 from RelationalAI/aa-bulk-delete
Implement bulk delete
2 parents d4a2f8f + a13f103 commit 4bc46a0

4 files changed

+241
-5
lines changed

Project.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name = "RustyObjectStore"
22
uuid = "1b5eed3d-1f46-4baa-87f3-a4a892b23610"
3-
version = "0.11.1"
3+
version = "0.12.3"
44

55
[deps]
66
Base64 = "2a0f44e3-6c83-55bd-87e4-b1978d98bd5f"
@@ -20,7 +20,7 @@ ReTestItems = "1"
2020
Sockets = "1"
2121
Test = "1"
2222
julia = "1.8"
23-
object_store_ffi_jll = "0.11.1"
23+
object_store_ffi_jll = "0.12.3"
2424

2525
[extras]
2626
CloudBase = "85eb1798-d7c4-4918-bb13-c944d38e27ed"

src/RustyObjectStore.jl

+102
Original file line numberDiff line numberDiff line change
@@ -746,6 +746,13 @@ struct DeleteException <: RequestException
746746

747747
DeleteException(msg) = new(msg, rust_message_to_reason(msg))
748748
end
749+
# Used for generic exceptions that are not specific to one of the to be deleted paths
750+
struct BulkDeleteException <: RequestException
751+
msg::String
752+
reason::ErrorReason
753+
754+
BulkDeleteException(msg) = new(msg, rust_message_to_reason(msg))
755+
end
749756
struct ListException <: RequestException
750757
msg::String
751758
reason::ErrorReason
@@ -1058,6 +1065,101 @@ function delete_object(path::String, conf::AbstractConfig)
10581065
end
10591066
end
10601067

1068+
# =========================================================================================
1069+
# Bulk Delete
1070+
struct BulkFailedEntryFFI
1071+
path::Cstring
1072+
error_message::Cstring
1073+
end
1074+
1075+
struct BulkFailedEntry
1076+
path::String
1077+
error_message::String
1078+
end
1079+
1080+
function convert_bulk_failed_entry(entry::BulkFailedEntryFFI)
1081+
return BulkFailedEntry(
1082+
unsafe_string(entry.path),
1083+
unsafe_string(entry.error_message),
1084+
)
1085+
end
1086+
1087+
mutable struct BulkResponseFFI
1088+
result::Cint
1089+
failed_entries::Ptr{BulkFailedEntryFFI}
1090+
failed_count::Culonglong
1091+
error_message::Ptr{Cchar}
1092+
context::Ptr{Cvoid}
1093+
1094+
BulkResponseFFI() = new(-1, C_NULL, 0, C_NULL, C_NULL)
1095+
end
1096+
1097+
"""
1098+
bulk_delete_objects(path, conf)
1099+
1100+
Send a bulk delete request to the object store.
1101+
1102+
# Arguments
1103+
- `paths::Vector{String}`: The locations of the objects to delete.
1104+
- `conf::AbstractConfig`: The configuration to use for the request.
1105+
It includes credentials and other client options.
1106+
1107+
# Throws
1108+
- `BulkDeleteException`: If the request fails for any reason.
1109+
Note that deletions of non-existing objects will be treated as success.
1110+
"""
1111+
function bulk_delete_objects(paths::Vector{String}, conf::AbstractConfig)
1112+
response = BulkResponseFFI()
1113+
ct = current_task()
1114+
event = Base.Event()
1115+
handle = pointer_from_objref(event)
1116+
config = into_config(conf)
1117+
while true
1118+
result = GC.@preserve paths config response event begin
1119+
preserve_task(ct)
1120+
try
1121+
# Pass a pointer to the array of pointers to the Cstrings
1122+
result = @ccall rust_lib.bulk_delete(
1123+
paths::Ptr{Ptr{Cchar}},
1124+
length(paths)::Cuint,
1125+
config::Ref{Config},
1126+
response::Ref{BulkResponseFFI},
1127+
handle::Ptr{Cvoid}
1128+
)::Cint
1129+
1130+
wait_or_cancel(event, response)
1131+
1132+
result
1133+
finally
1134+
unpreserve_task(ct)
1135+
end
1136+
end
1137+
1138+
if result == 2
1139+
# backoff
1140+
sleep(0.01)
1141+
continue
1142+
end
1143+
1144+
@throw_on_error(response, "bulk_delete", BulkDeleteException)
1145+
1146+
entries = if response.failed_count > 0
1147+
raw_entries = unsafe_wrap(Array, response.failed_entries, response.failed_count)
1148+
vector = map(convert_bulk_failed_entry, raw_entries)
1149+
@ccall rust_lib.destroy_bulk_failed_entries(
1150+
response.failed_entries::Ptr{BulkFailedEntryFFI},
1151+
response.failed_count::Culonglong
1152+
)::Cint
1153+
vector
1154+
else
1155+
Vector{BulkFailedEntry}[]
1156+
end
1157+
1158+
return entries
1159+
end
1160+
end
1161+
# =========================================================================================
1162+
10611163
mutable struct ReadResponseFFI
10621164
result::Cint
10631165
length::Culonglong

test/azure_blobs_exception_tests.jl

+109-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
@testitem "Basic BlobStorage exceptions" setup=[InitializeObjectStore] begin
22
using CloudBase.CloudTest: Azurite
3-
import CloudBase
43
using RustyObjectStore: RustyObjectStore, get_object!, put_object, ClientOptions, AzureConfig, AWSConfig
4+
import CloudBase
5+
import HTTP
6+
import Sockets
57

68
# For interactive testing, use Azurite.run() instead of Azurite.with()
79
# conf, p = Azurite.run(; debug=true, public=false); atexit(() -> kill(p))
@@ -173,6 +175,112 @@
173175
@test occursin("The specified resource does not exist.", e.msg)
174176
end
175177
end
178+
179+
@testset "bulk_delete_objects exceptions" begin
180+
# We have to mock to simulate partial failures for some of the
181+
# requested deletes
182+
crafted_res =
183+
"""--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
184+
Content-Type: application/http\r
185+
Content-ID: 0\r
186+
\r
187+
HTTP/1.1 202 Accepted\r
188+
x-ms-delete-type-permanent: true\r
189+
x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e284f\r
190+
x-ms-version: 2018-11-09\r
191+
\r
192+
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
193+
Content-Type: application/http\r
194+
Content-ID: 1\r
195+
\r
196+
HTTP/1.1 403 Forbidden\r
197+
Content-Length: 223\r
198+
Content-Type: application/xml\r
199+
Server: Microsoft-HTTPAPI/2.0\r
200+
x-ms-request-id: 12345678-90ab-cdef-1234-567890abcdef\r
201+
x-ms-version: 2021-12-02\r
202+
Date: Wed, 07 Feb 2025 12:34:56 GMT\r
203+
\r
204+
<?xml version="1.0" encoding="utf-8"?>\r
205+
<Error>\r
206+
<Code>AuthorizationPermissionMismatch</Code>\r
207+
<Message>\r
208+
This request is not authorized to perform this operation using this permission.\r
209+
RequestId: 12345678-90ab-cdef-1234-567890abcdef\r
210+
Time: 2025-02-07T12:34:56.000Z\r
211+
</Message>\r
212+
</Error>\r
213+
Time:2018-06-14T16:46:54.6040685Z</Message></Error>\r
214+
\r
215+
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed\r
216+
Content-Type: application/http\r
217+
Content-ID: 2\r
218+
\r
219+
HTTP/1.1 404 The specified blob does not exist.\r
220+
x-ms-error-code: BlobNotFound\r
221+
x-ms-request-id: 778fdc83-801e-0000-62ff-0334671e2852\r
222+
x-ms-version: 2018-11-09\r
223+
Content-Length: 216\r
224+
Content-Type: application/xml\r
225+
\r
226+
<?xml version=\"1.0\" encoding=\"utf-8\"?>\r
227+
<Error><Code>BlobNotFound</Code><Message>The specified blob does not exist.\r
228+
RequestId:778fdc83-801e-0000-62ff-0334671e2852\r
229+
Time:2018-06-14T16:46:54.6040685Z</Message></Error>\r
230+
--batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed--\r
231+
"""
232+
headers = Ref([
233+
"Transfer-Encoding" => "chunked",
234+
"Content-Type" => "multipart/mixed; boundary=batchresponse_66925647-d0cb-4109-b6d3-28efe3e1e5ed",
235+
"x-ms-request-id" => "778fdc83-801e-0000-62ff-033467000000",
236+
"x-ms-version" => "2018-11-09",
237+
])
238+
(port, tcp_server) = Sockets.listenany(8081)
239+
http_server = HTTP.serve!(tcp_server) do request::HTTP.Request
240+
return HTTP.Response(
241+
200,
242+
headers[],
243+
crafted_res,
244+
)
245+
end
246+
247+
mock_baseurl = "http://127.0.0.1:$port/account/container/"
248+
mock_config = AzureConfig(;
249+
storage_account_name=_credentials.auth.account,
250+
container_name=_container.name,
251+
storage_account_key=_credentials.auth.key,
252+
host=mock_baseurl
253+
)
254+
255+
failed_entries = RustyObjectStore.bulk_delete_objects(
256+
["a", "b", "c"],
257+
mock_config,
258+
)
259+
@test length(failed_entries) == 1
260+
@test failed_entries[1].path == "b"
261+
@test occursin("Forbidden (code: 403)", first(failed_entries).error_message)
262+
263+
# Corrupt response headers to generate a generic exception independent of the paths
264+
# we asked to delete
265+
headers[] = []
266+
try
267+
failed_entries = RustyObjectStore.bulk_delete_objects(
268+
["a", "b", "c"],
269+
mock_config,
270+
)
271+
# should throw because the response is invalid as it misses the
272+
# Content-Type header
273+
@test false
274+
catch e
275+
@test e isa RustyObjectStore.BulkDeleteException
276+
@test occursin("Got invalid bulk delete response", e.msg)
277+
@test e.reason == RustyObjectStore.UnknownError()
278+
finally
279+
close(http_server)
280+
end
281+
wait(http_server)
282+
# Test
283+
end
176284
end # Azurite.with
177285
# Azurite is not running at this point
178286
@testset "Connection error" begin

test/basic_unified_tests.jl

+28-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
@testsetup module ReadWriteCases
22
using RustyObjectStore: get_object!, put_object, get_object_stream, put_object_stream,
3-
AbstractConfig, delete_object, list_objects, list_objects_stream, next_chunk!, finish!
3+
AbstractConfig, bulk_delete_objects, delete_object,
4+
list_objects, list_objects_stream, next_chunk!, finish!
45
using CodecZlib
56
using RustyObjectStore
67

@@ -475,6 +476,32 @@ function run_read_write_test_cases(read_config::AbstractConfig, write_config::Ab
475476
end
476477
end
477478

479+
@testset "bulk_delete_objects" begin
480+
input = "1,2,3,4,5,6,7,8,9,1\n" ^ 5
481+
buffer = Vector{UInt8}(undef, 100)
482+
@assert sizeof(input) == 100
483+
@assert sizeof(buffer) == sizeof(input)
484+
485+
object_names = ["test100B.csv", "test200B.csv"]
486+
for name in object_names
487+
nbytes_written = put_object(codeunits(input), name, write_config)
488+
@test nbytes_written == 100
489+
end
490+
491+
failed_entries = bulk_delete_objects(object_names, write_config)
492+
@test isempty(failed_entries)
493+
494+
for name in object_names
495+
try
496+
nbytes_read = get_object!(buffer, name, read_config)
497+
@test false # should throw
498+
catch e
499+
@test e isa RustyObjectStore.GetException
500+
@test occursin("not found", e.msg)
501+
end
502+
end
503+
end
504+
478505
# Large files should use multipart upload / download requests
479506
@testset "20MB file, 20MB buffer" begin
480507
input = "1,2,3,4,5,6,7,8,9,1\n" ^ 1_000_000
@@ -686,7 +713,6 @@ Azurite.with(; debug=true, public=false) do conf
686713

687714
run_sanity_test_cases(config_padded)
688715
end # Azurite.with
689-
690716
end # @testitem
691717

692718
# NOTE: PUT on azure always requires credentials, while GET on public containers doesn't

0 commit comments

Comments
 (0)