-
-
Notifications
You must be signed in to change notification settings - Fork 1.5k
/
Copy pathlogutils.py
120 lines (99 loc) · 3.67 KB
/
logutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
# Copyright 2016-2017 Versada <https://versada.eu/>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import os.path
import urllib.parse
from sentry_sdk._compat import text_type
from werkzeug import datastructures
from .generalutils import get_environ
from .processor import SanitizePasswordsProcessor
def get_request_info(request):
"""
Returns context data extracted from :param:`request`.
Heavily based on flask integration for Sentry: https://git.io/vP4i9.
"""
urlparts = urllib.parse.urlsplit(request.url)
return {
"url": "{}://{}{}".format(urlparts.scheme, urlparts.netloc, urlparts.path),
"query_string": urlparts.query,
"method": request.method,
"headers": dict(datastructures.EnvironHeaders(request.environ)),
"env": dict(get_environ(request.environ)),
}
def get_extra_context(request):
"""
Extracts additional context from the current request (if such is set).
"""
try:
session = getattr(request, "session", {})
except RuntimeError:
ctx = {}
else:
ctx = {
"tags": {
"database": session.get("db", None),
},
"user": {
"email": session.get("login", None),
"id": session.get("uid", None),
},
"extra": {
"context": session.get("context", {}),
},
}
if request.httprequest:
ctx.update({"request": get_request_info(request.httprequest)})
return ctx
class SanitizeOdooCookiesProcessor(SanitizePasswordsProcessor):
"""Custom :class:`raven.processors.Processor`.
Allows to sanitize sensitive Odoo cookies, namely the "session_id" cookie.
"""
KEYS = frozenset(
[
"session_id",
]
)
class InvalidGitRepository(Exception):
pass
def fetch_git_sha(path, head=None):
""">>> fetch_git_sha(os.path.dirname(__file__))
Taken from https://git.io/JITmC
"""
if not head:
head_path = os.path.join(path, ".git", "HEAD")
if not os.path.exists(head_path):
raise InvalidGitRepository(
"Cannot identify HEAD for git repository at %s" % (path,)
)
with open(head_path, "r") as fp:
head = text_type(fp.read()).strip()
if head.startswith("ref: "):
head = head[5:]
revision_file = os.path.join(path, ".git", *head.split("/"))
else:
return head
else:
revision_file = os.path.join(path, ".git", "refs", "heads", head)
if not os.path.exists(revision_file):
if not os.path.exists(os.path.join(path, ".git")):
raise InvalidGitRepository(
"%s does not seem to be the root of a git repository" % (path,)
)
# Check for our .git/packed-refs' file since a `git gc` may have run
# https://git-scm.com/book/en/v2/Git-Internals-Maintenance-and-Data-Recovery
packed_file = os.path.join(path, ".git", "packed-refs")
if os.path.exists(packed_file):
with open(packed_file) as fh:
for line in fh:
line = line.rstrip()
if line and line[:1] not in ("#", "^"):
try:
revision, ref = line.split(" ", 1)
except ValueError:
continue
if ref == head:
return text_type(revision)
raise InvalidGitRepository(
'Unable to find ref to head "%s" in repository' % (head,)
)
with open(revision_file) as fh:
return text_type(fh.read()).strip()