Skip to content

Commit

Permalink
Use EventStreams instead of RCStream to consume RecentChange events
Browse files Browse the repository at this point in the history
  • Loading branch information
ottomata committed Mar 28, 2017
1 parent c38ac73 commit a1ac695
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 22 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This is the Cocytus project for tracking citations on Wikipedia.

We are changing a __diff stream__ into a __citation delta__ stream.

+ we use the [recent changes stream](https://wikitech.wikimedia.org/wiki/RCStream)
+ we use the [RecentChange stream](https://www.mediawiki.org/wiki/Manual:RCFeed) published by [EventStreams](https://wikitech.wikimedia.org/wiki/EventStreams)
+ to make queue of diffs to be inspected
+ Keep a database table of the latest version we have seen so far
+ call the wikimedia api to fetch the diff text
Expand Down
35 changes: 14 additions & 21 deletions cocytus-input.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@
from redis import Redis
import compare_change
import crossref_push
import socketIO_client
from sseclient import SSEClient as EventSource
import json
import time
import signal
import logging
Expand All @@ -27,23 +28,15 @@ def alarm_handle(signal_number, current_stack_frame):
signal.siginterrupt(signal.SIGALRM, False)
signal.alarm(alarm_interval)

class WikiNamespace(socketIO_client.BaseNamespace):

def on_change(self, change):
logging.info(u"enqueing "+str(change))
while True:
try:
queue.enqueue(compare_change.get_changes, change)
break
except Exception as e:
logging.error(e.message)
time.sleep(1.0)

def on_connect(self):
self.emit(u"subscribe", u"*")


while True:
socketIO = socketIO_client.SocketIO(u'stream.wikimedia.org', 80)
socketIO.define(WikiNamespace, u'/rc')
socketIO.wait(HEARTBEAT_INTERVAL + 2) # 10 minutes, in prime seconds
for event in EventSource('https://stream.wikimedia.org/v2/stream/recentchange'):
try:
if event.event == 'message' and event.data:
change = json.loads(event.data)
logging.info(u"enqueing " + str(change))
queue.enqueue(compare_change.get_changes, change)
elif event.event == 'error':
logging.error(event.data)
time.sleep(1.0)
except Exception as e:
logging.error(e.message)
time.sleep(1.0)

0 comments on commit a1ac695

Please sign in to comment.