diff --git a/mongo_orchestration/launch.py b/mongo_orchestration/launch.py index 2b9630b..c9af53a 100644 --- a/mongo_orchestration/launch.py +++ b/mongo_orchestration/launch.py @@ -12,322 +12,50 @@ # See the License for the specific language governing permissions and # limitations under the License. -import atexit -import copy -import itertools -import time +import argparse import os -import sys -import pymongo -import requests - -# Configurable hosts and ports used in the tests -db_user = str(os.environ.get("DB_USER", "")) -db_password = str(os.environ.get("DB_PASSWORD", "")) - -# Document count for stress tests -STRESS_COUNT = 100 - -# Test namespace, timestamp arguments -TESTARGS = ('test.test', 1) - -_mo_address = os.environ.get("MO_ADDRESS", "localhost:8889") -_mongo_start_port = int(os.environ.get("MONGO_PORT", 27017)) -_free_port = itertools.count(_mongo_start_port) - -DEFAULT_OPTIONS = { - 'logappend': True, - 'ipv6': True, - 'bind_ip': '127.0.0.1,::1', - # 'storageEngine': 'mmapv1', - # 'networkMessageCompressors': 'disabled', - # 'vvvvv': '', - 'setParameter': {'enableTestCommands': 1}, # 'logicalSessionRefreshMillis': 1000000}, -} - - -_post_request_template = {} -if db_user and db_password: - _post_request_template = {'login': db_user, 'password': db_password} - - -def _mo_url(resource, *args): - return 'http://' + '/'.join([_mo_address, resource] + list(args)) - - -@atexit.register -def kill_all(): - try: - clusters = requests.get(_mo_url('sharded_clusters')).json() - except requests.ConnectionError as e: - return - repl_sets = requests.get(_mo_url('replica_sets')).json() - servers = requests.get(_mo_url('servers')).json() - for cluster in clusters['sharded_clusters']: - requests.delete(_mo_url('sharded_clusters', cluster['id'])) - for rs in repl_sets['replica_sets']: - requests.delete(_mo_url('relica_sets', rs['id'])) - for server in servers['servers']: - requests.delete(_mo_url('servers', server['id'])) - - -class MCTestObject(object): - - def proc_params(self): - params = copy.deepcopy(DEFAULT_OPTIONS) - params.update(self._proc_params) - params["port"] = next(_free_port) - return params - - def get_config(self): - raise NotImplementedError - - def _make_post_request(self): - config = _post_request_template.copy() - config.update(self.get_config()) - import pprint - pprint.pprint(config) - ret = requests.post( - _mo_url(self._resource), timeout=None, json=config)#.json() - - if not ret.ok: - raise RuntimeError( - "Error sending POST to cluster: %s" % (ret.text,)) - - ret = ret.json() - if type(ret) == list: # Will return a list if an error occurred. - raise RuntimeError("Error sending POST to cluster: %s" % (ret,)) - pprint.pprint(ret) - return ret - - def _make_get_request(self): - ret = requests.get(_mo_url(self._resource, self.id), timeout=None) - - if not ret.ok: - raise RuntimeError( - "Error sending GET to cluster: %s" % (ret.text,)) - - ret = ret.json() - if type(ret) == list: # Will return a list if an error occurred. - raise RuntimeError("Error sending GET to cluster: %s" % (ret,)) - return ret - - def client(self, **kwargs): - kwargs = kwargs.copy() - if db_user: - kwargs['username'] = db_user - kwargs['password'] = db_password - client = pymongo.MongoClient(self.uri, **kwargs) - return client - - def stop(self): - requests.delete(_mo_url(self._resource, self.id)) - - -class Server(MCTestObject): - - _resource = 'servers' - - def __init__(self, id=None, uri=None, **kwargs): - self.id = id - self.uri = uri - self._proc_params = kwargs - - def get_config(self): - return { - 'name': 'mongod', - 'procParams': self.proc_params()} - - def start(self): - if self.id is None: - try: - response = self._make_post_request() - except requests.ConnectionError as e: - print('Please start mongo-orchestration!') - sys.exit(1) - self.id = response['id'] - self.uri = response.get('mongodb_auth_uri', - response['mongodb_uri']) - else: - requests.post( - _mo_url('servers', self.id), timeout=None, - json={'action': 'start'} - ) - return self - - def stop(self, destroy=True): - if destroy: - super(Server, self).stop() - else: - requests.post(_mo_url('servers', self.id), timeout=None, - json={'action': 'stop'}) - - -class ReplicaSet(MCTestObject): - - _resource = 'replica_sets' - - def __init__(self, id=None, uri=None, primary=None, secondary=None, - single=False, **kwargs): - self.single = single - self.id = id - self.uri = uri - self.primary = primary - self.secondary = secondary - self._proc_params = kwargs - self.members = [] - - def proc_params(self): - params = super(ReplicaSet, self).proc_params() - # params.setdefault('setParameter', {}).setdefault('transactionLifetimeLimitSeconds', 3) - # params.setdefault('setParameter', {}).setdefault('periodicNoopIntervalSecs', 1) - return params - - def get_config(self): - members = [{'procParams': self.proc_params()}] - if not self.single: - members.extend([ - {'procParams': self.proc_params()}, - {#'rsParams': {'arbiterOnly': True}, - 'procParams': self.proc_params()} - ]) - return {'members': members} - - def _init_from_response(self, response): - self.id = response['id'] - self.uri = response.get('mongodb_auth_uri', response['mongodb_uri']) - for member in response['members']: - m = Server(member['server_id'], member['host']) - self.members.append(m) - if member['state'] == 1: - self.primary = m - elif member['state'] == 2: - self.secondary = m - return self - - def start(self): - # We never need to restart a replica set, only start new ones. - return self._init_from_response(self._make_post_request()) - - def restart_primary(self): - self.primary.stop(destroy=False) - time.sleep(5) - self.primary.start() - time.sleep(1) - self._init_from_response(self._make_get_request()) - print('New primary: %s' % self.primary.uri) - - -class ReplicaSetSingle(ReplicaSet): - - def get_config(self): - return { - 'members': [ - {'procParams': self.proc_params()} - ] - } - - -class ShardedCluster(MCTestObject): - - _resource = 'sharded_clusters' - _shard_type = ReplicaSet - - def __init__(self, **kwargs): - self.id = None - self.uri = None - self.shards = [] - self._proc_params = kwargs - - def get_config(self): - return { - # 'configsvrs': [{'members': [DEFAULT_OPTIONS.copy()]}], - 'routers': [self.proc_params(), self.proc_params()], - 'shards': [ - {'id': 'demo-set-0', 'shardParams': - self._shard_type().get_config()}, - # {'id': 'demo-set-1', 'shardParams': - # self._shard_type().get_config()} - ] - } - - def start(self): - # We never need to restart a sharded cluster, only start new ones. - response = self._make_post_request() - for shard in response['shards']: - shard_resp = requests.get(_mo_url('replica_sets', shard['_id'])) - shard_json = shard_resp.json() - self.shards.append(self._shard_type()._init_from_response(shard_json)) - self.id = response['id'] - self.uri = response.get('mongodb_auth_uri', response['mongodb_uri']) - return self - - -class ShardedClusterSingle(ShardedCluster): - _shard_type = ReplicaSetSingle - - -def argv_has(string): - return any(string in arg for arg in sys.argv[1:]) - - -DEFAULT_CERTS = os.path.join( - os.environ.get( - 'MONGO_ORCHESTRATION_HOME', os.path.dirname(__file__)), - 'lib' +from launch_utils import ( + SERVER_TYPES, + SERVER_VERSION, + POST_REQUEST_TEMPLATE, + CERTS, + DB_PASSWORD, + DB_USER, ) -CERTS = os.environ.get('MONGO_ORCHESTRATION_CERTS', DEFAULT_CERTS) def main(): - for arg in sys.argv[1:]: - try: - port = int(arg) - _free_port = itertools.count(port) - except: - pass - for version in ['3.6', '4.0', '4.2', '4.4', '5.0', '6.0', '7.0', 'latest']: - if argv_has(version): - _post_request_template['version'] = version - break - - if argv_has('ssl') or argv_has('tls'): - _post_request_template['sslParams'] = { + parser = argparse.ArgumentParser(description="mongo-launch script") + parser.add_argument("server_type", action="store", choices=SERVER_TYPES.keys()) + parser.add_argument( + "-v", + "--server-version", + action="store", + type=str, + choices=SERVER_VERSION, + default=None, + ) + parser.add_argument("-t", "--use-tls", action="store_true") + parser.add_argument("-a", "--auth", action="store_true") + + cli_args = parser.parse_args() + + if cli_args.server_version: + POST_REQUEST_TEMPLATE["version"] = cli_args.server_version + if cli_args.use_tls: + POST_REQUEST_TEMPLATE["sslParams"] = { "sslOnNormalPorts": True, "sslPEMKeyFile": os.path.join(CERTS, "server.pem"), "sslCAFile": os.path.join(CERTS, "ca.pem"), - "sslWeakCertificateValidation" : True + "sslWeakCertificateValidation": True, } - if argv_has('auth'): - _post_request_template['login'] = db_user or 'user' - _post_request_template['password'] = db_password or 'password' - _post_request_template['auth_key'] = 'secret' - - single = argv_has('single') or argv_has('standalone') or argv_has('mongod') - msg = 'Type "q" to quit: ' - if argv_has('repl'): - # DEFAULT_OPTIONS['enableMajorityReadConcern'] = '' - cluster = ReplicaSet(single=single) - msg = 'Type "q" to quit, "r" to shutdown and restart the primary": ' - elif argv_has('shard') or argv_has('mongos'): - cluster = ShardedClusterSingle() - elif single: - cluster = Server() - else: - exit('Usage: %s [single|replica|shard] [ssl] [auth]' % (__file__,)) - - cluster.start() + if cli_args.auth: + POST_REQUEST_TEMPLATE["login"] = DB_USER or "user" + POST_REQUEST_TEMPLATE["password"] = DB_PASSWORD or "password" + POST_REQUEST_TEMPLATE["auth_key"] = "secret" - try: - while True: - data = input(msg) - if data == 'q': - break - if data == 'r' and argv_has('repl'): - cluster.restart_primary() - finally: - cluster.stop() + SERVER_TYPES[cli_args.server_type].run() # Requires mongo-orchestration running on port 8889. @@ -337,17 +65,17 @@ def main(): # # Examples (standalone node): # mongo-launch single -# mongo-launch single auth -# mongo-launch single auth ssl +# mongo-launch single --auth +# mongo-launch single --auth --use-tls # # Sharded clusters: # mongo-launch shard -# mongo-launch shard auth -# mongo-launch shard auth ssl +# mongo-launch shard-single --auth +# mongo-launch shard-single --auth --use-tls # # Replica sets: -# mongo-launch repl -# mongo-launch repl single -# mongo-launch repl single auth -if __name__ == '__main__': - main() \ No newline at end of file +# mongo-launch replica +# mongo-launch replica-single +# mongo-launch replica-single --auth +if __name__ == "__main__": + main() diff --git a/mongo_orchestration/launch_utils.py b/mongo_orchestration/launch_utils.py new file mode 100644 index 0000000..95b767c --- /dev/null +++ b/mongo_orchestration/launch_utils.py @@ -0,0 +1,265 @@ +import atexit +import copy +import itertools +import time +import os + +import pymongo +import requests + +# Configurable hosts and ports used in the tests +DB_USER = str(os.environ.get("DB_USER", "")) +DB_PASSWORD = str(os.environ.get("DB_PASSWORD", "")) + +_MO_ADDRESS = os.environ.get("MO_ADDRESS", "localhost:8889") +_MONGO_START_PORT = int(os.environ.get("MONGO_PORT", 27017)) +_FREE_PORT = itertools.count(_MONGO_START_PORT) + +DEFAULT_OPTIONS = { + "logappend": True, + "ipv6": True, + "bind_ip": "127.0.0.1,::1", + "setParameter": {"enableTestCommands": 1}, +} + +DEFAULT_CERTS = os.path.join( + os.environ.get("MONGO_ORCHESTRATION_HOME", os.path.dirname(__file__)), "lib" +) +CERTS = os.environ.get("MONGO_ORCHESTRATION_CERTS", DEFAULT_CERTS) + +SERVER_VERSION = ["3.6", "4.0", "4.2", "4.4", "5.0", "6.0", "7.0", "latest"] + + +# TODO: Figure out a better way to pass around template. Needs to be fixed +POST_REQUEST_TEMPLATE = {} +if DB_USER and DB_PASSWORD: + POST_REQUEST_TEMPLATE = {"login": DB_USER, "password": DB_PASSWORD} + + +def _mo_url(resource, *args): + return "http://" + "/".join([_MO_ADDRESS, resource] + list(args)) + + +@atexit.register +def kill_all(): + try: + clusters = requests.get(_mo_url("sharded_clusters")).json() + except requests.ConnectionError: + return + repl_sets = requests.get(_mo_url("replica_sets")).json() + servers = requests.get(_mo_url("servers")).json() + for cluster in clusters["sharded_clusters"]: + requests.delete(_mo_url("sharded_clusters", cluster["id"])) + for rs in repl_sets["replica_sets"]: + requests.delete(_mo_url("replica_sets", rs["id"])) + for server in servers["servers"]: + requests.delete(_mo_url("servers", server["id"])) + + +class MCTestObject(object): + def proc_params(self): + params = copy.deepcopy(DEFAULT_OPTIONS) + params.update(self._proc_params) + params["port"] = next(_FREE_PORT) + return params + + def get_config(self): + raise NotImplementedError + + def _make_post_request(self): + config = POST_REQUEST_TEMPLATE.copy() + config.update(self.get_config()) + import pprint + + pprint.pprint(config) + ret = requests.post(_mo_url(self._resource), timeout=None, json=config) + + if not ret.ok: + raise RuntimeError("Error sending POST to cluster: %s" % (ret.text,)) + + ret = ret.json() + if isinstance(ret, list): # Will return a list if an error occurred. + raise RuntimeError("Error sending POST to cluster: %s" % (ret,)) + pprint.pprint(ret) + return ret + + def _make_get_request(self): + ret = requests.get(_mo_url(self._resource, self.id), timeout=None) + + if not ret.ok: + raise RuntimeError("Error sending GET to cluster: %s" % (ret.text,)) + + ret = ret.json() + if isinstance(ret, list): # Will return a list if an error occurred. + raise RuntimeError("Error sending GET to cluster: %s" % (ret,)) + return ret + + def client(self, **kwargs): + kwargs = kwargs.copy() + if DB_USER: + kwargs["username"] = DB_USER + kwargs["password"] = DB_PASSWORD + client = pymongo.MongoClient(self.uri, **kwargs) + return client + + def start(self): + raise NotImplementedError + + def stop(self): + requests.delete(_mo_url(self._resource, self.id)) + + def __enter__(self): + try: + self.start() + except requests.ConnectionError as e: + raise ConnectionError( + "Please check if mongo-orchestration is running" + ) from e + return self + + def __exit__(self, *args, **kwargs): + self.stop() + + @classmethod + def run(cls): + with cls() as cluster: + while True: + data = input(cluster.cli_msg) + if data == "q": + break + if data == "r" and isinstance(cluster, ReplicaSet): + cluster.restart_primary() + + +class Server(MCTestObject): + _resource = "servers" + cli_msg = 'Type "q" to quit: ' + + def __init__(self, id=None, uri=None, **kwargs): + self.id = id + self.uri = uri + self._proc_params = kwargs + + def get_config(self): + return {"name": "mongod", "procParams": self.proc_params()} + + def start(self): + if self.id is None: + response = self._make_post_request() + self.id = response["id"] + self.uri = response.get("mongodb_auth_uri", response["mongodb_uri"]) + else: + requests.post( + _mo_url("servers", self.id), timeout=None, json={"action": "start"} + ) + return self + + def stop(self, destroy=True): + if destroy: + super(Server, self).stop() + else: + requests.post( + _mo_url("servers", self.id), timeout=None, json={"action": "stop"} + ) + + +class ReplicaSet(MCTestObject): + _resource = "replica_sets" + cli_msg = 'Type "q" to quit, "r" to shutdown and restart the primary": ' + + def __init__( + self, id=None, uri=None, primary=None, secondary=None, single=False, **kwargs + ): + self.single = single + self.id = id + self.uri = uri + self.primary = primary + self.secondary = secondary + self._proc_params = kwargs + self.members = [] + + def proc_params(self): + return super(ReplicaSet, self).proc_params() + + def get_config(self): + members = [{"procParams": self.proc_params()}] + if not self.single: + members.extend( + [ + {"procParams": self.proc_params()}, + {"procParams": self.proc_params()}, + ] + ) + return {"members": members} + + def _init_from_response(self, response): + self.id = response["id"] + self.uri = response.get("mongodb_auth_uri", response["mongodb_uri"]) + for member in response["members"]: + m = Server(member["server_id"], member["host"]) + self.members.append(m) + if member["state"] == 1: + self.primary = m + elif member["state"] == 2: + self.secondary = m + return self + + def start(self): + # We never need to restart a replica set, only start new ones. + return self._init_from_response(self._make_post_request()) + + def restart_primary(self): + self.primary.stop(destroy=False) + time.sleep(5) + self.primary.start() + time.sleep(1) + self._init_from_response(self._make_get_request()) + print("New primary: %s" % self.primary.uri) + + +class ReplicaSetSingle(ReplicaSet): + def __init__(self): + super(ReplicaSetSingle, self).__init__(single=True) + + +class ShardedCluster(MCTestObject): + _resource = "sharded_clusters" + _shard_type = ReplicaSet + + def __init__(self, **kwargs): + self.id = None + self.uri = None + self.shards = [] + self._proc_params = kwargs + + def get_config(self): + return { + "routers": [self.proc_params(), self.proc_params()], + "shards": [ + {"id": "demo-set-0", "shardParams": self._shard_type().get_config()}, + ], + } + + def start(self): + # We never need to restart a sharded cluster, only start new ones. + response = self._make_post_request() + for shard in response["shards"]: + shard_resp = requests.get(_mo_url("replica_sets", shard["_id"])) + shard_json = shard_resp.json() + self.shards.append(self._shard_type()._init_from_response(shard_json)) + self.id = response["id"] + self.uri = response.get("mongodb_auth_uri", response["mongodb_uri"]) + return self + + +class ShardedClusterSingle(ShardedCluster): + _shard_type = ReplicaSetSingle + + +SERVER_TYPES = { + "single": Server, + "replica-single": ReplicaSetSingle, + "replica": ReplicaSet, + "shard": ShardedCluster, + "shard-single": ShardedClusterSingle, +}