-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathdisplay_reply_to_status.py
178 lines (151 loc) · 5.36 KB
/
display_reply_to_status.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys
from System import String, DateTime
from System.IO import StringReader
from Misuzilla.Applications.TwitterIrcGateway.AddIns.DLRIntegration import DLRIntegrationAddIn
from Misuzilla.Applications.TwitterIrcGateway import Status, Utility
from Newtonsoft.Json import JsonConvert
# settings {{{
RES_PREFIX = ' '
RES_FORMAT = '>> %(screen_name)s: %(text)s'
RES_COLOR = None # see: http://www.mirc.co.uk/help/colors.html
LRU_TIMEOUT = 10 * 60
LRU_INTERVAL = LRU_TIMEOUT
# }}}
# helpers {{{
def urlencode(params):
def escape(x):
return Utility.UrlEncode(unicode(x))
return '&'.join(['%s=%s' % (escape(k), escape(v)) for (k, v) in params.items()])
def request(method, path, fmt='json', **params):
query = urlencode(params)
url = '%s.%s' % (path, fmt)
if method == 'GET':
if query:
url += '?' + query
return CurrentSession.TwitterService.GETv1_1(url, path)
else:
return CurrentSession.TwitterService.POSTv1_1(url, query, path)
def deserialize(type, data):
return JsonConvert.DeserializeObject[type].Overloads[String](data)
#}}}
class Cache(object): # {{{
DEFAULT_LRU_INTERVAL = 10 * 60
def __init__(self, lru_interval=DEFAULT_LRU_INTERVAL):
self._cache = {}
self._lru_interval = lru_interval
self._lru_time = DateTime.Now.AddSeconds(self._lru_interval)
@classmethod
def is_expired(cls, dt, now=None):
if now is None:
now = DateTime.Now
if dt:
return DateTime.Now >= dt
else:
return False
@classmethod
def _expire(cls, timeout=None, now=None):
if now is None:
now = DateTime.Now
if timeout is None:
return None
else:
return now.AddSeconds(timeout)
def _lru(self):
now = DateTime.Now
if self.is_expired(self._lru_time, now):
for (k, v) in self._cache.items():
if self.is_expired(v['expire'], now):
self._del(k)
self._lru_time = self._expire(self._lru_interval, now=now)
def _del(self, key):
return self._cache.pop(key, None)
def set(self, key, value, timeout=None):
now = DateTime.Now
self._cache[key] = {
'value': value,
'timeout': timeout,
'expire': self._expire(timeout),
}
def get(self, key):
entry = self._cache.get(key, None)
if entry is None:
return None
elif self.is_expired(entry['expire']):
self._del(key)
return None
else:
entry['expire'] = self._expire(entry['timeout'])
return entry['value']
# }}}
class StatusCache(Cache): # {{{
@classmethod
def _get_status(cls, id):
return deserialize(Status, request('GET', '/statuses/show', id=id))
def set(self, status, timeout=None):
Cache.set(self, status.Id, status, timeout=timeout)
def get(self, id, timeout=None):
self._lru()
if id is None:
return None
status = Cache.get(self, id)
if status is None:
try:
status = self._get_status(id)
self.set(status, timeout=timeout)
except:
pass
return status
# }}}
class DisplayReplyToStatus(object): # {{{
def __init__(self, cache, timeout, prefix, fmt, color):
CurrentSession.PreSendMessageTimelineStatus += self.on_pre_send_message_timeline_status
CurrentSession.AddInManager.GetAddIn(DLRIntegrationAddIn).BeforeUnload += self.on_before_unload
self.cache = cache
self.timeout = timeout
self.prefix = prefix
self.fmt = fmt
self.color = color
@classmethod
def colored(cls, text, color):
if color is not None:
return '%c%d%s%c' % (chr(0x03), color, text, chr(0x03))
else:
return '%c%s' % (chr(0x03), text)
@classmethod
def get_res_id(cls, status):
res_id = status.InReplyToStatusId
if not res_id:
return None
return int(res_id)
@classmethod
def status_to_dict(cls, status):
return {
'id': status.Id,
'created_at': status.CreatedAt,
'text': status.Text,
'user_id': status.User.Id,
'name': status.User.Name,
'screen_name': status.User.ScreenName,
}
def on_pre_send_message_timeline_status(self, sender, e):
try:
status = e.Status
self.cache.set(status, self.timeout)
if CurrentSession.TwitterUser.ScreenName == status.User.ScreenName:
return # 自分の発言は除外
res_id = self.get_res_id(status)
res_status = self.cache.get(res_id, self.timeout)
if res_status:
colored_text = self.colored(self.fmt % self.status_to_dict(res_status), self.color)
e.Text = '%s%s%s' % (e.Text, self.prefix, colored_text)
except Exception, e:
e.Text += ' (%s)' % unicode(e)
def on_before_unload(self, sender, e):
CurrentSession.PreSendMessageTimelineStatus -= self.on_pre_send_message_timeline_status
# }}}
# instantiate {{{
cache = StatusCache(lru_interval=LRU_INTERVAL)
instance = DisplayReplyToStatus(cache, LRU_TIMEOUT, RES_PREFIX, RES_FORMAT, RES_COLOR)
# }}}