Skip to content

Commit c564a16

Browse files
committed
std.Progress: IPC fixes
Reduce node_storage_buffer_len from 200 to 83. This makes messages over the pipe fit in a single packet (4096 bytes). There is now a comptime assert to ensure this. In practice this is plenty of storage because typical terminal heights are significantly less than 83 rows. Handling of split reads is fixed; instead of using a global `remaining_read_trash_bytes`, the value is stored in the "saved metadata" for the IPC node. Saved metadata is split into two arrays so that the "find" operation can quickly scan over fds for a match, looking at 332 bytes maximum, and only reading the memory for the other data upon match. More typical number of bytes read for this operation would be 0 (no child processes), 4 (1 child process), or 64 (16 child processes reporting progress). Removed an align(4) that was leftover from an older design. This also includes part of Jacob Young's not-yet-landed patch that implements `writevNonblock`.
1 parent 30a35a8 commit c564a16

File tree

1 file changed

+83
-35
lines changed

1 file changed

+83
-35
lines changed

lib/std/Progress.zig

Lines changed: 83 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -324,7 +324,7 @@ var global_progress: Progress = .{
324324
.node_end_index = 0,
325325
};
326326

327-
const node_storage_buffer_len = 200;
327+
const node_storage_buffer_len = 83;
328328
var node_parents_buffer: [node_storage_buffer_len]Node.Parent = undefined;
329329
var node_storage_buffer: [node_storage_buffer_len]Node.Storage = undefined;
330330
var node_freelist_buffer: [node_storage_buffer_len]Node.OptionalIndex = undefined;
@@ -755,8 +755,10 @@ const Serialized = struct {
755755

756756
parents_copy: [node_storage_buffer_len]Node.Parent,
757757
storage_copy: [node_storage_buffer_len]Node.Storage,
758+
ipc_metadata_fds_copy: [node_storage_buffer_len]Fd,
758759
ipc_metadata_copy: [node_storage_buffer_len]SavedMetadata,
759760

761+
ipc_metadata_fds: [node_storage_buffer_len]Fd,
760762
ipc_metadata: [node_storage_buffer_len]SavedMetadata,
761763
};
762764
};
@@ -810,36 +812,39 @@ fn serialize(serialized_buffer: *Serialized.Buffer) Serialized {
810812
}
811813

812814
const SavedMetadata = struct {
813-
ipc_fd: u16,
815+
remaining_read_trash_bytes: u16,
814816
main_index: u8,
815817
start_index: u8,
816818
nodes_len: u8,
819+
};
817820

818-
fn getIpcFd(metadata: SavedMetadata) posix.fd_t {
819-
return if (is_windows)
820-
@ptrFromInt(@as(usize, metadata.ipc_fd) << 2)
821-
else
822-
metadata.ipc_fd;
821+
const Fd = enum(i32) {
822+
_,
823+
824+
fn init(fd: posix.fd_t) Fd {
825+
return @enumFromInt(if (is_windows) @as(isize, @bitCast(@intFromPtr(fd))) else fd);
823826
}
824827

825-
fn setIpcFd(fd: posix.fd_t) u16 {
826-
return @intCast(if (is_windows)
827-
@shrExact(@intFromPtr(fd), 2)
828+
fn get(fd: Fd) posix.fd_t {
829+
return if (is_windows)
830+
@ptrFromInt(@as(usize, @bitCast(@as(isize, @intFromEnum(fd)))))
828831
else
829-
fd);
832+
@intFromEnum(fd);
830833
}
831834
};
832835

833836
var ipc_metadata_len: u8 = 0;
834-
var remaining_read_trash_bytes: usize = 0;
835837

836838
fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buffer) usize {
839+
const ipc_metadata_fds_copy = &serialized_buffer.ipc_metadata_fds_copy;
837840
const ipc_metadata_copy = &serialized_buffer.ipc_metadata_copy;
841+
const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds;
838842
const ipc_metadata = &serialized_buffer.ipc_metadata;
839843

840844
var serialized_len = start_serialized_len;
841-
var pipe_buf: [2 * 4096]u8 align(4) = undefined;
845+
var pipe_buf: [2 * 4096]u8 = undefined;
842846

847+
const old_ipc_metadata_fds = ipc_metadata_fds_copy[0..ipc_metadata_len];
843848
const old_ipc_metadata = ipc_metadata_copy[0..ipc_metadata_len];
844849
ipc_metadata_len = 0;
845850

@@ -850,6 +855,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
850855
) |main_parent, *main_storage, main_index| {
851856
if (main_parent == .unused) continue;
852857
const fd = main_storage.getIpcFd() orelse continue;
858+
const opt_saved_metadata = findOld(fd, old_ipc_metadata_fds, old_ipc_metadata);
853859
var bytes_read: usize = 0;
854860
while (true) {
855861
const n = posix.read(fd, pipe_buf[bytes_read..]) catch |err| switch (err) {
@@ -862,24 +868,26 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
862868
},
863869
};
864870
if (n == 0) break;
865-
if (remaining_read_trash_bytes > 0) {
866-
assert(bytes_read == 0);
867-
if (remaining_read_trash_bytes >= n) {
868-
remaining_read_trash_bytes -= n;
871+
if (opt_saved_metadata) |m| {
872+
if (m.remaining_read_trash_bytes > 0) {
873+
assert(bytes_read == 0);
874+
if (m.remaining_read_trash_bytes >= n) {
875+
m.remaining_read_trash_bytes = @intCast(m.remaining_read_trash_bytes - n);
876+
continue;
877+
}
878+
const src = pipe_buf[m.remaining_read_trash_bytes..n];
879+
std.mem.copyForwards(u8, &pipe_buf, src);
880+
m.remaining_read_trash_bytes = 0;
881+
bytes_read = src.len;
869882
continue;
870883
}
871-
const src = pipe_buf[remaining_read_trash_bytes..n];
872-
std.mem.copyForwards(u8, &pipe_buf, src);
873-
remaining_read_trash_bytes = 0;
874-
bytes_read = src.len;
875-
continue;
876884
}
877885
bytes_read += n;
878886
}
879887
// Ignore all but the last message on the pipe.
880888
var input: []u8 = pipe_buf[0..bytes_read];
881889
if (input.len == 0) {
882-
serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, old_ipc_metadata);
890+
serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, 0, fd);
883891
continue;
884892
}
885893

@@ -888,9 +896,8 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
888896
const expected_bytes = 1 + subtree_len * (@sizeOf(Node.Storage) + @sizeOf(Node.Parent));
889897
if (input.len < expected_bytes) {
890898
// Ignore short reads. We'll handle the next full message when it comes instead.
891-
assert(remaining_read_trash_bytes == 0);
892-
remaining_read_trash_bytes = expected_bytes - input.len;
893-
serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, old_ipc_metadata);
899+
const remaining_read_trash_bytes: u16 = @intCast(expected_bytes - input.len);
900+
serialized_len = useSavedIpcData(serialized_len, serialized_buffer, main_storage, main_index, opt_saved_metadata, remaining_read_trash_bytes, fd);
894901
continue :main_loop;
895902
}
896903
if (input.len > expected_bytes) {
@@ -908,8 +915,9 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
908915
const nodes_len: u8 = @intCast(@min(parents.len - 1, serialized_buffer.storage.len - serialized_len));
909916

910917
// Remember in case the pipe is empty on next update.
918+
ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd);
911919
ipc_metadata[ipc_metadata_len] = .{
912-
.ipc_fd = SavedMetadata.setIpcFd(fd),
920+
.remaining_read_trash_bytes = 0,
913921
.start_index = @intCast(serialized_len),
914922
.nodes_len = nodes_len,
915923
.main_index = @intCast(main_index),
@@ -950,6 +958,7 @@ fn serializeIpc(start_serialized_len: usize, serialized_buffer: *Serialized.Buff
950958
// Save a copy in case any pipes are empty on the next update.
951959
@memcpy(serialized_buffer.parents_copy[0..serialized_len], serialized_buffer.parents[0..serialized_len]);
952960
@memcpy(serialized_buffer.storage_copy[0..serialized_len], serialized_buffer.storage[0..serialized_len]);
961+
@memcpy(ipc_metadata_fds_copy[0..ipc_metadata_len], ipc_metadata_fds[0..ipc_metadata_len]);
953962
@memcpy(ipc_metadata_copy[0..ipc_metadata_len], ipc_metadata[0..ipc_metadata_len]);
954963

955964
return serialized_len;
@@ -963,9 +972,13 @@ fn copyRoot(dest: *Node.Storage, src: *align(1) Node.Storage) void {
963972
};
964973
}
965974

966-
fn findOld(ipc_fd: posix.fd_t, old_metadata: []const SavedMetadata) ?*const SavedMetadata {
967-
for (old_metadata) |*m| {
968-
if (m.getIpcFd() == ipc_fd)
975+
fn findOld(
976+
ipc_fd: posix.fd_t,
977+
old_metadata_fds: []Fd,
978+
old_metadata: []SavedMetadata,
979+
) ?*SavedMetadata {
980+
for (old_metadata_fds, old_metadata) |fd, *m| {
981+
if (fd.get() == ipc_fd)
969982
return m;
970983
}
971984
return null;
@@ -976,25 +989,38 @@ fn useSavedIpcData(
976989
serialized_buffer: *Serialized.Buffer,
977990
main_storage: *Node.Storage,
978991
main_index: usize,
979-
old_metadata: []const SavedMetadata,
992+
opt_saved_metadata: ?*SavedMetadata,
993+
remaining_read_trash_bytes: u16,
994+
fd: posix.fd_t,
980995
) usize {
981996
const parents_copy = &serialized_buffer.parents_copy;
982997
const storage_copy = &serialized_buffer.storage_copy;
998+
const ipc_metadata_fds = &serialized_buffer.ipc_metadata_fds;
983999
const ipc_metadata = &serialized_buffer.ipc_metadata;
9841000

985-
const ipc_fd = main_storage.getIpcFd().?;
986-
const saved_metadata = findOld(ipc_fd, old_metadata) orelse {
1001+
const saved_metadata = opt_saved_metadata orelse {
9871002
main_storage.completed_count = 0;
9881003
main_storage.estimated_total_count = 0;
1004+
if (remaining_read_trash_bytes > 0) {
1005+
ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd);
1006+
ipc_metadata[ipc_metadata_len] = .{
1007+
.remaining_read_trash_bytes = remaining_read_trash_bytes,
1008+
.start_index = @intCast(start_serialized_len),
1009+
.nodes_len = 0,
1010+
.main_index = @intCast(main_index),
1011+
};
1012+
ipc_metadata_len += 1;
1013+
}
9891014
return start_serialized_len;
9901015
};
9911016

9921017
const start_index = saved_metadata.start_index;
9931018
const nodes_len = @min(saved_metadata.nodes_len, serialized_buffer.storage.len - start_serialized_len);
9941019
const old_main_index = saved_metadata.main_index;
9951020

1021+
ipc_metadata_fds[ipc_metadata_len] = Fd.init(fd);
9961022
ipc_metadata[ipc_metadata_len] = .{
997-
.ipc_fd = SavedMetadata.setIpcFd(ipc_fd),
1023+
.remaining_read_trash_bytes = remaining_read_trash_bytes,
9981024
.start_index = @intCast(start_serialized_len),
9991025
.nodes_len = nodes_len,
10001026
.main_index = @intCast(main_index),
@@ -1209,6 +1235,11 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
12091235
.{ .base = parents.ptr, .len = parents.len },
12101236
};
12111237

1238+
// Ensures the packet can fit in the pipe buffer.
1239+
const upper_bound_msg_len = 1 + node_storage_buffer_len * @sizeOf(Node.Storage) +
1240+
node_storage_buffer_len * @sizeOf(Node.OptionalIndex);
1241+
comptime assert(upper_bound_msg_len <= 4096);
1242+
12121243
while (remaining_write_trash_bytes > 0) {
12131244
// We do this in a separate write call to give a better chance for the
12141245
// writev below to be in a single packet.
@@ -1228,7 +1259,7 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
12281259

12291260
// If this write would block we do not want to keep trying, but we need to
12301261
// know if a partial message was written.
1231-
if (posix.writev(fd, &vecs)) |written| {
1262+
if (writevNonblock(fd, &vecs)) |written| {
12321263
const total = header.len + storage.len + parents.len;
12331264
if (written < total) {
12341265
remaining_write_trash_bytes = total - written;
@@ -1243,6 +1274,23 @@ fn writeIpc(fd: posix.fd_t, serialized: Serialized) error{BrokenPipe}!void {
12431274
}
12441275
}
12451276

1277+
fn writevNonblock(fd: posix.fd_t, iov: []posix.iovec_const) posix.WriteError!usize {
1278+
var iov_index: usize = 0;
1279+
var written: usize = 0;
1280+
var total_written: usize = 0;
1281+
while (true) {
1282+
while (if (iov_index < iov.len)
1283+
written >= iov[iov_index].len
1284+
else
1285+
return total_written) : (iov_index += 1) written -= iov[iov_index].len;
1286+
iov[iov_index].base += written;
1287+
iov[iov_index].len -= written;
1288+
written = try posix.writev(fd, iov[iov_index..]);
1289+
if (written == 0) return total_written;
1290+
total_written += written;
1291+
}
1292+
}
1293+
12461294
fn maybeUpdateSize(resize_flag: bool) void {
12471295
if (!resize_flag) return;
12481296

0 commit comments

Comments
 (0)