-
Notifications
You must be signed in to change notification settings - Fork 6
/
dbti.py
2094 lines (2094 loc) · 166 KB
/
dbti.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
'The interface for interacting with dbt-core.\n\nWe package the interface as a single python module that can be imported\nand used in other python projects. You do not need to include dbt-core-interface\nas a dependency in your project if you do not want to. You can simply copy\nthe dbt_core_interface folder into your project and import it from there.\n'
_AV='primary_key'
_AU='Running diff'
_AT='Project header `X-dbt-Project` was not supplied but is required for this endpoint'
_AS='dbt-interface-server'
_AR='bind_addr'
_AQ='Key has type %r (not a string)'
_AP='%s is read-only.'
_AO='Last-Modified'
_AN='Content-Range'
_AM='This request is not connected to a route.'
_AL='text/html; charset=UTF-8'
_AK='route.url_args'
_AJ='bottle.route'
_AI='bottle.app'
_AH='app_reset'
_AG='before_request'
_AF='Read-Only property.'
_AE='single_threaded'
_AD='AdapterResponse'
_AC='compiled_code'
_AB='timestamp'
_AA='localhost'
_A9='keyfile'
_A8='certfile'
_A7='Do not use this API directly.'
_A6='bottle.request.ext.%s'
_A5='HTTP_'
_A4='127.0.0.1'
_A3='CONTENT_LENGTH'
_A2='application/json'
_A1='bottle.exc_info'
_A0='ignore'
_z='after_request'
_y='config'
_x='PROXY'
_w='strict'
_v='wsgiref'
_u='temporary'
_t='sql'
_s='create_table_as'
_r='create_schema'
_q='anonymous_node'
_p='threads'
_o='_rebase'
_n='error'
_m='rb'
_l='\r'
_k='CONTENT_TYPE'
_j='wsgi.errors'
_i='target'
_h='HEAD'
_g='REQUEST_METHOD'
_f='default'
_e='{}'
_d='ManifestNode'
_c='registered_projects'
_b='Project is not registered. Make a POST request to the /register endpoint first to register a runner'
_a='BOTTLE_CHILD'
_Z='wsgi.input'
_Y='QUERY_STRING'
_X='SCRIPT_NAME'
_W='catchall'
_V='utf-8'
_U='__main__'
_T='POST'
_S='GET'
_R='relation'
_Q='X-dbt-Project'
_P='close'
_O=':'
_N='Content-Type'
_M='PATH_INFO'
_L='?'
_K='latin1'
_J='Content-Length'
_I='name'
_H='.'
_G='\n'
_F='environ'
_E='utf8'
_D='/'
_C=False
_B=True
_A=None
if 1:import dbt.adapters.factory;dbt.adapters.factory.get_adapter=lambda config:config.adapter
import _thread as thread,base64,calendar,cgi,configparser,decimal,email.utils,functools,hashlib,hmac,http.client as httplib,itertools,json,logging,mimetypes,os,pickle,re,sys,tempfile,threading,time,uuid,warnings,weakref
from collections import OrderedDict,UserDict
from collections.abc import MutableMapping as DictMixin
from contextlib import contextmanager,redirect_stdout
from copy import copy
from dataclasses import asdict,dataclass,field
from datetime import date as datedate
from datetime import datetime,timedelta
from enum import Enum
from functools import lru_cache,wraps
from http.cookies import CookieError,Morsel,SimpleCookie
from inspect import getfullargspec
from io import BytesIO
from json import dumps as json_dumps
from json import loads as json_lds
from pathlib import Path
from tempfile import NamedTemporaryFile
from traceback import format_exc,print_exc
from types import FunctionType
from types import ModuleType as new_module
from typing import TYPE_CHECKING,Any,Callable,Dict,Generator,List,Optional,Tuple,Type,TypeVar,Union,cast
from unicodedata import normalize
from urllib.parse import SplitResult as UrlSplitResult
from urllib.parse import quote as urlquote
from urllib.parse import unquote as urlunquote
from urllib.parse import urlencode,urljoin
import dbt.version
from dbt.adapters.factory import get_adapter_class_by_name
from dbt.clients.system import make_directory
from dbt.config.runtime import RuntimeConfig
from dbt.node_types import NodeType
from dbt.parser.manifest import PARTIAL_PARSE_FILE_NAME,ManifestLoader,process_node
from dbt.parser.sql import SqlBlockParser,SqlMacroParser
from dbt.task.sql import SqlCompileRunner
from dbt.tracking import disable_tracking
from dbt.flags import set_from_args
try:from dbt.contracts.graph.parsed import ColumnInfo;from dbt.contracts.graph.compiled import ManifestNode
except Exception:from dbt.contracts.graph.nodes import ColumnInfo,ManifestNode
if TYPE_CHECKING:from agate import Table;from dbt.adapters.base import BaseAdapter,BaseRelation;from dbt.contracts.connection import AdapterResponse;from dbt.contracts.results import ExecutionResult,RunExecutionResult;from dbt.semver import VersionSpecifier;from dbt.task.runnable import ManifestTask
disable_tracking()
urlunquote=functools.partial(urlunquote,encoding=_K)
__dbt_major_version__=int(dbt.version.installed.major or 0)
__dbt_minor_version__=int(dbt.version.installed.minor or 0)
__dbt_patch_version__=int(dbt.version.installed.patch or 0)
if(__dbt_major_version__,__dbt_minor_version__,__dbt_patch_version__)>(1,3,0):RAW_CODE='raw_code';COMPILED_CODE=_AC
else:RAW_CODE='raw_code';COMPILED_CODE=_AC
if(__dbt_major_version__,__dbt_minor_version__,__dbt_patch_version__)<(1,5,0):import dbt.events.functions;dbt.events.functions.fire_event=lambda *args,**kwargs:_A
def default_project_dir():
'Get the default project directory.';A='DBT_PROJECT_DIR'
if A in os.environ:return Path(os.environ[A]).resolve()
paths=list(Path.cwd().parents);paths.insert(0,Path.cwd());return next((x for x in paths if (x/'dbt_project.yml').exists()),Path.cwd())
def default_profiles_dir():
'Get the default profiles directory.';A='DBT_PROFILES_DIR'
if A in os.environ:return Path(os.environ[A]).resolve()
return Path.cwd()if (Path.cwd()/'profiles.yml').exists()else Path.home()/'.dbt'
DEFAULT_PROFILES_DIR=str(default_profiles_dir())
DEFAULT_PROJECT_DIR=str(default_project_dir())
def write_manifest_for_partial_parse(self):
'Monkey patch for dbt manifest loader.';path=os.path.join(self.root_project.project_root,self.root_project.target_path,PARTIAL_PARSE_FILE_NAME)
try:
if self.manifest.metadata.dbt_version!=dbt.version.__version__:self.manifest.metadata.dbt_version=dbt.version.__version__
manifest_msgpack=self.manifest.to_msgpack();make_directory(os.path.dirname(path))
with open(path,'wb')as fp:fp.write(manifest_msgpack)
except Exception:raise
if(__dbt_major_version__,__dbt_minor_version__)<(1,4):ManifestLoader.write_manifest_for_partial_parse=write_manifest_for_partial_parse
__all__=['DbtProject','DbtProjectContainer','DbtAdapterExecutionResult','DbtAdapterCompilationResult','DbtManifestProxy','DbtConfiguration','__dbt_major_version__','__dbt_minor_version__','__dbt_patch_version__','DEFAULT_PROFILES_DIR','DEFAULT_PROJECT_DIR','ServerRunResult','ServerCompileResult','ServerResetResult','ServerRegisterResult','ServerUnregisterResult','ServerErrorCode','ServerError','ServerErrorContainer','ServerPlugin','run_server','default_project_dir','default_profiles_dir','ColumnInfo',_d]
T=TypeVar('T')
JINJA_CONTROL_SEQUENCES=['{{','}}','{%','%}','{#','#}']
LOGGER=logging.getLogger(__name__)
__version__=dbt.version.__version__
class DbtCommand(str,Enum):'The dbt commands we support.';RUN='run';BUILD='build';TEST='test';SEED='seed';RUN_OPERATION='run-operation';LIST='list';SNAPSHOT='snapshot'
@dataclass
class DbtConfiguration:
'The configuration for dbt-core.';project_dir:str=DEFAULT_PROJECT_DIR;profiles_dir:str=DEFAULT_PROFILES_DIR;target:Optional[str]=_A;threads:int=1;single_threaded:bool=_B;_vars:str=_e;quiet:bool=_B;use_experimental_parser:bool=_C;static_parser:bool=_C;partial_parse:bool=_C;dependencies:List[str]=field(default_factory=list)
def __post_init__(self):
'Post init hook to set single_threaded and remove target if not provided.'
if self.target is _A:del self.target
self.single_threaded=self.threads==1
@property
def profile(self):'Access the profiles_dir attribute as a string.';return _A
@property
def vars(self):'Access the vars attribute as a string.';return self._vars
@vars.setter
def vars(self,v):
'Set the vars attribute as a string or dict.\n\n If dict then it will be converted to a string which is what dbt expects.\n '
if(__dbt_major_version__,__dbt_minor_version__)>=(1,5):
if isinstance(v,str):v=json.loads(v)
elif isinstance(v,dict):v=json.dumps(v)
self._vars=v
class DbtManifestProxy(UserDict):
'Proxy for manifest dictionary object.\n\n If we need mutation then we should create a copy of the dict or interface with the dbt-core manifest object instead.\n '
def _readonly(self,*args,**kwargs):raise RuntimeError('Cannot modify DbtManifestProxy')
__setitem__=_readonly;__delitem__=_readonly;pop=_readonly;popitem=_readonly;clear=_readonly;update=_readonly;setdefault=_readonly
@dataclass
class DbtAdapterExecutionResult:'Interface for execution results.\n\n This keeps us 1 layer removed from dbt interfaces which may change.\n ';adapter_response:_AD;table:'Table';raw_code:str;compiled_code:str
@dataclass
class DbtAdapterCompilationResult:'Interface for compilation results.\n\n This keeps us 1 layer removed from dbt interfaces which may change.\n ';raw_code:str;compiled_code:str;node:_d;injected_code:Optional[str]=_A
class DbtTaskConfiguration:
'A container for task configuration with sane defaults.\n\n Users should enforce an interface for their tasks via a factory method that returns an instance of this class.\n '
def __init__(self,profile,target,**kwargs):'Initialize the task configuration.';self.profile=profile;self.target=target;self.kwargs=kwargs or{};self.threads=kwargs.get(_p,1);self.single_threaded=kwargs.get(_AE,self.threads==1);self.state_id=kwargs.get('state_id');self.version_check=kwargs.get('version_check',_C);self.resource_types=kwargs.get('resource_types');self.models=kwargs.get('models');self.select=kwargs.get('select');self.exclude=kwargs.get('exclude');self.selector_name=kwargs.get('selector_name');self.state=kwargs.get('state');self.defer=kwargs.get('defer',_C);self.fail_fast=kwargs.get('fail_fast',_C);self.full_refresh=kwargs.get('full_refresh',_C);self.store_failures=kwargs.get('store_failures',_C);self.indirect_selection=kwargs.get('indirect_selection','eager');self.data=kwargs.get('data',_C);self.schema=kwargs.get('schema',_C);self.show=kwargs.get('show',_C);self.output=kwargs.get('output',_I);self.output_keys=kwargs.get('output_keys');self.macro=kwargs.get('macro');self.args=kwargs.get('args',_e);self.quiet=kwargs.get('quiet',_B)
def __getattribute__(self,__name):'"Force all attribute access to be lower case.';return object.__getattribute__(self,__name.lower())
def __getattr__(self,name):'Get an attribute from the kwargs if it does not exist on the class.\n\n This is useful for passing through arbitrary arguments to dbt while still\n being able to manage some semblance of a sane interface with defaults.\n ';return self.kwargs.get(name)
@classmethod
def from_runtime_config(cls,config,**kwargs):"Create a task configuration container from a DbtProject's runtime config.\n\n This is a good example of where static typing is not necessary. Developers can just\n pass in whatever they want and it will be passed through to the task configuration container.\n Users of the library are free to pass in any mapping derived from their own implementation for\n their own custom task.\n ";threads=kwargs.pop(_p,config.threads);kwargs.pop(_AE,_A);return cls(config.profile_name,config.target_name,threads=threads,single_threaded=threads==1,**kwargs)
class DbtProject:
'Container for a dbt project.\n\n The dbt attribute is the primary interface for dbt-core. The adapter attribute is the primary interface for the dbt adapter.\n ';ADAPTER_TTL=3600
def __init__(self,target=_A,profiles_dir=DEFAULT_PROFILES_DIR,project_dir=DEFAULT_PROJECT_DIR,threads=1,vars=_e):'Initialize the DbtProject.';self.base_config=DbtConfiguration(threads=threads,target=target,profiles_dir=profiles_dir or DEFAULT_PROFILES_DIR,project_dir=project_dir or DEFAULT_PROJECT_DIR);self.base_config.vars=vars;self.adapter_mutex=threading.Lock();self.parsing_mutex=threading.Lock();self.manifest_mutation_mutex=threading.Lock();self.parse_project(init=_B);self._sql_parser=_A;self._macro_parser=_A
@classmethod
def from_config(cls,config):'Instatiate the DbtProject directly from a DbtConfiguration instance.';return cls(target=config.target,profiles_dir=config.profiles_dir,project_dir=config.project_dir,threads=config.threads)
def get_adapter_cls(self):'Get the adapter class associated with the dbt profile.';return get_adapter_class_by_name(self.config.credentials.type)
def initialize_adapter(self):
'Initialize a dbt adapter.'
if hasattr(self,'_adapter'):
try:self._adapter.connections.cleanup_all()
except Exception as e:LOGGER.debug(f"Failed to cleanup adapter connections: {e}")
self.adapter=self.get_adapter_cls()(self.config)
@property
def adapter(self):
'dbt-core adapter with TTL and automatic reinstantiation.\n\n This supports long running processes that may have their connection to the database terminated by\n the database server. It is transparent to the user.\n '
if time.time()-self._adapter_created_at>self.ADAPTER_TTL:self.initialize_adapter()
return self._adapter
@adapter.setter
def adapter(self,adapter):
'Verify connection and reset TTL on adapter set, update adapter prop ref on config.'
if self.adapter_mutex.acquire(blocking=_C):
try:self._adapter=adapter;self._adapter.connections.set_connection_name();self._adapter_created_at=time.time();self.config.adapter=self.adapter
finally:self.adapter_mutex.release()
def parse_project(self,init=_C):
'Parse project on disk.\n\n Uses the config from `DbtConfiguration` in args attribute, verifies connection to adapters database,\n mutates config, adapter, and dbt attributes. Thread-safe. From an efficiency perspective, this is a\n relatively expensive operation, so we want to avoid doing it more than necessary.\n '
with self.parsing_mutex:
if init:set_from_args(self.base_config,self.base_config);self.config=RuntimeConfig.from_args(self.base_config);self.initialize_adapter()
_project_parser=ManifestLoader(self.config,self.config.load_dependencies(),self.adapter.connections.set_query_header);self.manifest=_project_parser.load();self.manifest.build_flat_graph();_project_parser.save_macros_to_adapter(self.adapter);self._sql_parser=_A;self._macro_parser=_A
def safe_parse_project(self,reinit=_C):
'Safe version of parse_project that will not mutate the config if parsing fails.'
if reinit:self.clear_internal_caches()
_config_pointer=copy(self.config)
try:self.parse_project(init=reinit)
except Exception as parse_error:self.config=_config_pointer;raise parse_error
self.write_manifest_artifact()
def _verify_connection(self,adapter):
'Verification for adapter + profile.\n\n Used as a passthrough, this also seeds the master connection.\n '
try:adapter.connections.set_connection_name();adapter.debug_query()
except Exception as query_exc:raise RuntimeError('Could not connect to Database') from query_exc
else:return adapter
def adapter_probe(self):
'Check adapter connection, useful for long running procsesses.'
if not hasattr(self,'adapter')or self.adapter is _A:return _C
try:
with self.adapter.connection_named('osmosis-heartbeat'):self.adapter.debug_query()
except Exception:return _C
return _B
def fn_threaded_conn(self,fn,*args,**kwargs):
'For jobs which are intended to be submitted to a thread pool.'
@wraps(fn)
def _with_conn():self.adapter.connections.set_connection_name();return fn(*(args),**kwargs)
return _with_conn
def generate_runtime_model_context(self,node):'Wrap dbt context provider.';from dbt.context.providers import generate_runtime_model_context;return generate_runtime_model_context(node,self.config,self.manifest)
@property
def project_name(self):'dbt project name.';return self.config.project_name
@property
def project_root(self):'dbt project root.';return self.config.project_root
@property
def manifest_dict(self):'dbt manifest dict.';return DbtManifestProxy(self.manifest.flat_graph)
def write_manifest_artifact(self):'Write a manifest.json to disk.\n\n Because our project is in memory, this is useful for integrating with other tools that\n expect a manifest.json to be present in the target directory.\n ';artifact_path=os.path.join(self.config.project_root,self.config.target_path,'manifest.json');self.manifest.write(artifact_path)
def clear_internal_caches(self):'Clear least recently used caches and reinstantiable container objects.';self.compile_code.cache_clear();self.unsafe_compile_code.cache_clear()
def get_ref_node(self,target_model_name):'Get a `ManifestNode` from a dbt project model name.\n\n This is the same as one would in a {{ ref(...) }} macro call.\n ';return cast(_d,self.manifest.resolve_ref(target_model_name=target_model_name,target_model_package=_A,current_project=self.config.project_name,node_package=self.config.project_name))
def get_source_node(self,target_source_name,target_table_name):'Get a `ManifestNode` from a dbt project source name and table name.\n\n This is the same as one would in a {{ source(...) }} macro call.\n ';return cast(_d,self.manifest.resolve_source(target_source_name=target_source_name,target_table_name=target_table_name,current_project=self.config.project_name,node_package=self.config.project_name))
def get_node_by_path(self,path):
'Find an existing node given relative file path.\n\n TODO: We can include Path obj support and make this more robust.\n '
for node in self.manifest.nodes.values():
if node.original_file_path==path:return node
return _A
@contextmanager
def generate_server_node(self,sql,node_name=_q):
'Get a transient node for SQL execution against adapter.\n\n This is a context manager that will clear the node after execution and leverages a mutex during manifest mutation.\n '
with self.manifest_mutation_mutex:self._clear_node(node_name);sql_node=self.sql_parser.parse_remote(sql,node_name);process_node(self.config,self.manifest,sql_node);yield sql_node;self._clear_node(node_name)
def unsafe_generate_server_node(self,sql,node_name=_q):'Get a transient node for SQL execution against adapter.\n\n This is faster than `generate_server_node` but does not clear the node after execution.\n That is left to the caller. It is also not thread safe in and of itself and requires the caller to\n manage jitter or mutexes.\n ';self._clear_node(node_name);sql_node=self.sql_parser.parse_remote(sql,node_name);process_node(self.config,self.manifest,sql_node);return sql_node
def inject_macro(self,macro_contents):
'Inject a macro into the project.\n\n This is useful for testing macros in isolation. It offers unique ways to integrate with dbt.\n ';macro_overrides={}
for node in self.macro_parser.parse_remote(macro_contents):macro_overrides[node.unique_id]=node
self.manifest.macros.update(macro_overrides)
def get_macro_function(self,macro_name):
'Get macro as a function which behaves like a Python function.'
def _macro_fn(**kwargs):return self.adapter.execute_macro(macro_name,self.manifest,kwargs=kwargs)
return _macro_fn
def execute_macro(self,macro,**kwargs):'Wrap adapter execute_macro. Execute a macro like a python function.';return self.get_macro_function(macro)(**kwargs)
def adapter_execute(self,sql,auto_begin=_C,fetch=_C):'Wrap adapter.execute. Execute SQL against database.\n\n This is more on-the-rails than `execute_code` which intelligently handles jinja compilation provides a proxy result.\n ';return cast(Tuple[_AD,'Table'],self.adapter.execute(sql,auto_begin,fetch))
def execute_code(self,raw_code):
'Execute dbt SQL statement against database.\n\n This is a proxy for `adapter_execute` and the the recommended method for executing SQL against the database.\n ';compiled_code=str(raw_code)
if has_jinja(raw_code):compiled_code=self.compile_code(raw_code).compiled_code
return DbtAdapterExecutionResult(*self.adapter_execute(compiled_code,fetch=_B),raw_code,compiled_code)
def execute_from_node(self,node):
'Execute dbt SQL statement against database from a "ManifestNode".';raw_code=getattr(node,RAW_CODE);compiled_code=getattr(node,COMPILED_CODE,_A)
if compiled_code:return self.execute_code(compiled_code)
if has_jinja(raw_code):compiled_code=self.compile_from_node(node).compiled_code
return self.execute_code(compiled_code or raw_code)
@lru_cache(maxsize=100)
def compile_code(self,raw_code):
'Create a node with `generate_server_node` method. Compile generated node.\n\n Has a retry built in because even uuidv4 cannot gaurantee uniqueness at the speed\n in which we can call this function concurrently. A retry significantly increases the stability.\n ';temp_node_id=str(uuid.uuid4())
with self.generate_server_node(raw_code,temp_node_id)as node:return self.compile_from_node(node)
@lru_cache(maxsize=100)
def unsafe_compile_code(self,raw_code,retry=3):
'Create a node with `unsafe_generate_server_node` method. Compiles the generated node.\n\n Has a retry built in because even uuid4 cannot gaurantee uniqueness at the speed\n in which we can call this function concurrently. A retry significantly increases the\n stability. This is certainly the fastest way to compile SQL but it is yet to be benchmarked.\n ';temp_node_id=str(uuid.uuid4())
try:node=self.compile_from_node(self.unsafe_generate_server_node(raw_code,temp_node_id))
except Exception as compilation_error:
if retry>0:return self.compile_code(raw_code,retry-1)
raise compilation_error
else:return node
finally:self._clear_node(temp_node_id)
def compile_from_node(self,node):'Compiles existing node. ALL compilation passes through this code path.\n\n Raw SQL is marshalled by the caller into a mock node before being passed into this method.\n Existing nodes can be passed in here directly.\n ';compiled_node=SqlCompileRunner(self.config,self.adapter,node=node,node_index=1,num_nodes=1).compile(self.manifest);return DbtAdapterCompilationResult(getattr(compiled_node,RAW_CODE),getattr(compiled_node,COMPILED_CODE),compiled_node)
def _clear_node(self,name=_q):'Clear remote node from dbt project.';self.manifest.nodes.pop(f"{NodeType.SqlOperation}.{self.project_name}.{name}",_A)
def get_relation(self,database,schema,name):'Wrap for `adapter.get_relation`.';return self.adapter.get_relation(database,schema,name)
def relation_exists(self,database,schema,name):'Interface for checking if a relation exists in the database.';return self.adapter.get_relation(database,schema,name)is not _A
def node_exists(self,node):'Interface for checking if a node exists in the database.';return self.adapter.get_relation(self.create_relation_from_node(node))is not _A
def create_relation(self,database,schema,name):'Wrap `adapter.Relation.create`.';return self.adapter.Relation.create(database,schema,name)
def create_relation_from_node(self,node):'Wrap `adapter.Relation.create_from`.';return self.adapter.Relation.create_from(self.config,node)
def get_or_create_relation(self,database,schema,name):'Get relation or create if not exists.\n\n Returns tuple of relation and boolean result of whether it existed ie: (relation, did_exist).\n ';ref=self.get_relation(database,schema,name);return(ref,_B)if ref is not _A else(self.create_relation(database,schema,name),_C)
def create_schema(self,node):"Create a schema in the database leveraging dbt-core's builtin macro.";self.execute_macro(_r,kwargs={_R:self.create_relation_from_node(node)})
def get_columns_in_node(self,node):'Wrap `adapter.get_columns_in_relation`.';return self.adapter.get_columns_in_relation(self.create_relation_from_node(node)),
def get_columns(self,node):
'Get a list of columns from a compiled node.';columns=[]
try:columns.extend([c.name for c in self.get_columns_in_node(node)])
except Exception:original_sql=str(getattr(node,RAW_CODE));setattr(node,RAW_CODE,f"SELECT * FROM ({original_sql}) WHERE 1=0");result=self.execute_from_node(node);setattr(node,RAW_CODE,original_sql),delattr(node,COMPILED_CODE);node.compiled=_C;columns.extend(result.table.column_names)
return columns
def materialize(self,node,temporary=_B):'Materialize a table in the database.\n\n TODO: This is not fully baked. The API is stable but the implementation is not.\n ';return self.adapter_execute(self.execute_macro(_s,kwargs={_t:getattr(node,COMPILED_CODE),_R:self.create_relation_from_node(node),_u:temporary}),auto_begin=_B)
@property
def sql_parser(self):
'A dbt-core SQL parser capable of parsing and adding nodes to the manifest via `parse_remote` which will also return the added node to the caller.\n\n Note that post-parsing this still typically requires calls to `_process_nodes_for_ref`\n and `_process_sources_for_ref` from the `dbt.parser.manifest` module in order to compile.\n We have higher level methods that handle this for you.\n '
if self._sql_parser is _A:self._sql_parser=SqlBlockParser(self.config,self.manifest,self.config)
return self._sql_parser
@property
def macro_parser(self):
'A dbt-core macro parser. Parse macros with `parse_remote` and add them to the manifest.\n\n We have a higher level method `inject_macro` that handles this for you.\n '
if self._macro_parser is _A:self._macro_parser=SqlMacroParser(self.config,self.manifest)
return self._macro_parser
def get_task_config(self,**kwargs):'Get a dbt-core task configuration.';threads=kwargs.pop(_p,self.config.threads);return DbtTaskConfiguration.from_runtime_config(config=self.config,threads=threads,**kwargs)
def get_task_cls(self,typ):'Get a dbt-core task class by type.\n\n This could be overridden to add custom tasks such as linting, etc.\n so long as they are subclasses of `GraphRunnableTask`.\n ';from dbt.task.build import BuildTask;from dbt.task.list import ListTask;from dbt.task.run import RunTask;from dbt.task.run_operation import RunOperationTask;from dbt.task.seed import SeedTask;from dbt.task.snapshot import SnapshotTask;from dbt.task.test import TestTask;return{DbtCommand.RUN:RunTask,DbtCommand.BUILD:BuildTask,DbtCommand.TEST:TestTask,DbtCommand.SEED:SeedTask,DbtCommand.LIST:ListTask,DbtCommand.SNAPSHOT:SnapshotTask,DbtCommand.RUN_OPERATION:RunOperationTask}[typ]
def get_task(self,typ,args):
'Get a dbt-core task by type.'
if(__dbt_major_version__,__dbt_minor_version__)<(1,5):task=self.get_task_cls(typ)(args,self.config);task.load_manifest=lambda *args,**kwargs:_A;task.manifest=self.manifest
else:task=self.get_task_cls(typ)(args,self.config,self.manifest)
return task
def list(self,select=_A,exclude=_A,**kwargs):
'List resources in the dbt project.';select,exclude=marshall_selection_args(select,exclude)
with redirect_stdout(_A):return self.get_task(DbtCommand.LIST,self.get_task_config(select=select,exclude=exclude,**kwargs)).run()
def run(self,select=_A,exclude=_A,**kwargs):
'Run models in the dbt project.';select,exclude=marshall_selection_args(select,exclude)
with redirect_stdout(_A):return cast('RunExecutionResult',self.get_task(DbtCommand.RUN,self.get_task_config(select=select,exclude=exclude,**kwargs)).run())
def test(self,select=_A,exclude=_A,**kwargs):
'Test models in the dbt project.';select,exclude=marshall_selection_args(select,exclude)
with redirect_stdout(_A):return self.get_task(DbtCommand.TEST,self.get_task_config(select=select,exclude=exclude,**kwargs)).run()
def build(self,select=_A,exclude=_A,**kwargs):
'Build resources in the dbt project.';select,exclude=marshall_selection_args(select,exclude)
with redirect_stdout(_A):return self.get_task(DbtCommand.BUILD,self.get_task_config(select=select,exclude=exclude,**kwargs)).run()
def marshall_selection_args(select=_A,exclude=_A):
'Marshall selection arguments to a list of strings.'
if select is _A:select=[]
if exclude is _A:exclude=[]
if isinstance(select,(tuple,set,frozenset)):select=list(select)
if isinstance(exclude,(tuple,set,frozenset)):exclude=list(exclude)
if not isinstance(select,list):select=[select]
if not isinstance(exclude,list):exclude=[exclude]
return select,exclude
class DbtProjectContainer:
'Manages multiple DbtProjects.\n\n A DbtProject corresponds to a single project. This interface is used\n dbt projects in a single process. It enables basic multitenant servers.\n '
def __init__(self):'Initialize the container.';self._projects=OrderedDict();self._default_project=_A
def get_project(self,project_name):'Primary interface to get a project and execute code.';return self._projects.get(project_name)
def get_project_by_root_dir(self,root_dir):
'Get a project by its root directory.';root_dir=os.path.abspath(os.path.normpath(root_dir))
for project in self._projects.values():
if os.path.abspath(project.project_root)==root_dir:return project
return _A
def get_default_project(self):
'Get the default project which is the earliest project inserted into the container.';default_project=self._default_project
if not default_project:return _A
return self._projects.get(default_project)
def add_project(self,target=_A,profiles_dir=DEFAULT_PROFILES_DIR,project_dir=_A,threads=1,vars=_e,name_override=''):
'Add a DbtProject with arguments.';project=DbtProject(target,profiles_dir,project_dir,threads,vars);project_name=name_override or project.config.project_name
if self._default_project is _A:self._default_project=project_name
self._projects[project_name]=project;return project
def add_parsed_project(self,project):
'Add an already instantiated DbtProject.';self._projects.setdefault(project.config.project_name,project)
if self._default_project is _A:self._default_project=project.config.project_name
return project
def add_project_from_args(self,config):'Add a DbtProject from a DbtConfiguration.';project=DbtProject.from_config(config);self._projects.setdefault(project.config.project_name,project);return project
def drop_project(self,project_name):
'Drop a DbtProject.';project=self.get_project(project_name)
if project is _A:return
project.clear_internal_caches();project.adapter.connections.cleanup_all();self._projects.pop(project_name)
if self._default_project==project_name:
if len(self)>0:self._default_project=list(self._projects.keys())[0]
else:self._default_project=_A
def drop_all_projects(self):
"Drop all DbtProject's in the container.";self._default_project=_A
for project in self._projects:self.drop_project(project)
def reparse_all_projects(self):
'Reparse all projects.'
for project in self:project.safe_parse_project()
def registered_projects(self):'Grab all registered project names.';return list(self._projects.keys())
def __len__(self):'Allow len(DbtProjectContainer).';return len(self._projects)
def __getitem__(self,project):
"Allow DbtProjectContainer['jaffle_shop'].";maybe_project=self.get_project(project)
if maybe_project is _A:raise KeyError(project)
return maybe_project
def __setitem__(self,name,project):
"Allow DbtProjectContainer['jaffle_shop'] = DbtProject."
if self._default_project is _A:self._default_project=name
self._projects[name]=project
def __delitem__(self,project):"Allow del DbtProjectContainer['jaffle_shop'].";self.drop_project(project)
def __iter__(self):
'Allow project for project in DbtProjectContainer.'
for project in self._projects:
maybe_project=self.get_project(project)
if maybe_project is _A:continue
yield maybe_project
def __contains__(self,project):"Allow 'jaffle_shop' in DbtProjectContainer.";return project in self._projects
def __repr__(self):'Canonical string representation of DbtProjectContainer instance.';return _G.join(f"Project: {project.project_name}, Dir: {project.project_root}"for project in self)
def has_jinja(query):'Check if a query contains any Jinja control sequences.';return any(seq in query for seq in JINJA_CONTROL_SEQUENCES)
def semvar_to_tuple(semvar):'Convert a semvar to a tuple of ints.';return int(semvar.major or 0),int(semvar.minor or 0),int(semvar.patch or 0)
def _cli_parse(args):B='append';A='store_true';from argparse import ArgumentParser;parser=ArgumentParser(prog=args[0],usage='%(prog)s [options] package.module:app');opt=parser.add_argument;opt('--version',action=A,help='show version number.');opt('-b','--bind',metavar='ADDRESS',help='bind socket to ADDRESS.');opt('-s','--server',default=_v,help='use SERVER as backend.');opt('-p','--plugin',action=B,help='install additional plugin/s.');opt('-c','--conf',action=B,metavar='FILE',help='load config values from FILE.');opt('-C','--param',action=B,metavar='NAME=VALUE',help='override config values.');opt('--debug',action=A,help='start server in debug mode.');opt('--reload',action=A,help='auto-reload on file changes.');opt('app',help='WSGI app entry point.',nargs=_L);cli_args=parser.parse_args(args[1:]);return cli_args,parser
def _cli_patch(cli_args):
parsed_args,_=_cli_parse(cli_args);opts=parsed_args
if opts.server:
if opts.server.startswith('gevent'):import gevent.monkey;gevent.monkey.patch_all()
elif opts.server.startswith('eventlet'):import eventlet;eventlet.monkey_patch()
py=sys.version_info
py3k=py.major>2
def getargspec(func):spec=getfullargspec(func);kwargs=makelist(spec[0])+makelist(spec.kwonlyargs);return kwargs,spec[1],spec[2],spec[3]
basestring=str
unicode=str
json_loads=lambda s:json_lds(touni(s))
callable=lambda x:hasattr(x,'__call__')
imap=map
def _raise(*a):raise a[0](a[1]).with_traceback(a[2])
def tob(s,enc=_E):
if isinstance(s,unicode):return s.encode(enc)
return b''if s is _A else bytes(s)
def touni(s,enc=_E,err=_w):
if isinstance(s,bytes):return s.decode(enc,err)
return unicode(''if s is _A else s)
tonat=touni if py3k else tob
def _stderr(*args):
try:print(*(args),file=sys.stderr)
except (OSError,AttributeError):pass
def update_wrapper(wrapper,wrapped,*a,**ka):
try:functools.update_wrapper(wrapper,wrapped,*(a),**ka)
except AttributeError:pass
def depr(major,minor,cause,fix):
text='Warning: Use of deprecated feature or API. (Deprecated in Bottle-%d.%d)\nCause: %s\nFix: %s\n'%(major,minor,cause,fix)
if DEBUG==_w:raise DeprecationWarning(text)
warnings.warn(text,DeprecationWarning,stacklevel=3);return DeprecationWarning(text)
def makelist(data):
if isinstance(data,(tuple,list,set,dict)):return list(data)
elif data:return[data]
else:return[]
class DictProperty:
'Property that maps to a key in a local dict-like attribute.'
def __init__(self,attr,key=_A,read_only=_C):self.attr,self.key,self.read_only=attr,key,read_only
def __call__(self,func):functools.update_wrapper(self,func,updated=[]);self.getter,self.key=func,self.key or func.__name__;return self
def __get__(self,obj,cls):
if obj is _A:return self
key,storage=self.key,getattr(obj,self.attr)
if key not in storage:storage[key]=self.getter(obj)
return storage[key]
def __set__(self,obj,value):
if self.read_only:raise AttributeError(_AF)
getattr(obj,self.attr)[self.key]=value
def __delete__(self,obj):
if self.read_only:raise AttributeError(_AF)
del getattr(obj,self.attr)[self.key]
class cached_property:
'A property that is only computed once per instance and then replaces\n itself with an ordinary attribute. Deleting the attribute resets the\n property.'
def __init__(self,func):update_wrapper(self,func);self.func=func
def __get__(self,obj,cls):
if obj is _A:return self
value=obj.__dict__[self.func.__name__]=self.func(obj);return value
class lazy_attribute:
'A property that caches itself to the class object.'
def __init__(self,func):functools.update_wrapper(self,func,updated=[]);self.getter=func
def __get__(self,obj,cls):value=self.getter(cls);setattr(cls,self.__name__,value);return value
class BottleException(Exception):'A base class for exceptions used by bottle.'
class RouteError(BottleException):'This is a base class for all routing related exceptions'
class RouteReset(BottleException):'If raised by a plugin or request handler, the route is reset and all\n plugins are re-applied.'
class RouterUnknownModeError(RouteError):0
class RouteSyntaxError(RouteError):'The route parser found something not supported by this router.'
class RouteBuildError(RouteError):'The route could not be built.'
def _re_flatten(p):
'Turn all capturing groups in a regular expression pattern into\n non-capturing groups.'
if'('not in p:return p
return re.sub('(\\\\*)(\\(\\?P<[^>]+>|\\((?!\\?))',lambda m:m.group(0)if len(m.group(1))%2 else m.group(1)+'(?:',p)
class Router:
'A Router is an ordered collection of route->target pairs. It is used to\n efficiently match WSGI requests against a number of routes and return\n the first target that satisfies the request. The target may be anything,\n usually a string, ID or callable object. A route consists of a path-rule\n and a HTTP method.\n\n The path-rule is either a static path (e.g. `/contact`) or a dynamic\n path that contains wildcards (e.g. `/wiki/<page>`). The wildcard syntax\n and details on the matching order are described in docs:`routing`.\n ';default_pattern='[^/]+';default_filter='re';_MAX_GROUPS_PER_PATTERN=99
def __init__(self,strict=_C):self.rules=[];self._groups={};self.builder={};self.static={};self.dyna_routes={};self.dyna_regexes={};self.strict_order=strict;self.filters={'re':lambda conf:(_re_flatten(conf or self.default_pattern),_A,_A),'int':lambda conf:('-?\\d+',int,lambda x:str(int(x))),'float':lambda conf:('-?[\\d.]+',float,lambda x:str(float(x))),'path':lambda conf:('.+?',_A,_A)}
def add_filter(self,name,func):'Add a filter. The provided function is called with the configuration\n string as parameter and must return a (regexp, to_python, to_url) tuple.\n The first element is a string, the last two are callables or None.';self.filters[name]=func
rule_syntax=re.compile('(\\\\*)(?:(?::([a-zA-Z_][a-zA-Z_0-9]*)?()(?:#(.*?)#)?)|(?:<([a-zA-Z_][a-zA-Z_0-9]*)?(?::([a-zA-Z_]*)(?::((?:\\\\.|[^\\\\>])+)?)?)?>))')
def _itertokens(self,rule):
offset,prefix=0,''
for match in self.rule_syntax.finditer(rule):
prefix+=rule[offset:match.start()];g=match.groups()
if g[2]is not _A:depr(0,13,'Use of old route syntax.','Use <name> instead of :name in routes.')
if len(g[0])%2:prefix+=match.group(0)[len(g[0]):];offset=match.end();continue
if prefix:yield(prefix,_A,_A)
name,filtr,conf=g[4:7]if g[2]is _A else g[1:4];yield(name,filtr or _f,conf or _A);offset,prefix=match.end(),''
if offset<=len(rule)or prefix:yield(prefix+rule[offset:],_A,_A)
def add(self,rule,method,target,name=_A):
'Add a new rule or replace the target for an existing rule.';anons=0;keys=[];pattern='';filters=[];builder=[];is_static=_B
for (key,mode,conf) in self._itertokens(rule):
if mode:
is_static=_C
if mode==_f:mode=self.default_filter
mask,in_filter,out_filter=self.filters[mode](conf)
if not key:pattern+='(?:%s)'%mask;key='anon%d'%anons;anons+=1
else:pattern+=f"(?P<{key}>{mask})";keys.append(key)
if in_filter:filters.append((key,in_filter))
builder.append((key,out_filter or str))
elif key:pattern+=re.escape(key);builder.append((_A,key))
self.builder[rule]=builder
if name:self.builder[name]=builder
if is_static and not self.strict_order:self.static.setdefault(method,{});self.static[method][self.build(rule)]=target,_A;return
try:re_pattern=re.compile('^(%s)$'%pattern);re_match=re_pattern.match
except re.error as e:raise RouteSyntaxError(f"Could not add Route: {rule} ({e})")
if filters:
def getargs(path):
url_args=re_match(path).groupdict()
for (name,wildcard_filter) in filters:
try:url_args[name]=wildcard_filter(url_args[name])
except ValueError:raise HTTPError(400,'Path has wrong format.')
return url_args
elif re_pattern.groupindex:
def getargs(path):return re_match(path).groupdict()
else:getargs=_A
flatpat=_re_flatten(pattern);whole_rule=rule,flatpat,target,getargs
if(flatpat,method)in self._groups:
if DEBUG:msg='Route <%s %s> overwrites a previously defined route';warnings.warn(msg%(method,rule),RuntimeWarning)
self.dyna_routes[method][self._groups[flatpat,method]]=whole_rule
else:self.dyna_routes.setdefault(method,[]).append(whole_rule);self._groups[flatpat,method]=len(self.dyna_routes[method])-1
self._compile(method)
def _compile(self,method):
all_rules=self.dyna_routes[method];comborules=self.dyna_regexes[method]=[];maxgroups=self._MAX_GROUPS_PER_PATTERN
for x in range(0,len(all_rules),maxgroups):some=all_rules[x:x+maxgroups];combined=(flatpat for(_,flatpat,_,_)in some);combined='|'.join('(^%s$)'%flatpat for flatpat in combined);combined=re.compile(combined).match;rules=[(target,getargs)for(_,_,target,getargs)in some];comborules.append((combined,rules))
def build(self,_name,*anons,**query):
'Build an URL by filling the wildcards in a rule.';builder=self.builder.get(_name)
if not builder:raise RouteBuildError('No route with that name.',_name)
try:
for (i,value) in enumerate(anons):query['anon%d'%i]=value
url=''.join([f(query.pop(n))if n else f for(n,f)in builder]);return url if not query else url+_L+urlencode(query)
except KeyError as E:raise RouteBuildError('Missing URL argument: %r'%E.args[0])
def match(self,environ):
'Return a (target, url_args) tuple or raise HTTPError(400/404/405).';A='ANY';verb=environ[_g].upper();path=environ[_M]or _D;methods=(_x,_h,_S,A)if verb==_h else(_x,verb,A)
for method in methods:
if method in self.static and path in self.static[method]:target,getargs=self.static[method][path];return target,getargs(path)if getargs else{}
elif method in self.dyna_regexes:
for (combined,rules) in self.dyna_regexes[method]:
match=combined(path)
if match:target,getargs=rules[match.lastindex-1];return target,getargs(path)if getargs else{}
allowed=set();nocheck=set(methods)
for method in set(self.static)-nocheck:
if path in self.static[method]:allowed.add(method)
for method in set(self.dyna_regexes)-allowed-nocheck:
for (combined,rules) in self.dyna_regexes[method]:
match=combined(path)
if match:allowed.add(method)
if allowed:allow_header=','.join(sorted(allowed));raise HTTPError(405,'Method not allowed.',Allow=allow_header)
raise HTTPError(404,'Not found: '+repr(path))
class Route:
'This class wraps a route callback along with route specific metadata and\n configuration and applies Plugins on demand. It is also responsible for\n turning an URL path rule into a regular expression usable by the Router.\n '
def __init__(self,app,rule,method,callback,name=_A,plugins=_A,skiplist=_A,**config):self.app=app;self.rule=rule;self.method=method;self.callback=callback;self.name=name or _A;self.plugins=plugins or[];self.skiplist=skiplist or[];self.config=app.config._make_overlay();self.config.load_dict(config)
@cached_property
def call(self):'The route callback with all plugins applied. This property is\n created on demand and then cached to speed up subsequent requests.';return self._make_callback()
def reset(self):'Forget any cached values. The next time :attr:`call` is accessed,\n all plugins are re-applied.';self.__dict__.pop('call',_A)
def prepare(self):'Do all on-demand work immediately (useful for debugging).';self.call
def all_plugins(self):
'Yield all Plugins affecting this route.';unique=set()
for p in reversed(self.app.plugins+self.plugins):
if _B in self.skiplist:break
name=getattr(p,_I,_C)
if name and(name in self.skiplist or name in unique):continue
if p in self.skiplist or type(p)in self.skiplist:continue
if name:unique.add(name)
yield p
def _make_callback(self):
callback=self.callback
for plugin in self.all_plugins():
try:
if hasattr(plugin,'apply'):callback=plugin.apply(callback,self)
else:callback=plugin(callback)
except RouteReset:return self._make_callback()
if callback is not self.callback:update_wrapper(callback,self.callback)
return callback
def get_undecorated_callback(self):
'Return the callback. If the callback is a decorated function, try to\n recover the original function.';func=self.callback;func=getattr(func,'__func__'if py3k else'im_func',func);closure_attr='__closure__'if py3k else'func_closure'
while hasattr(func,closure_attr)and getattr(func,closure_attr):
attributes=getattr(func,closure_attr);func=attributes[0].cell_contents
if not isinstance(func,FunctionType):func=filter(lambda x:isinstance(x,FunctionType),map(lambda x:x.cell_contents,attributes));func=list(func)[0]
return func
def get_callback_args(self):'Return a list of argument names the callback (most likely) accepts\n as keyword arguments. If the callback is a decorated function, try\n to recover the original function before inspection.';return getargspec(self.get_undecorated_callback())[0]
def get_config(self,key,default=_A):'Lookup a config field and return its value, first checking the\n route.config, then route.app.config.';depr(0,13,'Route.get_config() is deprecated.','The Route.config property already includes values from the application config for missing keys. Access it directly.');return self.config.get(key,default)
def __repr__(self):cb=self.get_undecorated_callback();return f"<{self.method} {self.rule} -> {cb.__module__}:{cb.__name__}>"
class Bottle:
'Each Bottle object represents a single, distinct web application and\n consists of routes, callbacks, plugins, resources and configuration.\n Instances are callable WSGI applications.\n\n :param catchall: If true (default), handle all exceptions. Turn off to\n let debugging middleware handle exceptions.\n '
@lazy_attribute
def _global_config(cls):cfg=ConfigDict();cfg.meta_set(_W,'validate',bool);return cfg
def __init__(self,**kwargs):
self.config=self._global_config._make_overlay();self.config._add_change_listener(functools.partial(self.trigger_hook,_y));self.config.update({_W:_B})
if kwargs.get(_W)is _C:depr(0,13,'Bottle(catchall) keyword argument.',"The 'catchall' setting is now part of the app configuration. Fix: `app.config['catchall'] = False`");self.config[_W]=_C
if kwargs.get('autojson')is _C:depr(0,13,'Bottle(autojson) keyword argument.',"The 'autojson' setting is now part of the app configuration. Fix: `app.config['json.enable'] = False`");self.config['json.disable']=_B
self._mounts=[];self.resources=ResourceManager();self.routes=[];self.router=Router();self.error_handler={};self.plugins=[];self.install(JSONPlugin());self.install(TemplatePlugin())
catchall=DictProperty(_y,_W);__hook_names=_AG,_z,_AH,_y;__hook_reversed={_z}
@cached_property
def _hooks(self):return{name:[]for name in self.__hook_names}
def add_hook(self,name,func):
'Attach a callback to a hook. Three hooks are currently implemented:\n\n before_request\n Executed once before each request. The request context is\n available, but no routing has happened yet.\n after_request\n Executed once after each request regardless of its outcome.\n app_reset\n Called whenever :meth:`Bottle.reset` is called.\n '
if name in self.__hook_reversed:self._hooks[name].insert(0,func)
else:self._hooks[name].append(func)
def remove_hook(self,name,func):
'Remove a callback from a hook.'
if name in self._hooks and func in self._hooks[name]:self._hooks[name].remove(func);return _B
def trigger_hook(self,__name,*args,**kwargs):'Trigger a hook and return a list of results.';return[hook(*(args),**kwargs)for hook in self._hooks[__name][:]]
def hook(self,name):
'Return a decorator that attaches a callback to a hook. See\n :meth:`add_hook` for details.'
def decorator(func):self.add_hook(name,func);return func
return decorator
def _mount_wsgi(self,prefix,app,**options):
segments=[p for p in prefix.split(_D)if p]
if not segments:raise ValueError('WSGI applications cannot be mounted to "/".')
path_depth=len(segments)
def mountpoint_wrapper():
try:
request.path_shift(path_depth);rs=HTTPResponse([])
def start_response(status,headerlist,exc_info=_A):
if exc_info:_raise(*(exc_info))
if py3k:status=status.encode(_K).decode(_E);headerlist=[(k,v.encode(_K).decode(_E))for(k,v)in headerlist]
rs.status=status
for (name,value) in headerlist:rs.add_header(name,value)
return rs.body.append
body=app(request.environ,start_response);rs.body=itertools.chain(rs.body,body)if rs.body else body;return rs
finally:request.path_shift(-path_depth)
options.setdefault('skip',_B);options.setdefault('method',_x);options.setdefault('mountpoint',{'prefix':prefix,_i:app});options['callback']=mountpoint_wrapper;self.route('/%s/<:re:.*>'%_D.join(segments),**options)
if not prefix.endswith(_D):self.route(_D+_D.join(segments),**options)
def _mount_app(self,prefix,app,**options):
A='_mount.app'
if app in self._mounts or A in app.config:depr(0,13,'Application mounted multiple times. Falling back to WSGI mount.','Clone application before mounting to a different location.');return self._mount_wsgi(prefix,app,**options)
if options:depr(0,13,'Unsupported mount options. Falling back to WSGI mount.','Do not specify any route options when mounting bottle application.');return self._mount_wsgi(prefix,app,**options)
if not prefix.endswith(_D):depr(0,13,"Prefix must end in '/'. Falling back to WSGI mount.","Consider adding an explicit redirect from '/prefix' to '/prefix/' in the parent application.");return self._mount_wsgi(prefix,app,**options)
self._mounts.append(app);app.config['_mount.prefix']=prefix;app.config[A]=self
for route in app.routes:route.rule=prefix+route.rule.lstrip(_D);self.add_route(route)
def mount(self,prefix,app,**options):
"Mount an application (:class:`Bottle` or plain WSGI) to a specific\n URL prefix. Example::\n\n parent_app.mount('/prefix/', child_app)\n\n :param prefix: path prefix or `mount-point`.\n :param app: an instance of :class:`Bottle` or a WSGI application.\n\n Plugins from the parent application are not applied to the routes\n of the mounted child application. If you need plugins in the child\n application, install them separately.\n\n While it is possible to use path wildcards within the prefix path\n (:class:`Bottle` childs only), it is highly discouraged.\n\n The prefix path must end with a slash. If you want to access the\n root of the child application via `/prefix` in addition to\n `/prefix/`, consider adding a route with a 307 redirect to the\n parent application.\n "
if not prefix.startswith(_D):raise ValueError("Prefix must start with '/'")
if isinstance(app,Bottle):return self._mount_app(prefix,app,**options)
else:return self._mount_wsgi(prefix,app,**options)
def merge(self,routes):
"Merge the routes of another :class:`Bottle` application or a list of\n :class:`Route` objects into this application. The routes keep their\n 'owner', meaning that the :data:`Route.app` attribute is not\n changed."
if isinstance(routes,Bottle):routes=routes.routes
for route in routes:self.add_route(route)
def install(self,plugin):
'Add a plugin to the list of plugins and prepare it for being\n applied to all routes of this application. A plugin may be a simple\n decorator or an object that implements the :class:`Plugin` API.\n '
if hasattr(plugin,'setup'):plugin.setup(self)
if not callable(plugin)and not hasattr(plugin,'apply'):raise TypeError('Plugins must be callable or implement .apply()')
self.plugins.append(plugin);self.reset();return plugin
def uninstall(self,plugin):
'Uninstall plugins. Pass an instance to remove a specific plugin, a type\n object to remove all plugins that match that type, a string to remove\n all plugins with a matching ``name`` attribute or ``True`` to remove all\n plugins. Return the list of removed plugins.';removed,remove=[],plugin
for (i,plugin) in list(enumerate(self.plugins))[::-1]:
if remove is _B or remove is plugin or remove is type(plugin)or getattr(plugin,_I,_B)==remove:
removed.append(plugin);del self.plugins[i]
if hasattr(plugin,_P):plugin.close()
if removed:self.reset()
return removed
def reset(self,route=_A):
'Reset all routes (force plugins to be re-applied) and clear all\n caches. If an ID or route object is given, only that specific route\n is affected.'
if route is _A:routes=self.routes
elif isinstance(route,Route):routes=[route]
else:routes=[self.routes[route]]
for route in routes:route.reset()
if DEBUG:
for route in routes:route.prepare()
self.trigger_hook(_AH)
def close(self):
'Close the application and all installed plugins.'
for plugin in self.plugins:
if hasattr(plugin,_P):plugin.close()
def run(self,**kwargs):'Calls :func:`run` with the same parameters.';run(self,**kwargs)
def match(self,environ):'Search for a matching route and return a (:class:`Route`, urlargs)\n tuple. The second value is a dictionary with parameters extracted\n from the URL. Raise :exc:`HTTPError` (404/405) on a non-match.';return self.router.match(environ)
def get_url(self,routename,**kargs):'Return a string that matches a named route';scriptname=request.environ.get(_X,'').strip(_D)+_D;location=self.router.build(routename,**kargs).lstrip(_D);return urljoin(urljoin(_D,scriptname),location)
def add_route(self,route):
'Add a route object, but do not change the :data:`Route.app`\n attribute.';self.routes.append(route);self.router.add(route.rule,route.method,route,name=route.name)
if DEBUG:route.prepare()
def route(self,path=_A,method=_S,callback=_A,name=_A,apply=_A,skip=_A,**config):
"A decorator to bind a function to a request URL. Example::\n\n @app.route('/hello/<name>')\n def hello(name):\n return 'Hello %s' % name\n\n The ``<name>`` part is a wildcard. See :class:`Router` for syntax\n details.\n\n :param path: Request path or a list of paths to listen to. If no\n path is specified, it is automatically generated from the\n signature of the function.\n :param method: HTTP method (`GET`, `POST`, `PUT`, ...) or a list of\n methods to listen to. (default: `GET`)\n :param callback: An optional shortcut to avoid the decorator\n syntax. ``route(..., callback=func)`` equals ``route(...)(func)``\n :param name: The name for this route. (default: None)\n :param apply: A decorator or plugin or a list of plugins. These are\n applied to the route callback in addition to installed plugins.\n :param skip: A list of plugins, plugin classes or names. Matching\n plugins are not installed to this route. ``True`` skips all.\n\n Any additional keyword arguments are stored as route-specific\n configuration and passed to plugins (see :meth:`Plugin.apply`).\n "
if callable(path):path,callback=_A,path
plugins=makelist(apply);skiplist=makelist(skip)
def decorator(callback):
if isinstance(callback,basestring):callback=load(callback)
for rule in makelist(path)or yieldroutes(callback):
for verb in makelist(method):verb=verb.upper();route=Route(self,rule,verb,callback,name=name,plugins=plugins,skiplist=skiplist,**config);self.add_route(route)
return callback
return decorator(callback)if callback else decorator
def get(self,path=_A,method=_S,**options):'Equals :meth:`route`.';return self.route(path,method,**options)
def post(self,path=_A,method=_T,**options):'Equals :meth:`route` with a ``POST`` method parameter.';return self.route(path,method,**options)
def put(self,path=_A,method='PUT',**options):'Equals :meth:`route` with a ``PUT`` method parameter.';return self.route(path,method,**options)
def delete(self,path=_A,method='DELETE',**options):'Equals :meth:`route` with a ``DELETE`` method parameter.';return self.route(path,method,**options)
def patch(self,path=_A,method='PATCH',**options):'Equals :meth:`route` with a ``PATCH`` method parameter.';return self.route(path,method,**options)
def error(self,code=500,callback=_A):
"Register an output handler for a HTTP error code. Can\n be used as a decorator or called directly ::\n\n def error_handler_500(error):\n return 'error_handler_500'\n\n app.error(code=500, callback=error_handler_500)\n\n @app.error(404)\n def error_handler_404(error):\n return 'error_handler_404'\n\n "
def decorator(callback):
if isinstance(callback,basestring):callback=load(callback)
self.error_handler[int(code)]=callback;return callback
return decorator(callback)if callback else decorator
def default_error_handler(self,res):return tob(template(ERROR_PAGE_TEMPLATE,e=res,template_settings=dict(name='__ERROR_PAGE_TEMPLATE')))
def _handle(self,environ):
path=environ['bottle.raw_path']=environ[_M]
if py3k:environ[_M]=path.encode(_K).decode(_E,_A0)
environ[_AI]=self;request.bind(environ);response.bind()
try:
while _B:
out=_A
try:self.trigger_hook(_AG);route,args=self.router.match(environ);environ['route.handle']=route;environ[_AJ]=route;environ[_AK]=args;out=route.call(**args);break
except HTTPResponse as E:out=E;break
except RouteReset:depr(0,13,'RouteReset exception deprecated','Call route.call() after route.reset() and return the result.');route.reset();continue
finally:
if isinstance(out,HTTPResponse):out.apply(response)
try:self.trigger_hook(_z)
except HTTPResponse as E:out=E;out.apply(response)
except (KeyboardInterrupt,SystemExit,MemoryError):raise
except Exception as E:
if not self.catchall:raise
stacktrace=format_exc();environ[_j].write(stacktrace);environ[_j].flush();environ[_A1]=sys.exc_info();out=HTTPError(500,'Internal Server Error',E,stacktrace);out.apply(response)
return out
def _cast(self,out,peek=_A):
'Try to convert the parameter into something WSGI compatible and set\n correct HTTP headers when possible.\n Support: False, str, unicode, dict, HTTPResponse, HTTPError, file-like,\n iterable of strings and iterable of unicodes\n ';A='wsgi.file_wrapper'
if not out:
if _J not in response:response[_J]=0
return[]
if isinstance(out,(tuple,list))and isinstance(out[0],(bytes,unicode)):out=out[0][0:0].join(out)
if isinstance(out,unicode):out=out.encode(response.charset)
if isinstance(out,bytes):
if _J not in response:response[_J]=len(out)
return[out]
if isinstance(out,HTTPError):out.apply(response);out=self.error_handler.get(out.status_code,self.default_error_handler)(out);return self._cast(out)
if isinstance(out,HTTPResponse):out.apply(response);return self._cast(out.body)
if hasattr(out,'read'):
if A in request.environ:return request.environ[A](out)
elif hasattr(out,_P)or not hasattr(out,'__iter__'):return WSGIFileWrapper(out)
try:
iout=iter(out);first=next(iout)
while not first:first=next(iout)
except StopIteration:return self._cast('')
except HTTPResponse as E:first=E
except (KeyboardInterrupt,SystemExit,MemoryError):raise
except Exception as error:
if not self.catchall:raise
first=HTTPError(500,'Unhandled exception',error,format_exc())
if isinstance(first,HTTPResponse):return self._cast(first)
elif isinstance(first,bytes):new_iter=itertools.chain([first],iout)
elif isinstance(first,unicode):encoder=lambda x:x.encode(response.charset);new_iter=imap(encoder,itertools.chain([first],iout))
else:msg='Unsupported response type: %s'%type(first);return self._cast(HTTPError(500,msg))
if hasattr(out,_P):new_iter=_closeiter(new_iter,out.close)
return new_iter
def wsgi(self,environ,start_response):
'The bottle WSGI-interface.'
try:
out=self._cast(self._handle(environ))
if response._status_code in(100,101,204,304)or environ[_g]==_h:
if hasattr(out,_P):out.close()
out=[]
exc_info=environ.get(_A1)
if exc_info is not _A:del environ[_A1]
start_response(response._wsgi_status_line(),response.headerlist,exc_info);return out
except (KeyboardInterrupt,SystemExit,MemoryError):raise
except Exception as E:
if not self.catchall:raise
err='<h1>Critical error while processing request: %s</h1>'%html_escape(environ.get(_M,_D))
if DEBUG:err+='<h2>Error:</h2>\n<pre>\n%s\n</pre>\n<h2>Traceback:</h2>\n<pre>\n%s\n</pre>\n'%(html_escape(repr(E)),html_escape(format_exc()))
environ[_j].write(err);environ[_j].flush();headers=[(_N,_AL)];start_response('500 INTERNAL SERVER ERROR',headers,sys.exc_info());return[tob(err)]
def __call__(self,environ,start_response):"Each instance of :class:'Bottle' is a WSGI application.";return self.wsgi(environ,start_response)
def __enter__(self):'Use this application as default for all module-level shortcuts.';default_app.push(self);return self
def __exit__(self,exc_type,exc_value,traceback):default_app.pop()
def __setattr__(self,name,value):
if name in self.__dict__:raise AttributeError('Attribute %s already defined. Plugin conflict?'%name)
self.__dict__[name]=value
class BaseRequest:
"A wrapper for WSGI environment dictionaries that adds a lot of\n convenient access methods and properties. Most of them are read-only.\n\n Adding new attributes to a request actually adds them to the environ\n dictionary (as 'bottle.request.ext.<name>'). This is the recommended\n way to store and access request-specific data.\n ";__slots__=_F,;MEMFILE_MAX=102400
def __init__(self,environ=_A):'Wrap a WSGI environ dictionary.';self.environ={}if environ is _A else environ;self.environ['bottle.request']=self
@DictProperty(_F,_AI,read_only=_B)
def app(self):'Bottle application handling this request.';raise RuntimeError('This request is not connected to an application.')
@DictProperty(_F,_AJ,read_only=_B)
def route(self):'The bottle :class:`Route` object that matches this request.';raise RuntimeError(_AM)
@DictProperty(_F,_AK,read_only=_B)
def url_args(self):'The arguments extracted from the URL.';raise RuntimeError(_AM)
@property
def path(self):'The value of ``PATH_INFO`` with exactly one prefixed slash (to fix\n broken clients and avoid the "empty path" edge case).';return _D+self.environ.get(_M,'').lstrip(_D)
@property
def method(self):'The ``REQUEST_METHOD`` value as an uppercase string.';return self.environ.get(_g,_S).upper()
@DictProperty(_F,'bottle.request.headers',read_only=_B)
def headers(self):'A :class:`WSGIHeaderDict` that provides case-insensitive access to\n HTTP request headers.';return WSGIHeaderDict(self.environ)
def get_header(self,name,default=_A):'Return the value of a request header, or a given default value.';return self.headers.get(name,default)
@DictProperty(_F,'bottle.request.cookies',read_only=_B)
def cookies(self):'Cookies parsed into a :class:`FormsDict`. Signed cookies are NOT\n decoded. Use :meth:`get_cookie` if you expect signed cookies.';cookies=SimpleCookie(self.environ.get('HTTP_COOKIE','')).values();return FormsDict((c.key,c.value)for c in cookies)
def get_cookie(self,key,default=_A,secret=_A,digestmod=hashlib.sha256):
'Return the content of a cookie. To read a `Signed Cookie`, the\n `secret` must match the one used to create the cookie (see\n :meth:`BaseResponse.set_cookie`). If anything goes wrong (missing\n cookie or wrong signature), return a default value.';value=self.cookies.get(key)
if secret:
if value and value.startswith('!')and _L in value:
sig,msg=map(tob,value[1:].split(_L,1));hash=hmac.new(tob(secret),msg,digestmod=digestmod).digest()
if _lscmp(sig,base64.b64encode(hash)):
dst=pickle.loads(base64.b64decode(msg))
if dst and dst[0]==key:return dst[1]
return default
return value or default
@DictProperty(_F,'bottle.request.query',read_only=_B)
def query(self):
'The :attr:`query_string` parsed into a :class:`FormsDict`. These\n values are sometimes called "URL arguments" or "GET parameters", but\n not to be confused with "URL wildcards" as they are provided by the\n :class:`Router`.';get=self.environ['bottle.get']=FormsDict();pairs=_parse_qsl(self.environ.get(_Y,''))
for (key,value) in pairs:get[key]=value
return get
@DictProperty(_F,'bottle.request.forms',read_only=_B)
def forms(self):
'Form values parsed from an `url-encoded` or `multipart/form-data`\n encoded POST or PUT request body. The result is returned as a\n :class:`FormsDict`. All keys and values are strings. File uploads\n are stored separately in :attr:`files`.';forms=FormsDict();forms.recode_unicode=self.POST.recode_unicode
for (name,item) in self.POST.allitems():
if not isinstance(item,FileUpload):forms[name]=item
return forms
@DictProperty(_F,'bottle.request.params',read_only=_B)
def params(self):
'A :class:`FormsDict` with the combined values of :attr:`query` and\n :attr:`forms`. File uploads are stored in :attr:`files`.';params=FormsDict()
for (key,value) in self.query.allitems():params[key]=value
for (key,value) in self.forms.allitems():params[key]=value
return params
@DictProperty(_F,'bottle.request.files',read_only=_B)
def files(self):
'File uploads parsed from `multipart/form-data` encoded POST or PUT\n request body. The values are instances of :class:`FileUpload`.\n\n ';files=FormsDict();files.recode_unicode=self.POST.recode_unicode
for (name,item) in self.POST.allitems():
if isinstance(item,FileUpload):files[name]=item
return files
@DictProperty(_F,'bottle.request.json',read_only=_B)
def json(self):
'If the ``Content-Type`` header is ``application/json`` or\n ``application/json-rpc``, this property holds the parsed content\n of the request body. Only requests smaller than :attr:`MEMFILE_MAX`\n are processed to avoid memory exhaustion.\n Invalid JSON raises a 400 error response.\n ';ctype=self.environ.get(_k,'').lower().split(';')[0]
if ctype in(_A2,'application/json-rpc'):
b=self._get_body_string(self.MEMFILE_MAX)
if not b:return _A
try:return json_loads(b)
except (ValueError,TypeError):raise HTTPError(400,'Invalid JSON')
return _A
def _iter_body(self,read,bufsize):
maxread=max(0,self.content_length)
while maxread:
part=read(min(maxread,bufsize))
if not part:break
yield part;maxread-=len(part)
@staticmethod
def _iter_chunked(read,bufsize):
err=HTTPError(400,'Error while parsing chunked transfer body.');rn,sem,bs=tob('\r\n'),tob(';'),tob('')
while _B:
header=read(1)
while header[-2:]!=rn:
c=read(1);header+=c
if not c:raise err
if len(header)>bufsize:raise err
size,_,_=header.partition(sem)
try:maxread=int(tonat(size.strip()),16)
except ValueError:raise err
if maxread==0:break
buff=bs
while maxread>0:
if not buff:buff=read(min(maxread,bufsize))
part,buff=buff[:maxread],buff[maxread:]
if not part:raise err
yield part;maxread-=len(part)
if read(2)!=rn:raise err
@DictProperty(_F,'bottle.request.body',read_only=_B)
def _body(self):
try:read_func=self.environ[_Z].read
except KeyError:self.environ[_Z]=BytesIO();return self.environ[_Z]
body_iter=self._iter_chunked if self.chunked else self._iter_body;body,body_size,is_temp_file=BytesIO(),0,_C
for part in body_iter(read_func,self.MEMFILE_MAX):
body.write(part);body_size+=len(part)
if not is_temp_file and body_size>self.MEMFILE_MAX:body,tmp=NamedTemporaryFile(mode='w+b'),body;body.write(tmp.getvalue());del tmp;is_temp_file=_B
self.environ[_Z]=body;body.seek(0);return body
def _get_body_string(self,maxread):
'Read body into a string. Raise HTTPError(413) on requests that are\n too large.';A='Request entity too large'
if self.content_length>maxread:raise HTTPError(413,A)
data=self.body.read(maxread+1)
if len(data)>maxread:raise HTTPError(413,A)
return data
@property
def body(self):'The HTTP request body as a seek-able file-like object. Depending on\n :attr:`MEMFILE_MAX`, this is either a temporary file or a\n :class:`io.BytesIO` instance. Accessing this property for the first\n time reads and replaces the ``wsgi.input`` environ variable.\n Subsequent accesses just do a `seek(0)` on the file object.';self._body.seek(0);return self._body
@property
def chunked(self):'True if Chunked transfer encoding was.';return'chunked'in self.environ.get('HTTP_TRANSFER_ENCODING','').lower()
GET=query
@DictProperty(_F,'bottle.request.post',read_only=_B)
def POST(self):
'The values of :attr:`forms` and :attr:`files` combined into a single\n :class:`FormsDict`. Values are either strings (form values) or\n instances of :class:`cgi.FieldStorage` (file uploads).\n ';post=FormsDict()
if not self.content_type.startswith('multipart/'):
body=tonat(self._get_body_string(self.MEMFILE_MAX),_K)
for (key,value) in _parse_qsl(body):post[key]=value
return post
safe_env={_Y:''}
for key in (_g,_k,_A3):
if key in self.environ:safe_env[key]=self.environ[key]
args=dict(fp=self.body,environ=safe_env,keep_blank_values=_B)
if py3k:args['encoding']=_E;post.recode_unicode=_C
data=cgi.FieldStorage(**args);self['_cgi.FieldStorage']=data;data=data.list or[]
for item in data:
if item.filename is _A:post[item.name]=item.value
else:post[item.name]=FileUpload(item.file,item.name,item.filename,item.headers)
return post
@property
def url(self):'The full request URI including hostname and scheme. If your app\n lives behind a reverse proxy or load balancer and you get confusing\n results, make sure that the ``X-Forwarded-Host`` header is set\n correctly.';return self.urlparts.geturl()
@DictProperty(_F,'bottle.request.urlparts',read_only=_B)
def urlparts(self):
'The :attr:`url` string as an :class:`urlparse.SplitResult` tuple.\n The tuple contains (scheme, host, path, query_string and fragment),\n but the fragment is always empty because it is not visible to the\n server.';A='http';env=self.environ;http=env.get('HTTP_X_FORWARDED_PROTO')or env.get('wsgi.url_scheme',A);host=env.get('HTTP_X_FORWARDED_HOST')or env.get('HTTP_HOST')
if not host:
host=env.get('SERVER_NAME',_A4);port=env.get('SERVER_PORT')
if port and port!=('80'if http==A else'443'):host+=_O+port
path=urlquote(self.fullpath);return UrlSplitResult(http,host,path,env.get(_Y),'')
@property
def fullpath(self):'Request path including :attr:`script_name` (if present).';return urljoin(self.script_name,self.path.lstrip(_D))
@property
def query_string(self):'The raw :attr:`query` part of the URL (everything in between ``?``\n and ``#``) as a string.';return self.environ.get(_Y,'')
@property
def script_name(self):"The initial portion of the URL's `path` that was removed by a higher\n level (server or routing middleware) before the application was\n called. This script path is returned with leading and tailing\n slashes.";script_name=self.environ.get(_X,'').strip(_D);return _D+script_name+_D if script_name else _D
def path_shift(self,shift=1):'Shift path segments from :attr:`path` to :attr:`script_name` and\n vice versa.\n\n :param shift: The number of path segments to shift. May be negative\n to change the shift direction. (default: 1)\n ';script,path=path_shift(self.environ.get(_X,_D),self.path,shift);self[_X],self[_M]=script,path
@property
def content_length(self):'The request body length as an integer. The client is responsible to\n set this header. Otherwise, the real length of the body is unknown\n and -1 is returned. In this case, :attr:`body` will be empty.';return int(self.environ.get(_A3)or-1)
@property
def content_type(self):'The Content-Type header as a lowercase-string (default: empty).';return self.environ.get(_k,'').lower()
@property
def is_xhr(self):'True if the request was triggered by a XMLHttpRequest. This only\n works with JavaScript libraries that support the `X-Requested-With`\n header (most of the popular libraries do).';requested_with=self.environ.get('HTTP_X_REQUESTED_WITH','');return requested_with.lower()=='xmlhttprequest'
@property
def is_ajax(self):'Alias for :attr:`is_xhr`. "Ajax" is not the right term.';return self.is_xhr
@property
def auth(self):
'HTTP authentication data as a (user, password) tuple. This\n implementation currently supports basic (not digest) authentication\n only. If the authentication happened at a higher level (e.g. in the\n front web-server or a middleware), the password field is None, but\n the user field is looked up from the ``REMOTE_USER`` environ\n variable. On any errors, None is returned.';basic=parse_auth(self.environ.get('HTTP_AUTHORIZATION',''))
if basic:return basic
ruser=self.environ.get('REMOTE_USER')
if ruser:return ruser,_A
return _A
@property
def remote_route(self):
'A list of all IPs that were involved in this request, starting with\n the client IP and followed by zero or more proxies. This does only\n work if all proxies support the ```X-Forwarded-For`` header. Note\n that this information can be forged by malicious clients.';proxy=self.environ.get('HTTP_X_FORWARDED_FOR')
if proxy:return[ip.strip()for ip in proxy.split(',')]
remote=self.environ.get('REMOTE_ADDR');return[remote]if remote else[]
@property
def remote_addr(self):'The client IP as a string. Note that this information can be forged\n by malicious clients.';route=self.remote_route;return route[0]if route else _A
def copy(self):'Return a new :class:`Request` with a shallow :attr:`environ` copy.';return Request(self.environ.copy())
def get(self,value,default=_A):return self.environ.get(value,default)
def __getitem__(self,key):return self.environ[key]
def __delitem__(self,key):self[key]='';del self.environ[key]
def __iter__(self):return iter(self.environ)
def __len__(self):return len(self.environ)
def keys(self):return self.environ.keys()
def __setitem__(self,key,value):
'Change an environ value and clear all caches that depend on it.';A='params'
if self.environ.get('bottle.request.readonly'):raise KeyError('The environ dictionary is read-only.')
self.environ[key]=value;todelete=()
if key==_Z:todelete='body','forms','files',A,'post','json'
elif key==_Y:todelete='query',A
elif key.startswith(_A5):todelete='headers','cookies'
for key in todelete:self.environ.pop('bottle.request.'+key,_A)
def __repr__(self):return f"<{self.__class__.__name__}: {self.method} {self.url}>"
def __getattr__(self,name):
'Search in self.environ for additional user defined attributes.'
try:var=self.environ[_A6%name];return var.__get__(self)if hasattr(var,'__get__')else var
except KeyError:raise AttributeError('Attribute %r not defined.'%name)
def __setattr__(self,name,value):
if name==_F:return object.__setattr__(self,name,value)
key=_A6%name
if hasattr(self,name):raise AttributeError('Attribute already defined: %s'%name)
self.environ[key]=value
def __delattr__(self,name):
try:del self.environ[_A6%name]
except KeyError:raise AttributeError('Attribute not defined: %s'%name)
def _hkey(key):
if _G in key or _l in key or'\x00'in key:raise ValueError('Header names must not contain control characters: %r'%key)
return key.title().replace('_','-')
def _hval(value):
value=tonat(value)
if _G in value or _l in value or'\x00'in value:raise ValueError('Header value must not contain control characters: %r'%value)
return value
class HeaderProperty:
def __init__(self,name,reader=_A,writer=_A,default=''):self.name,self.default=name,default;self.reader,self.writer=reader,writer;self.__doc__='Current value of the %r header.'%name.title()
def __get__(self,obj,_):
if obj is _A:return self
value=obj.get_header(self.name,self.default);return self.reader(value)if self.reader else value
def __set__(self,obj,value):obj[self.name]=self.writer(value)if self.writer else value
def __delete__(self,obj):del obj[self.name]
class BaseResponse:
"Storage class for a response body as well as headers and cookies.\n\n This class does support dict-like case-insensitive item-access to\n headers, but is NOT a dict. Most notably, iterating over a response\n yields parts of the body and not the headers.\n\n :param body: The response body as one of the supported types.\n :param status: Either an HTTP status code (e.g. 200) or a status line\n including the reason phrase (e.g. '200 OK').\n :param headers: A dictionary or a list of name-value pairs.\n\n Additional keyword arguments are added to the list of headers.\n Underscores in the header name are replaced with dashes.\n ";default_status=200;default_content_type=_AL;bad_headers={204:frozenset((_N,_J)),304:frozenset(('Allow','Content-Encoding','Content-Language',_J,_AN,_N,'Content-Md5',_AO))}
def __init__(self,body='',status=_A,headers=_A,**more_headers):
self._cookies=_A;self._headers={};self.body=body;self.status=status or self.default_status
if headers:
if isinstance(headers,dict):headers=headers.items()
for (name,value) in headers:self.add_header(name,value)
if more_headers:
for (name,value) in more_headers.items():self.add_header(name,value)
def copy(self,cls=_A):
'Returns a copy of self.';cls=cls or BaseResponse;copy=cls();copy.status=self.status;copy._headers={k:v[:]for(k,v)in self._headers.items()}
if self._cookies:
cookies=copy._cookies=SimpleCookie()
for (k,v) in self._cookies.items():cookies[k]=v.value;cookies[k].update(v)
return copy
def __iter__(self):return iter(self.body)
def close(self):
if hasattr(self.body,_P):self.body.close()
@property
def status_line(self):'The HTTP status line as a string (e.g. ``404 Not Found``).';return self._status_line
@property
def status_code(self):'The HTTP status code as an integer (e.g. 404).';return self._status_code
def _set_status(self,status):
if isinstance(status,int):code,status=status,_HTTP_STATUS_LINES.get(status)
elif' 'in status:
if _G in status or _l in status or'\x00'in status:raise ValueError('Status line must not include control chars.')
status=status.strip();code=int(status.split()[0])
else:raise ValueError('String status line without a reason phrase.')
if not 100<=code<=999:raise ValueError('Status code out of range.')
self._status_code=code;self._status_line=str(status or'%d Unknown'%code)