Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ptp sync callback #35

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions ap2-receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from ap2.dxxp import parse_dxxp
from enum import IntFlag

from ap2.connections.ptp_time import PTP

"""
# No Auth - coreutils, PairSetupMfi
Expand Down Expand Up @@ -471,7 +472,7 @@ def do_SETUP(self):
else:
print("Sending CONTROL/DATA:")
buff = 8388608 # determines how many CODEC frame size 1024 we can hold
stream = Stream(plist["streams"][0], buff)
stream = Stream(plist["streams"][0], buff, self.server.ptp_link)
set_volume_pid(stream.data_proc.pid)
self.server.streams.append(stream)
device_setup_data["streams"][0]["controlPort"] = stream.control_port
Expand All @@ -492,7 +493,19 @@ def do_SETUP(self):
self.send_header("CSeq", self.headers["CSeq"])
self.end_headers()
self.wfile.write(res)

if "timingProtocol" in plist:
if plist["timingProtocol"] == "PTP":
if self.server.ptp_proc is None:
print("PTP Startup")
mac = int(
(ifen[ni.AF_LINK][0]["addr"]).replace(":", ""), 16
)
self.server.ptp_proc, self.server.ptp_link = PTP.spawn(mac)
else:
print("PTP reusing")
return

self.send_error(404)

def do_GET_PARAMETER(self):
Expand Down Expand Up @@ -587,10 +600,19 @@ def do_SETRATEANCHORTIME(self):
try:
if content_len > 0:
body = self.rfile.read(content_len)

plist = readPlistFromString(body)
if plist["rate"] == 1:
self.server.streams[0].audio_connection.send("play-%i" % plist["rtpTime"])
networkTime = plist["networkTimeSecs"] * (10 ** 9)
sample_bytes = plist["networkTimeFrac"].to_bytes(
8, byteorder="big", signed=True
)
uint64_sample = int.from_bytes(sample_bytes, byteorder="big")
nthFactor = 0.5 ** 64
nanos = int(uint64_sample * nthFactor * (10 ** 9))
networkTime += nanos
self.server.streams[0].audio_connection.send(
f'play-{plist["rtpTime"]}-{networkTime}'
)
if plist["rate"] == 0:
self.server.streams[0].audio_connection.send("pause")
self.pp.pprint(plist)
Expand Down Expand Up @@ -629,8 +651,9 @@ def do_TEARDOWN(self):
# Erase the hap() instance, otherwise reconnects fail
self.server.hap = None

# terminate the forked event_proc, otherwise a zombie process consumes 100% cpu
self.event_proc.terminate()
# # terminate the forked event_proc, otherwise a zombie process consumes 100% cpu
# this is causing skip track to not work
# self.event_proc.terminate()

def do_SETPEERS(self):
print("SETPEERS %s" % self.path)
Expand Down Expand Up @@ -978,6 +1001,8 @@ def __init__(self, addr_port, handler):
self.hap = None
self.enc_layer = False
self.streams = []
self.event_proc = None
self.ptp_proc = None

# Override
def get_request(self):
Expand Down
113 changes: 83 additions & 30 deletions ap2/connections/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from ..utils import get_logger, get_free_tcp_socket, get_free_udp_socket

from ap2.connections.ptp_time import PTP

class RTP:
def __init__(self, data):
Expand Down Expand Up @@ -119,6 +120,11 @@ def next(self):

return buffered_object

def previous(self):
self.read_index = self.decrement_index(self.read_index)
buffered_object = self.buffer_array[self.read_index]
return buffered_object

def get_fullness(self):
# get distance between read and write in relation to buff size
return ((self.BUFFER_SIZE + self.write_index - self.read_index)
Expand Down Expand Up @@ -268,7 +274,9 @@ def init_audio_sink(self):
self.sink = self.pa.open(format=self.pa.get_format_from_width(2),
channels=self.channel_count,
rate=self.sample_rate,
output=True)
output=True,
stream_callback=self.callback,
start=False,)
# nice Python3 crash if we don't check self.sink is null. Not harmful, but should check.
if not self.sink:
exit()
Expand Down Expand Up @@ -351,21 +359,21 @@ def process(self, rtp):
frame = self.resampler.resample(frame)
return frame.planes[0].to_bytes()

def run(self, parent_reader_connection):
def run(self, parent_reader_connection, ptp_link):
# This pipe is between player (read data) and server (write data)
parent_writer_connection, writer_connection = multiprocessing.Pipe()
server_thread = threading.Thread(target=self.serve, args=(writer_connection,))
player_thread = threading.Thread(target=self.play, args=(parent_reader_connection, parent_writer_connection))
player_thread = threading.Thread(target=self.play, args=(parent_reader_connection, parent_writer_connection, ptp_link))

server_thread.start()
player_thread.start()

@classmethod
def spawn(cls, session_key, audio_format, buff):
def spawn(cls, session_key, audio_format, buff, ptp_link=None):
audio = cls(session_key, audio_format, buff)
# This pipe is reachable from receiver
parent_reader_connection, audio.audio_connection = multiprocessing.Pipe()
mainprocess = multiprocessing.Process(target=audio.run, args=(parent_reader_connection,))
mainprocess = multiprocessing.Process(target=audio.run, args=(parent_reader_connection, ptp_link))
mainprocess.start()

return audio.port, mainprocess, audio.audio_connection
Expand Down Expand Up @@ -429,6 +437,49 @@ def get_min_timestamp(self):

return res


def callback(self, in_data, frame_count, time_info, status):
self.ptp_link.send("get_ptp_master_nanos_timestamped")
if self.ptp_link.poll(1):
network_time_ns, network_time_monotonic_ts = self.ptp_link.recv()
time_monotonic_ns = time.monotonic_ns()
network_time_ns += time_monotonic_ns - network_time_monotonic_ts
else:
return

rtp = self.rtp_buffer.next()
if not rtp:
print(f"callback {frame_count} no more data")
return (None, pyaudio.paAbort)

dac_offset = time_info["output_buffer_dac_time"] - time_info["current_time"]

rtp_timestamp = (
(network_time_ns - self.anchorNetworkTime) / (10 ** 9) + dac_offset
) * self.sample_rate + self.anchorRtpTime

# print(
# f"callback {frame_count} {rtp.timestamp} {time_info['output_buffer_dac_time']} {time_info['current_time']} ts: {rtp_timestamp} dac offset {dac_offset}"
# )
skip = 0
while rtp_timestamp - rtp.timestamp > 1024:
rtp = self.rtp_buffer.next()
if rtp is None:
return
skip += 1
if skip != 0:
print(f"skipped {skip}")

back = 0
while skip == 0 and rtp.timestamp - rtp_timestamp > 1024:
rtp = self.rtp_buffer.previous()
back += 1
if back > 0:
print(f"went back {back}")

audio = self.process(rtp)
return (audio, pyaudio.paContinue)

def forward(self, requested_timestamp):
finished = False
while not finished:
Expand All @@ -444,10 +495,11 @@ def forward(self, requested_timestamp):
finished = True

# player moves readindex in buffer
def play(self, rtspconn, serverconn):
def play(self, rtspconn, serverconn, ptp_link):
playing = False
data_ready = False
data_ontime = True
self.ptp_link = ptp_link
i = 0
while True:
if not playing:
Expand All @@ -459,13 +511,17 @@ def play(self, rtspconn, serverconn):
else:
server_timeout = 0

if self.rtp_buffer.can_read():
if not data_ready and self.rtp_buffer.get_fullness() > 0.2:
print(
f"setting data ready at buffer fullness {self.rtp_buffer.get_fullness()}"
)
data_ready = True

if serverconn.poll(server_timeout):
message = serverconn.recv()
if message == "data_ready":
data_ready = True
print(f"setting data ready at from server")
elif message == "data_ontime_response":
print("player: ontime data response received")
ts = self.get_min_timestamp()
Expand All @@ -478,43 +534,40 @@ def play(self, rtspconn, serverconn):
message = rtspconn.recv()
if str.startswith(message, "play"):
self.anchorMonotonicTime = time.monotonic_ns()
self.anchorRtpTime = int(str.split(message, "-")[1])
msg_data = str.split(message, "-")
self.anchorRtpTime = int(msg_data[1])
self.anchorNetworkTime = int(msg_data[2])

playing = True

elif message == "pause":
playing = False
data_ready = False
print("pause event")
self.sink.stop_stream()

elif str.startswith(message, "flush_from_until_seq"):
pending_flush_from_seq, pending_flush_until_seq = str.split(message, "-")[-2:]
pending_flush_from_seq, pending_flush_until_seq = str.split(
message, "-"
)[-2:]
pending_flush_from_seq = int(pending_flush_from_seq)
pending_flush_until_seq = int(pending_flush_until_seq)

print("player: request flush received from-until %i-%i" % (pending_flush_from_seq, pending_flush_until_seq))
print("player: relay message to server to flush from-until sequence %i-%i" % (pending_flush_from_seq, pending_flush_until_seq))
print(
"player: request flush received from-until %i-%i"
% (pending_flush_from_seq, pending_flush_until_seq)
)
print(
"player: relay message to server to flush from-until sequence %i-%i"
% (pending_flush_from_seq, pending_flush_until_seq)
)
serverconn.send(message)

if playing and data_ready:
rtp = self.rtp_buffer.next()
if rtp:
time_offset_ms = self.get_time_offset(rtp.timestamp)
if i % 1000 == 0:
# pass
print("player: offset is %i ms" % time_offset_ms)
if time_offset_ms >= (self.sample_delay * 1000):
# print("player: offset %i ms too big - seq = %i - sleeping %s sec" % (time_offset_ms, rtp.sequence_no, "{:05.2f}".format(time_offset_ms /1000)))
# time.sleep(time_offset_ms / 1000)
time.sleep((self.sample_delay / 2) - 0.001)
elif time_offset_ms < -100:
print("player: offset %i ms too low - seq = %i - sending ontime data request" % (time_offset_ms, rtp.sequence_no))
# request on_time data message
serverconn.send("on_time_data_request")
data_ontime = False

audio = self.process(rtp)
self.sink.write(audio)
i += 1
if not self.sink.is_active():
print("starting stream")
self.sink.start_stream()
continue # use callback

# server moves write index in buffer
# the exception to this rule is the buffer initialization (init call)
Expand Down
Loading