-
Notifications
You must be signed in to change notification settings - Fork 354
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
259 changed files
with
26,868 additions
and
664 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,201 @@ | ||
#!/usr/bin/env python | ||
from gevent import monkey | ||
monkey.patch_all() | ||
|
||
import click | ||
import json | ||
import operator | ||
import random | ||
import time | ||
|
||
from munkres import Munkres, make_cost_matrix | ||
from nylas.logging import get_logger, configure_logging | ||
configure_logging() | ||
log = get_logger() | ||
|
||
from inbox.config import config | ||
from inbox.scheduling.deferred_migration import (DeferredAccountMigration, | ||
DeferredAccountMigrationExecutor) | ||
from inbox.models.session import global_session_scope | ||
from inbox.models.account import Account | ||
from inbox.util import fleet | ||
from inbox.models.session import session_scope | ||
|
||
# How long we should take to migrate all accounts (in seconds). | ||
ACCOUNT_MIGRATION_TIMESPAN = 15 * 60 # 15 minutes | ||
|
||
|
||
def actual_hostname(hostname): | ||
# A little hack for running the rebalance script locally in a dev VM | ||
if hostname == 'localhost': | ||
return 'precise64' | ||
return hostname | ||
|
||
|
||
def jitter_for_deadline(timespan): | ||
min_delay = 10 | ||
max_delay = timespan | ||
return (random.random() * (max_delay - min_delay)) + min_delay | ||
|
||
|
||
def is_account_on_debug_host(account_id, debug_hosts): | ||
with session_scope(account_id) as db_session: | ||
sync_host = db_session.query(Account.sync_host).get(account_id) | ||
return sync_host in debug_hosts | ||
|
||
|
||
def partition_accounts(load_per_account, num_buckets): | ||
# Partition equitably in n-buckets. | ||
# http://stackoverflow.com/a/6670011 | ||
sorted_loads = sorted(load_per_account.items(), key=operator.itemgetter(1), reverse=True) | ||
buckets = [[] for i in range(num_buckets)] | ||
bucket_totals = [0.0 for i in range(num_buckets)] | ||
|
||
i = 0 | ||
for account_id, load in sorted_loads[0:num_buckets]: | ||
buckets[i].append(account_id) | ||
bucket_totals[i] += load | ||
i += 1 | ||
|
||
for account, load in sorted_loads[num_buckets:]: | ||
# Find the less loaded bucket: | ||
i = bucket_totals.index(min(bucket_totals)) | ||
buckets[i].append(account) | ||
bucket_totals[i] += load | ||
return buckets | ||
|
||
|
||
def get_account_hosts(): | ||
with global_session_scope() as db_session: | ||
return dict((str(id_), host) for id_, host in | ||
db_session.query(Account.id, Account.sync_host). | ||
filter(Account.sync_should_run)) | ||
|
||
|
||
def do_minimize_migrations(hosts, buckets, should_optimize=True): | ||
# Our task is to find a bipartite matching between buckets and hosts that | ||
# maximizes the number of Accounts that are already assigned to the correct | ||
# sync host. To do this we use the Hungarian algorithm which computes a | ||
# bipartite matching between n workers and n tasks such that the overall | ||
# cost is minimized (see https://en.wikipedia.org/wiki/Hungarian_algorithm). | ||
# Luckily there's a python library (munkres) that implements this algorithm | ||
# for us :-) Since this algorithm minimizes cost we must first build our | ||
# profit matrix and then convert it into a cost matrix. | ||
account_hosts = get_account_hosts() | ||
profit_matrix = [] | ||
max_num_present = 0 | ||
sync_procs = [] | ||
for host in hosts: | ||
for i in range(host['num_procs']): | ||
sync_procs.append('{}:{}'.format(actual_hostname(host['name']), i)) | ||
|
||
# Construct the profit matrix. Each row corresponds to a bucket and each | ||
# column within that row corresponds to the number of items in that bucket | ||
# that are currently assigned to the corresponding sync host. | ||
for bucket in buckets: | ||
row = [] | ||
for proc_id in sync_procs: | ||
num_present = 0 | ||
for account_id in bucket: | ||
if account_hosts.get(account_id) == proc_id: | ||
num_present += 1 | ||
# We add 1 because the munkres library can't really handle matrices | ||
# with 0 values :-/ This won't change the ultimate answer, however. | ||
num_present += 1 | ||
row.append(num_present) | ||
max_num_present = max(num_present, max_num_present) | ||
profit_matrix.append(row) | ||
|
||
indexes = None | ||
if should_optimize: | ||
# We add 1 because the munkres library can't really handle matrices | ||
# with 0 values :-/ This won't change the ultimate answer, however. | ||
max_num_present += 1 | ||
cost_matrix = make_cost_matrix(profit_matrix, lambda cost: max_num_present - cost) | ||
|
||
m = Munkres() | ||
indexes = m.compute(cost_matrix) | ||
else: | ||
indexes = [(i, i) for i in range(len(sync_procs))] | ||
|
||
# Now that we have the optimal solution we need to reorder the original | ||
# buckets to match to their corresponding hosts based on the results. | ||
result_buckets = [None for _ in indexes] | ||
total_profit = 0 | ||
total_accounts = 0 | ||
for row, column in indexes: | ||
total_profit += profit_matrix[row][column] - 1 | ||
result_buckets[column] = buckets[row] | ||
total_accounts += len(buckets[row]) | ||
log.info("Accounts already on the correct hosts:", | ||
correct_accounts=total_profit, | ||
total_accounts=total_accounts, | ||
correct_percent=float(total_profit) / float(total_accounts) * 100.0) | ||
return result_buckets | ||
|
||
|
||
def migrate_accounts(zone, hosts, buckets, timespan): | ||
start_time = time.time() | ||
executor = DeferredAccountMigrationExecutor() # Just for its Redis thingy | ||
|
||
bucket_idx = 0 | ||
for host_idx, host in enumerate(hosts): | ||
host['name'] = actual_hostname(host['name']) | ||
|
||
for process_idx in range(host['num_procs']): | ||
instance = '{}:{}'.format(host['name'], process_idx) | ||
bucket = buckets[bucket_idx] | ||
bucket_idx += 1 | ||
|
||
for account_id in bucket: | ||
delay = jitter_for_deadline(timespan) | ||
deadline = start_time + delay | ||
log.info("Sync load balancer migrating Account", | ||
zone=zone, | ||
account_id=account_id, | ||
host=instance, | ||
delay=delay) | ||
dam = DeferredAccountMigration(deadline, account_id, instance) | ||
dam.save(executor.redis) | ||
|
||
|
||
def balance_zone(zone, normal_hosts, debug_hosts, account_loads, timespan, minimize_migrations, dry_run): | ||
num_buckets = sum([host['num_procs'] for host in normal_hosts]) | ||
account_loads = {account_id: load for account_id, load in account_loads.items() | ||
if not is_account_on_debug_host(account_id, debug_hosts)} | ||
buckets = partition_accounts(account_loads, num_buckets) | ||
buckets = do_minimize_migrations(normal_hosts, buckets, minimize_migrations) | ||
if dry_run: | ||
print "Would reassign accounts in zone {} like this:".format(zone) | ||
for bucket in buckets: | ||
bucket_load = 0 | ||
for account_id in bucket: | ||
bucket_load += account_loads[account_id] | ||
print "\t{}: {}".format(bucket_load, bucket) | ||
return | ||
migrate_accounts(zone, normal_hosts, buckets, timespan) | ||
|
||
|
||
@click.command() | ||
@click.option('--level', default='staging') | ||
@click.option('--dry-run', is_flag=True, default=False) | ||
@click.option('--timespan', default=ACCOUNT_MIGRATION_TIMESPAN) | ||
@click.option('--minimize-migrations/--no-minimize-migrations', default=True) | ||
@click.argument('account-loads') | ||
def main(dry_run, level, timespan, minimize_migrations, account_loads): | ||
zones = {h.get('ZONE') for h in config['DATABASE_HOSTS']} | ||
load_per_account = {} | ||
with open(account_loads) as f: | ||
load_per_account = json.load(f) | ||
for zone in zones: | ||
loads = load_per_account.get(zone) | ||
if loads is None: | ||
loads = load_per_account['null'] | ||
hosts = fleet.get_sync_hosts_in_zone(zone, level) | ||
normal_hosts = [h for h in hosts if not h['debug']] | ||
debug_hosts = set(h for h in hosts if h['debug']) | ||
balance_zone(zone, normal_hosts, debug_hosts, loads, timespan, minimize_migrations, dry_run) | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
#!/usr/bin/env python | ||
# Check that we can fetch attachments for 99.9% of our syncing accounts. | ||
from gevent import monkey | ||
monkey.patch_all() | ||
|
||
import datetime | ||
import click | ||
import gevent | ||
from gevent.pool import Pool | ||
import traceback | ||
|
||
from collections import defaultdict | ||
from inbox.models import Account, Block | ||
from inbox.crispin import connection_pool | ||
from inbox.s3.base import get_raw_from_provider | ||
from inbox.s3.exc import EmailFetchException, TemporaryEmailFetchException | ||
from sqlalchemy.sql.expression import func | ||
|
||
from nylas.logging import get_logger, configure_logging | ||
from inbox.models.backends.generic import GenericAccount | ||
from inbox.models.session import (session_scope, global_session_scope, | ||
session_scope_by_shard_id) | ||
|
||
configure_logging() | ||
log = get_logger(purpose='separator-backfix') | ||
|
||
NUM_MESSAGES = 10 | ||
|
||
|
||
def process_account(account_id): | ||
ret = defaultdict(int) | ||
|
||
try: | ||
with session_scope(account_id) as db_session: | ||
acc = db_session.query(Account).get(account_id) | ||
db_session.expunge(acc) | ||
|
||
one_month_ago = datetime.datetime.utcnow() - datetime.timedelta(days=30) | ||
|
||
for i in range(NUM_MESSAGES): | ||
with session_scope(account_id) as db_session: | ||
block = db_session.query(Block).filter( | ||
Block.namespace_id == acc.namespace.id, | ||
Block.created_at < one_month_ago).order_by( | ||
func.rand()).limit(1).first() | ||
|
||
if block is None: | ||
continue | ||
|
||
if len(block.parts) == 0: | ||
ret['null_failures'] += 1 | ||
continue | ||
|
||
message = block.parts[0].message | ||
raw_mime = get_raw_from_provider(message) | ||
|
||
if raw_mime != '': | ||
ret['successes'] += 1 | ||
else: | ||
ret['null_failures'] += 1 | ||
except Exception as e: | ||
ret[type(e).__name__] += 1 | ||
|
||
return ret | ||
|
||
|
||
@click.command() | ||
@click.option('--num-accounts', type=int, default=1500) | ||
def main(num_accounts): | ||
with global_session_scope() as db_session: | ||
accounts = db_session.query(Account).filter( | ||
Account.sync_should_run == True).order_by(func.rand()).limit( | ||
num_accounts).all() | ||
|
||
accounts = [acc.id for acc in accounts][:num_accounts] | ||
db_session.expunge_all() | ||
|
||
pool = Pool(size=100) | ||
results = pool.map(process_account, accounts) | ||
|
||
global_results = dict() | ||
for ret in results: | ||
for key in ret: | ||
if key not in global_results: | ||
global_results[key] = 0 | ||
|
||
global_results[key] += ret[key] | ||
|
||
print global_results | ||
|
||
if __name__ == '__main__': | ||
main() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
#!/bin/bash | ||
set -e | ||
|
||
mysql -uroot -proot -e "GRANT ALL PRIVILEGES ON test.* TO inboxtest@localhost IDENTIFIED BY 'inboxtest'" | ||
mysql -uroot -proot -e "GRANT ALL PRIVILEGES ON test_1.* TO inboxtest@localhost IDENTIFIED BY 'inboxtest'" | ||
mysql -uroot -proot -e "GRANT ALL PRIVILEGES ON synctest.* TO inboxtest@localhost IDENTIFIED BY 'inboxtest'" | ||
mysql -uroot -proot -e "GRANT ALL PRIVILEGES ON synctest_1.* TO inboxtest@localhost IDENTIFIED BY 'inboxtest'" | ||
mysql -uroot -proot -e 'GRANT ALL PRIVILEGES ON `test%`.* TO inboxtest@localhost IDENTIFIED BY "inboxtest"' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
#!/usr/bin/env python | ||
"""Watches a redis priority queue for deferred account migrations to execute.""" | ||
|
||
import gevent.monkey | ||
gevent.monkey.patch_all() | ||
|
||
from setproctitle import setproctitle | ||
from inbox.scheduling.deferred_migration import DeferredAccountMigrationExecutor | ||
from nylas.logging import configure_logging | ||
configure_logging() | ||
|
||
|
||
def main(): | ||
setproctitle('deferred-migration-service') | ||
print "Starting DeferredAccountMigrationExecutor..." | ||
dame = DeferredAccountMigrationExecutor() | ||
dame.start() | ||
print "Done" | ||
dame.join() | ||
|
||
|
||
if __name__ == '__main__': | ||
main() |
Oops, something went wrong.