-
Notifications
You must be signed in to change notification settings - Fork 2
/
example.exs
83 lines (72 loc) · 2.3 KB
/
example.exs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
Mix.install([
{:membrane_agora_plugin, "~> 0.1.0"},
{:membrane_file_plugin, "~> 0.15.0"},
{:membrane_h264_plugin, "~> 0.7.2"},
{:membrane_realtimer_plugin, "~> 0.7.0"},
{:membrane_aac_plugin, "~> 0.16.0"}
])
defmodule Pipeline do
use Membrane.Pipeline
@video_path "test/fixtures/in_video.h264"
@audio_path "test/fixtures/in_audio.aac"
@framerate 30
@channel_name System.get_env("AGORA_CHANNEL_NAME", "")
@token System.get_env("AGORA_TOKEN", "")
@app_id System.get_env("AGORA_APP_ID", "")
@user_id System.get_env("AGORA_USER_ID", "0")
@impl true
def handle_init(_ctx, _options) do
spec = [
child(:video_source, %Membrane.File.Source{location: @video_path})
|> child(:video_parser, %Membrane.H264.Parser{
generate_best_effort_timestamps: %{framerate: {@framerate, 1}},
repeat_parameter_sets: true,
output_stream_structure: :annexb
})
|> child(:video_realtimer, Membrane.Realtimer)
|> via_in(Pad.ref(:video, 0))
|> child(:agora_sink, %Membrane.Agora.Sink{
channel_name: @channel_name,
token: @token,
app_id: @app_id,
user_id: @user_id
}),
child(:audio_source, %Membrane.File.Source{
location: @audio_path
})
|> child(:audio_parser, %Membrane.AAC.Parser{
out_encapsulation: :none,
samples_per_frame: 1024
})
|> child(:audio_realtimer, Membrane.Realtimer)
|> via_in(Pad.ref(:audio, 0))
|> get_child(:agora_sink)
]
{[spec: spec], %{terminated_tracks: []}}
end
@impl true
def handle_element_end_of_stream(:agora_sink, pad, _context, state) do
state = %{state | terminated_tracks: [pad | state.terminated_tracks]}
if all_tracks_terminated?(state.terminated_tracks) do
{[terminate: :normal], state}
else
{[], state}
end
end
@impl true
def handle_element_end_of_stream(_child, _pad, _context, state) do
{[], state}
end
defp all_tracks_terminated?(terminated_tracks) do
Pad.ref(:audio, 0) in terminated_tracks and Pad.ref(:video, 0) in terminated_tracks
end
def wait_for_termination(ref) do
receive do
{:DOWN, ^ref, :process, _pid, _reason} ->
nil
end
end
end
{:ok, _supervisor, pid} = Pipeline.start()
ref = Process.monitor(pid)
Pipeline.wait_for_termination(ref)