diff --git a/connect/src/context_resolver.rs b/connect/src/context_resolver.rs new file mode 100644 index 000000000..278fc0899 --- /dev/null +++ b/connect/src/context_resolver.rs @@ -0,0 +1,347 @@ +use crate::{ + core::{Error, Session}, + protocol::{ + autoplay_context_request::AutoplayContextRequest, context::Context, + transfer_state::TransferState, + }, + state::{ + context::{ContextType, UpdateContext}, + ConnectState, + }, +}; +use std::cmp::PartialEq; +use std::{ + collections::{HashMap, VecDeque}, + fmt::{Display, Formatter}, + hash::Hash, + time::Duration, +}; +use thiserror::Error as ThisError; +use tokio::time::Instant; + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +enum Resolve { + Uri(String), + Context(Context), +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub(super) enum ContextAction { + Append, + Replace, +} + +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub(super) struct ResolveContext { + resolve: Resolve, + fallback: Option, + update: UpdateContext, + action: ContextAction, +} + +impl ResolveContext { + fn append_context(uri: impl Into) -> Self { + Self { + resolve: Resolve::Uri(uri.into()), + fallback: None, + update: UpdateContext::Default, + action: ContextAction::Append, + } + } + + pub fn from_uri( + uri: impl Into, + fallback: impl Into, + update: UpdateContext, + action: ContextAction, + ) -> Self { + let fallback_uri = fallback.into(); + Self { + resolve: Resolve::Uri(uri.into()), + fallback: (!fallback_uri.is_empty()).then_some(fallback_uri), + update, + action, + } + } + + pub fn from_context(context: Context, update: UpdateContext, action: ContextAction) -> Self { + Self { + resolve: Resolve::Context(context), + fallback: None, + update, + action, + } + } + + /// the uri which should be used to resolve the context, might not be the context uri + fn resolve_uri(&self) -> Option<&str> { + // it's important to call this always, or at least for every ResolveContext + // otherwise we might not even check if we need to fallback and just use the fallback uri + match self.resolve { + Resolve::Uri(ref uri) => ConnectState::valid_resolve_uri(uri), + Resolve::Context(ref ctx) => ConnectState::get_context_uri_from_context(ctx), + } + .or(self.fallback.as_deref()) + } + + /// the actual context uri + fn context_uri(&self) -> &str { + match self.resolve { + Resolve::Uri(ref uri) => uri, + Resolve::Context(ref ctx) => ctx.uri.as_deref().unwrap_or_default(), + } + } +} + +impl Display for ResolveContext { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "resolve_uri: <{:?}>, context_uri: <{}>, update: <{:?}>", + self.resolve_uri(), + self.context_uri(), + self.update, + ) + } +} + +#[derive(Debug, ThisError)] +enum ContextResolverError { + #[error("no next context to resolve")] + NoNext, + #[error("tried appending context with {0} pages")] + UnexpectedPagesSize(usize), + #[error("tried resolving not allowed context: {0:?}")] + NotAllowedContext(String), +} + +impl From for Error { + fn from(value: ContextResolverError) -> Self { + Error::failed_precondition(value) + } +} + +pub struct ContextResolver { + session: Session, + queue: VecDeque, + unavailable_contexts: HashMap, +} + +// time after which an unavailable context is retried +const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600); + +impl ContextResolver { + pub fn new(session: Session) -> Self { + Self { + session, + queue: VecDeque::new(), + unavailable_contexts: HashMap::new(), + } + } + + pub fn add(&mut self, resolve: ResolveContext) { + let last_try = self + .unavailable_contexts + .get(&resolve) + .map(|i| i.duration_since(Instant::now())); + + let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) { + let _ = self.unavailable_contexts.remove(&resolve); + debug!( + "context was requested {}s ago, trying again to resolve the requested context", + last_try.expect("checked by condition").as_secs() + ); + None + } else { + last_try + }; + + if last_try.is_some() { + debug!("tried loading unavailable context: {resolve}"); + return; + } else if self.queue.contains(&resolve) { + debug!("update for {resolve} is already added"); + return; + } else { + trace!( + "added {} to resolver queue", + resolve.resolve_uri().unwrap_or(resolve.context_uri()) + ) + } + + self.queue.push_back(resolve) + } + + pub fn add_list(&mut self, resolve: Vec) { + for resolve in resolve { + self.add(resolve) + } + } + + pub fn remove_used_and_invalid(&mut self) { + if let Some((_, _, remove)) = self.find_next() { + let _ = self.queue.drain(0..remove); // remove invalid + } + self.queue.pop_front(); // remove used + } + + pub fn clear(&mut self) { + self.queue = VecDeque::new() + } + + fn find_next(&self) -> Option<(&ResolveContext, &str, usize)> { + for idx in 0..self.queue.len() { + let next = self.queue.get(idx)?; + match next.resolve_uri() { + None => { + warn!("skipped {idx} because of invalid resolve_uri: {next}"); + continue; + } + Some(uri) => return Some((next, uri, idx)), + } + } + None + } + + pub fn has_next(&self) -> bool { + self.find_next().is_some() + } + + pub async fn get_next_context( + &self, + recent_track_uri: impl Fn() -> Vec, + ) -> Result { + let (next, resolve_uri, _) = self.find_next().ok_or(ContextResolverError::NoNext)?; + + match next.update { + UpdateContext::Default => { + let mut ctx = self.session.spclient().get_context(resolve_uri).await; + if let Ok(ctx) = ctx.as_mut() { + ctx.uri = Some(next.context_uri().to_string()); + ctx.url = ctx.uri.as_ref().map(|s| format!("context://{s}")); + } + + ctx + } + UpdateContext::Autoplay => { + if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:") + { + // autoplay is not supported for podcasts + Err(ContextResolverError::NotAllowedContext( + resolve_uri.to_string(), + ))? + } + + let request = AutoplayContextRequest { + context_uri: Some(resolve_uri.to_string()), + recent_track_uri: recent_track_uri(), + ..Default::default() + }; + self.session.spclient().get_autoplay_context(&request).await + } + } + } + + pub fn mark_next_unavailable(&mut self) { + if let Some((next, _, _)) = self.find_next() { + self.unavailable_contexts + .insert(next.clone(), Instant::now()); + } + } + + pub fn apply_next_context( + &self, + state: &mut ConnectState, + mut context: Context, + ) -> Result>, Error> { + let (next, _, _) = self.find_next().ok_or(ContextResolverError::NoNext)?; + + let remaining = match next.action { + ContextAction::Append if context.pages.len() == 1 => state + .fill_context_from_page(context.pages.remove(0)) + .map(|_| None), + ContextAction::Replace => { + let remaining = state.update_context(context, next.update); + if let Resolve::Context(ref ctx) = next.resolve { + state.merge_context(Some(ctx.clone())); + } + + remaining + } + ContextAction::Append => { + warn!("unexpected page size: {context:#?}"); + Err(ContextResolverError::UnexpectedPagesSize(context.pages.len()).into()) + } + }?; + + Ok(remaining.map(|remaining| { + remaining + .into_iter() + .map(ResolveContext::append_context) + .collect::>() + })) + } + + pub fn try_finish( + &self, + state: &mut ConnectState, + transfer_state: &mut Option, + ) -> bool { + let (next, _, _) = match self.find_next() { + None => return false, + Some(next) => next, + }; + + // when there is only one update type, we are the last of our kind, so we should update the state + if self + .queue + .iter() + .filter(|resolve| resolve.update == next.update) + .count() + != 1 + { + return false; + } + + match (next.update, state.active_context) { + (UpdateContext::Default, ContextType::Default) | (UpdateContext::Autoplay, _) => { + debug!( + "last item of type <{:?}>, finishing state setup", + next.update + ); + } + (UpdateContext::Default, _) => { + debug!("skipped finishing default, because it isn't the active context"); + return false; + } + } + + let active_ctx = state.get_context(state.active_context); + let res = if let Some(transfer_state) = transfer_state.take() { + state.finish_transfer(transfer_state) + } else if state.shuffling_context() { + state.shuffle() + } else if matches!(active_ctx, Ok(ctx) if ctx.index.track == 0) { + // has context, and context is not touched + // when the index is not zero, the next index was already evaluated elsewhere + let ctx = active_ctx.expect("checked by precondition"); + let idx = ConnectState::find_index_in_context(ctx, |t| { + state.current_track(|c| t.uri == c.uri) + }) + .ok(); + + state.reset_playback_to_position(idx) + } else { + state.fill_up_next_tracks() + }; + + if let Err(why) = res { + error!("setup of state failed: {why}, last used resolve {next:#?}") + } + + state.update_restrictions(); + state.update_queue_revision(); + + true + } +} diff --git a/connect/src/lib.rs b/connect/src/lib.rs index 3cfbbca19..11a651863 100644 --- a/connect/src/lib.rs +++ b/connect/src/lib.rs @@ -5,6 +5,7 @@ use librespot_core as core; use librespot_playback as playback; use librespot_protocol as protocol; +mod context_resolver; mod model; pub mod spirc; pub mod state; diff --git a/connect/src/model.rs b/connect/src/model.rs index a080f9686..8315ee29b 100644 --- a/connect/src/model.rs +++ b/connect/src/model.rs @@ -1,8 +1,4 @@ -use crate::state::ConnectState; use librespot_core::dealer::protocol::SkipTo; -use librespot_protocol::context::Context; -use std::fmt::{Display, Formatter}; -use std::hash::{Hash, Hasher}; #[derive(Debug)] pub struct SpircLoadCommand { @@ -13,7 +9,11 @@ pub struct SpircLoadCommand { pub shuffle: bool, pub repeat: bool, pub repeat_track: bool, - pub playing_track: PlayingTrack, + /// Decides the starting position in the given context + /// + /// ## Remarks: + /// If none is provided and shuffle true, a random track is played, otherwise the first + pub playing_track: Option, } #[derive(Debug)] @@ -23,19 +23,20 @@ pub enum PlayingTrack { Uid(String), } -impl From for PlayingTrack { - fn from(value: SkipTo) -> Self { +impl TryFrom for PlayingTrack { + type Error = (); + + fn try_from(value: SkipTo) -> Result { // order of checks is important, as the index can be 0, but still has an uid or uri provided, // so we only use the index as last resort if let Some(uri) = value.track_uri { - PlayingTrack::Uri(uri) + Ok(PlayingTrack::Uri(uri)) } else if let Some(uid) = value.track_uid { - PlayingTrack::Uid(uid) + Ok(PlayingTrack::Uid(uid)) + } else if let Some(index) = value.track_index { + Ok(PlayingTrack::Index(index)) } else { - PlayingTrack::Index(value.track_index.unwrap_or_else(|| { - warn!("SkipTo didn't provided any point to skip to, falling back to index 0"); - 0 - })) + Err(()) } } } @@ -58,131 +59,3 @@ pub(super) enum SpircPlayStatus { preloading_of_next_track_triggered: bool, }, } - -#[derive(Debug, Clone)] -pub(super) struct ResolveContext { - context: Context, - fallback: Option, - autoplay: bool, - /// if `true` updates the entire context, otherwise only fills the context from the next - /// retrieve page, it is usually used when loading the next page of an already established context - /// - /// like for example: - /// - playing an artists profile - update: bool, -} - -impl ResolveContext { - pub fn from_uri(uri: impl Into, fallback: impl Into, autoplay: bool) -> Self { - let fallback_uri = fallback.into(); - Self { - context: Context { - uri: Some(uri.into()), - ..Default::default() - }, - fallback: (!fallback_uri.is_empty()).then_some(fallback_uri), - autoplay, - update: true, - } - } - - pub fn from_context(context: Context, autoplay: bool) -> Self { - Self { - context, - fallback: None, - autoplay, - update: true, - } - } - - // expected page_url: hm://artistplaycontext/v1/page/spotify/album/5LFzwirfFwBKXJQGfwmiMY/km_artist - pub fn from_page_url(page_url: String) -> Self { - let split = if let Some(rest) = page_url.strip_prefix("hm://") { - rest.split('/') - } else { - warn!("page_url didn't started with hm://. got page_url: {page_url}"); - page_url.split('/') - }; - - let uri = split - .skip_while(|s| s != &"spotify") - .take(3) - .collect::>() - .join(":"); - - trace!("created an ResolveContext from page_url <{page_url}> as uri <{uri}>"); - - Self { - context: Context { - uri: Some(uri), - ..Default::default() - }, - fallback: None, - update: false, - autoplay: false, - } - } - - /// the uri which should be used to resolve the context, might not be the context uri - pub fn resolve_uri(&self) -> Option<&String> { - // it's important to call this always, or at least for every ResolveContext - // otherwise we might not even check if we need to fallback and just use the fallback uri - ConnectState::get_context_uri_from_context(&self.context) - .and_then(|s| (!s.is_empty()).then_some(s)) - .or(self.fallback.as_ref()) - } - - /// the actual context uri - pub fn context_uri(&self) -> &str { - self.context.uri.as_deref().unwrap_or_default() - } - - pub fn autoplay(&self) -> bool { - self.autoplay - } - - pub fn update(&self) -> bool { - self.update - } -} - -impl Display for ResolveContext { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!( - f, - "resolve_uri: <{:?}>, context_uri: <{:?}>, autoplay: <{}>, update: <{}>", - self.resolve_uri(), - self.context.uri, - self.autoplay, - self.update - ) - } -} - -impl PartialEq for ResolveContext { - fn eq(&self, other: &Self) -> bool { - let eq_context = self.context_uri() == other.context_uri(); - let eq_resolve = self.resolve_uri() == other.resolve_uri(); - let eq_autoplay = self.autoplay == other.autoplay; - let eq_update = self.update == other.update; - - eq_context && eq_resolve && eq_autoplay && eq_update - } -} - -impl Eq for ResolveContext {} - -impl Hash for ResolveContext { - fn hash(&self, state: &mut H) { - self.context_uri().hash(state); - self.resolve_uri().hash(state); - self.autoplay.hash(state); - self.update.hash(state); - } -} - -impl From for Context { - fn from(value: ResolveContext) -> Self { - value.context - } -} diff --git a/connect/src/spirc.rs b/connect/src/spirc.rs index e82865233..5526a8537 100644 --- a/connect/src/spirc.rs +++ b/connect/src/spirc.rs @@ -1,6 +1,6 @@ pub use crate::model::{PlayingTrack, SpircLoadCommand}; -use crate::state::{context::ResetContext, metadata::Metadata}; use crate::{ + context_resolver::{ContextAction, ContextResolver, ResolveContext}, core::{ authentication::Credentials, dealer::{ @@ -10,12 +10,12 @@ use crate::{ session::UserAttributes, Error, Session, SpotifyId, }, + model::SpircPlayStatus, playback::{ mixer::Mixer, player::{Player, PlayerEvent, PlayerEventChannel}, }, protocol::{ - autoplay_context_request::AutoplayContextRequest, connect::{Cluster, ClusterUpdate, LogoutCommand, SetVolumeCommand}, context::Context, explicit_content_pubsub::UserAttributesUpdate, @@ -24,19 +24,17 @@ use crate::{ transfer_state::TransferState, user_attributes::UserAttributesMutation, }, -}; -use crate::{ - model::{ResolveContext, SpircPlayStatus}, state::{ - context::{ContextType, LoadNext, UpdateContext}, + context::{ + ResetContext, {ContextType, UpdateContext}, + }, + metadata::Metadata, provider::IsProvider, {ConnectState, ConnectStateConfig}, }, }; use futures_util::StreamExt; use protobuf::MessageField; -use std::collections::HashMap; -use std::time::Instant; use std::{ future::Future, sync::atomic::{AtomicUsize, Ordering}, @@ -96,17 +94,11 @@ struct SpircTask { commands: Option>, player_events: Option, + context_resolver: ContextResolver, + shutdown: bool, session: Session, - /// the list of contexts to resolve - resolve_context: Vec, - - /// contexts may not be resolvable at the moment so we should ignore any further request - /// - /// an unavailable context is retried after [RETRY_UNAVAILABLE] - unavailable_contexts: HashMap, - /// is set when transferring, and used after resolving the contexts to finish the transfer pub transfer_state: Option, @@ -114,6 +106,10 @@ struct SpircTask { /// when no other future resolves, otherwise resets the delay update_volume: bool, + /// when set to true, it will update the volume after [UPDATE_STATE_DELAY], + /// when no other future resolves, otherwise resets the delay + update_state: bool, + spirc_id: usize, } @@ -143,12 +139,10 @@ const CONTEXT_FETCH_THRESHOLD: usize = 2; const VOLUME_STEP_SIZE: u16 = 1024; // (u16::MAX + 1) / VOLUME_STEPS -// delay to resolve a bundle of context updates, delaying the update prevents duplicate context updates of the same type -const RESOLVE_CONTEXT_DELAY: Duration = Duration::from_millis(500); -// time after which an unavailable context is retried -const RETRY_UNAVAILABLE: Duration = Duration::from_secs(3600); // delay to update volume after a certain amount of time, instead on each update request const VOLUME_UPDATE_DELAY: Duration = Duration::from_secs(2); +// to reduce updates to remote, we group some request by waiting for a set amount of time +const UPDATE_STATE_DELAY: Duration = Duration::from_millis(200); pub struct Spirc { commands: mpsc::UnboundedSender, @@ -246,13 +240,14 @@ impl Spirc { commands: Some(cmd_rx), player_events: Some(player_events), + context_resolver: ContextResolver::new(session.clone()), + shutdown: false, session, - resolve_context: Vec::new(), - unavailable_contexts: HashMap::new(), transfer_state: None, update_volume: false, + update_state: false, spirc_id, }; @@ -355,6 +350,10 @@ impl SpircTask { let commands = self.commands.as_mut(); let player_events = self.player_events.as_mut(); + // when state and volume update have a higher priority than context resolving + // because of that the context resolving has to wait, so that the other tasks can finish + let allow_context_resolving = !self.update_state && !self.update_volume; + tokio::select! { // startup of the dealer requires a connection_id, which is retrieved at the very beginning connection_id_update = self.connection_id_update.next() => unwrap! { @@ -417,13 +416,15 @@ impl SpircTask { } }, event = async { player_events?.recv().await }, if player_events.is_some() => if let Some(event) = event { - if let Err(e) = self.handle_player_event(event).await { + if let Err(e) = self.handle_player_event(event) { error!("could not dispatch player event: {}", e); } }, - _ = async { sleep(RESOLVE_CONTEXT_DELAY).await }, if !self.resolve_context.is_empty() => { - if let Err(why) = self.handle_resolve_context().await { - error!("ContextError: {why}") + _ = async { sleep(UPDATE_STATE_DELAY).await }, if self.update_state => { + self.update_state = false; + + if let Err(why) = self.notify().await { + error!("state update: {why}") } }, _ = async { sleep(VOLUME_UPDATE_DELAY).await }, if self.update_volume => { @@ -441,7 +442,27 @@ impl SpircTask { error!("error updating connect state for volume update: {why}") } }, - else => break, + // context resolver handling, the idea/reason behind it the following: + // + // when we request a context that has multiple pages (for example an artist) + // resolving all pages at once can take around ~1-30sec, when we resolve + // everything at once that would block our main loop for that time + // + // to circumvent this behavior, we request each context separately here and + // finish after we received our last item of a type + next_context = async { + self.context_resolver.get_next_context(|| { + self.connect_state.recent_track_uris() + }).await + }, if allow_context_resolving && self.context_resolver.has_next() => { + let update_state = self.handle_next_context(next_context); + if update_state { + if let Err(why) = self.notify().await { + error!("update after context resolving failed: {why}") + } + } + }, + else => break } } @@ -455,154 +476,48 @@ impl SpircTask { self.session.dealer().close().await; } - async fn handle_resolve_context(&mut self) -> Result<(), Error> { - let mut last_resolve = None::; - while let Some(resolve) = self.resolve_context.pop() { - if matches!(last_resolve, Some(ref last_resolve) if last_resolve == &resolve) { - debug!("did already update the context for {resolve}"); - continue; - } else { - last_resolve = Some(resolve.clone()); - - let resolve_uri = match resolve.resolve_uri() { - Some(resolve) => resolve, - None => { - warn!("tried to resolve context without resolve_uri: {resolve}"); - return Ok(()); - } - }; - - debug!("resolving: {resolve}"); - // the autoplay endpoint can return a 404, when it tries to retrieve an - // autoplay context for an empty playlist as it seems - if let Err(why) = self - .resolve_context( - resolve_uri, - resolve.context_uri(), - resolve.autoplay(), - resolve.update(), - ) - .await - { - error!("failed resolving context <{resolve}>: {why}"); - self.unavailable_contexts.insert(resolve, Instant::now()); - continue; - } - - self.connect_state.merge_context(Some(resolve.into())); + fn handle_next_context(&mut self, next_context: Result) -> bool { + let next_context = match next_context { + Err(why) => { + self.context_resolver.mark_next_unavailable(); + self.context_resolver.remove_used_and_invalid(); + error!("{why}"); + return false; } - } + Ok(ctx) => ctx, + }; - if let Some(transfer_state) = self.transfer_state.take() { - self.connect_state.finish_transfer(transfer_state)? - } + debug!("handling next context {:?}", next_context.uri); - if matches!(self.connect_state.active_context, ContextType::Default) { - let ctx = self.connect_state.context.as_ref(); - if matches!(ctx, Some(ctx) if ctx.tracks.is_empty()) { - self.connect_state.clear_next_tracks(true); - self.handle_next(None)?; + match self + .context_resolver + .apply_next_context(&mut self.connect_state, next_context) + { + Ok(remaining) => { + if let Some(remaining) = remaining { + self.context_resolver.add_list(remaining) + } } - } - - self.connect_state.fill_up_next_tracks()?; - self.connect_state.update_restrictions(); - self.connect_state.update_queue_revision(); - - self.preload_autoplay_when_required(); - - self.notify().await - } - - async fn resolve_context( - &mut self, - resolve_uri: &str, - context_uri: &str, - autoplay: bool, - update: bool, - ) -> Result<(), Error> { - if !autoplay { - let mut ctx = self.session.spclient().get_context(resolve_uri).await?; - - if update { - ctx.uri = Some(context_uri.to_string()); - ctx.url = Some(format!("context://{context_uri}")); - - self.connect_state - .update_context(ctx, UpdateContext::Default)? - } else if matches!(ctx.pages.first(), Some(p) if !p.tracks.is_empty()) { - debug!( - "update context from single page, context {:?} had {} pages", - ctx.uri, - ctx.pages.len() - ); - self.connect_state - .fill_context_from_page(ctx.pages.remove(0))?; - } else { - error!("resolving context should only update the tracks, but had no page, or track. {ctx:#?}"); - }; - - if let Err(why) = self.notify().await { - error!("failed to update connect state, after updating the context: {why}") + Err(why) => { + error!("{why}") } - - return Ok(()); - } - - if resolve_uri.contains("spotify:show:") || resolve_uri.contains("spotify:episode:") { - // autoplay is not supported for podcasts - Err(SpircError::NotAllowedContext(resolve_uri.to_string()))? } - let previous_tracks = self.connect_state.prev_autoplay_track_uris(); - - debug!( - "requesting autoplay context <{resolve_uri}> with {} previous tracks", - previous_tracks.len() - ); - - let ctx_request = AutoplayContextRequest { - context_uri: Some(resolve_uri.to_string()), - recent_track_uri: previous_tracks, - ..Default::default() - }; - - let context = self - .session - .spclient() - .get_autoplay_context(&ctx_request) - .await?; - - self.connect_state - .update_context(context, UpdateContext::Autoplay) - } - - fn add_resolve_context(&mut self, resolve: ResolveContext) { - let last_try = self - .unavailable_contexts - .get(&resolve) - .map(|i| i.duration_since(Instant::now())); - - let last_try = if matches!(last_try, Some(last_try) if last_try > RETRY_UNAVAILABLE) { - let _ = self.unavailable_contexts.remove(&resolve); - debug!( - "context was requested {}s ago, trying again to resolve the requested context", - last_try.expect("checked by condition").as_secs() - ); - None + let update_state = if self + .context_resolver + .try_finish(&mut self.connect_state, &mut self.transfer_state) + { + self.add_autoplay_resolving_when_required(); + true } else { - last_try + false }; - if last_try.is_none() { - debug!("add resolve request: {resolve}"); - self.resolve_context.push(resolve); - } else { - debug!("tried loading unavailable context: {resolve}") - } + self.context_resolver.remove_used_and_invalid(); + update_state } - // todo: time_delta still necessary? + // todo: is the time_delta still necessary? fn now_ms(&self) -> i64 { let dur = SystemTime::now() .duration_since(UNIX_EPOCH) @@ -612,96 +527,56 @@ impl SpircTask { } async fn handle_command(&mut self, cmd: SpircCommand) -> Result<(), Error> { - if matches!(cmd, SpircCommand::Shutdown) { - trace!("Received SpircCommand::Shutdown"); - self.handle_disconnect().await?; - self.shutdown = true; - if let Some(rx) = self.commands.as_mut() { - rx.close() - } - Ok(()) - } else if self.connect_state.is_active() { - trace!("Received SpircCommand::{:?}", cmd); - match cmd { - SpircCommand::Play => { - self.handle_play(); - self.notify().await - } - SpircCommand::PlayPause => { - self.handle_play_pause(); - self.notify().await - } - SpircCommand::Pause => { - self.handle_pause(); - self.notify().await - } - SpircCommand::Prev => { - self.handle_prev()?; - self.notify().await - } - SpircCommand::Next => { - self.handle_next(None)?; - self.notify().await - } - SpircCommand::VolumeUp => { - self.handle_volume_up(); - self.notify().await - } - SpircCommand::VolumeDown => { - self.handle_volume_down(); - self.notify().await - } - SpircCommand::Disconnect { pause } => { - if pause { - self.handle_pause() - } - self.handle_disconnect().await?; - self.notify().await - } - SpircCommand::Shuffle(shuffle) => { - self.connect_state.handle_shuffle(shuffle)?; - self.notify().await - } - SpircCommand::Repeat(repeat) => { - self.connect_state.set_repeat_context(repeat); - self.notify().await - } - SpircCommand::RepeatTrack(repeat) => { - self.connect_state.set_repeat_track(repeat); - self.notify().await - } - SpircCommand::SetPosition(position) => { - self.handle_seek(position); - self.notify().await - } - SpircCommand::SetVolume(volume) => { - self.set_volume(volume); - self.notify().await + trace!("Received SpircCommand::{:?}", cmd); + match cmd { + SpircCommand::Shutdown => { + trace!("Received SpircCommand::Shutdown"); + self.handle_disconnect().await?; + self.shutdown = true; + if let Some(rx) = self.commands.as_mut() { + rx.close() } - SpircCommand::Load(command) => { - self.handle_load(command, None).await?; - self.notify().await - } - _ => Ok(()), } - } else { - match cmd { - SpircCommand::Activate => { - trace!("Received SpircCommand::{:?}", cmd); - self.handle_activate(); - self.notify().await - } - _ => { - warn!("SpircCommand::{:?} will be ignored while Not Active", cmd); - Ok(()) + SpircCommand::Activate if !self.connect_state.is_active() => { + trace!("Received SpircCommand::{:?}", cmd); + self.handle_activate(); + return self.notify().await; + } + SpircCommand::Activate => warn!( + "SpircCommand::{:?} will be ignored while already active", + cmd + ), + _ if !self.connect_state.is_active() => { + warn!("SpircCommand::{:?} will be ignored while Not Active", cmd) + } + SpircCommand::Disconnect { pause } => { + if pause { + self.handle_pause() } + return self.handle_disconnect().await; } - } + SpircCommand::Play => self.handle_play(), + SpircCommand::PlayPause => self.handle_play_pause(), + SpircCommand::Pause => self.handle_pause(), + SpircCommand::Prev => self.handle_prev()?, + SpircCommand::Next => self.handle_next(None)?, + SpircCommand::VolumeUp => self.handle_volume_up(), + SpircCommand::VolumeDown => self.handle_volume_down(), + SpircCommand::Shuffle(shuffle) => self.connect_state.handle_shuffle(shuffle)?, + SpircCommand::Repeat(repeat) => self.connect_state.set_repeat_context(repeat), + SpircCommand::RepeatTrack(repeat) => self.connect_state.set_repeat_track(repeat), + SpircCommand::SetPosition(position) => self.handle_seek(position), + SpircCommand::SetVolume(volume) => self.set_volume(volume), + SpircCommand::Load(command) => self.handle_load(command, None).await?, + }; + + self.notify().await } - async fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { + fn handle_player_event(&mut self, event: PlayerEvent) -> Result<(), Error> { if let PlayerEvent::TrackChanged { audio_item } = event { self.connect_state.update_duration(audio_item.duration_ms); + self.update_state = true; return Ok(()); } @@ -710,122 +585,125 @@ impl SpircTask { self.play_request_id = Some(play_request_id); return Ok(()); } + + let is_current_track = matches! { + (event.get_play_request_id(), self.play_request_id), + (Some(event_id), Some(current_id)) if event_id == current_id + }; + // we only process events if the play_request_id matches. If it doesn't, it is // an event that belongs to a previous track and only arrives now due to a race // condition. In this case we have updated the state already and don't want to // mess with it. - if let Some(play_request_id) = event.get_play_request_id() { - if Some(play_request_id) == self.play_request_id { - match event { - PlayerEvent::EndOfTrack { .. } => self.handle_end_of_track().await, - PlayerEvent::Loading { .. } => { - match self.play_status { - SpircPlayStatus::LoadingPlay { position_ms } => { - self.connect_state - .update_position(position_ms, self.now_ms()); - trace!("==> kPlayStatusPlay"); - } - SpircPlayStatus::LoadingPause { position_ms } => { - self.connect_state - .update_position(position_ms, self.now_ms()); - trace!("==> kPlayStatusPause"); - } - _ => { - self.connect_state.update_position(0, self.now_ms()); - trace!("==> kPlayStatusLoading"); - } - } - self.notify().await - } - PlayerEvent::Playing { position_ms, .. } - | PlayerEvent::PositionCorrection { position_ms, .. } - | PlayerEvent::Seeked { position_ms, .. } => { - trace!("==> kPlayStatusPlay"); - let new_nominal_start_time = self.now_ms() - position_ms as i64; - match self.play_status { - SpircPlayStatus::Playing { - ref mut nominal_start_time, - .. - } => { - if (*nominal_start_time - new_nominal_start_time).abs() > 100 { - *nominal_start_time = new_nominal_start_time; - self.connect_state - .update_position(position_ms, self.now_ms()); - self.notify().await - } else { - Ok(()) - } - } - SpircPlayStatus::LoadingPlay { .. } - | SpircPlayStatus::LoadingPause { .. } => { - self.connect_state - .update_position(position_ms, self.now_ms()); - self.play_status = SpircPlayStatus::Playing { - nominal_start_time: new_nominal_start_time, - preloading_of_next_track_triggered: false, - }; - self.notify().await - } - _ => Ok(()), - } - } - PlayerEvent::Paused { - position_ms: new_position_ms, + if !is_current_track { + return Ok(()); + } + + match event { + PlayerEvent::EndOfTrack { .. } => { + let next_track = self + .connect_state + .repeat_track() + .then(|| self.connect_state.current_track(|t| t.uri.clone())); + + self.handle_next(next_track)? + } + PlayerEvent::Loading { .. } => match self.play_status { + SpircPlayStatus::LoadingPlay { position_ms } => { + self.connect_state + .update_position(position_ms, self.now_ms()); + trace!("==> LoadingPlay"); + } + SpircPlayStatus::LoadingPause { position_ms } => { + self.connect_state + .update_position(position_ms, self.now_ms()); + trace!("==> LoadingPause"); + } + _ => { + self.connect_state.update_position(0, self.now_ms()); + trace!("==> Loading"); + } + }, + PlayerEvent::Seeked { position_ms, .. } => { + trace!("==> Seeked"); + self.connect_state + .update_position(position_ms, self.now_ms()) + } + PlayerEvent::Playing { position_ms, .. } + | PlayerEvent::PositionCorrection { position_ms, .. } => { + trace!("==> Playing"); + let new_nominal_start_time = self.now_ms() - position_ms as i64; + match self.play_status { + SpircPlayStatus::Playing { + ref mut nominal_start_time, .. } => { - trace!("==> kPlayStatusPause"); - match self.play_status { - SpircPlayStatus::Paused { .. } | SpircPlayStatus::Playing { .. } => { - self.connect_state - .update_position(new_position_ms, self.now_ms()); - self.play_status = SpircPlayStatus::Paused { - position_ms: new_position_ms, - preloading_of_next_track_triggered: false, - }; - self.notify().await - } - SpircPlayStatus::LoadingPlay { .. } - | SpircPlayStatus::LoadingPause { .. } => { - self.connect_state - .update_position(new_position_ms, self.now_ms()); - self.play_status = SpircPlayStatus::Paused { - position_ms: new_position_ms, - preloading_of_next_track_triggered: false, - }; - self.notify().await - } - _ => Ok(()), + if (*nominal_start_time - new_nominal_start_time).abs() > 100 { + *nominal_start_time = new_nominal_start_time; + self.connect_state + .update_position(position_ms, self.now_ms()); + } else { + return Ok(()); } } - PlayerEvent::Stopped { .. } => { - trace!("==> kPlayStatusStop"); - match self.play_status { - SpircPlayStatus::Stopped => Ok(()), - _ => { - self.play_status = SpircPlayStatus::Stopped; - self.notify().await - } - } + SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { + self.connect_state + .update_position(position_ms, self.now_ms()); + self.play_status = SpircPlayStatus::Playing { + nominal_start_time: new_nominal_start_time, + preloading_of_next_track_triggered: false, + }; } - PlayerEvent::TimeToPreloadNextTrack { .. } => { - self.handle_preload_next_track(); - Ok(()) + _ => return Ok(()), + } + } + PlayerEvent::Paused { + position_ms: new_position_ms, + .. + } => { + trace!("==> Paused"); + match self.play_status { + SpircPlayStatus::Paused { .. } | SpircPlayStatus::Playing { .. } => { + self.connect_state + .update_position(new_position_ms, self.now_ms()); + self.play_status = SpircPlayStatus::Paused { + position_ms: new_position_ms, + preloading_of_next_track_triggered: false, + }; } - PlayerEvent::Unavailable { track_id, .. } => { - self.handle_unavailable(track_id)?; - if self.connect_state.current_track(|t| &t.uri) == &track_id.to_uri()? { - self.handle_next(None)?; - } - self.notify().await + SpircPlayStatus::LoadingPlay { .. } | SpircPlayStatus::LoadingPause { .. } => { + self.connect_state + .update_position(new_position_ms, self.now_ms()); + self.play_status = SpircPlayStatus::Paused { + position_ms: new_position_ms, + preloading_of_next_track_triggered: false, + }; } - _ => Ok(()), + _ => return Ok(()), } - } else { - Ok(()) } - } else { - Ok(()) + PlayerEvent::Stopped { .. } => { + trace!("==> Stopped"); + match self.play_status { + SpircPlayStatus::Stopped => return Ok(()), + _ => self.play_status = SpircPlayStatus::Stopped, + } + } + PlayerEvent::TimeToPreloadNextTrack { .. } => { + self.handle_preload_next_track(); + return Ok(()); + } + PlayerEvent::Unavailable { track_id, .. } => { + self.handle_unavailable(track_id)?; + if self.connect_state.current_track(|t| &t.uri) == &track_id.to_uri()? { + self.handle_next(None)? + } + } + _ => return Ok(()), } + + self.update_state = true; + Ok(()) } async fn handle_connection_id_update(&mut self, connection_id: String) -> Result<(), Error> { @@ -924,7 +802,7 @@ impl SpircTask { self.player .emit_auto_play_changed_event(matches!(new_value, "1")); - self.preload_autoplay_when_required() + self.add_autoplay_resolving_when_required() } } else { trace!( @@ -958,7 +836,7 @@ impl SpircTask { // fixme: workaround fix, because of missing information why it behaves like it does // background: when another device sends a connect-state update, some player's position de-syncs // tried: providing session_id, playback_id, track-metadata "track_player" - self.notify().await?; + self.update_state = true; } } else if self.connect_state.is_active() { self.connect_state.became_inactive(&self.session).await?; @@ -1008,16 +886,18 @@ impl SpircTask { update_context.context.uri, self.connect_state.context_uri() ) } else { - self.add_resolve_context(ResolveContext::from_context( + self.context_resolver.add(ResolveContext::from_context( update_context.context, - false, + super::state::context::UpdateContext::Default, + ContextAction::Replace, )) } return Ok(()); } // modification and update of the connect_state Transfer(transfer) => { - self.handle_transfer(transfer.data.expect("by condition checked"))? + self.handle_transfer(transfer.data.expect("by condition checked"))?; + return self.notify().await; } Play(play) => { let shuffle = play @@ -1025,19 +905,19 @@ impl SpircTask { .player_options_override .as_ref() .map(|o| o.shuffling_context.unwrap_or_default()) - .unwrap_or_else(|| self.connect_state.shuffling_context()); + .unwrap_or_default(); let repeat = play .options .player_options_override .as_ref() .map(|o| o.repeating_context.unwrap_or_default()) - .unwrap_or_else(|| self.connect_state.repeat_context()); + .unwrap_or_default(); let repeat_track = play .options .player_options_override .as_ref() .map(|o| o.repeating_track.unwrap_or_default()) - .unwrap_or_else(|| self.connect_state.repeat_track()); + .unwrap_or_default(); let context_uri = play .context @@ -1050,7 +930,7 @@ impl SpircTask { context_uri, start_playing: true, seek_to: play.options.seek_to.unwrap_or_default(), - playing_track: play.options.skip_to.unwrap_or_default().into(), + playing_track: play.options.skip_to.and_then(|s| s.try_into().ok()), shuffle, repeat, repeat_track, @@ -1094,7 +974,8 @@ impl SpircTask { Resume(_) => self.handle_play(), } - self.notify().await + self.update_state = true; + Ok(()) } fn handle_transfer(&mut self, mut transfer: TransferState) -> Result<(), Error> { @@ -1121,7 +1002,12 @@ impl SpircTask { let fallback = self.connect_state.current_track(|t| &t.uri).clone(); - self.add_resolve_context(ResolveContext::from_uri(ctx_uri.clone(), &fallback, false)); + self.context_resolver.add(ResolveContext::from_uri( + ctx_uri.clone(), + &fallback, + UpdateContext::Default, + ContextAction::Replace, + )); let timestamp = self.now_ms(); let state = &mut self.connect_state; @@ -1129,6 +1015,14 @@ impl SpircTask { state.set_active(true); state.handle_initial_transfer(&mut transfer); + // adjust active context, so resolve knows for which context it should set up the state + state.active_context = if autoplay { + ContextType::Autoplay + } else { + ContextType::Default + }; + + // update position if the track continued playing let transfer_timestamp = transfer.playback.timestamp.unwrap_or_default(); let position = match transfer.playback.position_as_of_timestamp { Some(position) if transfer.playback.is_paused.unwrap_or_default() => position.into(), @@ -1145,7 +1039,12 @@ impl SpircTask { if self.connect_state.current_track(|t| t.is_autoplay()) || autoplay { debug!("currently in autoplay context, async resolving autoplay for {ctx_uri}"); - self.add_resolve_context(ResolveContext::from_uri(ctx_uri, fallback, true)) + self.context_resolver.add(ResolveContext::from_uri( + ctx_uri, + fallback, + UpdateContext::Autoplay, + ContextAction::Replace, + )) } self.transfer_state = Some(transfer); @@ -1154,6 +1053,9 @@ impl SpircTask { } async fn handle_disconnect(&mut self) -> Result<(), Error> { + self.context_resolver.clear(); + + self.play_status = SpircPlayStatus::Stopped {}; self.connect_state .update_position_in_relation(self.now_ms()); self.notify().await?; @@ -1175,9 +1077,9 @@ impl SpircTask { fn handle_stop(&mut self) { self.player.stop(); self.connect_state.update_position(0, self.now_ms()); - self.connect_state.clear_next_tracks(true); + self.connect_state.clear_next_tracks(); - if let Err(why) = self.connect_state.fill_up_next_tracks() { + if let Err(why) = self.connect_state.reset_playback_to_position(None) { warn!("failed filling up next_track during stopping: {why}") } } @@ -1219,6 +1121,8 @@ impl SpircTask { self.connect_state .reset_context(ResetContext::WhenDifferent(&cmd.context_uri)); + self.connect_state.reset_options(); + if !self.connect_state.is_active() { self.handle_activate(); } @@ -1231,35 +1135,46 @@ impl SpircTask { } } else { &cmd.context_uri - } - .clone(); + }; if current_context_uri == &cmd.context_uri && fallback == cmd.context_uri { debug!("context <{current_context_uri}> didn't change, no resolving required") } else { debug!("resolving context for load command"); - self.resolve_context(&fallback, &cmd.context_uri, false, true) - .await?; + self.context_resolver.clear(); + self.context_resolver.add(ResolveContext::from_uri( + &cmd.context_uri, + fallback, + UpdateContext::Default, + ContextAction::Replace, + )); + let context = self.context_resolver.get_next_context(Vec::new).await; + self.handle_next_context(context); } // for play commands with skip by uid, the context of the command contains // tracks with uri and uid, so we merge the new context with the resolved/existing context self.connect_state.merge_context(context); - self.connect_state.clear_next_tracks(false); + + // load here, so that we clear the queue only after we definitely retrieved a new context + self.connect_state.clear_next_tracks(); self.connect_state.clear_restrictions(); debug!("play track <{:?}>", cmd.playing_track); let index = match cmd.playing_track { - PlayingTrack::Index(i) => i as usize, - PlayingTrack::Uri(uri) => { - let ctx = self.connect_state.context.as_ref(); - ConnectState::find_index_in_context(ctx, |t| t.uri == uri)? - } - PlayingTrack::Uid(uid) => { - let ctx = self.connect_state.context.as_ref(); - ConnectState::find_index_in_context(ctx, |t| t.uid == uid)? - } + None => None, + Some(playing_track) => Some(match playing_track { + PlayingTrack::Index(i) => i as usize, + PlayingTrack::Uri(uri) => { + let ctx = self.connect_state.get_context(ContextType::Default)?; + ConnectState::find_index_in_context(ctx, |t| t.uri == uri)? + } + PlayingTrack::Uid(uid) => { + let ctx = self.connect_state.get_context(ContextType::Default)?; + ConnectState::find_index_in_context(ctx, |t| t.uid == uid)? + } + }), }; debug!( @@ -1269,18 +1184,28 @@ impl SpircTask { self.connect_state.set_shuffle(cmd.shuffle); self.connect_state.set_repeat_context(cmd.repeat); + self.connect_state.set_repeat_track(cmd.repeat_track); if cmd.shuffle { - self.connect_state.set_current_track(index)?; - self.connect_state.shuffle()?; + if let Some(index) = index { + self.connect_state.set_current_track(index)?; + } else { + self.connect_state.set_current_track_random()?; + } + + if self.context_resolver.has_next() { + self.connect_state.update_queue_revision() + } else { + self.connect_state.shuffle()?; + self.add_autoplay_resolving_when_required(); + } } else { - // manually overwrite a possible current queued track - self.connect_state.set_current_track(index)?; - self.connect_state.reset_playback_to_position(Some(index))?; + self.connect_state + .set_current_track(index.unwrap_or_default())?; + self.connect_state.reset_playback_to_position(index)?; + self.add_autoplay_resolving_when_required(); } - self.connect_state.set_repeat_track(cmd.repeat_track); - if self.connect_state.current_track(MessageField::is_some) { self.load_track(cmd.start_playing, cmd.seek_to)?; } else { @@ -1288,8 +1213,6 @@ impl SpircTask { self.handle_stop() } - self.preload_autoplay_when_required(); - Ok(()) } @@ -1408,47 +1331,41 @@ impl SpircTask { Ok(()) } - fn preload_autoplay_when_required(&mut self) { + fn add_autoplay_resolving_when_required(&mut self) { let require_load_new = !self .connect_state - .has_next_tracks(Some(CONTEXT_FETCH_THRESHOLD)); + .has_next_tracks(Some(CONTEXT_FETCH_THRESHOLD)) + && self.session.autoplay(); if !require_load_new { return; } - match self.connect_state.try_load_next_context() { - Err(why) => error!("failed loading next context: {why}"), - Ok(next) => { - match next { - LoadNext::Done => info!("loaded next context"), - LoadNext::PageUrl(page_url) => { - self.add_resolve_context(ResolveContext::from_page_url(page_url)) - } - LoadNext::Empty if self.session.autoplay() => { - let current_context = self.connect_state.context_uri(); - let fallback = self.connect_state.current_track(|t| &t.uri); - let resolve = ResolveContext::from_uri(current_context, fallback, true); + let current_context = self.connect_state.context_uri(); + let fallback = self.connect_state.current_track(|t| &t.uri); - self.add_resolve_context(resolve) - } - LoadNext::Empty => { - debug!("next context is empty and autoplay isn't enabled, no preloading required") - } - } - } - } - } + let has_tracks = self + .connect_state + .get_context(ContextType::Autoplay) + .map(|c| !c.tracks.is_empty()) + .unwrap_or_default(); + + let resolve = ResolveContext::from_uri( + current_context, + fallback, + UpdateContext::Autoplay, + if has_tracks { + ContextAction::Append + } else { + ContextAction::Replace + }, + ); - fn is_playing(&self) -> bool { - matches!( - self.play_status, - SpircPlayStatus::Playing { .. } | SpircPlayStatus::LoadingPlay { .. } - ) + self.context_resolver.add(resolve); } fn handle_next(&mut self, track_uri: Option) -> Result<(), Error> { - let continue_playing = self.is_playing(); + let continue_playing = self.connect_state.is_playing(); let current_uri = self.connect_state.current_track(|t| &t.uri); let mut has_next_track = @@ -1467,13 +1384,11 @@ impl SpircTask { }; }; - self.preload_autoplay_when_required(); - if has_next_track { + self.add_autoplay_resolving_when_required(); self.load_track(continue_playing, 0) } else { info!("Not playing next track because there are no more tracks left in queue."); - self.connect_state.reset_playback_to_position(None)?; self.handle_stop(); Ok(()) } @@ -1491,7 +1406,7 @@ impl SpircTask { self.connect_state.reset_playback_to_position(None)?; self.handle_stop() } - Some(_) => self.load_track(self.is_playing(), 0)?, + Some(_) => self.load_track(self.connect_state.is_playing(), 0)?, } } else { self.handle_seek(0); @@ -1512,16 +1427,6 @@ impl SpircTask { self.set_volume(volume); } - async fn handle_end_of_track(&mut self) -> Result<(), Error> { - let next_track = self - .connect_state - .repeat_track() - .then(|| self.connect_state.current_track(|t| t.uri.clone())); - - self.handle_next(next_track)?; - self.notify().await - } - fn handle_playlist_modification( &mut self, playlist_modification_info: PlaylistModificationInfo, @@ -1537,10 +1442,11 @@ impl SpircTask { } debug!("playlist modification for current context: {uri}"); - self.add_resolve_context(ResolveContext::from_uri( + self.context_resolver.add(ResolveContext::from_uri( uri, self.connect_state.current_track(|t| &t.uri), - false, + UpdateContext::Default, + ContextAction::Replace, )); Ok(()) @@ -1619,7 +1525,7 @@ impl SpircTask { async fn notify(&mut self) -> Result<(), Error> { self.connect_state.set_status(&self.play_status); - if self.is_playing() { + if self.connect_state.is_playing() { self.connect_state .update_position_in_relation(self.now_ms()); } diff --git a/connect/src/state.rs b/connect/src/state.rs index eff83bacc..c1fb73ab9 100644 --- a/connect/src/state.rs +++ b/connect/src/state.rs @@ -15,7 +15,6 @@ use crate::{ }, protocol::{ connect::{Capabilities, Device, DeviceInfo, MemberType, PutStateReason, PutStateRequest}, - context_page::ContextPage, player::{ ContextIndex, ContextPlayerOptions, PlayOrigin, PlayerState, ProvidedTrack, Suppressions, @@ -105,19 +104,17 @@ pub struct ConnectState { unavailable_uri: Vec, - pub active_since: Option, + active_since: Option, queue_count: u64, // separation is necessary because we could have already loaded // the autoplay context but are still playing from the default context /// to update the active context use [switch_active_context](ConnectState::set_active_context) pub active_context: ContextType, - pub fill_up_context: ContextType, + fill_up_context: ContextType, /// the context from which we play, is used to top up prev and next tracks - pub context: Option, - /// upcoming contexts, directly provided by the context-resolver - next_contexts: Vec, + context: Option, /// a context to keep track of our shuffled context, /// should be only available when `player.option.shuffling_context` is true @@ -240,6 +237,22 @@ impl ConnectState { self.request.is_active } + /// Returns the `is_playing` value as perceived by other connect devices + /// + /// see [ConnectState::set_status] + pub fn is_playing(&self) -> bool { + let player = self.player(); + player.is_playing && !player.is_paused + } + + /// Returns the `is_paused` state value as perceived by other connect devices + /// + /// see [ConnectState::set_status] + pub fn is_pause(&self) -> bool { + let player = self.player(); + player.is_playing && player.is_paused && player.is_buffering + } + pub fn set_volume(&mut self, volume: u32) { self.device_mut() .device_info @@ -297,6 +310,12 @@ impl ConnectState { | SpircPlayStatus::Stopped ); + if player.is_paused { + player.playback_speed = 0.; + } else { + player.playback_speed = 1.; + } + // desktop and mobile require all 'states' set to true, when we are paused, // otherwise the play button (desktop) is grayed out or the preview (mobile) can't be opened player.is_buffering = player.is_paused @@ -349,9 +368,15 @@ impl ConnectState { } pub fn reset_playback_to_position(&mut self, new_index: Option) -> Result<(), Error> { + debug!( + "reset_playback with active ctx <{:?}> fill_up ctx <{:?}>", + self.active_context, self.fill_up_context + ); + let new_index = new_index.unwrap_or(0); self.update_current_index(|i| i.track = new_index as u32); self.update_context_index(self.active_context, new_index + 1)?; + self.fill_up_context = self.active_context; if !self.current_track(|t| t.is_queue()) { self.set_current_track(new_index)?; @@ -360,7 +385,7 @@ impl ConnectState { self.clear_prev_track(); if new_index > 0 { - let context = self.get_context(&self.active_context)?; + let context = self.get_context(self.active_context)?; let before_new_track = context.tracks.len() - new_index; self.player_mut().prev_tracks = context @@ -375,7 +400,7 @@ impl ConnectState { debug!("has {} prev tracks", self.prev_tracks().len()) } - self.clear_next_tracks(true); + self.clear_next_tracks(); self.fill_up_next_tracks()?; self.update_restrictions(); diff --git a/connect/src/state/context.rs b/connect/src/state/context.rs index 11827cc53..93c44923e 100644 --- a/connect/src/state/context.rs +++ b/connect/src/state/context.rs @@ -7,10 +7,15 @@ use crate::{ player::{ContextIndex, ProvidedTrack}, restrictions::Restrictions, }, - state::{metadata::Metadata, provider::Provider, ConnectState, StateError}, + state::{ + metadata::Metadata, + provider::{IsProvider, Provider}, + ConnectState, StateError, SPOTIFY_MAX_NEXT_TRACKS_SIZE, + }, }; use protobuf::MessageField; use std::collections::HashMap; +use std::ops::Deref; use uuid::Uuid; const LOCAL_FILES_IDENTIFIER: &str = "spotify:local-files"; @@ -25,7 +30,7 @@ pub struct StateContext { pub index: ContextIndex, } -#[derive(Default, Debug, Copy, Clone)] +#[derive(Default, Debug, Copy, Clone, PartialEq)] pub enum ContextType { #[default] Default, @@ -33,57 +38,81 @@ pub enum ContextType { Autoplay, } -pub enum LoadNext { - Done, - PageUrl(String), - Empty, -} - -#[derive(Debug)] +#[derive(Debug, Hash, Copy, Clone, PartialEq, Eq)] pub enum UpdateContext { Default, Autoplay, } +impl Deref for UpdateContext { + type Target = ContextType; + + fn deref(&self) -> &Self::Target { + match self { + UpdateContext::Default => &ContextType::Default, + UpdateContext::Autoplay => &ContextType::Autoplay, + } + } +} + pub enum ResetContext<'s> { Completely, DefaultIndex, WhenDifferent(&'s str), } +/// Extracts the spotify uri from a given page_url +/// +/// Just extracts "spotify/album/5LFzwirfFwBKXJQGfwmiMY" and replaces the slash's with colon's +/// +/// Expected `page_url` should look something like the following: +/// `hm://artistplaycontext/v1/page/spotify/album/5LFzwirfFwBKXJQGfwmiMY/km_artist` +fn page_url_to_uri(page_url: &str) -> String { + let split = if let Some(rest) = page_url.strip_prefix("hm://") { + rest.split('/') + } else { + warn!("page_url didn't start with hm://. got page_url: {page_url}"); + page_url.split('/') + }; + + split + .skip_while(|s| s != &"spotify") + .take(3) + .collect::>() + .join(":") +} + impl ConnectState { pub fn find_index_in_context bool>( - context: Option<&StateContext>, + ctx: &StateContext, f: F, ) -> Result { - let ctx = context - .as_ref() - .ok_or(StateError::NoContext(ContextType::Default))?; - ctx.tracks .iter() .position(f) .ok_or(StateError::CanNotFindTrackInContext(None, ctx.tracks.len())) } - pub(super) fn get_context(&self, ty: &ContextType) -> Result<&StateContext, StateError> { + pub fn get_context(&self, ty: ContextType) -> Result<&StateContext, StateError> { match ty { ContextType::Default => self.context.as_ref(), ContextType::Shuffle => self.shuffle_context.as_ref(), ContextType::Autoplay => self.autoplay_context.as_ref(), } - .ok_or(StateError::NoContext(*ty)) + .ok_or(StateError::NoContext(ty)) } pub fn context_uri(&self) -> &String { &self.player().context_uri } - pub fn reset_context(&mut self, mut reset_as: ResetContext) { - self.set_active_context(ContextType::Default); - self.fill_up_context = ContextType::Default; + fn different_context_uri(&self, uri: &str) -> bool { + // search identifier is always different + self.context_uri() != uri || uri.starts_with(SEARCH_IDENTIFIER) + } - if matches!(reset_as, ResetContext::WhenDifferent(ctx) if self.context_uri() != ctx) { + pub fn reset_context(&mut self, mut reset_as: ResetContext) { + if matches!(reset_as, ResetContext::WhenDifferent(ctx) if self.different_context_uri(ctx)) { reset_as = ResetContext::Completely } self.shuffle_context = None; @@ -92,7 +121,6 @@ impl ConnectState { ResetContext::Completely => { self.context = None; self.autoplay_context = None; - self.next_contexts.clear(); } ResetContext::WhenDifferent(_) => debug!("context didn't change, no reset"), ResetContext::DefaultIndex => { @@ -106,28 +134,40 @@ impl ConnectState { } } + self.fill_up_context = ContextType::Default; + self.set_active_context(ContextType::Default); self.update_restrictions() } - pub fn get_context_uri_from_context(context: &Context) -> Option<&String> { - let context_uri = context.uri.as_ref()?; - - if !context_uri.starts_with(SEARCH_IDENTIFIER) { - return Some(context_uri); + pub fn valid_resolve_uri(uri: &str) -> Option<&str> { + if uri.is_empty() || uri.starts_with(SEARCH_IDENTIFIER) { + None + } else { + Some(uri) } + } - context - .pages - .first() - .and_then(|p| p.tracks.first().and_then(|t| t.uri.as_ref())) + pub fn get_context_uri_from_context(context: &Context) -> Option<&str> { + let uri = context.uri.as_deref().unwrap_or_default(); + Self::valid_resolve_uri(uri).or_else(|| { + context + .pages + .first() + .and_then(|p| p.tracks.first().and_then(|t| t.uri.as_deref())) + }) } pub fn set_active_context(&mut self, new_context: ContextType) { self.active_context = new_context; - let ctx = match self.get_context(&new_context) { + let player = self.player_mut(); + + player.context_metadata = Default::default(); + player.restrictions = Some(Default::default()).into(); + + let ctx = match self.get_context(new_context) { Err(why) => { - debug!("couldn't load context info because: {why}"); + warn!("couldn't load context info because: {why}"); return; } Ok(ctx) => ctx, @@ -138,9 +178,6 @@ impl ConnectState { let player = self.player_mut(); - player.context_metadata.clear(); - player.restrictions.clear(); - if let Some(restrictions) = restrictions.take() { player.restrictions = MessageField::some(restrictions.into()); } @@ -150,24 +187,25 @@ impl ConnectState { } } - pub fn update_context(&mut self, mut context: Context, ty: UpdateContext) -> Result<(), Error> { + pub fn update_context( + &mut self, + mut context: Context, + ty: UpdateContext, + ) -> Result>, Error> { if context.pages.iter().all(|p| p.tracks.is_empty()) { error!("context didn't have any tracks: {context:#?}"); - return Err(StateError::ContextHasNoTracks.into()); + Err(StateError::ContextHasNoTracks)?; } else if matches!(context.uri, Some(ref uri) if uri.starts_with(LOCAL_FILES_IDENTIFIER)) { - return Err(StateError::UnsupportedLocalPlayBack.into()); - } - - if matches!(ty, UpdateContext::Default) { - self.next_contexts.clear(); + Err(StateError::UnsupportedLocalPlayBack)?; } + let mut next_contexts = Vec::new(); let mut first_page = None; for page in context.pages { if first_page.is_none() && !page.tracks.is_empty() { first_page = Some(page); } else { - self.next_contexts.push(page) + next_contexts.push(page) } } @@ -176,17 +214,8 @@ impl ConnectState { Some(p) => p, }; - let prev_context = match ty { - UpdateContext::Default => self.context.as_ref(), - UpdateContext::Autoplay => self.autoplay_context.as_ref(), - }; - debug!( - "updated context {ty:?} from <{:?}> ({} tracks) to <{:?}> ({} tracks)", - self.context_uri(), - prev_context - .map(|c| c.tracks.len().to_string()) - .unwrap_or_else(|| "-".to_string()), + "updated context {ty:?} to <{:?}> ({} tracks)", context.uri, page.tracks.len() ); @@ -195,32 +224,32 @@ impl ConnectState { UpdateContext::Default => { let mut new_context = self.state_context_from_page( page, + context.metadata, context.restrictions.take(), context.uri.as_deref(), None, ); // when we update the same context, we should try to preserve the previous position - // otherwise we might load the entire context twice - if !self.context_uri().contains(SEARCH_IDENTIFIER) + // otherwise we might load the entire context twice, unless it's the search context + if !self.context_uri().starts_with(SEARCH_IDENTIFIER) && matches!(context.uri, Some(ref uri) if uri == self.context_uri()) { - match Self::find_index_in_context(Some(&new_context), |t| { - self.current_track(|t| &t.uri) == &t.uri - }) { - Ok(new_pos) => { - debug!("found new index of current track, updating new_context index to {new_pos}"); - new_context.index.track = (new_pos + 1) as u32; + if let Some(new_index) = self.find_last_index_in_new_context(&new_context) { + new_context.index.track = match new_index { + Ok(i) => i, + Err(i) => { + self.player_mut().index = MessageField::none(); + i + } + }; + + // enforce reloading the context + if let Some(autoplay_ctx) = self.autoplay_context.as_mut() { + autoplay_ctx.index.track = 0 } - // the track isn't anymore in the context - Err(_) if matches!(self.active_context, ContextType::Default) => { - warn!("current track was removed, setting pos to last known index"); - new_context.index.track = self.player().index.track - } - Err(_) => {} + self.clear_next_tracks(); } - // enforce reloading the context - self.clear_next_tracks(true); } self.context = Some(new_context); @@ -235,6 +264,7 @@ impl ConnectState { UpdateContext::Autoplay => { self.autoplay_context = Some(self.state_context_from_page( page, + context.metadata, context.restrictions.take(), context.uri.as_deref(), Some(Provider::Autoplay), @@ -242,12 +272,81 @@ impl ConnectState { } } - Ok(()) + if next_contexts.is_empty() { + return Ok(None); + } + + // load remaining contexts + let next_contexts = next_contexts + .into_iter() + .flat_map(|page| { + if !page.tracks.is_empty() { + self.fill_context_from_page(page).ok()?; + None + } else if matches!(page.page_url, Some(ref url) if !url.is_empty()) { + Some(page_url_to_uri( + &page.page_url.expect("checked by precondition"), + )) + } else { + warn!("unhandled context page: {page:#?}"); + None + } + }) + .collect(); + + Ok(Some(next_contexts)) + } + + fn find_first_prev_track_index(&self, ctx: &StateContext) -> Option { + let prev_tracks = self.prev_tracks(); + for i in (0..prev_tracks.len()).rev() { + let prev_track = prev_tracks.get(i)?; + if let Ok(idx) = Self::find_index_in_context(ctx, |t| prev_track.uri == t.uri) { + return Some(idx); + } + } + None + } + + fn find_last_index_in_new_context( + &self, + new_context: &StateContext, + ) -> Option> { + let ctx = self.context.as_ref()?; + + let is_queued_item = self.current_track(|t| t.is_queue() || t.is_from_queue()); + + let new_index = if ctx.index.track as usize >= SPOTIFY_MAX_NEXT_TRACKS_SIZE { + Some(ctx.index.track as usize - SPOTIFY_MAX_NEXT_TRACKS_SIZE) + } else if is_queued_item { + self.find_first_prev_track_index(new_context) + } else { + Self::find_index_in_context(new_context, |current| { + self.current_track(|t| t.uri == current.uri) + }) + .ok() + } + .map(|i| i as u32 + 1); + + Some(new_index.ok_or_else(|| { + info!( + "couldn't distinguish index from current or previous tracks in the updated context" + ); + let fallback_index = self + .player() + .index + .as_ref() + .map(|i| i.track) + .unwrap_or_default(); + info!("falling back to index {fallback_index}"); + fallback_index + })) } fn state_context_from_page( &mut self, page: ContextPage, + metadata: HashMap, restrictions: Option, new_context_uri: Option<&str>, provider: Option, @@ -258,8 +357,12 @@ impl ConnectState { .tracks .iter() .flat_map(|track| { - match self.context_to_provided_track(track, Some(new_context_uri), provider.clone()) - { + match self.context_to_provided_track( + track, + Some(new_context_uri), + Some(&page.metadata), + provider.clone(), + ) { Ok(t) => Some(t), Err(why) => { error!("couldn't convert {track:#?} into ProvidedTrack: {why}"); @@ -272,7 +375,7 @@ impl ConnectState { StateContext { tracks, restrictions, - metadata: page.metadata, + metadata, index: ContextIndex::new(), } } @@ -293,12 +396,11 @@ impl ConnectState { let new_track_uri = new_track.uri.unwrap_or_default(); if let Ok(position) = - Self::find_index_in_context(Some(current_context), |t| t.uri == new_track_uri) + Self::find_index_in_context(current_context, |t| t.uri == new_track_uri) { let context_track = current_context.tracks.get_mut(position)?; for (key, value) in new_track.metadata { - warn!("merging metadata {key} {value}"); context_track.metadata.insert(key, value); } @@ -334,10 +436,10 @@ impl ConnectState { &self, ctx_track: &ContextTrack, context_uri: Option<&str>, + page_metadata: Option<&HashMap>, provider: Option, ) -> Result { let id = match (ctx_track.uri.as_ref(), ctx_track.gid.as_ref()) { - (None, None) => Err(StateError::InvalidTrackUri(None))?, (Some(uri), _) if uri.contains(['?', '%']) => { Err(StateError::InvalidTrackUri(Some(uri.clone())))? } @@ -363,7 +465,7 @@ impl ConnectState { _ => Uuid::new_v4().as_simple().to_string(), }; - let mut metadata = HashMap::new(); + let mut metadata = page_metadata.cloned().unwrap_or_default(); for (k, v) in &ctx_track.metadata { metadata.insert(k.to_string(), v.to_string()); } @@ -389,7 +491,7 @@ impl ConnectState { } pub fn fill_context_from_page(&mut self, page: ContextPage) -> Result<(), Error> { - let context = self.state_context_from_page(page, None, None, None); + let context = self.state_context_from_page(page, HashMap::new(), None, None, None); let ctx = self .context .as_mut() @@ -401,26 +503,4 @@ impl ConnectState { Ok(()) } - - pub fn try_load_next_context(&mut self) -> Result { - let next = match self.next_contexts.first() { - None => return Ok(LoadNext::Empty), - Some(_) => self.next_contexts.remove(0), - }; - - if next.tracks.is_empty() { - let next_page_url = match next.page_url { - Some(page_url) if !page_url.is_empty() => page_url, - _ => Err(StateError::NoContext(ContextType::Default))?, - }; - - self.update_current_index(|i| i.page += 1); - return Ok(LoadNext::PageUrl(next_page_url)); - } - - self.fill_context_from_page(next)?; - self.fill_up_next_tracks()?; - - Ok(LoadNext::Done) - } } diff --git a/connect/src/state/handle.rs b/connect/src/state/handle.rs index a69e1ebe1..1c1a4b325 100644 --- a/connect/src/state/handle.rs +++ b/connect/src/state/handle.rs @@ -1,5 +1,10 @@ -use crate::state::{context::ResetContext, ConnectState}; -use librespot_core::{dealer::protocol::SetQueueCommand, Error}; +use crate::{ + core::{dealer::protocol::SetQueueCommand, Error}, + state::{ + context::{ContextType, ResetContext}, + ConnectState, + }, +}; use protobuf::MessageField; impl ConnectState { @@ -16,7 +21,7 @@ impl ConnectState { return Ok(()); } - let ctx = self.context.as_ref(); + let ctx = self.get_context(ContextType::Default)?; let current_index = ConnectState::find_index_in_context(ctx, |c| self.current_track(|t| c.uri == t.uri))?; @@ -52,7 +57,7 @@ impl ConnectState { self.set_shuffle(false); self.reset_context(ResetContext::DefaultIndex); - let ctx = self.context.as_ref(); + let ctx = self.get_context(ContextType::Default)?; let current_track = ConnectState::find_index_in_context(ctx, |t| { self.current_track(|t| &t.uri) == &t.uri })?; diff --git a/connect/src/state/options.rs b/connect/src/state/options.rs index b9c2c5766..97484f675 100644 --- a/connect/src/state/options.rs +++ b/connect/src/state/options.rs @@ -33,6 +33,12 @@ impl ConnectState { } } + pub fn reset_options(&mut self) { + self.set_shuffle(false); + self.set_repeat_track(false); + self.set_repeat_context(false); + } + pub fn shuffle(&mut self) -> Result<(), Error> { if let Some(reason) = self .player() @@ -47,16 +53,12 @@ impl ConnectState { } self.clear_prev_track(); - self.clear_next_tracks(true); + self.clear_next_tracks(); let current_uri = self.current_track(|t| &t.uri); - let ctx = self - .context - .as_ref() - .ok_or(StateError::NoContext(ContextType::Default))?; - - let current_track = Self::find_index_in_context(Some(ctx), |t| &t.uri == current_uri)?; + let ctx = self.get_context(ContextType::Default)?; + let current_track = Self::find_index_in_context(ctx, |t| &t.uri == current_uri)?; let mut shuffle_context = ctx.clone(); // we don't need to include the current track, because it is already being played diff --git a/connect/src/state/restrictions.rs b/connect/src/state/restrictions.rs index a0f269331..03495c680 100644 --- a/connect/src/state/restrictions.rs +++ b/connect/src/state/restrictions.rs @@ -17,14 +17,18 @@ impl ConnectState { const ENDLESS_CONTEXT: &str = "endless_context"; let prev_tracks_is_empty = self.prev_tracks().is_empty(); + + let is_paused = self.is_pause(); + let is_playing = self.is_playing(); + let player = self.player_mut(); if let Some(restrictions) = player.restrictions.as_mut() { - if player.is_playing { + if is_playing { restrictions.disallow_pausing_reasons.clear(); restrictions.disallow_resuming_reasons = vec!["not_paused".to_string()] } - if player.is_paused { + if is_paused { restrictions.disallow_resuming_reasons.clear(); restrictions.disallow_pausing_reasons = vec!["not_playing".to_string()] } diff --git a/connect/src/state/tracks.rs b/connect/src/state/tracks.rs index 2dc1b9af4..14e3abcc2 100644 --- a/connect/src/state/tracks.rs +++ b/connect/src/state/tracks.rs @@ -1,12 +1,15 @@ -use crate::state::{ - context::ContextType, - metadata::Metadata, - provider::{IsProvider, Provider}, - ConnectState, StateError, SPOTIFY_MAX_NEXT_TRACKS_SIZE, SPOTIFY_MAX_PREV_TRACKS_SIZE, +use crate::{ + core::{Error, SpotifyId}, + protocol::player::ProvidedTrack, + state::{ + context::ContextType, + metadata::Metadata, + provider::{IsProvider, Provider}, + ConnectState, StateError, SPOTIFY_MAX_NEXT_TRACKS_SIZE, SPOTIFY_MAX_PREV_TRACKS_SIZE, + }, }; -use librespot_core::{Error, SpotifyId}; -use librespot_protocol::player::ProvidedTrack; use protobuf::MessageField; +use rand::Rng; // identifier used as part of the uid pub const IDENTIFIER_DELIMITER: &str = "delimiter"; @@ -64,8 +67,14 @@ impl<'ct> ConnectState { &self.player().next_tracks } + pub fn set_current_track_random(&mut self) -> Result<(), Error> { + let max_tracks = self.get_context(self.active_context)?.tracks.len(); + let rng_track = rand::thread_rng().gen_range(0..max_tracks); + self.set_current_track(rng_track) + } + pub fn set_current_track(&mut self, index: usize) -> Result<(), Error> { - let context = self.get_context(&self.active_context)?; + let context = self.get_context(self.active_context)?; let new_track = context .tracks @@ -77,8 +86,8 @@ impl<'ct> ConnectState { debug!( "set track to: {} at {} of {} tracks", - index, new_track.uri, + index, context.tracks.len() ); @@ -132,7 +141,7 @@ impl<'ct> ConnectState { self.set_active_context(ContextType::Autoplay); None } else { - let ctx = self.context.as_ref(); + let ctx = self.get_context(ContextType::Default)?; let new_index = Self::find_index_in_context(ctx, |c| c.uri == new_track.uri); match new_index { Ok(new_index) => Some(new_index as u32), @@ -251,12 +260,7 @@ impl<'ct> ConnectState { self.prev_tracks_mut().clear() } - pub fn clear_next_tracks(&mut self, keep_queued: bool) { - if !keep_queued { - self.next_tracks_mut().clear(); - return; - } - + pub fn clear_next_tracks(&mut self) { // respect queued track and don't throw them out of our next played tracks let first_non_queued_track = self .next_tracks() @@ -271,13 +275,13 @@ impl<'ct> ConnectState { } } - pub fn fill_up_next_tracks(&mut self) -> Result<(), StateError> { - let ctx = self.get_context(&self.fill_up_context)?; + pub fn fill_up_next_tracks(&mut self) -> Result<(), Error> { + let ctx = self.get_context(self.fill_up_context)?; let mut new_index = ctx.index.track as usize; let mut iteration = ctx.index.page; while self.next_tracks().len() < SPOTIFY_MAX_NEXT_TRACKS_SIZE { - let ctx = self.get_context(&self.fill_up_context)?; + let ctx = self.get_context(self.fill_up_context)?; let track = match ctx.tracks.get(new_index) { None if self.repeat_context() => { let delimiter = Self::new_delimiter(iteration.into()); @@ -292,14 +296,14 @@ impl<'ct> ConnectState { // transition to autoplay as fill up context self.fill_up_context = ContextType::Autoplay; - new_index = self.get_context(&ContextType::Autoplay)?.index.track as usize; + new_index = self.get_context(ContextType::Autoplay)?.index.track as usize; // add delimiter to only display the current context Self::new_delimiter(iteration.into()) } None if self.autoplay_context.is_some() => { match self - .get_context(&ContextType::Autoplay)? + .get_context(ContextType::Autoplay)? .tracks .get(new_index) { @@ -324,6 +328,11 @@ impl<'ct> ConnectState { self.next_tracks_mut().push(track); } + debug!( + "finished filling up next_tracks ({})", + self.next_tracks().len() + ); + self.update_context_index(self.fill_up_context, new_index)?; // the web-player needs a revision update, otherwise the queue isn't updated in the ui @@ -350,17 +359,14 @@ impl<'ct> ConnectState { } } - pub fn prev_autoplay_track_uris(&self) -> Vec { + pub fn recent_track_uris(&self) -> Vec { let mut prev = self .prev_tracks() .iter() - .flat_map(|t| t.is_autoplay().then_some(t.uri.clone())) + .map(|t| t.uri.clone()) .collect::>(); - if self.current_track(|t| t.is_autoplay()) { - prev.push(self.current_track(|t| t.uri.clone())); - } - + prev.push(self.current_track(|t| t.uri.clone())); prev } diff --git a/connect/src/state/transfer.rs b/connect/src/state/transfer.rs index 53d420a12..7404bf550 100644 --- a/connect/src/state/transfer.rs +++ b/connect/src/state/transfer.rs @@ -25,10 +25,12 @@ impl ConnectState { self.context_to_provided_track( track, transfer.current_session.context.uri.as_deref(), + None, transfer .queue .is_playing_queue - .and_then(|b| b.then_some(Provider::Queue)), + .unwrap_or_default() + .then_some(Provider::Queue), ) } @@ -72,7 +74,8 @@ impl ConnectState { } self.clear_prev_track(); - self.clear_next_tracks(false); + self.clear_next_tracks(); + self.update_queue_revision() } /// completes the transfer, loading the queue and updating metadata @@ -91,7 +94,7 @@ impl ConnectState { self.set_active_context(context_ty); self.fill_up_context = context_ty; - let ctx = self.get_context(&self.active_context).ok(); + let ctx = self.get_context(self.active_context)?; let current_index = match transfer.current_session.current_uid.as_ref() { Some(uid) if track.is_queue() => Self::find_index_in_context(ctx, |c| &c.uid == uid) @@ -103,7 +106,7 @@ impl ConnectState { "active track is <{}> with index {current_index:?} in {:?} context, has {} tracks", track.uri, self.active_context, - ctx.map(|c| c.tracks.len()).unwrap_or_default() + ctx.tracks.len() ); if self.player().track.is_none() { @@ -130,6 +133,7 @@ impl ConnectState { if let Ok(queued_track) = self.context_to_provided_track( track, Some(self.context_uri()), + None, Some(Provider::Queue), ) { self.add_to_queue(queued_track, false); diff --git a/core/src/dealer/protocol.rs b/core/src/dealer/protocol.rs index e6b7f2dc3..c774a1194 100644 --- a/core/src/dealer/protocol.rs +++ b/core/src/dealer/protocol.rs @@ -174,7 +174,9 @@ fn handle_transfer_encoding( ) -> Result, Error> { let encoding = headers.get("Transfer-Encoding").map(String::as_str); if let Some(encoding) = encoding { - trace!("message was send with {encoding} encoding "); + trace!("message was sent with {encoding} encoding "); + } else { + trace!("message was sent with no encoding "); } if !matches!(encoding, Some("gzip")) { diff --git a/core/src/spclient.rs b/core/src/spclient.rs index 42213a574..a66804636 100644 --- a/core/src/spclient.rs +++ b/core/src/spclient.rs @@ -813,7 +813,7 @@ impl SpClient { /// **will** contain the query /// - artists /// - returns 2 pages with tracks: 10 most popular tracks and latest/popular album - /// - remaining pages are albums of the artists and are only provided as page_url + /// - remaining pages are artist albums sorted by popularity (only provided as page_url) /// - search /// - is massively influenced by the provided query /// - the query result shown by the search expects no query at all diff --git a/examples/play_connect.rs b/examples/play_connect.rs index 9a033da23..60cf631fb 100644 --- a/examples/play_connect.rs +++ b/examples/play_connect.rs @@ -84,7 +84,7 @@ async fn main() { repeat: false, repeat_track: false, // the index specifies which track in the context starts playing, in this case the first in the album - playing_track: PlayingTrack::Index(0), + playing_track: PlayingTrack::Index(0).into(), }) .unwrap(); }); diff --git a/protocol/src/impl_trait.rs b/protocol/src/impl_trait.rs new file mode 100644 index 000000000..c936a5f0b --- /dev/null +++ b/protocol/src/impl_trait.rs @@ -0,0 +1,2 @@ +mod context; +mod player; diff --git a/protocol/src/impl_trait/context.rs b/protocol/src/impl_trait/context.rs new file mode 100644 index 000000000..875ef9ad2 --- /dev/null +++ b/protocol/src/impl_trait/context.rs @@ -0,0 +1,13 @@ +use crate::context::Context; +use protobuf::Message; +use std::hash::{Hash, Hasher}; + +impl Hash for Context { + fn hash(&self, state: &mut H) { + if let Ok(ctx) = self.write_to_bytes() { + ctx.hash(state) + } + } +} + +impl Eq for Context {} diff --git a/protocol/src/conversion.rs b/protocol/src/impl_trait/player.rs similarity index 100% rename from protocol/src/conversion.rs rename to protocol/src/impl_trait/player.rs diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index 05aef10fd..c905ceb87 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -1,6 +1,6 @@ // This file is parsed by build.rs // Each included module will be compiled from the matching .proto definition. -mod conversion; +mod impl_trait; include!(concat!(env!("OUT_DIR"), "/mod.rs"));