-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathredshift_deduper.py
154 lines (107 loc) · 4.15 KB
/
redshift_deduper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import logging
log = logging.getLogger('Redshift Dedupper')
class redshiftDeduper(object):
"""
Utility for deduping records in a redshift table.
Params:
rs_conn: redshift connection object
table_name: name of the table (include schema name if needed) to dedupe.
dedupe_key: the column name to apply deduping to. Usually the primary key but can also accept
a concat of columns (e.g. dedupe_key= "concat(user_id, "event_id"))
"""
def __init__(self, rs_conn, table_name, dedupe_key):
self.rs_conn = rs_conn
self.table_name = table_name
self.dedupe_key = dedupe_key
self.n_duplicates = self.get_number_duplicates()
def run(self):
if self.n_duplicates == 0:
log.info("No records to dedupe")
else:
log.info("Dedupping {} record(s)".format(self.n_duplicates))
query = self.get_dedupping_query()
self.rs_conn.execute(query)
log.info("Done deduping {} record(s) from {}".format(self.n_duplicates, self.table_name))
def get_dedupping_query(self):
unique_dupes_temp_table_query = self.get_unique_dupes_temp_table_query()
delete_dupes_query = self.get_delete_query()
inert_uniques_query = self.get_insert_unique_dupes_query()
query = """
create temp table unique_dupes_temp as (
{}
);
{};
{};
drop table unique_dupes_temp;
""".format(unique_dupes_temp_table_query, delete_dupes_query, inert_uniques_query)
return query
def get_insert_unique_dupes_query(self):
"""
First delete the columns created for deduping: rn and key.
Then insert the unique rows back in
"""
query = """
alter table unique_dupes_temp
drop column rn;
alter table unique_dupes_temp
drop column key;
insert into {}
select * from unique_dupes_temp;""".format(self.table_name)
return query
def get_delete_query(self):
query = """
delete from {}
where {} in (
select
key
from unique_dupes_temp
)""".format(self.table_name, self.dedupe_key)
return query
def get_unique_dupes_temp_table_query(self):
duplicates_query = self.get_duplicates_query()
row_numbered_dupes_query = self.get_row_number_dupes_query()
final_uniques_query = self.get_uniques_query()
query = """
with
duplicates as (
{}
),
dupes_row_numbered as (
{}
)
{}""".format(duplicates_query, row_numbered_dupes_query, final_uniques_query)
return query
def get_number_duplicates(self):
duplicates_query = self.get_duplicates_query()
return len(self.rs_conn.execute_and_fetch(duplicates_query, return_json=True))
def get_duplicates_query(self):
dupe_query = """
select
{} as dedupe_id
from {}
group by 1
having count(*) > 1""".format(self.dedupe_key, self.table_name)
return dupe_query
def get_row_number_dupes_query(self):
dupes_row_numbered_query = """
with
table_with_dedupe_id as (
select
{} as key
, *
from {}
)
select
row_number() over (partition by duplicates.dedupe_id) as rn
, table_with_dedupe_id.*
from table_with_dedupe_id
left join duplicates on table_with_dedupe_id.key = duplicates.dedupe_id
where duplicates.dedupe_id is not null""".format(self.dedupe_key, self.table_name)
return dupes_row_numbered_query
def get_uniques_query(self):
uniques_query = """
select
*
from dupes_row_numbered
where rn = 1"""
return uniques_query