@@ -27,6 +27,111 @@ pub fn test_file_write_read<S: squeue::EntryMarker, C: cqueue::EntryMarker>(
27
27
Ok ( ( ) )
28
28
}
29
29
30
+ pub fn test_pipe_write_read_multi < S : squeue:: EntryMarker , C : cqueue:: EntryMarker > (
31
+ ring : & mut IoUring < S , C > ,
32
+ test : & Test ,
33
+ ) -> anyhow:: Result < ( ) > {
34
+ require ! (
35
+ test;
36
+ test. probe. is_supported( opcode:: Write :: CODE ) ;
37
+ test. probe. is_supported( opcode:: ReadMulti :: CODE ) ;
38
+ test. probe. is_supported( opcode:: ProvideBuffers :: CODE ) ;
39
+ ) ;
40
+
41
+ println ! ( "test pipe_write_read_multi" ) ;
42
+
43
+ // Create a pipe for testing
44
+ let mut pipefds = [ 0 ; 2 ] ;
45
+ let res = unsafe { libc:: pipe ( pipefds. as_mut_ptr ( ) ) } ;
46
+ if res < 0 {
47
+ return Err ( anyhow:: anyhow!( "Failed to create pipe" ) ) ;
48
+ }
49
+
50
+ let read_fd = types:: Fd ( pipefds[ 0 ] ) ;
51
+ let write_fd = types:: Fd ( pipefds[ 1 ] ) ;
52
+
53
+ // Prepare data to write and buffers to read into
54
+ let mut input = vec ! [ 0xde ; 1024 ] ;
55
+ input. extend_from_slice ( & [ 0xad ; 256 ] ) ;
56
+
57
+ // NOTE: we use 3 buffers for the last EOF is also taking up a buffer
58
+ let mut bufs = vec ! [ 0 ; 3 * 1024 ] ;
59
+
60
+ // Provide buffers for multi-shot reads
61
+ let provide_bufs_e = opcode:: ProvideBuffers :: new ( bufs. as_mut_ptr ( ) , 1024 , 3 , 0xbeef , 0 ) ;
62
+
63
+ unsafe {
64
+ ring. submission ( )
65
+ . push ( & provide_bufs_e. build ( ) . user_data ( 0x31 ) . into ( ) )
66
+ . expect ( "queue is full" ) ;
67
+ }
68
+
69
+ ring. submit_and_wait ( 1 ) ?;
70
+
71
+ let cqe: cqueue:: Entry = ring. completion ( ) . next ( ) . expect ( "cqueue is empty" ) . into ( ) ;
72
+ assert_eq ! ( cqe. user_data( ) , 0x31 ) ;
73
+ assert_eq ! ( cqe. result( ) , 0 ) ;
74
+
75
+ // Write data to the pipe
76
+ let write_e = opcode:: Write :: new ( write_fd, input. as_ptr ( ) , input. len ( ) as _ ) ;
77
+
78
+ unsafe {
79
+ ring. submission ( )
80
+ . push ( & write_e. build ( ) . user_data ( 0x32 ) . into ( ) )
81
+ . expect ( "queue is full" ) ;
82
+ }
83
+
84
+ ring. submit_and_wait ( 1 ) ?;
85
+
86
+ let cqe: cqueue:: Entry = ring. completion ( ) . next ( ) . expect ( "cqueue is empty" ) . into ( ) ;
87
+ assert_eq ! ( cqe. user_data( ) , 0x32 ) ;
88
+ assert_eq ! ( cqe. result( ) , input. len( ) as i32 ) ;
89
+
90
+ // Issue multi-shot read using a buf_group with 1024 length buffers
91
+ let read_multi_e = opcode:: ReadMulti :: new ( read_fd, 0xbeef )
92
+ . build ( )
93
+ . user_data ( 0x33 )
94
+ . into ( ) ;
95
+
96
+ unsafe {
97
+ ring. submission ( )
98
+ . push ( & read_multi_e)
99
+ . expect ( "queue is full" ) ;
100
+ }
101
+
102
+ // Close the write end to ensure we get an EOF completion
103
+ let cres = unsafe { libc:: close ( write_fd. 0 ) } ;
104
+ if cres < 0 {
105
+ return Err ( anyhow:: anyhow!( "Failed to close file descriptor" ) ) ;
106
+ }
107
+
108
+ ring. submit_and_wait ( 3 ) ?;
109
+
110
+ let cqes: Vec < cqueue:: Entry > = ring. completion ( ) . map ( Into :: into) . collect ( ) ;
111
+ assert_eq ! ( cqes. len( ) , 3 ) ;
112
+
113
+ // First read should get 1024 bytes
114
+ assert_eq ! ( cqes[ 0 ] . user_data( ) , 0x33 ) ;
115
+ assert_eq ! ( cqes[ 0 ] . result( ) , 1024 ) ;
116
+ assert ! ( cqueue:: more( cqes[ 0 ] . flags( ) ) ) ;
117
+ assert_eq ! ( cqueue:: buffer_select( cqes[ 0 ] . flags( ) ) , Some ( 0 ) ) ;
118
+ assert_eq ! ( & bufs[ ..1024 ] , & input[ ..1024 ] ) ;
119
+
120
+ // Second read should get the remaining 256 bytes
121
+ assert_eq ! ( cqes[ 1 ] . user_data( ) , 0x33 ) ;
122
+ assert_eq ! ( cqes[ 1 ] . result( ) , 256 ) ;
123
+ assert ! ( cqueue:: more( cqes[ 1 ] . flags( ) ) ) ;
124
+ assert_eq ! ( cqueue:: buffer_select( cqes[ 1 ] . flags( ) ) , Some ( 1 ) ) ;
125
+ assert_eq ! ( & bufs[ 1024 ..] [ ..256 ] , & input[ 1024 ..] [ ..256 ] ) ;
126
+
127
+ // Final completion should indicate EOF
128
+ assert_eq ! ( cqes[ 2 ] . user_data( ) , 0x33 ) ;
129
+ assert ! ( !cqueue:: more( cqes[ 2 ] . flags( ) ) ) ;
130
+ assert_eq ! ( cqes[ 2 ] . result( ) , 0 ) ; // 0 indicates EOF for read operations
131
+
132
+ Ok ( ( ) )
133
+ }
134
+
30
135
pub fn test_file_writev_readv < S : squeue:: EntryMarker , C : cqueue:: EntryMarker > (
31
136
ring : & mut IoUring < S , C > ,
32
137
test : & Test ,
0 commit comments