-
Notifications
You must be signed in to change notification settings - Fork 0
/
cronjobs.py
107 lines (92 loc) · 4.77 KB
/
cronjobs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_script import Manager,Command
from datetime import date, datetime, timedelta
from app import Condate,Tag,Phrase
import config
# import twitter
import random
from mastodon import Mastodon
app = Flask(__name__)
app.config.from_object('config')
manager = Manager(app)
db = SQLAlchemy(app)
class Social(Command):
"""
Social alerts!
Weekly and monthly notices of approaching convention dates
Weekly notices of approaching registration deadlines
"""
# def post_to_twitter(self, twitter_api, message):
# try:
# twitter_api.PostUpdate(message)
# return ''
# except:
# return "There was an error posting to Twitter.\n"
# return ''
def post_to_mastodon(self, mastodon_api, message):
try:
mastodon_api.toot(message)
return ''
except:
return "There was an error posting to Mastodon.\n"
def run(self):
# twitter_api = twitter.Api(consumer_key=config.TWITTER_CONSUMER_KEY, consumer_secret=config.TWITTER_CONSUMER_SECRET, access_token_key=config.TWITTER_ACCESS_TOKEN_KEY, access_token_secret=config.TWITTER_ACCESS_TOKEN_SECRET)
mastodon_api = Mastodon(access_token=config.MASTODON_ACCESS_TOKEN, api_base_url=config.MASTODON_BASE_URL)
# get indie_tag for filtering
indie_tag = Tag.query.filter(Tag.title == "Indie").first()
# get least used phrases pool
min_uses = db.session.query(db.func.min(Phrase.num_uses)).scalar()
phrases = Phrase.query.filter(Phrase.num_uses == min_uses).all()
output = "-----------------\nSocial cronjob: %s\n\n" % str(datetime.now())
# cons that are a week away
daily_notices = Condate.query.filter(Condate.start_date == date.today() + timedelta(days=7), Condate.published == True, Condate.cancelled == False).all()
output = output + "Condates happening in a week: \n"
for c in daily_notices:
if indie_tag in c.convention.tags:
phrase = random.choice(phrases)
# update phrase num_uses count & remove from pool
phrase.num_uses += 1
phrases.remove(phrase)
message = "%s %s is a week away. %s" % (phrase, c.convention.title, c.convention.url)
output = output + self.post_to_mastodon(mastodon_api, message)
# if c.convention.twitter:
# message = message + " @%s" % (c.convention.twitter)
# output = output + self.post_to_twitter(twitter_api, message)
output = output + "\n" + message
# cons that are a month away
monthly_notices = Condate.query.filter(Condate.start_date == date.today() + timedelta(days=30), Condate.published == True, Condate.cancelled == False).all()
output = output + "\nCondates happening in a month: \n"
for c in monthly_notices:
if indie_tag in c.convention.tags:
phrase = random.choice(phrases)
# update phrase num_uses count & remove from pool
phrase.num_uses += 1
phrases.remove(phrase)
message = "%s %s is a month away. %s" % (phrase, c.convention.title, c.convention.url)
output = output + self.post_to_mastodon(mastodon_api, message)
# if c.convention.twitter:
# message = message + " @%s" % (c.convention.twitter)
# output = output + self.post_to_twitter(twitter_api, message)
output = output + "\n" + message
# cons that have registrations closing in a week
weekly_registration_notices = Condate.query.filter(Condate.registration_closes == date.today() + timedelta(days=7), Condate.published == True, Condate.cancelled == False).all()
output = output + "\nCondates with registration closing in a week: \n"
for c in weekly_registration_notices:
if indie_tag in c.convention.tags:
phrase = random.choice(phrases)
# update phrase num_uses count & remove from pool
phrase.num_uses += 1
phrases.remove(phrase)
message = "%s %s registration closes in a week. %s" % (phrase, c.convention.title, c.convention.url)
output = output + self.post_to_mastodon(mastodon_api, message)
# if c.convention.twitter:
# message = message + " @%s" % (c.convention.twitter)
# output = output + self.post_to_twitter(twitter_api, message)
output = output + "\n" + message
# commit updated phrase num_uses counts
db.session.commit()
print(output)
manager.add_command('social', Social())
if __name__ == "__main__":
manager.run()