-
Notifications
You must be signed in to change notification settings - Fork 12
/
Copy pathprogress_211.py
149 lines (115 loc) · 4.59 KB
/
progress_211.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import sys
from collections import deque
from shutil import get_terminal_size
from time import time
from streamlink_cli.compat import is_win32
PROGRESS_FORMATS = (
"[download] => {prefix} ({written} @ {speed}/s) ",
)
# widths generated from
# http://www.unicode.org/Public/4.0-Update/EastAsianWidth-4.0.0.txt
widths = [
(13, 1), (15, 0), (126, 1), (159, 0), (687, 1), (710, 0), # noqa: E241
(711, 1), (727, 0), (733, 1), (879, 0), (1154, 1), (1161, 0), # noqa: E241
(4347, 1), (4447, 2), (7467, 1), (7521, 0), (8369, 1), (8426, 0), # noqa: E241
(9000, 1), (9002, 2), (11021, 1), (12350, 2), (12351, 1), (12438, 2), # noqa: E241
(12442, 0), (19893, 2), (19967, 1), (55203, 2), (63743, 1), (64106, 2), # noqa: E241
(65039, 1), (65059, 0), (65131, 2), (65279, 1), (65376, 2), (65500, 1), # noqa: E241
(65510, 2), (120831, 1), (262141, 2), (1114109, 1) # noqa: E241
]
def get_width(o):
"""Returns the screen column width for unicode ordinal."""
for num, wid in widths:
if o <= num:
return wid
return 1
def terminal_width(value):
"""Returns the width of the string it would be when displayed."""
if isinstance(value, bytes):
value = value.decode("utf8", "ignore")
return sum(map(get_width, map(ord, value)))
def get_cut_prefix(value, max_len):
"""Drops Characters by unicode not by bytes."""
should_convert = isinstance(value, bytes)
if should_convert:
value = value.decode("utf8", "ignore")
for i in range(len(value)):
if terminal_width(value[i:]) <= max_len:
break
return value[i:].encode("utf8", "ignore") if should_convert else value[i:]
def print_inplace(msg):
"""Clears out the previous line and prints a new one."""
term_width = get_terminal_size().columns
spacing = term_width - terminal_width(msg)
# On windows we need one less space or we overflow the line for some reason.
if is_win32:
spacing -= 1
sys.stderr.write("\r{0}".format(msg))
sys.stderr.write(" " * max(0, spacing))
sys.stderr.flush()
def format_filesize(size):
"""Formats the file size into a human readable format."""
for suffix in ("bytes", "KB", "MB", "GB", "TB"):
if size < 1024.0:
if suffix in ("GB", "TB"):
return "{0:3.2f} {1}".format(size, suffix)
else:
return "{0:3.1f} {1}".format(size, suffix)
size /= 1024.0
def format_time(elapsed):
"""Formats elapsed seconds into a human readable format."""
hours = int(elapsed / (60 * 60))
minutes = int((elapsed % (60 * 60)) / 60)
seconds = int(elapsed % 60)
rval = ""
if hours:
rval += "{0}h".format(hours)
if elapsed > 60:
rval += "{0}m".format(minutes)
rval += "{0}s".format(seconds)
return rval
def create_status_line(**params):
"""Creates a status line with appropriate size."""
max_size = get_terminal_size().columns - 1
for fmt in PROGRESS_FORMATS:
status = fmt.format(**params)
if len(status) <= max_size:
break
return status
def progress(iterator, prefix):
"""Progress an iterator and updates a pretty status line to the terminal.
The status line contains:
- Amount of data read from the iterator
- Time elapsed
- Average speed, based on the last few seconds.
"""
if terminal_width(prefix) > 52:
prefix = (get_cut_prefix(prefix, 50))
speed_updated = start = time()
speed_written = written = 0
speed_history = deque(maxlen=5)
for data in iterator:
yield data
now = time()
elapsed = now - start
written += len(data)
speed_elapsed = now - speed_updated
if speed_elapsed >= 0.5:
speed_history.appendleft((
written - speed_written,
speed_updated,
))
speed_updated = now
speed_written = written
speed_history_written = sum(h[0] for h in speed_history)
speed_history_elapsed = now - speed_history[-1][1]
speed = speed_history_written / speed_history_elapsed
status = create_status_line(
prefix=prefix,
written=format_filesize(written),
elapsed=format_time(elapsed),
speed=format_filesize(speed)
)
print_inplace(status)
sys.stderr.write("\n")
sys.stderr.flush()