forked from douban/pyquicklz
-
Notifications
You must be signed in to change notification settings - Fork 0
/
quicklz.pyx
68 lines (60 loc) · 2.06 KB
/
quicklz.pyx
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
__author__ = "davies <[email protected]>"
__version__ = "1.4.1"
__copyright__ = "Copyright (C) 2010 douban.com"
__license__ = "Apache License 2.0"
from libc.stdint cimport uint16_t, uint32_t, uint16_t
from libc.stdlib cimport malloc, free
from cpython cimport PyBytes_AsStringAndSize, PyBytes_FromStringAndSize
cdef extern from "src/quicklz.h":
cdef enum:
QLZ_SCRATCH_COMPRESS
QLZ_SCRATCH_DECOMPRESS
size_t qlz_size_decompressed(char *source) nogil
size_t qlz_size_compressed(char *source) nogil
size_t qlz_decompress(char *source, void *destination, char *scratch_decompress) nogil
size_t qlz_compress(void *source, char *destination, size_t size, char *scratch_compress) nogil
def compress(bytes val):
cdef char *wbuf
cdef char *src
cdef char *dst
cdef Py_ssize_t vlen
cdef int csize
if not val:
return b""
PyBytes_AsStringAndSize(val, &src, &vlen)
dst = <char *>malloc(vlen + 400)
with nogil:
wbuf = <char *>malloc(QLZ_SCRATCH_COMPRESS)
csize = qlz_compress(src, dst, vlen, wbuf)
free(wbuf)
val = PyBytes_FromStringAndSize(dst, csize)
free(dst)
return val
def decompress(bytes val):
cdef char wbuf[QLZ_SCRATCH_DECOMPRESS]
cdef char *src
cdef char *dst
cdef Py_ssize_t slen, dlen
if not val:
return b""
PyBytes_AsStringAndSize(val, &src, &slen)
if qlz_size_compressed(src) != slen:
raise ValueError('compressed length not match %d!=%d' % (slen, qlz_size_compressed(src)))
dst = <char*> malloc(qlz_size_decompressed(src))
with nogil:
dlen = qlz_decompress(src, dst, wbuf)
val = PyBytes_FromStringAndSize(dst, dlen);
free(dst)
return val
TRY_LENGTH = 1024 * 10
def try_compress(bytes val, minlength=256, ratio=0.8):
if len(val) < minlength:
return False, val
if len(val) > TRY_LENGTH:
v = compress(val[:TRY_LENGTH])
if len(v) > len(val) * ratio:
return False, val
v = compress(val)
if len(v) > len(val) * ratio:
return False, val
return True, v