-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathinit_db.py
94 lines (80 loc) · 3.08 KB
/
init_db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import time
import asyncio
from aiomysql import create_pool
from sqlalchemy import select, and_
from aiomysql.sa import create_engine
from polls.models import question, choice
from sqlalchemy.schema import CreateTable, DropTable
from sqlalchemy.sql.expression import bindparam
async def init_engine(loop):
return await create_engine(host='127.0.0.1', port=3306,
autocommit=True, user='root',
password='123456', db='aioweb', loop=loop)
async def init_pool(loop):
return await create_pool(host='127.0.0.1', port=3306,
user='root', password='123456',
db='aioweb', loop=loop, autocommit=True)
def init_bind():
from sqlalchemy import create_engine as ce
from polls.settings import config
DSN = 'mysql+pymysql://{user}:{password}@{host}:{port}/{database}'
return ce(DSN.format(**config['mysql']))
async def create_tables(engine):
async with engine.acquire() as conn:
for table in [question, choice]:
create_expr = CreateTable(table)
if table.exists(init_bind()):
await conn.execute('SET FOREIGN_KEY_CHECKS = 0')
drop_expr = DropTable(table)
await conn.execute(drop_expr)
await conn.execute(create_expr)
await conn.execute('SET FOREIGN_KEY_CHECKS = 1')
else:
await conn.execute(create_expr)
async def sample_data(engine):
async with engine.acquire() as conn:
data = [
{'content': 'what the fuck hello word', 'create_at': time.time()},
{'content': 'just fuck you', 'create_at': time.time()},
]
for row in data:
return await conn.execute(question.insert().values(**row))
# data = [
# {'content': 'fuck', 'votes': 1, 'qid': 1},
# {'content': 'your', 'votes': 2, 'qid': 1},
# {'content': 'wife', 'votes': 3, 'qid': 1},
# ]
# for row in data:
# return await conn.execute(choice.insert().values(*data))
# 批量更新
async def update(engine):
async with engine.acquire() as conn:
await conn.execute(choice.update().where(
choice.c.id == bindparam('id')
).values(
{'content': bindparam('content')}
), [
{'id': 1, 'content': 'fuck'},
{'id': 2, 'content': 'bitch'},
]
)
async def _select(engine):
async with engine.acquire() as conn:
sql = select([choice.c.id]).where(
and_(choice.c.id == 3, choice.c.content == 'fuck'))
rows = await conn.execute(sql)
print(await rows.fetchall())
# return await rows.first()
async def main(loop):
engine = await init_engine(loop)
# print(getattr(choice.c, 'qid'))
# await create_tables(engine)
r = await sample_data(engine)
# print(await r.first())
# print(r.rowcount)
# print(r.lastrowid)
# await _select(engine)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
loop.close()