Skip to content

Commit

Permalink
Implement n-param descrambling using JSInterp
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkf committed Dec 17, 2021
1 parent c3b0045 commit 905d1d2
Showing 1 changed file with 46 additions and 280 deletions.
326 changes: 46 additions & 280 deletions youtube_dl/extractor/youtube.py
Original file line number Diff line number Diff line change
Expand Up @@ -1385,304 +1385,70 @@ def _extract_player_url(self, webpage):
'https://www.youtube.com', player_url)
return player_url

# Based on an equivalent function [2] in the youtube.lua script from VLC
# Many thanks to @linkfanel [3]
# NB This code could fail if YT should revise the player code and would then have
# to be reworked (thankless task previously undertaken at [1] and [2])
# from yt-dlp
# See also:
# 1. https://github.com/ytdl-org/youtube-dl/issues/29326#issuecomment-894619419
# 2. https://code.videolan.org/videolan/vlc/-/blob/4fb284e5af69aa9ac2100ccbdd3b88debec9987f/share/lua/playlist/youtube.lua#L116
# 3. https://github.com/ytdl-org/youtube-dl/issues/30097#issuecomment-950157377
def _n_descramble(self, n_param, js):
def _extract_n_function_name(self, jscode):
return self._search_regex(
(r'\.get\("n"\)\)&&\(b=(?P<nfunc>[a-zA-Z0-9$]{3})\([a-zA-Z0-9]\)',),
jscode, 'Initial JS player n function name', group='nfunc')

def _extract_n_function(self, video_id, player_url):
player_id = self._extract_player_info(player_url)
func_code = self._downloader.cache.load('youtube-nsig', player_id)

if func_code:
jsi = JSInterpreter(func_code)
else:
player_id = self._extract_player_info(player_url)
jscode = self._get_player_code(video_id, player_url, player_id)
funcname = self._extract_n_function_name(jscode)
jsi = JSInterpreter(jscode)
func_code = jsi.extract_function_code(funcname)
self._downloader.cache.store('youtube-nsig', player_id, func_code)

if self._downloader.params.get('youtube_print_sig_code'):
self.to_screen('Extracted nsig function from {0}:\n{1}\n'.format(player_id, func_code[1]))

return lambda s: jsi.extract_function_from_code(*func_code)([s])

def _n_descramble(self, n_param, player_url, video_id):
"""Compute the response to YT's "n" parameter challenge
Args:
n_param -- challenge string that is the value of the
URL's "n" query parameter
js -- text of the JS player code that includes the
challenge response algorithm
n_param -- challenge string that is the value of the
URL's "n" query parameter
player_url -- URL of YT player JS
video_id
"""
if not js:
return

# helper functions (part 1)
def isiterable(x):
try:
return x.__getitem__ and True
except AttributeError:
return False

def find_first(pattern, string, flags=0, groups=1):
pattern = re.compile(pattern, flags)
return next(map(lambda m: m.groups() if groups is True else m.group(groups),
pattern.finditer(string)),
(None, ) * pattern.groups if groups is True else None)

# Look for the descrambler function's name
# a.D&&(b=a.get("n"))&&(b=lha(b),a.set("n",b))}};
descrambler = find_first(r'[=(,&|](\w+)\(\w+\),\w+\.set\("n",', js)
if not descrambler:
self.report_warning("Couldn't extract YouTube video throttling parameter descrambling function name")
return
# Fetch the code of the descrambler function
# lha=function(a){var b=a.split(""),c=[310282131,"KLf3",b,null,function(d,e){d.push(e)},-45817231, [data and transformations...] ,1248130556];c[3]=c;c[15]=c;c[18]=c;try{c[40](c[14],c[2]),c[25](c[48]),c[21](c[32],c[23]), [scripted calls...] ,c[25](c[33],c[3])}catch(d){return"enhanced_except_4ZMBnuz-_w8_"+a}return b.join("")};
code = find_first(r'(?s)%s=function\([^)]+\)\{(.+?)\};' % (descrambler, ), js)
if not code:
self.report_warning("Couldn't extract YouTube video throttling parameter descrambling code")
return
# Split code into two main sections: 1/ data and transformations,
# and 2/ a script of calls
datac, xc, script = find_first(r'(?s)c=\[(.+)\];(.+?);try\{(.+)\}catch\(', code, groups=True)
if not datac or not script:
self.report_warning("Couldn't extract YouTube video throttling parameter descrambling rules")
return
if xc:
# ad hoc update of c[] array after declaration (player 2dfe380c, ...)
xc = [int(x) for x in re.findall(r'c\[(\d+)\]=c\b', xc)]
if not xc:
self.report_warning("Couldn't extract YouTube video throttling parameter additional rules")
else:
xc = []

# Split "n" parameter into a table as descrambling operates on it
# as one of several arrays - in Python just copy it as a list
n = list(n_param)
# Helper: table_len = function() ... end - in Python just use len

# Common routine shared by the compound transformations,
# compounding the "n" parameter with an input string,
# character by character using a Base64 alphabet.
# d.forEach(function(l,m,n){this.push(n[m]=h[(h.indexOf(l)-h.indexOf(this[m])+m-32+f--)%h.length])},e.split(""))
def compound(ntab, strg, alphabet, charcode):
if ntab != n or type(strg) != compat_str:
return True
inp = list(strg)
llen = len(alphabet)
ntab_copy = ntab[:]
for i, c in enumerate(ntab_copy):
if type(c) != compat_str:
return True
pos1 = alphabet.find(c)
pos2 = alphabet.find(inp[i])
if pos1 < 0 or pos2 < 0:
return True
pos = (pos1 - pos2 + charcode - 32) % llen
newc = alphabet[pos]
ntab[i] = newc
inp.append(newc)

# The data section contains among others function code for a number
# of transformations, most of which are basic array operations.
# We can match these functions' code to identify them, and emulate
# the corresponding transformations.

# helper fns (in-place)
def swap(s, i, j):
x = s[i]
s[i] = s[j]
s[j] = x

def rotate(s, i):
tmp = s[:]
tmp[i:] = s
tmp[:i] = tmp[len(s):]
s[:] = tmp[:len(s)]

def remove(s, i):
del s[i]

b64_alphabets = [
"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ-_",
"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789-_",
]
sig_id = ('nsig_value', n_param)
if sig_id in self._player_cache:
return self._player_cache[sig_id]

# Compounding functions use a subfunction, so we need to be
# more specific in how much parsed data we consume.
cp_skip = r'(?s)^.*?\},e\.split\(""\)\)},\s*(.*)$'
def_skip = r"(?s)^.*?\},\s*(.*)$"

trans = (
# fn_name, fn, fn_detect_pattern, skip_re
('reverse', lambda tab, _: tab.reverse(),
# noqa: E127
# function(d){d.reverse()}
# function(d){for(var e=d.length;e;)d.push(d.splice(--e,1)[0])}
r"^function\(d\)",
def_skip),
('append', lambda tab, val: tab.append(val),
# noqa: E127
# function(d,e){d.push(e)}
r"^function\(d,e\){d\.push\(e\)\},",
def_skip),
('remove', lambda tab, i: remove(tab, i % len(tab)) if type(i) == int else True,
# noqa: E127
# function(d,e){e=(e%d.length+d.length)%d.length;d.splice(e,1)}
r"^[^}]+?;d\.splice\(e,1\)\},",
def_skip),
('swap', lambda tab, i: swap(tab, 0, i % len(tab)) if type(i) == int else True,
# noqa: E127
# function(d,e){e=(e%d.length+d.length)%d.length;var f=d[0];d[0]=d[e];d[e]=f}
# function(d,e){e=(e%d.length+d.length)%d.length;d.splice(0,1,d.splice(e,1,d[0])[0])}
r"^[^}]+?;(?:var\sf=d\[0\];d\[0\]=d\[e\];d\[e\]=f|d\.splice\(0,1,d\.splice\(e,1,d\[0\]\)\[0\]\))\},",
def_skip),
('rotate', lambda tab, shift: rotate(tab, shift % len(tab)) if type(shift) == int else True,
# noqa: E127
# function(d,e){for(e=(e%d.length+d.length)%d.length;e--;)d.unshift(d.pop())}
# function(d,e){e=(e%d.length+d.length)%d.length;d.splice(-e).reverse().forEach(function(f){d.unshift(f)})}
r"^[^}]+?d\.unshift\((?:d\.pop\(\)|f\)\})\)},",
def_skip),
('alphabet1', lambda: b64_alphabets[0],
# noqa: E127
# function(){for(var d=64,e=[];++d-e.length-32;){switch(d){case 91:d=44;continue;case 123:d=65;break;case 65:d-=18;continue;case 58:d=96;continue;case 46:d=95}e.push(String.fromCharCode(d))}return e}
r"^[^}]+?case\s58:d=96;.+?\}return",
def_skip),
('alphabet2', lambda: b64_alphabets[1],
# noqa: E127
# function(){for(var d=64,e=[];++d-e.length-32;)switch(d){case 46:d=95;default:e.push(String.fromCharCode(d));case 94:case 95:case 96:break;case 123:d-=76;case 92:case 93:continue;case 58:d=44;case 91:}return e}
# function(){for(var d=64,e=[];++d-e.length-32;){switch(d){case 58:d-=14;case 91:case 92:case 93:continue;case 123:d=47;case 94:case 95:case 96:continue;case 46:d=95}e.push(String.fromCharCode(d))}return e}
r"^[^}]+?case\s58:d(?:=44;|-=14;[^}]+\})[^}]+\}return",
def_skip),
('compound', lambda tab, s, alphabet: compound(tab, s, alphabet, 96),
# noqa: E127
# function(d,e,f){var h=f.length;d.forEach(function(l,m,n){this.push(n[m]=f[(f.indexOf(l)-f.indexOf(this[m])+m+h--)%f.length])},e.split(""))}
r"^function\(\w,\w,\w\)",
cp_skip),

# Compound transformations first build a variation of a
# Base64 alphabet, then in a common section, compound the
# "n" parameter with an input string, character by character.
('compound1', lambda tab, s: compound(tab, s, b64_alphabets[0], 96),
# noqa: E127
# function(d,e){for(var f=64,h=[];++f-h.length-32;)switch(f){case 58:f=96;continue;case 91:f=44;break;case 65:f=47;continue;case 46:f=153;case 123:f-=58;default:h.push(String.fromCharCode(f))} [ compound... ] }
r"^[^}]+?case\s58:f=96;",
cp_skip),
('compound2', lambda tab, s: compound(tab, s, b64_alphabets[1], 96),
# noqa: E127
# function(d,e){for(var f=64,h=[];++f-h.length-32;){switch(f){case 58:f-=14;case 91:case 92:case 93:continue;case 123:f=47;case 94:case 95:case 96:continue;case 46:f=95}h.push(String.fromCharCode(f))} [ compound... ] }
# function(d,e){for(var f=64,h=[];++f-h.length-32;)switch(f){case 46:f=95;default:h.push(String.fromCharCode(f));case 94:case 95:case 96:break;case 123:f-=76;case 92:case 93:continue;case 58:f=44;case 91:} [ compound... ] }
r"^[^}]+?case\s58:f(?:-=14|=44);",
cp_skip),
# Fallback
('unid', lambda _, __: self.report_warning("Couldn't apply unidentified YouTube video throttling parameter transformation, aborting descrambling") or True,
# noqa: E127
None,
def_skip),
)
# The data section actually mixes input data, reference to the
# "n" parameter array, and self-reference to its own array, with
# transformation functions used to modify itself. We parse it
# as such into a table.
data = []
datac += ","
while datac:
# Transformation functions
if re.match(r"^function\(", datac):
name, el, _, skip = next(
itertools.dropwhile(
lambda x: x[2] is not None and not re.match(x[2], datac), trans))
datac = find_first(skip, datac)
# String input data
elif re.match(r'^"[^"]*",', datac):
el, datac = find_first(r'(?s)^"([^"]*)",\s*(.*)$', datac, groups=True)
# Integer input data
elif re.match(r'^-?\d+(?:E\d+)?,', datac):
el, datac = find_first(r"(?s)^(.*?),\s*(.*)$", datac, groups=True)
el = int(float(el) if 'E' in el else el)
# Reference to "n" parameter array
elif re.match('^b,', datac):
el = n
datac = find_first(r"(?s)^b,\s*(.*)$", datac)
# Replaced by self-reference to data array after its declaration
elif re.match('^null,', datac):
el = data
datac = find_first(r"(?s)^null,\s*(.*)$", datac)
else:
self.report_warning("Couldn't parse unidentified YouTube video throttling parameter descrambling data"
'\nNear: "%s"' % datac[:64])
el = False
# Lua tables can't contain nil values: Python can, but still use False
datac = find_first(r"(?s)^[^,]*?,\s*(.*)$", datac)
data.append(el)

# Additional rules
for idx in xc:
data[idx] = data

# Debugging helper to print data array elements
def prd(el, tab=None):
if not el:
return "???"
elif el == n:
return "n"
elif el == data:
return "data"
elif type(el) == compat_str:
return '"%s"' % (el, )
elif type(el) == int:
if isiterable(tab):
return "%d -> %d" % (el, el % len(tab), )
return "%d" % (el, )
else:
for tr in trans:
if el == tr[1]:
return tr[0]
return repr(el)

# The script section contains a series of calls to elements of
# the data section array onto other elements of it: calls to
# transformations, with a reference to the data array itself or
# the "n" parameter array as first argument, and often input data
# as a second argument. We parse and emulate those calls to follow
# the descrambling script.
# c[40](c[14],c[2]),c[25](c[48]),c[21](c[32],c[23]), [...]
for ifunc, itab, iarg, iextra in map(lambda m: m.groups(),
# c[m](c[mm]{,c[mmm]{,c[mmmm]}})
re.finditer(
r"c\[(\d+)\]\(c\[(\d+)\](?:,\s*c\[(\d+)\])?(?:,\s*c\[(\d+)\]\(\))?[^)]*?\)",
script)):
tab = data[int(itab)]
arg = iarg and data[int(iarg)]
if iextra:
alphabet = data[int(iextra)]()
func = lambda t, s: data[int(ifunc)](t, s, alphabet)
else:
func = data[int(ifunc)]

# Uncomment to debug transformation chain
# nprev = ''.join(n)
# dprev = ' '.join(map(prd, data))
# print(''.join(('"n" parameter transformation: ', prd(func), "(", prd(tab), (", " + prd(arg, tab)) if arg else '', ") ", ifunc, "(", itab, (", " + iarg) if iarg else "", ")")))
if not callable(func) or not isiterable(tab) or func(tab, arg):
self.report_warning("Invalid data type encountered during YouTube video throttling parameter descrambling transformation chain, aborting"
"\nCouldn't descramble YouTube throttling URL parameter: data transfer will be throttled")
self.report_warning("Couldn't process youtube video URL, please check for updates to this script")
break
# Uncomment to debug transformation chain
# nnew = ''.join(n)
# if nprev != nnew:
# print('from: ' + nprev + "\nto: " + nnew)
# dnew = ' '.join(map(prd, data))
# if dprev != dnew:
# print('from: ' + dprev + "\nto: " + dnew)
return ''.join(n)
try:
player_id = ('nsig', player_url)
if player_id not in self._player_cache:
self._player_cache[player_id] = self._extract_n_function(video_id, player_url)
func = self._player_cache[player_id]
self._player_cache[sig_id] = func(n_param)
if self._downloader.params.get('verbose', False):
self._downloader.to_screen('[debug] [%s] %s' % (self.IE_NAME, 'Decrypted nsig {0} => {1}'.format(n_param, self._player_cache[sig_id])))
return self._player_cache[sig_id]
except Exception as e:
raise ExtractorError(traceback.format_exc(), cause=e, video_id=video_id)

def _unthrottle_format_urls(self, video_id, player_url, formats):
if not player_url:
return
player_id = self._extract_player_info(player_url)
code = self._get_player_code(video_id, player_url, player_id)
n_cache = {}
for fmt in formats:
parsed_fmt_url = compat_urlparse.urlparse(fmt['url'])
qs = compat_urlparse.parse_qs(parsed_fmt_url.query)
n_param = qs.get('n')
if not n_param:
continue
n_param = n_param[-1]
n_response = n_cache.get(n_param)
if not n_response:
n_response = self._n_descramble(n_param, code)
if n_response:
n_cache[n_param] = n_response
n_response = self._n_descramble(n_param, player_url, video_id)
if n_response:
qs['n'] = [n_response]
fmt['url'] = compat_urlparse.urlunparse(
Expand Down

0 comments on commit 905d1d2

Please sign in to comment.