-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdb.py
109 lines (87 loc) · 3.72 KB
/
db.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import sqlite3
class Database():
def __init__(self):
self.vt = sqlite3.connect('/root/starkrekt-backend/approve.sqlite')
self.im = self.vt.cursor()
self.im.execute("""CREATE TABLE IF NOT EXISTS approve (
tx_from TEXT,
contract TEXT,
spender TEXT,
UNIQUE(tx_from, contract, spender))""")
self.im.execute("CREATE INDEX IF NOT EXISTS idx_approve_tx_from ON approve(tx_from)")
self.im.execute("""CREATE TABLE IF NOT EXISTS contract (
contract TEXT,
name TEXT,
kind TEXT,
decimals INT)""")
def approve_insert_data(self, tx_from, contract, spender):
params = (tx_from, contract, spender)
try:
self.im.execute("""INSERT INTO approve(tx_from, contract, spender)
VALUES (?, ?, ?)""", params)
self.vt.commit()
except sqlite3.IntegrityError:
print("Record already exists and won't be added again.")
def approve_get_data(self, tx_from):
params = (hex(tx_from),)
self.im.execute("""SELECT * FROM approve WHERE tx_from = ? """, params)
data = self.im.fetchall()
key = ("tx_from", "contract", "spender")
datax = [dict(zip(key, d)) for d in data]
return datax
def approve_get_full(self):
self.im.execute("""SELECT * FROM approve""")
data = self.im.fetchall()
key = ("tx_from", "contract", "spender")
datax = [dict(zip(key, d)) for d in data]
return datax
def contract_init(self, contract):
params = (contract,)
self.im.execute("""SELECT contract FROM contract WHERE contract = ? """, params)
if not self.im.fetchone():
params = (contract, None, None, None)
self.im.execute("""INSERT INTO contract VALUES (?, ?, ?, ?)""", params)
self.vt.commit()
def contract_update_name(self, contract ,name):
params = (name, contract)
self.im.execute("""UPDATE contract SET name = ? WHERE contract = ? """, params)
self.vt.commit()
def contract_get_name(self, contract):
params = (contract,)
self.im.execute("""SELECT name FROM contract WHERE contract = ? """, params)
data = self.im.fetchone()
if data:
return data[0]
return None
def contract_update_kind(self, contract, kind):
params = (kind, contract)
self.im.execute("""UPDATE contract SET kind = ? WHERE contract = ? """, params)
self.vt.commit()
def contract_get_kind(self, contract):
params = (contract,)
self.im.execute("""SELECT kind FROM contract WHERE contract = ? """, params)
data = self.im.fetchone()
if data:
return data[0]
return None
def contract_update_decimals(self, contract, decimals):
params = (decimals, contract)
self.im.execute("""UPDATE contract SET decimals = ? WHERE contract = ? """, params)
self.vt.commit()
def contract_get_decimals(self, contract):
params = (contract,)
self.im.execute("""SELECT decimals FROM contract WHERE contract = ? """, params)
data = self.im.fetchone()
if data:
return data[0]
return None
def add_column(self):
self.im.execute("ALTER TABLE contract ADD COLUMN decimals INTEGER")
if __name__ == "__main__":
db=Database()
#w=db.approve_get_full()
#a=(db("0x00da114221cb8355fa859dbdb4c44beeaa0bb37c7537ad5ae66fe5e0efd20e6eb3))
#print(a)
#print(w[-1])
k=db.add()
print(k)