diff --git a/Cargo.toml b/Cargo.toml index 7623fe3c..795a6a8d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ members = [ exclude = [ "examples/concurrent", "examples/firehose", + "examples/video", ] resolver = "2" diff --git a/examples/video/Cargo.toml b/examples/video/Cargo.toml new file mode 100644 index 00000000..3a6c6f89 --- /dev/null +++ b/examples/video/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "video" +version = "0.1.0" +edition = "2021" + +[dependencies] +atrium-api = { version = "0.24.8", features = ["agent"] } +atrium-xrpc-client.version = "0.5.10" +clap = { version = "4.5.21", features = ["derive"] } +serde = { version = "1.0", features = ["derive"] } +serde_html_form = { version = "0.2.6", default-features = false } +tokio = { version = "1.41.1", features = ["macros", "rt-multi-thread"] } diff --git a/examples/video/README.md b/examples/video/README.md new file mode 100644 index 00000000..d18d4fd0 --- /dev/null +++ b/examples/video/README.md @@ -0,0 +1,14 @@ +## Example code for uploading a video + +1. First, get a token by `com.atproto.server.getServiceAuth`. + +2. Call uploadVideo against the video service (`video.bsky.app`) with the token. + +3. Call `app.bsky.video.getJobStatus` against the video service as well. + +(The same goes for `app.bsky.video.getUploadLimits`, which gets a token and calls the video service with it to get the data, but the process of checking this may be omitted.) + +In Atrium: + +- Since `AtpAgent` cannot process XRPC requests with the token obtained by `getServiceAuth`, we need to prepare a dedicated Client and create an `AtpServiceClient` that uses it. +- The `app.bsky.video.uploadVideo` endpoint is special (weird?) and requires special hacks such as adding query parameters to the request URL and modifying the response to match the schema. \ No newline at end of file diff --git a/examples/video/src/main.rs b/examples/video/src/main.rs new file mode 100644 index 00000000..36670876 --- /dev/null +++ b/examples/video/src/main.rs @@ -0,0 +1,265 @@ +use atrium_api::{ + agent::{store::MemorySessionStore, AtpAgent}, + client::AtpServiceClient, + types::{ + string::{Datetime, Did}, + Collection, TryIntoUnknown, Union, + }, + xrpc::{ + http::{uri::Builder, Request, Response}, + types::AuthorizationToken, + HttpClient, XrpcClient, + }, +}; +use atrium_xrpc_client::reqwest::ReqwestClient; +use clap::Parser; +use serde::Serialize; +use std::{fs::File, io::Read, path::PathBuf, time::Duration}; +use tokio::time; + +const VIDEO_SERVICE: &str = "https://video.bsky.app"; +const VIDEO_SERVICE_DID: &str = "did:web:video.bsky.app"; +const UPLOAD_VIDEO_PATH: &str = "/xrpc/app.bsky.video.uploadVideo"; + +/// Simple program to upload videos by ATrium API agent. +#[derive(Parser, Debug)] +#[command(author, version, about, long_about = None)] +struct Args { + /// Identifier of the login user. + #[arg(short, long)] + identifier: String, + /// App password of the login user. + #[arg(short, long)] + password: String, + /// Video file to upload. + #[arg(long, value_name = "VIDEO FILE")] + video: PathBuf, +} + +#[derive(Serialize)] +struct UploadParams { + did: Did, + name: String, +} + +struct VideoClient { + token: String, + params: Option, + inner: ReqwestClient, +} + +impl VideoClient { + fn new(token: String, params: Option) -> Self { + Self { + token, + params, + inner: ReqwestClient::new( + // Actually, `base_uri` returns `VIDEO_SERVICE`, so there is no need to specify this. + "https://dummy.example.com", + ), + } + } +} + +impl HttpClient for VideoClient { + async fn send_http( + &self, + mut request: Request>, + ) -> Result>, Box> { + let is_upload_video = request.uri().path() == UPLOAD_VIDEO_PATH; + // Hack: Append query parameters + if is_upload_video { + if let Some(params) = &self.params { + *request.uri_mut() = Builder::from(request.uri().clone()) + .path_and_query(format!( + "{UPLOAD_VIDEO_PATH}?{}", + serde_html_form::to_string(params)? + )) + .build()?; + } + } + let mut response = self.inner.send_http(request).await; + // Hack: Formatting an incorrect response body + if is_upload_video { + if let Ok(res) = response.as_mut() { + *res.body_mut() = + [b"{\"jobStatus\":".to_vec(), res.body().to_vec(), b"}".to_vec()].concat(); + } + } + response + } +} + +impl XrpcClient for VideoClient { + fn base_uri(&self) -> String { + VIDEO_SERVICE.to_string() + } + async fn authorization_token(&self, _: bool) -> Option { + Some(AuthorizationToken::Bearer(self.token.clone())) + } +} + +#[tokio::main] +async fn main() -> Result<(), Box> { + let args = Args::parse(); + // Read video file + let data = { + let mut file = File::open(&args.video)?; + let mut buf = Vec::new(); + file.read_to_end(&mut buf)?; + buf + }; + + // Login + println!("Logging in..."); + let agent = + AtpAgent::new(ReqwestClient::new("https://bsky.social"), MemorySessionStore::default()); + let session = agent.login(&args.identifier, &args.password).await?; + + // Check upload limits + println!("Checking upload limits..."); + let limits = { + let service_auth = agent + .api + .com + .atproto + .server + .get_service_auth( + atrium_api::com::atproto::server::get_service_auth::ParametersData { + aud: VIDEO_SERVICE_DID.parse().expect("invalid DID"), + exp: None, + lxm: atrium_api::app::bsky::video::get_upload_limits::NSID.parse().ok(), + } + .into(), + ) + .await?; + let client = AtpServiceClient::new(VideoClient::new(service_auth.data.token, None)); + client.service.app.bsky.video.get_upload_limits().await? + }; + println!("{:?}", limits.data); + if !limits.can_upload + || limits.remaining_daily_bytes.map_or(false, |remain| remain < data.len() as i64) + || limits.remaining_daily_videos.map_or(false, |remain| remain <= 0) + { + eprintln!("You cannot upload a video: {:?}", limits.data); + return Ok(()); + } + + // Upload video + println!("Uploading video..."); + let output = { + let service_auth = agent + .api + .com + .atproto + .server + .get_service_auth( + atrium_api::com::atproto::server::get_service_auth::ParametersData { + aud: format!( + "did:web:{}", + agent.get_endpoint().await.strip_prefix("https://").unwrap() + ) + .parse() + .expect("invalid DID"), + exp: None, + lxm: atrium_api::com::atproto::repo::upload_blob::NSID.parse().ok(), + } + .into(), + ) + .await?; + + let filename = args + .video + .file_name() + .and_then(|s| s.to_os_string().into_string().ok()) + .expect("failed to get filename"); + let client = AtpServiceClient::new(VideoClient::new( + service_auth.data.token, + Some(UploadParams { did: session.did.clone(), name: filename }), + )); + client.service.app.bsky.video.upload_video(data).await? + }; + println!("{:?}", output.job_status.data); + + // Wait for the video to be uploaded + let client = AtpServiceClient::new(ReqwestClient::new(VIDEO_SERVICE)); + let mut status = output.data.job_status.data; + loop { + status = client + .service + .app + .bsky + .video + .get_job_status( + atrium_api::app::bsky::video::get_job_status::ParametersData { + job_id: status.job_id.clone(), + } + .into(), + ) + .await? + .data + .job_status + .data; + println!("{status:?}"); + if status.blob.is_some() + || status.state == "JOB_STATE_CREATED" + || status.state == "JOB_STATE_FAILED" + { + break; + } + time::sleep(Duration::from_millis(100)).await; + } + let Some(video) = status.blob else { + eprintln!("Failed to get blob: {status:?}"); + return Ok(()); + }; + if let Some(message) = status.message { + println!("{message}"); + } + + // Post to feed with the video + println!("Video uploaded: {video:?}"); + let record = atrium_api::app::bsky::feed::post::RecordData { + created_at: Datetime::now(), + embed: Some(Union::Refs( + atrium_api::app::bsky::feed::post::RecordEmbedRefs::AppBskyEmbedVideoMain(Box::new( + atrium_api::app::bsky::embed::video::MainData { + alt: Some(String::from("alt text")), + aspect_ratio: None, + captions: None, + video, + } + .into(), + )), + )), + entities: None, + facets: None, + labels: None, + langs: None, + reply: None, + tags: None, + text: String::new(), + } + .try_into_unknown() + .expect("failed to convert record"); + let output = agent + .api + .com + .atproto + .repo + .create_record( + atrium_api::com::atproto::repo::create_record::InputData { + collection: atrium_api::app::bsky::feed::Post::nsid(), + record, + repo: session.data.did.into(), + rkey: None, + swap_commit: None, + validate: Some(true), + } + .into(), + ) + .await?; + println!("{:?}", output.data); + + Ok(()) +}