-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexample.py
122 lines (96 loc) · 3.2 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import psycopg2
import string
import random
import time
DSN = 'dbname=postgis_scratch user=postgres host=localhost password=pg'
schema = 'example'
codes_table = 'codes'
used_table = 'used'
CHARS = string.digits + string.ascii_uppercase
def next_code(code):
d = 1
next = ''
for n in (2,1,0):
d, m = divmod(CHARS.index(code[n]) + d, 36)
next = CHARS[m] + next
return None if d == 1 else next
with psycopg2.connect(DSN) as conn, conn.cursor() as curs:
qstr = """
DROP TABLE IF EXISTS {schema}.{codes_table};
CREATE TABLE {schema}.{codes_table}
(
gid serial NOT NULL,
code character(3),
CONSTRAINT {codes_table}_pkey PRIMARY KEY (gid)
)
WITH (OIDS=FALSE);
DROP TABLE IF EXISTS {schema}.{used_table};
CREATE TABLE {schema}.{used_table}
(
gid serial NOT NULL,
code character(3),
CONSTRAINT {used_table}_pkey PRIMARY KEY (gid)
)
WITH (OIDS=FALSE);
""".format(schema=schema, codes_table=codes_table, used_table=used_table)
curs.execute(qstr)
qstr = """
INSERT INTO {schema}.{codes_table} (code) VALUES
""".format(schema=schema, codes_table=codes_table)
startTime = time.time()
for c in [c2+c1 for c2 in CHARS for c1 in CHARS]:
codes = ",".join(["('{0}')".format(c+c0) for c0 in CHARS])
curs.execute(
'INSERT INTO {schema}.{codes_table} (code) VALUES {codes}'.format(
schema=schema, codes_table=codes_table, codes=codes
)
)
endTime = time.time()
print('\ntime: {0:.3f} sec'.format(endTime - startTime))
maxgid = 36**3 - 1
qstr = """
INSERT INTO {schema}.{used_table} (code)
SELECT code FROM {schema}.{codes_table};
DELETE FROM {schema}.{used_table} WHERE gid IN ({gid_list});
""".format(
schema=schema, codes_table=codes_table, used_table=used_table,
gid_list=','.join([str(random.randint(0, maxgid)) for a in range(5)])
)
curs.execute(qstr)
qstr = """
SELECT code
FROM {schema}.{codes_table} c
LEFT JOIN {schema}.{used_table} u USING (code)
WHERE u.code IS NULL;
""".format(schema=schema, codes_table=codes_table, used_table=used_table)
curs.execute(qstr)
n = curs.rowcount
print('missing codes: {0}'.format(
'none found.' if n == 0 else str(n)
))
rows = curs.fetchall()
for rec in rows:
print(' {0}'.format(rec[0]))
qstr = """
DROP FUNCTION IF EXISTS example.next_code(char(3));
CREATE OR REPLACE FUNCTION example.next_code(code char(3))
RETURNS char(3) AS
$$
CHARS = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
d = 1
next = ''
for n in (2,1,0):
d, m = divmod(CHARS.index(code[n]) + d, 36)
next = CHARS[m] + next
return None if d == 1 else next
$$ LANGUAGE plpython3u IMMUTABLE;
""".format(schema=schema, codes_table=codes_table, used_table=used_table)
curs.execute(qstr)
# Cleanup.
qstr = """
DROP TABLE IF EXISTS {schema}.{codes_table};
DROP TABLE IF EXISTS {schema}.{used_table};
DROP FUNCTION IF EXISTS {schema}.next_code(char(3));
""".format(schema=schema, codes_table=codes_table, used_table=used_table)
curs.execute(qstr)
conn.close()