Skip to content

Commit deb6d8f

Browse files
author
Андрей Ткаченко
committed
Async Mp4Stream
1 parent a4bb30d commit deb6d8f

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+3580
-2427
lines changed

Cargo.toml

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,17 @@ bytes = "1.1.0"
1919
num-rational = { version = "0.4.0", features = ["serde"] }
2020
serde = { version = "1.0", features = ["derive"] }
2121
serde_json = "1.0"
22+
tokio = { version = "1.37.0", features = ["io-util"] }
23+
futures = "0.3.30"
24+
const_format = "0.2.32"
25+
pin-project-lite = "0.2.14"
26+
async-stream = "0.3.5"
2227

2328
[dev-dependencies]
24-
criterion = "0.3"
29+
anyhow = "1.0"
30+
criterion = "0.5.1"
31+
tokio = { version = "1.37.0", features = ["full"] }
32+
tokio-util = "0.7.10"
2533

2634
[[bench]]
2735
name = "bench_main"

benches/bench_main.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
use criterion::BenchmarkId;
22
use criterion::{criterion_group, criterion_main, Criterion};
33

4-
use std::fs::File;
4+
// use std::fs::File;
55

6-
fn read_mp4(filename: &str) -> u64 {
7-
let f = File::open(filename).unwrap();
8-
let m = mp4::read_mp4(f).unwrap();
6+
fn read_mp4(_filename: &str) -> u64 {
7+
// let f = File::open(filename).unwrap();
8+
// let m = mp4::read_mp4(f).unwrap();
99

10-
m.size()
10+
// m.size()
11+
0
1112
}
1213

1314
fn criterion_benchmark(c: &mut Criterion) {

examples/mp4_to_mpeg2ts.rs

Lines changed: 338 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,338 @@
1+
// use std::convert::TryInto;
2+
// use std::env;
3+
// use std::fs::File;
4+
// use std::io::{self, BufReader};
5+
// use std::path::Path;
6+
7+
// use anyhow::bail;
8+
// use bytes::{BufMut, Bytes, BytesMut};
9+
// use futures::SinkExt;
10+
// use mp4::TrackType;
11+
// use std::io::{Cursor, Write};
12+
// use tokio_util::codec::Encoder;
13+
14+
// use bytes::Buf;
15+
// use mpeg2ts::{
16+
// es::{StreamId, StreamType},
17+
// pes::PesHeader,
18+
// time::{ClockReference, Timestamp},
19+
// ts::{
20+
// payload::{self, Pat, Pmt},
21+
// AdaptationField, ContinuityCounter, EsInfo, Pid, ProgramAssociation,
22+
// TransportScramblingControl, TsHeader, TsPacket, TsPacketWriter, TsPayload, VersionNumber,
23+
// WriteTsPacket,
24+
// },
25+
// Error as TsError,
26+
// };
27+
28+
// const PMT_PID: u16 = 4096;
29+
// const VIDEO_ES_PID: u16 = 256;
30+
// // const AUDIO_ES_PID: u16 = 258;
31+
// const PES_VIDEO_STREAM_ID: u8 = 224;
32+
// // const PES_AUDIO_STREAM_ID: u8 = 192;
33+
34+
// #[derive(Default)]
35+
// pub struct TsEncoder {
36+
// video_continuity_counter: ContinuityCounter,
37+
// header_sent: bool,
38+
// timestamp: i64,
39+
// }
40+
41+
// impl TsEncoder {
42+
// fn write_packet(
43+
// &mut self,
44+
// writer: &mut TsPacketWriter<impl Write>,
45+
// pts: Timestamp,
46+
// dts: Timestamp,
47+
// data: &[u8],
48+
// is_keyframe: bool,
49+
// ) -> Result<(), TsError> {
50+
// let mut header = Self::default_ts_header(VIDEO_ES_PID, self.video_continuity_counter)?;
51+
// let mut buf = Cursor::new(data);
52+
// let packet = {
53+
// let data = payload::Bytes::new(&buf.chunk()[..buf.remaining().min(153)])?;
54+
// buf.advance(data.len());
55+
56+
// TsPacket {
57+
// header: header.clone(),
58+
// adaptation_field: is_keyframe.then(|| AdaptationField {
59+
// discontinuity_indicator: false,
60+
// random_access_indicator: true,
61+
// es_priority_indicator: false,
62+
// pcr: Some(ClockReference::from(pts)),
63+
// opcr: None,
64+
// splice_countdown: None,
65+
// transport_private_data: Vec::new(),
66+
// extension: None,
67+
// }),
68+
// payload: Some(TsPayload::Pes(payload::Pes {
69+
// header: PesHeader {
70+
// stream_id: StreamId::new(PES_VIDEO_STREAM_ID),
71+
// priority: false,
72+
// data_alignment_indicator: false,
73+
// copyright: false,
74+
// original_or_copy: false,
75+
// pts: Some(pts),
76+
// dts: if pts == dts { None } else { Some(dts) },
77+
// escr: None,
78+
// },
79+
// pes_packet_len: 0,
80+
// data,
81+
// })),
82+
// }
83+
// };
84+
85+
// writer.write_ts_packet(&packet)?;
86+
// header.continuity_counter.increment();
87+
88+
// while buf.has_remaining() {
89+
// let raw_payload =
90+
// payload::Bytes::new(&buf.chunk()[..buf.remaining().min(payload::Bytes::MAX_SIZE)])?;
91+
92+
// buf.advance(raw_payload.len());
93+
94+
// let packet = TsPacket {
95+
// header: header.clone(),
96+
// adaptation_field: None,
97+
// payload: Some(TsPayload::Raw(raw_payload)),
98+
// };
99+
100+
// writer.write_ts_packet(&packet)?;
101+
// header.continuity_counter.increment();
102+
// }
103+
104+
// self.video_continuity_counter = header.continuity_counter;
105+
// Ok(())
106+
// }
107+
108+
// pub fn new(timestamp: i64) -> TsEncoder {
109+
// Self {
110+
// video_continuity_counter: Default::default(),
111+
// header_sent: false,
112+
// timestamp,
113+
// }
114+
// }
115+
// }
116+
117+
// struct Frame {
118+
// pub pts: i64,
119+
// pub dts: i64,
120+
// pub body: Bytes,
121+
// pub key: bool,
122+
// }
123+
124+
// impl<'a> Encoder<&'a Frame> for TsEncoder {
125+
// type Error = anyhow::Error;
126+
127+
// fn encode(&mut self, frame: &'a Frame, dst: &mut BytesMut) -> Result<(), Self::Error> {
128+
// let mut writer = TsPacketWriter::new(dst.writer());
129+
130+
// if !self.header_sent {
131+
// self.header_sent = true;
132+
// self.write_header(&mut writer, StreamType::H264)?;
133+
// }
134+
135+
// let pts = frame.pts - self.timestamp;
136+
// let dts = frame.dts - self.timestamp;
137+
// let p_ts = Timestamp::new((pts as u64 * 9) / 100 + 1).map_err(TsError::from)?;
138+
// let d_ts = Timestamp::new((dts as u64 * 9) / 100 + 1).map_err(TsError::from)?;
139+
140+
// self.write_packet(&mut writer, p_ts, d_ts, &frame.body, frame.key)?;
141+
142+
// Ok(())
143+
// }
144+
// }
145+
146+
// impl TsEncoder {
147+
// #[inline]
148+
// fn write_header<W: WriteTsPacket>(
149+
// &mut self,
150+
// writer: &mut W,
151+
// stream_type: StreamType,
152+
// ) -> Result<(), TsError> {
153+
// self.write_packets(
154+
// writer,
155+
// [
156+
// &Self::default_pat_packet(),
157+
// &Self::default_pmt_packet(stream_type),
158+
// ],
159+
// )?;
160+
161+
// Ok(())
162+
// }
163+
164+
// #[inline]
165+
// fn write_packets<'a, W: WriteTsPacket, P: IntoIterator<Item = &'a TsPacket>>(
166+
// &mut self,
167+
// writer: &mut W,
168+
// packets: P,
169+
// ) -> Result<(), TsError> {
170+
// packets
171+
// .into_iter()
172+
// .try_for_each(|pak| writer.write_ts_packet(pak))?;
173+
174+
// Ok(())
175+
// }
176+
177+
// fn default_ts_header(
178+
// pid: u16,
179+
// continuity_counter: ContinuityCounter,
180+
// ) -> Result<TsHeader, TsError> {
181+
// Ok(TsHeader {
182+
// transport_error_indicator: false,
183+
// transport_priority: false,
184+
// pid: Pid::new(pid)?,
185+
// transport_scrambling_control: TransportScramblingControl::NotScrambled,
186+
// continuity_counter,
187+
// })
188+
// }
189+
190+
// fn default_pat_packet() -> TsPacket {
191+
// TsPacket {
192+
// header: Self::default_ts_header(0, Default::default()).unwrap(),
193+
// adaptation_field: None,
194+
// payload: Some(TsPayload::Pat(Pat {
195+
// transport_stream_id: 1,
196+
// version_number: VersionNumber::default(),
197+
// table: vec![ProgramAssociation {
198+
// program_num: 1,
199+
// program_map_pid: Pid::new(PMT_PID).unwrap(),
200+
// }],
201+
// })),
202+
// }
203+
// }
204+
205+
// fn default_pmt_packet(stream_type: StreamType) -> TsPacket {
206+
// TsPacket {
207+
// header: Self::default_ts_header(PMT_PID, Default::default()).unwrap(),
208+
// adaptation_field: None,
209+
// payload: Some(TsPayload::Pmt(Pmt {
210+
// program_num: 1,
211+
// pcr_pid: Some(Pid::new(VIDEO_ES_PID).unwrap()),
212+
// version_number: VersionNumber::default(),
213+
// program_info: vec![],
214+
// es_info: vec![EsInfo {
215+
// stream_type,
216+
// elementary_pid: Pid::new(VIDEO_ES_PID).unwrap(),
217+
// descriptors: vec![],
218+
// }],
219+
// })),
220+
// }
221+
// }
222+
// }
223+
224+
// #[tokio::main(flavor = "current_thread")]
225+
// async fn main() {
226+
// let args: Vec<String> = env::args().collect();
227+
228+
// if args.len() < 2 {
229+
// println!("Usage: mp4sample <filename>");
230+
// std::process::exit(1);
231+
// }
232+
233+
// if let Err(err) = samples(&args[1]).await {
234+
// let _ = writeln!(io::stderr(), "{}", err);
235+
// }
236+
// }
237+
238+
// async fn samples<P: AsRef<Path>>(filename: &P) -> anyhow::Result<()> {
239+
// let mut ts_name = filename.as_ref().parent().unwrap().to_path_buf();
240+
// ts_name.push(format!(
241+
// "{}.ts",
242+
// filename.as_ref().file_stem().unwrap().to_str().unwrap()
243+
// ));
244+
245+
// let f = File::open(filename)?;
246+
// let size = f.metadata()?.len();
247+
// let reader = BufReader::new(f);
248+
// let ts_file = tokio::fs::File::create(ts_name).await.unwrap();
249+
250+
// let mut ts = tokio_util::codec::FramedWrite::new(ts_file, TsEncoder::new(-1_400_000));
251+
// let mut mp4 = mp4::Mp4Reader::read_header(reader, size)?;
252+
253+
// if let Some(track_id) = mp4.tracks().iter().find_map(|(k, v)| {
254+
// v.track_type()
255+
// .ok()
256+
// .and_then(|x| matches!(x, TrackType::Video).then_some(*k))
257+
// }) {
258+
// let sample_count = mp4.sample_count(track_id).unwrap();
259+
// let mut params = BytesMut::new();
260+
// let track = mp4.tracks().get(&track_id).unwrap();
261+
// let timescale = track.timescale();
262+
263+
// if let Ok(sps) = track.sequence_parameter_set() {
264+
// params.put_slice(&[0, 0, 0, 1]);
265+
// params.put_slice(sps);
266+
// }
267+
268+
// if let Ok(pps) = track.picture_parameter_set() {
269+
// params.put_slice(&[0, 0, 0, 1]);
270+
// params.put_slice(pps);
271+
// }
272+
273+
// for sample_idx in 0..sample_count {
274+
// let sample_id = sample_idx + 1;
275+
// let sample = mp4.read_sample(track_id, sample_id);
276+
277+
// if let Some(samp) = sample.unwrap() {
278+
// let dts = (samp.start_time as i64 * 1_000_000) / timescale as i64;
279+
// let pts = (samp.start_time as i64 + samp.rendering_offset as i64) * 1_000_000
280+
// / timescale as i64;
281+
282+
// let mut bytes = BytesMut::from(samp.bytes.as_ref());
283+
// convert_h264(&mut bytes).unwrap();
284+
285+
// let mut body = BytesMut::with_capacity(bytes.len() + 6);
286+
287+
// if sample_idx == 0 {
288+
// body.put_slice(&params);
289+
// }
290+
291+
// body.put_slice(&[0, 0, 0, 1, 9, 240]);
292+
// body.put_slice(&bytes);
293+
294+
// ts.send(&Frame {
295+
// pts,
296+
// dts,
297+
// body: body.freeze(),
298+
// key: samp.is_sync,
299+
// })
300+
// .await?;
301+
// }
302+
// }
303+
// }
304+
// Ok(())
305+
// }
306+
307+
// fn convert_h264(data: &mut [u8]) -> anyhow::Result<()> {
308+
// // TODO:
309+
// // * For each IDR frame, copy the SPS and PPS from the stream's
310+
// // parameters, rather than depend on it being present in the frame
311+
// // already. In-band parameters aren't guaranteed. This is awkward
312+
// // with h264_reader v0.5's h264_reader::avcc::AvcDecoderRecord because it
313+
// // strips off the NAL header byte from each parameter. The next major
314+
// // version shouldn't do this.
315+
// // * Copy only the slice data. In particular, don't copy SEI, which confuses
316+
// // Safari: <https://github.com/scottlamb/retina/issues/60#issuecomment-1178369955>
317+
318+
// let mut i = 0;
319+
// while i < data.len() - 3 {
320+
// // Replace each NAL's length with the Annex B start code b"\x00\x00\x00\x01".
321+
// let bytes = &mut data[i..i + 4];
322+
// let nalu_length = u32::from_be_bytes(bytes.try_into().unwrap()) as usize;
323+
// bytes.copy_from_slice(&[0, 0, 0, 1]);
324+
325+
// i += 4 + nalu_length;
326+
327+
// if i > data.len() {
328+
// bail!("partial nal body");
329+
// }
330+
// }
331+
332+
// if i < data.len() {
333+
// bail!("partial nal body");
334+
// }
335+
336+
// Ok(())
337+
// }
338+
fn main() {}

0 commit comments

Comments
 (0)