31
31
//! }
32
32
//! ```
33
33
34
+ use std:: io:: { Cursor , Read } ;
34
35
use std:: task:: Context ;
35
36
use std:: task:: Poll ;
36
37
use std:: { fmt:: Debug , pin:: Pin , str:: FromStr } ;
37
38
38
39
use futures_core:: stream:: Stream ;
40
+ use futures_lite:: { io, prelude:: * } ;
41
+ use futures_util:: stream:: TryStreamExt ;
39
42
use multipart:: server:: Multipart as Parser ;
40
- use std:: io:: { Cursor , Read } ;
41
43
42
- use crate :: { format_err, Mime , Status } ;
44
+ use crate :: mime;
45
+ use crate :: { format_err, Body , Mime , Status } ;
43
46
pub use entry:: Entry ;
44
47
45
48
mod entry;
@@ -67,7 +70,6 @@ impl Multipart {
67
70
68
71
/// Parse a `Body` stream as a `Multipart` instance.
69
72
pub async fn from_req ( req : & mut crate :: Request ) -> crate :: Result < Self > {
70
- let body = req. take_body ( ) . into_string ( ) . await ?;
71
73
let boundary = req
72
74
. content_type ( )
73
75
. map ( |ct| ct. param ( "boundary" ) . cloned ( ) )
@@ -83,6 +85,9 @@ impl Multipart {
83
85
}
84
86
} ;
85
87
88
+ // Not ideal, but done for now so we can avoid implementing all of Multipart ourselves for the time being.
89
+ let body = req. take_body ( ) . into_string ( ) . await ?;
90
+
86
91
let multipart = Parser :: with_body ( Cursor :: new ( body) , boundary) ;
87
92
Ok ( Self {
88
93
entries : vec ! [ ] ,
@@ -96,6 +101,11 @@ impl Multipart {
96
101
E : Into < Entry > ,
97
102
{
98
103
self . entries . push ( entry. into ( ) ) ;
104
+ // if let Some(entries) = self.entries.as_mut() {
105
+ // entries.push(entry.into());
106
+ // } else {
107
+ // self.entries = Some(vec![entry.into()]);
108
+ // }
99
109
}
100
110
}
101
111
@@ -120,20 +130,91 @@ impl Stream for Multipart {
120
130
. content_type
121
131
. map ( |ct| Mime :: from_str ( & ct. to_string ( ) ) )
122
132
. transpose ( ) ?;
123
- entry. set_content_type ( mime) ;
133
+ if let Some ( mime) = mime {
134
+ entry. set_mime ( mime) ;
135
+ } else {
136
+ // https://tools.ietf.org/html/rfc7578#section-4.4
137
+ entry. set_mime ( mime:: PLAIN ) ;
138
+ }
124
139
125
140
Poll :: Ready ( Some ( Ok ( entry) ) )
126
141
}
127
142
Ok ( None ) => Poll :: Ready ( None ) ,
128
- Err ( _e) => {
129
- // TODO: forward error?
130
- let mut err = format_err ! ( "Invalid multipart entry" ) ;
143
+ Err ( e) => {
144
+ let mut err = format_err ! ( "Invalid multipart entry: {}" , e) ;
131
145
err. set_status ( 400 ) ;
132
146
Poll :: Ready ( Some ( Err ( err) ) )
133
147
}
134
148
}
135
149
}
136
150
}
137
151
138
- // TODO
139
- // impl From<Multipart> for Body {}
152
+ // struct MultipartReader {
153
+ // entry_iter: Box<dyn Iterator<Item = Entry>>,
154
+ // }
155
+
156
+ // impl From<Multipart> for MultipartReader {
157
+ // fn from(multipart: Multipart) -> Self {
158
+ // Self {
159
+ // entry_iter: Box::new(multipart.entries.into_iter())
160
+ // }
161
+ // }
162
+ // }
163
+
164
+ // impl AsyncRead for MultipartReader {
165
+ // #[allow(missing_doc_code_examples)]
166
+ // fn poll_read(
167
+ // mut self: Pin<&mut Self>,
168
+ // cx: &mut Context<'_>,
169
+ // buf: &mut [u8],
170
+ // ) -> Poll<io::Result<usize>> {
171
+ // if let Some(entry) = self.entry_iter.next() {
172
+ // Pin::new(&mut entry).poll_read(cx, buf)
173
+ // } else {
174
+ // Poll::Ready()
175
+ // }
176
+ // }
177
+ // }
178
+
179
+ // impl AsyncBufRead for MultipartReader {
180
+ // #[allow(missing_doc_code_examples)]
181
+ // fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&'_ [u8]>> {
182
+ // let this = self.project();
183
+ // this.reader.poll_fill_buf(cx)
184
+ // }
185
+
186
+ // fn consume(mut self: Pin<&mut Self>, amt: usize) {
187
+ // Pin::new(&mut self.reader).consume(amt)
188
+ // }
189
+ // }
190
+
191
+ struct BufReader < R : AsyncRead > {
192
+ inner : io:: BufReader < R > ,
193
+ }
194
+
195
+ impl < R : AsyncRead > BufReader < R > {
196
+ pub fn new ( inner : R ) -> Self {
197
+ Self {
198
+ inner : io:: BufReader :: new ( inner) ,
199
+ }
200
+ }
201
+ }
202
+
203
+ impl < R : AsyncRead > AsRef < [ u8 ] > for BufReader < R > {
204
+ fn as_ref ( & self ) -> & [ u8 ] {
205
+ self . inner . buffer ( )
206
+ }
207
+ }
208
+
209
+ impl From < Multipart > for Body {
210
+ fn from ( multipart : Multipart ) -> Self {
211
+ let stream = multipart. map ( |maybe_entry| {
212
+ maybe_entry
213
+ . map ( |entry| BufReader :: new ( entry) )
214
+ . map_err ( |err| {
215
+ std:: io:: Error :: new ( std:: io:: ErrorKind :: Other , err. to_string ( ) . to_owned ( ) )
216
+ } )
217
+ } ) ;
218
+ Body :: from_reader ( io:: BufReader :: new ( stream. into_async_read ( ) ) , None )
219
+ }
220
+ }
0 commit comments