-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsettings.py
executable file
·250 lines (212 loc) · 9.51 KB
/
settings.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
import os
import logging
from urllib.parse import urlparse
import json
import ast
from settings_schema import SETTINGS_SCHEMA
# Get config directory from environment variable with fallback
CONFIG_DIR = os.environ.get('USER_CONFIG', '/user/config')
# Update the path to use the environment variable
CONFIG_FILE = os.path.join(CONFIG_DIR, 'config.json')
def load_config():
#logging.debug("Starting load_config()")
if os.path.exists(CONFIG_FILE):
with open(CONFIG_FILE, 'r') as config_file:
try:
config = json.load(config_file)
#logging.debug(f"Raw loaded config: {json.dumps(config, indent=2)}")
# Parse string representations in Content Sources
if 'Content Sources' in config:
#logging.debug("Content Sources before parsing: %s", json.dumps(config['Content Sources'], indent=2))
for key, value in config['Content Sources'].items():
if isinstance(value, str):
try:
parsed_value = json.loads(value)
config['Content Sources'][key] = parsed_value
#logging.debug(f"Parsed value for {key}: {parsed_value}")
except json.JSONDecodeError:
# If it's not valid JSON, keep it as is
logging.debug(f"Keeping original string value for {key}: {value}")
#logging.debug("Content Sources after parsing: %s", json.dumps(config['Content Sources'], indent=2))
#logging.debug(f"Final loaded config: {json.dumps(config, indent=2)}")
return config
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON from {CONFIG_FILE}: {str(e)}. Using empty config.")
logging.debug("Config file not found or empty, returning empty dict")
return {}
def save_config(config):
# logging.debug("Starting save_config()")
#logging.debug(f"Config before saving: {json.dumps(config, indent=2)}")
# Ensure Content Sources are saved as proper JSON
if 'Content Sources' in config:
for key, value in config['Content Sources'].items():
if isinstance(value, str):
try:
# Try to parse it as JSON
json.loads(value)
except json.JSONDecodeError:
# If it's not valid JSON, convert it to a JSON string
config['Content Sources'][key] = json.dumps(value)
with open(CONFIG_FILE, 'w') as config_file:
json.dump(config, config_file, indent=2)
#logging.debug(f"Final saved config: {json.dumps(config, indent=2)}")
# Helper function to safely parse boolean values
def parse_bool(value):
if isinstance(value, bool):
return value
if isinstance(value, str):
return value.lower() in ('true', 'yes', '1', 'on')
return bool(value)
def get_setting(section, key=None, default=None):
config = load_config()
if section == 'Content Sources':
content_sources = config.get(section, {})
if not isinstance(content_sources, dict):
logging.warning(f"'Content Sources' setting is not a dictionary. Resetting to empty dict.")
content_sources = {}
return content_sources
if key is None:
return config.get(section, {})
value = config.get(section, {}).get(key, default)
# Handle boolean values
if isinstance(value, str) and value.lower() in ('true', 'false'):
return parse_bool(value)
# Validate URL if the key ends with 'url'
if key.lower().endswith('url'):
return validate_url(value)
return value
# Update the set_setting function to handle boolean values correctly
def set_setting(section, key, value):
config = load_config()
if section not in config:
config[section] = {}
if key.lower().endswith('url'):
value = validate_url(value)
# Convert boolean strings to actual booleans
if isinstance(value, str) and value.lower() in ('true', 'false'):
value = parse_bool(value)
config[section][key] = value
save_config(config)
def parse_string_dicts(obj):
if isinstance(obj, dict):
return {k: parse_string_dicts(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [parse_string_dicts(item) for item in obj]
elif isinstance(obj, str):
try:
return parse_string_dicts(ast.literal_eval(obj))
except (ValueError, SyntaxError):
return obj
else:
return obj
def deserialize_config(config):
if isinstance(config, dict):
return {k: deserialize_config(v) for k, v in config.items() if not k.isdigit()}
elif isinstance(config, list):
if config and isinstance(config[0], list) and len(config[0]) == 2:
# This is likely a preferred filter list
return [tuple(item) for item in config]
return [deserialize_config(item) for item in config]
else:
return config
def validate_url(url):
if not url:
logging.debug(f"Empty URL provided")
return ''
if not url.startswith(('http://', 'https://')):
url = f'http://{url}'
try:
result = urlparse(url)
if all([result.scheme, result.netloc]):
return url
else:
logging.warning(f"Invalid URL structure: {url}")
return ''
except Exception as e:
logging.error(f"Error parsing URL {url}: {str(e)}")
return ''
def get_all_settings():
config = load_config()
# Ensure 'Content Sources' is a dictionary
if 'Content Sources' in config:
if not isinstance(config['Content Sources'], dict):
logging.warning("'Content Sources' setting is not a dictionary. Resetting to empty dict.")
config['Content Sources'] = {}
else:
config['Content Sources'] = {}
return config
def get_scraping_settings():
config = load_config()
scraping_settings = {}
versions = config.get('Scraping', {}).get('versions', {})
for version, settings in versions.items():
for key, value in settings.items():
label = f"{version.capitalize()} - {key.replace('_', ' ').title()}"
scraping_settings[f"{version}_{key}"] = (label, value)
return scraping_settings
def get_jackett_settings():
config = load_config()
jackett_settings = {}
instances = config.get('Jackett', {})
for instance, settings in instances.items():
jackett_settings[f"{instance}"] = (settings)
return jackett_settings
def ensure_settings_file():
if not os.path.exists(CONFIG_FILE):
os.makedirs(os.path.dirname(CONFIG_FILE), exist_ok=True)
config = {}
is_new_file = True
else:
config = load_config()
is_new_file = not config # Check if the config is empty (existing but empty file)
for section, section_data in SETTINGS_SCHEMA.items():
if section not in config:
config[section] = {}
# Skip adding defaults for Scrapers, Content Sources, and Notifications
if section in ['Scrapers', 'Content Sources', 'Notifications']:
continue
if isinstance(section_data, dict) and 'schema' in section_data:
# Handle nested schemas
for key, value in section_data['schema'].items():
if key not in config[section]:
config[section][key] = value.get('default', {})
else:
for key, value in section_data.items():
if key != 'tab' and key not in config[section]:
config[section][key] = value.get('default', '')
# Ensure default scraping version only if there are no versions or it's a new file
if 'Scraping' not in config:
config['Scraping'] = {}
if 'versions' not in config['Scraping'] or not config['Scraping']['versions'] or is_new_file:
config['Scraping']['versions'] = {
'Default': {
'enable_hdr': False,
'max_resolution': '1080p',
'resolution_wanted': '<=',
'resolution_weight': '3',
'hdr_weight': '3',
'similarity_weight': '3',
'size_weight': '3',
'bitrate_weight': '3',
'preferred_filter_in': '',
'preferred_filter_out': '',
'filter_in': '',
'filter_out': '',
'min_size_gb': '0.01',
'max_size_gb': ''
}
}
# Ensure Debrid Provider is set to Torbox if not already set
if 'Debrid Provider' not in config:
config['Debrid Provider'] = {}
if 'provider' not in config['Debrid Provider'] or not config['Debrid Provider']['provider']:
config['Debrid Provider']['provider'] = 'RealDebrid'
if 'api_key' not in config['Debrid Provider']:
config['Debrid Provider']['api_key'] = 'demo_key' # Initialize with a demo key for testing
# Migrate RealDebrid API key if it exists
if 'RealDebrid' in config and 'api_key' in config['RealDebrid']:
if 'api_key' not in config['Debrid Provider'] or not config['Debrid Provider']['api_key']:
config['Debrid Provider']['api_key'] = config['RealDebrid']['api_key']
# Optionally set provider to RealDebrid since we found a key
config['Debrid Provider']['provider'] = 'RealDebrid'
save_config(config)