-
Notifications
You must be signed in to change notification settings - Fork 6
/
sync_timelogs.py
executable file
·146 lines (126 loc) · 4.44 KB
/
sync_timelogs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#!/usr/bin/env python
import argparse
import sys
import click
from datetime import datetime, timedelta, timezone
from dateutil import parser
from decouple import config
from libtoggl import TogglTimesheets, Timelog
from libtempo import JiraTempoTimelogsDriver
def distribute_incomplete(incomplete, complete, logf):
""" Distribute time in incomplete in complete, weighted """
current = [float(t.time.seconds) for t in complete]
rates = [i / sum(current) for i in current]
logf("\n---\n")
logf("Incomplete timelogs found, distributing with rates:")
logf(
"\n".join(
"{} {} {}".format(t.ticket, t.description, r)
for t, r in zip(complete, rates)
)
)
for timelog in incomplete:
logf(
"Distributing {}: {}".format(
timelog.time, " ".join([str(rate * timelog.time) for rate in rates])
)
)
for target, rate in zip(complete, rates):
target.time += timelog.time * rate
def update_tempo(timelogs, logf):
""" Update Jira """
tempo_driver = JiraTempoTimelogsDriver(config("JIRA_URL"))
tempo_driver.login(config("JIRA_USER"), config("JIRA_PASSWORD"))
logf("\n---\n")
for timelog in grouped:
logf(
"Logging time for {}: {} ({}) ({})".format(
timelog.ticket, timelog.description, timelog.date, timelog.time
)
)
tempo_driver.add_timelog(timelog)
def group_timelogs(timelogs, logf):
""" Group multiple timelogs with same ticket, description and date """
cache = {}
for timelog in timelogs:
key = "{}:{}:{}".format(
timelog.ticket, timelog.description, timelog.date.date()
)
logf(
"Found worklog {}: {} ({}) ({})".format(
timelog.ticket, timelog.description, timelog.date, timelog.time
)
)
if key not in cache:
cache[key] = timelog
else:
cache[key].time += timelog.time
logf("\n---\n")
for timelog in cache.values():
logf(
"Grouped worklog {}: {} ({}) ({})".format(
timelog.ticket, timelog.description, timelog.date, timelog.time
)
)
return cache.values()
def get_timelogs(start, end):
""" Fetch timelogs from Toggle """
toggl_driver = TogglTimesheets(config("TOGGL_TOKEN"))
timelogs = toggl_driver.get_timelogs(start, end)
return timelogs["complete"], timelogs["incomplete"]
if __name__ == "__main__":
# Define arguments
argp = argparse.ArgumentParser()
argp.add_argument(
"-d", action="store_true", help="Redistribute incomplete on other tickets"
)
argp.add_argument("-n", action="store_true", help="Make no modifications")
argp.add_argument("-v", action="store_true", help="Be verbose")
argp.add_argument(
"-s", action="store", default=None, help="Starting date, e.g. 2019-01-01"
)
argp.add_argument(
"-e", action="store", default=None, help="End date, e.g. 2019-01-01"
)
args = argp.parse_args()
# Verbosity, needs work
logf = lambda x: x
if args.v:
logf = print
# Load last saved worklog if not date is supplied
if args.s:
start = datetime.strptime(args.s, r"%Y-%m-%d")
else:
try:
with open(".latest") as f:
start = datetime.fromtimestamp(float(f.read()))
except:
print('ERROR: .latest not found, run with "-s YYYY-MM-DD"')
if args.e:
end = datetime.strptime(args.e, r"%Y-%m-%d")
else:
# Set end to last midnight
end = datetime.now().replace(hour=0, minute=0, second=0)
# Fetch complete and incomplete timelogs
complete, incomplete = get_timelogs(start, end)
# Group completed
grouped = group_timelogs(complete, logf)
# Distribute incomplete
if incomplete and args.d:
distribute_incomplete(incomplete, grouped, logf)
elif incomplete:
print(
"ERROR:",
"cannot proceed with incomplete timelogs, verify or enable distribute (-d).",
)
sys.exit(1)
# This will update Tempo
if not args.n and click.confirm(
"\nDo you want to upload these worklogs to Tempo?", default=False
):
last = update_tempo(grouped, logf)
if last:
# Stopped before everything was processed
end = last
with open(".latest", "w") as f:
f.write(str(end.timestamp()))