-
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(output): implement SMC FTP support (#552)
- Loading branch information
Showing
5 changed files
with
207 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,102 @@ | ||
import logging | ||
from datetime import timedelta | ||
from ftplib import FTP_TLS | ||
from io import BytesIO | ||
|
||
import configargparse | ||
|
||
from ..track import Track | ||
from .base import TrackObserver | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
class SmcFtpTrackObserver(TrackObserver): | ||
"""Update track metadata for DLS and DL+ to the SMC FTP server.""" | ||
|
||
name = "SMC FTP" | ||
|
||
class Options(TrackObserver.Options): # pragma: no coverage | ||
@classmethod | ||
def args(cls, args: configargparse.ArgParser): | ||
args.add_argument( | ||
"--dab-smc", | ||
help="Enable SMC FTP delivery", | ||
type=bool, | ||
default=False, | ||
) | ||
args.add_argument( | ||
"--dab-smc-ftp-hostname", | ||
help="Hostname of SMC FTP server", | ||
default=[], | ||
) | ||
args.add_argument( | ||
"--dab-smc-ftp-username", | ||
help="Username for SMC FTP server", | ||
) | ||
args.add_argument( | ||
"--dab-smc-ftp-password", help="Password for SMC FTP server" | ||
) | ||
|
||
def __init__(self, hostname: str, username: str, password: str) -> None: | ||
self.hostname: str = hostname | ||
self.username: str = username | ||
self.password: str = password | ||
|
||
def __init__(self, options: Options): | ||
self._options = options | ||
|
||
def track_started(self, track: Track): | ||
logger.info(f"Updating DAB+ DLS for track: {track.artist} - {track.title}") | ||
|
||
if track.get_duration() < timedelta(seconds=5): | ||
logger.info("Track is less than 5 seconds, not sending to SMC") | ||
return | ||
|
||
dls, dlplus = _dls_from_track(track) | ||
|
||
# check for too long meta and shorten to just artist | ||
if dls.getbuffer().nbytes > 128: # pragma: no cover | ||
logger.warning(f"SMC DLS to long {dls=}") | ||
dls, dlplus = _dls_from_track(track, title=False) | ||
|
||
ftp = FTP_TLS() | ||
ftp.connect(self._options.hostname) | ||
ftp.sendcmd(f"USER {self._options.username}") | ||
ftp.sendcmd(f"PASS {self._options.password}") | ||
|
||
ftp.storlines("STOR /dls/nowplaying.dls", dls) | ||
ftp.storlines("STOR /dlplus/nowplaying.dls", dlplus) | ||
|
||
ftp.close() | ||
|
||
logger.info( | ||
f"SMC FTP Server: {self._options.hostname} DLS: {dls} DL+: {dlplus}" | ||
) | ||
|
||
def track_finished(self, track): | ||
return True | ||
|
||
|
||
def _dls_from_track(track: Track, title=True) -> (BytesIO, BytesIO): | ||
# track.artist contains station name if no artist is set | ||
dls = f"{track.artist} - {track.show.name}" if title else track.artist | ||
dlplus = "" | ||
|
||
if not track.has_default_title() and not track.has_default_artist(): | ||
dls = f"{track.artist} - {track.title}" if title else track.artist | ||
dlplus = ( | ||
f"artist={track.artist}\ntitle={track.title}\n" | ||
if title | ||
else f"artist={track.artist}\n" | ||
) | ||
|
||
return (_bytes_from_string(dls), _bytes_from_string(dlplus)) | ||
|
||
|
||
def _bytes_from_string(string: str) -> BytesIO: | ||
b = BytesIO() | ||
# encode as latin1 since that is what DAB supports | ||
b.write(string.encode("latin1")) | ||
b.seek(0) | ||
return b |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
"""Tests for :class:`SmcFtpTrackObserver`.""" | ||
from unittest.mock import ANY, Mock, call, patch | ||
|
||
from nowplaying.track.observers.smc_ftp import SmcFtpTrackObserver | ||
from nowplaying.track.track import Track | ||
|
||
|
||
def test_init(): | ||
"""Test class:`SmcFrpTrackObserver`'s :meth:`.__init__` method.""" | ||
SmcFtpTrackObserver( | ||
options=SmcFtpTrackObserver.Options( | ||
hostname="hostname", | ||
username="username", | ||
password="password", | ||
) | ||
) | ||
|
||
|
||
@patch("nowplaying.track.observers.smc_ftp.FTP_TLS") | ||
def test_track_started(mock_ftp, track_factory, show_factory): | ||
"""Test :class:`SmcFtpTrackObserver`'s :meth:`track_started` method.""" | ||
mock_ftp_instance = Mock() | ||
mock_ftp.return_value = mock_ftp_instance | ||
|
||
track = track_factory() | ||
track.show = show_factory() | ||
|
||
smc_ftp_track_observer = SmcFtpTrackObserver( | ||
options=SmcFtpTrackObserver.Options( | ||
hostname="hostname", | ||
username="username", | ||
password="password", | ||
) | ||
) | ||
smc_ftp_track_observer._ftp_cls = mock_ftp | ||
smc_ftp_track_observer.track_started(track) | ||
mock_ftp.assert_called_once() | ||
mock_ftp_instance.assert_has_calls( | ||
calls=[ | ||
call.connect("hostname"), | ||
call.sendcmd("USER username"), | ||
call.sendcmd("PASS password"), | ||
call.storlines( | ||
"STOR /dls/nowplaying.dls", | ||
ANY, | ||
), | ||
call.storlines( | ||
"STOR /dlplus/nowplaying.dls", | ||
ANY, | ||
), | ||
call.close(), | ||
] | ||
) | ||
|
||
# test skipping short tracks | ||
track = track_factory(artist="Radio Bern", title="Livestream", duration=3) | ||
mock_ftp.reset_mock() | ||
mock_ftp_instance.reset_mock() | ||
smc_ftp_track_observer.track_started(track) | ||
mock_ftp_instance.storlines.assert_not_called() | ||
|
||
# test default track | ||
track = track_factory(artist="Radio Bern", title="Livestream", duration=60) | ||
track.show = show_factory() | ||
mock_ftp.reset_mock() | ||
mock_ftp_instance.reset_mock() | ||
smc_ftp_track_observer.track_started(track) | ||
mock_ftp_instance.assert_has_calls( | ||
calls=[ | ||
call.connect("hostname"), | ||
call.sendcmd("USER username"), | ||
call.sendcmd("PASS password"), | ||
call.storlines( | ||
"STOR /dls/nowplaying.dls", | ||
ANY, | ||
), | ||
call.storlines("STOR /dlplus/nowplaying.dls", ANY), | ||
call.close(), | ||
] | ||
) | ||
|
||
|
||
def test_track_finished(): | ||
"""Test class:`SmcFtpTrackObserver`'s :meth:`.track_finished` method.""" | ||
smc_ftp_track_observer = SmcFtpTrackObserver( | ||
options=SmcFtpTrackObserver.Options( | ||
hostname="hostname", | ||
username="username", | ||
password="password", | ||
) | ||
) | ||
assert smc_ftp_track_observer.track_finished(Track()) |