Skip to content

Commit

Permalink
[YouTube] Rework n-sig processing, realigning with yt-dlp
Browse files Browse the repository at this point in the history
* apply n-sig before chunked fragments, fixes ytdl-org#32692
  • Loading branch information
dirkf committed Jan 15, 2024
1 parent 5831eaa commit 3dad3c4
Showing 1 changed file with 151 additions and 129 deletions.
280 changes: 151 additions & 129 deletions youtube_dl/extractor/youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -1460,6 +1460,30 @@ def __init__(self, *args, **kwargs):
self._code_cache = {}
self._player_cache = {}

# *ytcfgs, webpage=None
def _extract_player_url(self, *ytcfgs, **kw_webpage):
if ytcfgs and not isinstance(ytcfgs[0], dict):
webpage = kw_webpage.get('webpage') or ytcfgs[0]
if webpage:
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage or '', 'player URL', fatal=False)
if player_url:
ytcfgs = ytcfgs + ({'PLAYER_JS_URL': player_url},)
return traverse_obj(
ytcfgs, (Ellipsis, 'PLAYER_JS_URL'), (Ellipsis, 'WEB_PLAYER_CONTEXT_CONFIGS', Ellipsis, 'jsUrl'),
get_all=False, expected_type=lambda u: urljoin('https://www.youtube.com', u))

def _download_player_url(self, video_id, fatal=False):
res = self._download_webpage(
'https://www.youtube.com/iframe_api',
note='Downloading iframe API JS', video_id=video_id, fatal=fatal)
player_version = self._search_regex(
r'player\\?/([0-9a-fA-F]{8})\\?/', res or '', 'player version', fatal=fatal,
default=NO_DEFAULT if res else None)
if player_version:
return 'https://www.youtube.com/s/player/{0}/player_ias.vflset/en_US/base.js'.format(player_version)

def _signature_cache_id(self, example_sig):
""" Return a string representation of a signature """
return '.'.join(compat_str(len(part)) for part in example_sig.split('.'))
Expand All @@ -1474,46 +1498,49 @@ def _extract_player_info(cls, player_url):
raise ExtractorError('Cannot identify player %r' % player_url)
return id_m.group('id')

def _get_player_code(self, video_id, player_url, player_id=None):
def _load_player(self, video_id, player_url, fatal=True, player_id=None):
if not player_id:
player_id = self._extract_player_info(player_url)

if player_id not in self._code_cache:
self._code_cache[player_id] = self._download_webpage(
player_url, video_id,
code = self._download_webpage(
player_url, video_id, fatal=fatal,
note='Downloading player ' + player_id,
errnote='Download of %s failed' % player_url)
return self._code_cache[player_id]
if code:
self._code_cache[player_id] = code
return self._code_cache[player_id] if fatal else self._code_cache.get(player_id)

def _extract_signature_function(self, video_id, player_url, example_sig):
player_id = self._extract_player_info(player_url)

# Read from filesystem cache
func_id = 'js_%s_%s' % (
func_id = 'js_{0}_{1}'.format(
player_id, self._signature_cache_id(example_sig))
assert os.path.basename(func_id) == func_id

cache_spec = self._downloader.cache.load('youtube-sigfuncs', func_id)
if cache_spec is not None:
return lambda s: ''.join(s[i] for i in cache_spec)
self.write_debug('Extracting signature function {0}'.format(func_id))
cache_spec, code = self.cache.load('youtube-sigfuncs', func_id), None

code = self._get_player_code(video_id, player_url, player_id)
res = self._parse_sig_js(code)

test_string = ''.join(map(compat_chr, range(len(example_sig))))
cache_res = res(test_string)
cache_spec = [ord(c) for c in cache_res]
if not cache_spec:
code = self._load_player(video_id, player_url, player_id)
if code:
res = self._parse_sig_js(code)
test_string = ''.join(map(compat_chr, range(len(example_sig))))
cache_spec = [ord(c) for c in res(test_string)]
self.cache.store('youtube-sigfuncs', func_id, cache_spec)

self._downloader.cache.store('youtube-sigfuncs', func_id, cache_spec)
return res
return lambda s: ''.join(s[i] for i in cache_spec)

def _print_sig_code(self, func, example_sig):
if not self.get_param('youtube_print_sig_code'):
return

def gen_sig_code(idxs):
def _genslice(start, end, step):
starts = '' if start == 0 else str(start)
ends = (':%d' % (end + step)) if end + step >= 0 else ':'
steps = '' if step == 1 else (':%d' % step)
return 's[%s%s%s]' % (starts, ends, steps)
return 's[{0}{1}{2}]'.format(starts, ends, steps)

step = None
# Quelch pyflakes warnings - start will be set when step is set
Expand Down Expand Up @@ -1564,160 +1591,155 @@ def _parse_sig_js(self, jscode):
jscode, 'Initial JS player signature function name', group='sig')

jsi = JSInterpreter(jscode)

initial_function = jsi.extract_function(funcname)

return lambda s: initial_function([s])

def _decrypt_signature(self, s, video_id, player_url):
"""Turn the encrypted s field into a working signature"""
def _cached(self, func, *cache_id):
def inner(*args, **kwargs):
if cache_id not in self._player_cache:
try:
self._player_cache[cache_id] = func(*args, **kwargs)
except ExtractorError as e:
self._player_cache[cache_id] = e
except Exception as e:
self._player_cache[cache_id] = ExtractorError(traceback.format_exc(), cause=e)

if player_url is None:
raise ExtractorError('Cannot decrypt signature without player_url')
ret = self._player_cache[cache_id]
if isinstance(ret, Exception):
raise ret
return ret
return inner

try:
player_id = (player_url, self._signature_cache_id(s))
if player_id not in self._player_cache:
func = self._extract_signature_function(
video_id, player_url, s
)
self._player_cache[player_id] = func
func = self._player_cache[player_id]
if self._downloader.params.get('youtube_print_sig_code'):
self._print_sig_code(func, s)
return func(s)
except Exception as e:
tb = traceback.format_exc()
raise ExtractorError(
'Signature extraction failed: ' + tb, cause=e)

def _extract_player_url(self, webpage):
player_url = self._search_regex(
r'"(?:PLAYER_JS_URL|jsUrl)"\s*:\s*"([^"]+)"',
webpage or '', 'player URL', fatal=False)
if not player_url:
return
if player_url.startswith('//'):
player_url = 'https:' + player_url
elif not re.match(r'https?://', player_url):
player_url = compat_urllib_parse.urljoin(
'https://www.youtube.com', player_url)
return player_url
def _decrypt_signature(self, s, video_id, player_url):
"""Turn the encrypted s field into a working signature"""
extract_sig = self._cached(
self._extract_signature_function, 'sig', player_url, self._signature_cache_id(s))
func = extract_sig(video_id, player_url, s)
self._print_sig_code(func, s)
return func(s)

# from yt-dlp
# See also:
# 1. https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419
# 2. https://code.videolan.org/videolan/vlc/-/blob/4fb284e5af69aa9ac2100ccbdd3b88debec9987f/share/lua/playlist/youtube.lua#L116
# 3. https://github.com/ytdl-org/youtube-dl/issues/30097#issuecomment-950157377
def _decrypt_nsig(self, n, video_id, player_url):
"""Turn the encrypted n field into a working signature"""
if player_url is None:
raise ExtractorError('Cannot decrypt nsig without player_url')

try:
jsi, player_id, func_code = self._extract_n_function_code(video_id, player_url)
except ExtractorError as e:
raise ExtractorError('Unable to extract nsig jsi, player_id, func_codefunction code', cause=e)
if self.get_param('youtube_print_sig_code'):
self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(
player_id, func_code[1]))

try:
extract_nsig = self._cached(self._extract_n_function_from_code, 'nsig func', player_url)
ret = extract_nsig(jsi, func_code)(n)
except JSInterpreter.Exception as e:
self.report_warning(
'%s (%s %s)' % (
self.__ie_msg(
'Unable to decode n-parameter: download likely to be throttled'),
error_to_compat_str(e),
traceback.format_exc()))
return

self.write_debug('Decrypted nsig {0} => {1}'.format(n, ret))
return ret

def _extract_n_function_name(self, jscode):
target = r'(?P<nfunc>[a-zA-Z_$][\w$]*)(?:\[(?P<idx>\d+)\])?'
nfunc_and_idx = self._search_regex(
r'\.get\("n"\)\)&&\(b=(%s)\([\w$]+\)' % (target, ),
jscode, 'Initial JS player n function name')
nfunc, idx = re.match(target, nfunc_and_idx).group('nfunc', 'idx')
func_name, idx = self._search_regex(
r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z_$][\w$]*)(?:\[(?P<idx>\d+)\])?\([\w$]+\)',
jscode, 'Initial JS player n function name', group=('nfunc', 'idx'))
if not idx:
return nfunc
return func_name

VAR_RE_TMPL = r'var\s+%s\s*=\s*(?P<name>\[(?P<alias>%s)\])[;,]'
note = 'Initial JS player n function {0} (%s[%s])' % (nfunc, idx)
return self._parse_json(self._search_regex(
r'var {0}\s*=\s*(\[.+?\])\s*[,;]'.format(re.escape(func_name)), jscode,
'Initial JS player n function list ({0}.{1})'.format(func_name, idx)),
func_name, transform_source=js_to_json)[int(idx)]

def search_function_code(needle, group):
return self._search_regex(
VAR_RE_TMPL % (re.escape(nfunc), needle), jscode,
note.format(group), group=group)
def _extract_n_function_code(self, video_id, player_url):
player_id = self._extract_player_info(player_url)
func_code = self.cache.load('youtube-nsig', player_id)
jscode = func_code or self._load_player(video_id, player_url)
jsi = JSInterpreter(jscode)

if int_or_none(idx) == 0:
real_nfunc = search_function_code(r'[a-zA-Z_$][\w$]*', group='alias')
if real_nfunc:
return real_nfunc
return self._parse_json(
search_function_code('.+?', group='name'),
nfunc, transform_source=js_to_json)[int(idx)]
if func_code:
return jsi, player_id, func_code

def _extract_n_function(self, video_id, player_url):
player_id = self._extract_player_info(player_url)
func_code = self._downloader.cache.load('youtube-nsig', player_id)
func_name = self._extract_n_function_name(jscode)

# For redundancy
func_code = self._search_regex(
r'''(?xs)%s\s*=\s*function\s*\((?P<var>[\w$]+)\)\s*
# NB: The end of the regex is intentionally kept strict
{(?P<code>.+?}\s*return\ [\w$]+.join\(""\))};''' % func_name,
jscode, 'nsig function', group=('var', 'code'), default=None)
if func_code:
jsi = JSInterpreter(func_code)
func_code = ([func_code[0]], func_code[1])
else:
jscode = self._get_player_code(video_id, player_url, player_id)
funcname = self._extract_n_function_name(jscode)
jsi = JSInterpreter(jscode)
func_code = jsi.extract_function_code(funcname)
self._downloader.cache.store('youtube-nsig', player_id, func_code)

if self._downloader.params.get('youtube_print_sig_code'):
self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(player_id, func_code[1]))

return lambda s: jsi.extract_function_from_code(*func_code)([s])

def _n_descramble(self, n_param, player_url, video_id):
"""Compute the response to YT's "n" parameter challenge,
or None
Args:
n_param -- challenge string that is the value of the
URL's "n" query parameter
player_url -- URL of YT player JS
video_id
"""
self.write_debug('Extracting nsig function with jsinterp')
func_code = jsi.extract_function_code(func_name)

sig_id = ('nsig_value', n_param)
if sig_id in self._player_cache:
return self._player_cache[sig_id]
self.cache.store('youtube-nsig', player_id, func_code)
return jsi, player_id, func_code

def _extract_n_function_from_code(self, jsi, func_code):
func = jsi.extract_function_from_code(*func_code)

def extract_nsig(s):
try:
ret = func([s])
except JSInterpreter.Exception:
raise
except Exception as e:
raise JSInterpreter.Exception(traceback.format_exc(), cause=e)

try:
player_id = ('nsig', player_url)
if player_id not in self._player_cache:
self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
func = self._player_cache[player_id]
ret = func(n_param)
if ret.startswith('enhanced_except_'):
raise ExtractorError('Unhandled exception in decode')
self._player_cache[sig_id] = ret
if self._downloader.params.get('verbose', False):
self._downloader.to_screen('[debug] [%s] %s' % (self.IE_NAME, 'Decrypted nsig {0} => {1}'.format(n_param, self._player_cache[sig_id])))
return self._player_cache[sig_id]
except Exception as e:
self._downloader.report_warning(
'[%s] %s (%s %s)' % (
self.IE_NAME,
'Unable to decode n-parameter: download likely to be throttled',
error_to_compat_str(e),
traceback.format_exc()))
raise JSInterpreter.Exception('Signature function returned an exception')
return ret

return extract_nsig

def _unthrottle_format_urls(self, video_id, player_url, *formats):

def decrypt_nsig(n, p):
return self._decrypt_nsig(n, video_id, p)

def _unthrottle_format_urls(self, video_id, player_url, formats):
for fmt in formats:
parsed_fmt_url = compat_urllib_parse.urlparse(fmt['url'])
n_param = compat_parse_qs(parsed_fmt_url.query).get('n')
if not n_param:
continue
n_param = n_param[-1]
n_response = self._n_descramble(n_param, player_url, video_id)
n_response = self._cached(decrypt_nsig, n_param, player_url)
if n_response is None:
# give up if descrambling failed
break
for fmt_dct in traverse_obj(fmt, (None, (None, ('fragments', Ellipsis))), expected_type=dict):
fmt_dct['url'] = update_url(
fmt_dct['url'], query_update={'n': [n_response]})
fmt['url'] = update_url(
fmt['url'], query_update={'n': [n_response]})

# from yt-dlp, with tweaks
def _extract_signature_timestamp(self, video_id, player_url, ytcfg=None, fatal=False):
"""
Extract signatureTimestamp (sts)
Required to tell API what sig/player version is in use.
"""
sts = int_or_none(ytcfg.get('STS')) if isinstance(ytcfg, dict) else None
sts = traverse_obj(ytcfg, 'STS', expected_type=int)
if not sts:
# Attempt to extract from player
if player_url is None:
error_msg = 'Cannot extract signature timestamp without player_url.'
if fatal:
raise ExtractorError(error_msg)
self._downloader.report_warning(error_msg)
self.report_warning(error_msg)
return
code = self._get_player_code(video_id, player_url)
code = self._load_player(video_id, player_url, fatal=fatal)
sts = int_or_none(self._search_regex(
r'(?:signatureTimestamp|sts)\s*:\s*(?P<sts>[0-9]{5})', code or '',
'JS player signature timestamp', group='sts', fatal=fatal))
Expand Down Expand Up @@ -1986,6 +2008,9 @@ def feed_entry(name):
else:
self.to_screen('Downloading just video %s because of --no-playlist' % video_id)

if not player_url:
player_url = self._extract_player_url(webpage)

formats = []
itags = []
itag_qualities = {}
Expand Down Expand Up @@ -2055,6 +2080,7 @@ def build_fragments(f):
if mobj:
dct['ext'] = mimetype2ext(mobj.group(1))
dct.update(parse_codecs(mobj.group(2)))
self._unthrottle_format_urls(video_id, player_url, dct)
single_stream = 'none' in (dct.get(c) for c in ('acodec', 'vcodec'))
if single_stream and dct.get('ext'):
dct['container'] = dct['ext'] + '_dash'
Expand Down Expand Up @@ -2174,10 +2200,6 @@ def build_fragments(f):
uploader = self._extract_author_var(
webpage, 'name', videodetails=video_details, metadata=microformat)

if not player_url:
player_url = self._extract_player_url(webpage)
self._unthrottle_format_urls(video_id, player_url, formats)

info = {
'id': video_id,
'title': self._live_title(video_title) if is_live else video_title,
Expand Down

0 comments on commit 3dad3c4

Please sign in to comment.