Skip to content

Commit

Permalink
Implement enum FutureState<T>, embed fstat into ReqInner
Browse files Browse the repository at this point in the history
  • Loading branch information
j-devel committed Jun 26, 2024
1 parent e05bd1d commit 40df8bc
Showing 1 changed file with 71 additions and 74 deletions.
145 changes: 71 additions & 74 deletions examples/xbd-net/src/xbd/gcoap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,43 +150,37 @@ impl Future for Req {
//

#[derive(Debug)]
pub struct Progress<T>(Option<AtomicWaker>, pub Option<AtomicWaker>, pub Option<T>);

#[derive(Debug)]
pub enum FutureState<T> {// !!!!
New,
Registered,
Resolved(T),
pub enum FutureState<T> {
New(Option<AtomicWaker>),
Registered(Option<AtomicWaker>),
Resolved(Option<T>),
}

impl<T> Progress<T> {
impl<T> FutureState<T> {
pub fn new() -> Self {
Self(Some(AtomicWaker::new()), None, None)
}

pub fn is_new(&self) -> bool {
self.0.is_some() && self.1.is_none() && self.2.is_none()
Self::New(Some(AtomicWaker::new()))
}

pub fn register(&mut self, cx_waker: &Waker) {
assert!(self.is_new());

let waker = self.0.take().unwrap();
waker.register(cx_waker);
self.1.replace(waker);
if let Self::New(waker) = self {
let waker = waker.take().unwrap();
waker.register(cx_waker);
*self = Self::Registered(Some(waker));
} else { panic!(); }
}

pub fn resolve(&mut self, ret: T) {
assert!(self.0.is_none() && self.1.is_some() && self.2.is_none()); // registered

self.2.replace(ret);
self.1.take().unwrap().wake();
if let Self::Registered(waker) = self {
let waker = waker.take().unwrap();
*self = Self::Resolved(Some(ret));
waker.wake();
} else { panic!(); }
}

pub fn take(&mut self) -> T {
assert!(self.0.is_none() && self.1.is_none() && self.2.is_some()); // resolved

self.2.take().unwrap()
if let Self::Resolved(ret) = self {
ret.take().unwrap()
} else { panic!(); }
}

pub fn as_mut_ptr(&self) -> *mut Self {
Expand All @@ -209,8 +203,7 @@ pub struct ReqInner {
blockwise: bool,
blockwise_state_index: Option<usize>,
blockwise_hdr: Option<Vec<u8, BLOCKWISE_HDR_MAX>>,
progress: Progress<GcoapMemoState>,
// fstat: FutureState<GcoapMemoState>,
fstat: FutureState<GcoapMemoState>,
}

impl ReqInner {
Expand All @@ -227,7 +220,7 @@ impl ReqInner {
blockwise,
blockwise_state_index,
blockwise_hdr,
progress: Progress::new(),
fstat: FutureState::new(),
}
}
}
Expand All @@ -236,42 +229,46 @@ impl Future for ReqInner {
type Output = GcoapMemoState;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<<Self as Future>::Output> {
if self.progress.is_new() {
self.progress.register(cx.waker());
let progress_ptr = self.progress.as_mut_ptr();

match self.method {
COAP_METHOD_GET => {
if self.blockwise {
let idx = self.blockwise_state_index.unwrap();

if BlockwiseData::state_is_valid(idx) {
BlockwiseData::set_state_last(Some(idx));
BlockwiseData::update_state(idx,
self.addr.as_bytes(),
self.uri.as_bytes(),
self.blockwise_hdr.as_deref());

gcoap_get_blockwise_inner(&self.addr, &self.uri, idx, progress_ptr);
} else { // blockwise stream could be already closed
BlockwiseData::set_state_last(None);

return Poll::Ready(GcoapMemoState::Err)
match &mut self.fstat {
FutureState::New(_) => {
self.fstat.register(cx.waker());
let fstat_ptr = self.fstat.as_mut_ptr();

match self.method {
COAP_METHOD_GET => {
if self.blockwise {
let idx = self.blockwise_state_index.unwrap();

if BlockwiseData::state_is_valid(idx) {
BlockwiseData::set_state_last(Some(idx));
BlockwiseData::update_state(idx,
self.addr.as_bytes(),
self.uri.as_bytes(),
self.blockwise_hdr.as_deref());

gcoap_get_blockwise_inner(&self.addr, &self.uri, idx, fstat_ptr);
} else { // blockwise stream could be already closed
BlockwiseData::set_state_last(None);

return Poll::Ready(GcoapMemoState::Err)
}
} else {
gcoap_get_inner(&self.addr, &self.uri, fstat_ptr);
}
} else {
gcoap_get_inner(&self.addr, &self.uri, progress_ptr);
}
},
COAP_METHOD_POST => gcoap_post_inner(
&self.addr, &self.uri, self.payload.as_ref().unwrap().as_slice(), progress_ptr),
COAP_METHOD_PUT => gcoap_put_inner(
&self.addr, &self.uri, self.payload.as_ref().unwrap().as_slice(), progress_ptr),
_ => todo!(),
}

Poll::Pending
} else {
Poll::Ready(self.progress.take())
},
COAP_METHOD_POST => gcoap_post_inner(
&self.addr, &self.uri, self.payload.as_ref().unwrap().as_slice(), fstat_ptr),
COAP_METHOD_PUT => gcoap_put_inner(
&self.addr, &self.uri, self.payload.as_ref().unwrap().as_slice(), fstat_ptr),
_ => todo!(),
}

Poll::Pending
},
FutureState::Resolved(_) => {
Poll::Ready(self.fstat.take())
},
_ => panic!(),
}
}
}
Expand All @@ -282,25 +279,25 @@ unsafe impl Send for ReqInner {

//

fn gcoap_get_blockwise_inner(addr: &str, uri: &str, blockwise_state_index: usize, progress_ptr: *mut Progress<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_GET, None, true, Some(blockwise_state_index), progress_ptr);
fn gcoap_get_blockwise_inner(addr: &str, uri: &str, blockwise_state_index: usize, fstat_ptr: *mut FutureState<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_GET, None, true, Some(blockwise_state_index), fstat_ptr);
}

fn gcoap_get_inner(addr: &str, uri: &str, progress_ptr: *mut Progress<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_GET, None, false, None, progress_ptr);
fn gcoap_get_inner(addr: &str, uri: &str, fstat_ptr: *mut FutureState<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_GET, None, false, None, fstat_ptr);
}

fn gcoap_post_inner(addr: &str, uri: &str, payload: &[u8], progress_ptr: *mut Progress<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_POST, Some(payload), false, None, progress_ptr);
fn gcoap_post_inner(addr: &str, uri: &str, payload: &[u8], fstat_ptr: *mut FutureState<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_POST, Some(payload), false, None, fstat_ptr);
}

fn gcoap_put_inner(addr: &str, uri: &str, payload: &[u8], progress_ptr: *mut Progress<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_PUT, Some(payload), false, None, progress_ptr);
fn gcoap_put_inner(addr: &str, uri: &str, payload: &[u8], fstat_ptr: *mut FutureState<GcoapMemoState>) {
gcoap_req(addr, uri, COAP_METHOD_PUT, Some(payload), false, None, fstat_ptr);
}

fn gcoap_req(addr: &str, uri: &str, method: CoapMethod,
payload: Option<&[u8]>, blockwise: bool, blockwise_state_index: Option<usize>,
progress_ptr: *mut Progress<GcoapMemoState>) {
fstat_ptr: *mut FutureState<GcoapMemoState>) {
let payload_ptr = payload.map_or(core::ptr::null(), |payload| payload.as_ptr());
let payload_len = payload.map_or(0, |payload| payload.len());

Expand All @@ -323,7 +320,7 @@ fn gcoap_req(addr: &str, uri: &str, method: CoapMethod,
uri_cstr.as_ptr(),
method, payload_ptr, payload_len,
blockwise, blockwise_state_index.unwrap_or(0 /* to be ignored */),
progress_ptr as *const c_void, // context
fstat_ptr as *const c_void, // context
gcoap_req_resp_handler as *const c_void);
}
}
Expand Down Expand Up @@ -356,5 +353,5 @@ fn gcoap_req_resp_handler(memo: *const c_void, pdu: *const c_void, remote: *cons
};

let memo = GcoapMemoState::new(memo_state, payload);
Progress::get_mut_ref(context as *mut _).resolve(memo);
FutureState::get_mut_ref(context as *mut _).resolve(memo);
}

0 comments on commit 40df8bc

Please sign in to comment.