-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathuploaders.py
144 lines (110 loc) · 4.27 KB
/
uploaders.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import os
import posixpath
import string
import re
from shutil import copyfileobj
import boto3
from flask import current_app, redirect, send_file
from flask.helpers import safe_join
from werkzeug.utils import secure_filename
from CTFd.utils import get_app_config
from CTFd.utils.encoding import hexencode
class BaseUploader(object):
def __init__(self):
raise NotImplementedError
def store(self, fileobj, filename):
raise NotImplementedError
def upload(self, file_obj, filename):
raise NotImplementedError
def download(self, filename):
raise NotImplementedError
def delete(self, filename):
raise NotImplementedError
def sync(self):
raise NotImplementedError
class FilesystemUploader(BaseUploader):
def __init__(self, base_path=None):
super(BaseUploader, self).__init__()
self.base_path = base_path or current_app.config.get("UPLOAD_FOLDER")
def store(self, fileobj, filename):
location = os.path.join(self.base_path, filename)
directory = os.path.dirname(location)
if not os.path.exists(directory):
os.makedirs(directory)
with open(location, "wb") as dst:
copyfileobj(fileobj, dst, 16384)
return filename
def upload(self, file_obj, filename):
if len(filename) == 0:
raise Exception("Empty filenames cannot be used")
filename = secure_filename(filename)
md5hash = hexencode(os.urandom(16))
file_path = posixpath.join(md5hash, filename)
return self.store(file_obj, file_path)
def download(self, filename):
return send_file(safe_join(self.base_path, filename), as_attachment=True)
def delete(self, filename):
if os.path.exists(os.path.join(self.base_path, filename)):
os.unlink(os.path.join(self.base_path, filename))
return True
return False
def sync(self):
pass
class S3Uploader(BaseUploader):
def __init__(self):
super(BaseUploader, self).__init__()
self.endpoint = get_app_config("AWS_S3_ENDPOINT_URL")
self.s3 = self._get_s3_connection()
self.bucket = get_app_config("AWS_S3_BUCKET")
def _get_s3_connection(self):
access_key = get_app_config("AWS_ACCESS_KEY_ID")
secret_key = get_app_config("AWS_SECRET_ACCESS_KEY")
client = boto3.client(
"s3",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
endpoint_url=self.endpoint,
)
return client
def _clean_filename(self, c):
if c in string.ascii_letters + string.digits + "-" + "_" + ".":
return True
def store(self, fileobj, filename):
self.s3.upload_fileobj(fileobj, self.bucket, filename)
return filename
def upload(self, file_obj, filename):
filename = filter(
self._clean_filename, secure_filename(filename).replace(" ", "_")
)
filename = "".join(filename)
if len(filename) <= 0:
return False
md5hash = hexencode(os.urandom(16))
dst = md5hash + "/" + filename
self.s3.upload_fileobj(file_obj, self.bucket, dst)
return dst
def download(self, filename):
key = filename
# filename = filename.split("/").pop()
url = re.sub(
r"([^:])(/{2,})",
r"\1/",
"{}/{}/{}".format(self.endpoint, self.bucket, filename)
)
return redirect(url)
def delete(self, filename):
self.s3.delete_object(Bucket=self.bucket, Key=filename)
return True
def sync(self):
local_folder = current_app.config.get("UPLOAD_FOLDER")
# If the bucket is empty then Contents will not be in the response
bucket_list = self.s3.list_objects(Bucket=self.bucket).get("Contents", [])
for s3_key in bucket_list:
s3_object = s3_key["Key"]
# We don't want to download any directories
if s3_object.endswith("/") is False:
local_path = os.path.join(local_folder, s3_object)
directory = os.path.dirname(local_path)
if not os.path.exists(directory):
os.makedirs(directory)
self.s3.download_file(self.bucket, s3_object, local_path)