Skip to content

Commit

Permalink
Merge pull request #2 from ReFirmLabs/1-implement-user-admin-into-cli
Browse files Browse the repository at this point in the history
1 implement user admin into cli
  • Loading branch information
eacmen authored Oct 24, 2019
2 parents 763bdff + ec8f2c8 commit ada51f7
Show file tree
Hide file tree
Showing 4 changed files with 268 additions and 24 deletions.
2 changes: 1 addition & 1 deletion centrifuge_cli/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = '0.1.3'
__version__ = '0.2.0'
249 changes: 228 additions & 21 deletions centrifuge_cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
import uuid
import click
import requests
import dateparser
from datetime import datetime
from collections import MutableMapping
from itertools import chain, starmap
from urllib.parse import urlparse, urlunparse

import pandas as pd
import numpy as np
Expand All @@ -33,14 +36,28 @@ def flatten(d, parent_key='', sep='.'):
class Cli(object):

def __init__(self, endpoint, apikey, limit, outfmt, fields):
self.endpoint = endpoint
self.apikey = apikey
self.limit = limit
self.outfmt = outfmt
self.fields = fields

def do_GET(self, uri):
res = requests.get(f'{self.endpoint}{uri}&limit={self.limit}&authtoken={self.apikey}')
url = urlparse(endpoint)
self.endpoint_scheme = url.scheme
self.endpoint_netloc = url.netloc

def build_url(self, path, query_list):
default_query = [f'limit={self.limit}',
f'authtoken={self.apikey}']
if query_list is not None:
default_query.extend(query_list)

query = '&'.join(default_query)
return urlunparse((self.endpoint_scheme, self.endpoint_netloc, path, None, query, None))

def do_GET(self, path, query_list=None):
url = self.build_url(path, query_list)

res = requests.get(url)
res.raise_for_status()

if self.outfmt == 'json':
Expand All @@ -64,16 +81,27 @@ def do_GET(self, uri):

return df

def do_POST(self, uri, data, files=None):
res = requests.post(f'{self.endpoint}{uri}&limit={self.limit}&authtoken={self.apikey}', data=data, files=files)
def do_POST(self, path, data, files=None, query_list=None):
url = self.build_url(path, query_list)

res = requests.post(url, data=data, files=files)
res.raise_for_status()
return res

def do_DELETE(self, uri):
res = requests.delete(f'{self.endpoint}{uri}&authtoken={self.apikey}')
def do_PUT(self, path, data, query_list=None):
url = self.build_url(path, query_list)

res = requests.put(url, data=data)
res.raise_for_status()
return res

def do_DELETE(self, path, query_list=None):
url = self.build_url(path, query_list)

res = requests.delete(url)
res.raise_for_status()

if res.status_code is not 204:
if res.status_code not in (200, 204):
return('Error occurred, could not delete')
else:
return('Deleted')
Expand Down Expand Up @@ -105,14 +133,19 @@ def reports(cli):
@reports.command(name="list")
@pass_cli
def list_command(cli):
click.echo(cli.do_GET('/api/upload?sorters[0][field]=id&sorters[0][dir]=desc'))
click.echo(cli.do_GET('/api/upload', query_list=['sorters[0][field]=id',
'sorters[0][dir]=desc']))


@reports.command()
@click.argument('searchterm', required=True)
@pass_cli
def search(cli, searchterm):
click.echo(cli.do_GET(f'/api/upload?sorters[0][field]=id&sorters[0][dir]=desc&filters[0][field]=search&filters[0][type]=like&filters[0][value]={searchterm}'))
click.echo(cli.do_GET('/api/upload', query_list=['sorters[0][field]=id',
'sorters[0][dir]=desc',
'filters[0][field]=search',
'filters[0][type]=like',
f'filters[0][value]={searchterm}']))


@cli.group()
Expand All @@ -125,59 +158,66 @@ def report(cli, ufid):
@report.command()
@pass_cli
def delete(cli):
click.echo(cli.do_DELETE(f'/api/upload?ufid={cli.ufid}'))
click.echo(cli.do_DELETE('/api/upload', query_list=[f'ufid={cli.ufid}', ]))


@report.command()
@pass_cli
def info(cli):
click.echo(cli.do_GET(f'/api/upload/details/{cli.ufid}?'))
click.echo(cli.do_GET(f'/api/upload/details/{cli.ufid}'))


@report.command()
@pass_cli
def crypto(cli):
click.echo(cli.do_GET(f'/api/report/crypto/{cli.ufid}?sorters[0][field]=path&sorters[0][dir]=asc'))
click.echo(cli.do_GET(f'/api/report/crypto/{cli.ufid}', query_list=['sorters[0][field]=path',
'sorters[0][dir]=asc']))


@report.command()
@pass_cli
def passhash(cli):
click.echo(cli.do_GET(f'/api/report/passwordhash/{cli.ufid}?'))
click.echo(cli.do_GET(f'/api/report/passwordhash/{cli.ufid}'))


@report.command()
@pass_cli
def guardian(cli):
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/analyzer-results?affected=true&sorters[0][field]=name&sorters[0][dir]=asc'))
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/analyzer-results', query_list=['affected=true&sorters[0][field]=name',
'sorters[0][dir]=asc']))


@report.command()
@pass_cli
def sbom(cli):
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/components/pathmatches?'))
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/components/pathmatches'))


@report.command(name='code-summary')
@pass_cli
def code_summary(cli):
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/vulnerable-files?sorters[0][field]=totalFlaws&sorters[0][dir]=desc'))
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/vulnerable-files', query_list=['sorters[0][field]=totalFlaws',
'sorters[0][dir]=desc']))


@report.command(name='code-static')
@click.option('--exid', required=True, metavar='EXID', help='Extraction ID from code-summary output')
@click.option('--path', required=True, metavar='PATH', help='File path that you want to get analysis results for')
@pass_cli
def code_static(cli, exid, path):
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/vulnerable-files/{exid}?path={path}&sorters[0][field]=offset&sorters[0][dir]=asc'))
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/vulnerable-files/{exid}', query_list=[f'path={path}',
'sorters[0][field]=offset',
'sorters[0][dir]=asc']))


@report.command(name='code-emulated')
@click.option('--exid', required=True, metavar='EXID', help='Extraction ID from code-summary output')
@click.option('--path', required=True, metavar='PATH', help='File path that you want to get analysis results for')
@pass_cli
def code_emulated(cli, exid, path):
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/emulated-files/{exid}?path={path}&sorters[0][field]=id&sorters[0][dir]=asc'))
click.echo(cli.do_GET(f'/api/report/{cli.ufid}/emulated-files/{exid}', query_list=[f'path={path}',
'sorters[0][field]=id',
'sorters[0][dir]=asc']))


@cli.command()
Expand Down Expand Up @@ -213,9 +253,176 @@ def upload(cli, make, model, version, chunksize, filename):
'dztotalchunkcount': totalChunkCount,
'dzchunkbytesoffset': chunkOffset
}
res = cli.do_POST('/api/upload/chunky?', data, files)
res = cli.do_POST('/api/upload/chunky', data, files=files)
ufid = res.json()['ufid']
click.echo(f"Upload complete. When report is complete you may view results at {cli.endpoint}/report/{ufid}")
click.echo(f"Upload complete. Report id is {ufid}")


@cli.group()
@pass_cli
def users(cli):
pass


@users.command(name="list")
@pass_cli
def user_list(cli):
click.echo(cli.do_GET('/api/user'))


@users.command()
@click.option('--email', metavar='EMAIL', help='Email address of new user', required=True)
@click.option('--password', metavar='PASSWORD', help='Password for new user, if none supplied it will be auto-generated')
@click.option('--orgid', metavar='ID', help='Organization ID for the new user.', type=int)
@click.option('--admin', help='If set user will have administrative privileges', is_flag=True)
@click.option('--expires', help='Specify a date or time interval. For example "2019-07-04" or "in 2 weeks".')
@click.option('--no-expire', help='If set user will never expire.', is_flag=True)
@pass_cli
def new(cli, email, password, orgid, admin, expires, no_expire):
if not no_expire and expires is None:
raise RuntimeError('Must specify expiry date or --no-expire')

if no_expire:
isPermanent = True
expiresAt = "-"
else:
isPermanent = False
dt = dateparser.parse(expires)
if dt < datetime.now():
raise RuntimeError('Expiry date is in the past, be sure to use "in" if specifying a time interval i.e. "in 2 weeks"')

expiresAt = dt.strftime("%Y-%m-%d")
post_data = {
'username': email,
'password': password,
'organizationId': orgid,
'isAdmin': admin,
'isTrial': False,
'isPermanent': isPermanent,
'expiresAt': expiresAt}

click.echo(cli.do_POST('/api/user', post_data))


@cli.group()
@click.option('--userid', metavar='ID', help='User ID of the user being modified', required=True)
@pass_cli
def user(cli, userid):
cli.userid = userid


@user.command()
@pass_cli
def delete(cli):
click.echo(cli.do_DELETE(f'/api/user/{cli.userid}'))


@user.command(name='set-expiration')
@click.argument('expires', metavar='DATE')
@pass_cli
def set_expiration(cli, expires):
dt = dateparser.parse(expires)
if dt < datetime.now():
raise RuntimeError('Expiry date is in the past')

expiresAt = dt.strftime("%Y-%m-%d")

put_data = {
'isPermanent': False,
'expiresAt': expiresAt}

click.echo(cli.do_PUT(f'/api/user/{cli.userid}', put_data))


@user.command(name='set-password')
@click.argument('password', metavar='PASSWORD')
@pass_cli
def set_password(cli, password):
put_data = {
'password': password}

click.echo(cli.do_PUT(f'/api/user/{cli.userid}', put_data))


@user.command(name='set-organization-id')
@click.argument('orgid', metavar='ID')
@pass_cli
def set_organization_id(cli, orgid):
put_data = {
'organizationId': int(orgid)}

click.echo(cli.do_PUT(f'/api/user/{cli.userid}', put_data))


@user.command(name='set-email')
@click.argument('email', metavar='EMAIL')
@pass_cli
def set_email(cli, email):
put_data = {
'username': email}

click.echo(cli.do_PUT(f'/api/user/{cli.userid}', put_data))


@user.command(name='make-permanent')
@pass_cli
def make_permanent(cli):
put_data = {
'isPermanent': True,
'expiresAt': "-"}

click.echo(cli.do_PUT(f'/api/user/{cli.userid}', put_data))


@user.command(name='make-admin')
@pass_cli
def make_admin(cli):
put_data = {
'isAdmin': True}

click.echo(cli.do_PUT(f'/api/user/{cli.userid}', put_data))


@cli.group()
@pass_cli
def orgs(cli):
pass


@orgs.command(name="list")
@pass_cli
def orgs_list(cli):
click.echo(cli.do_GET('/api/organization'))


@orgs.command()
@click.option('--ownerid', metavar='ID', help='User id of the owner of this organization', required=True)
@click.argument('name', metavar='ORG_NAME')
@pass_cli
def new(cli, ownerid, name):
post_data = {
'ownerId': ownerid,
'name': name}
click.echo(cli.do_POST('/api/organization', post_data))


@cli.group()
@click.option('--orgid', metavar='ID', help='Organization id', required=True)
@pass_cli
def org(cli, orgid):
cli.orgid = orgid


@org.command()
@click.option('--ownerid', metavar='OWNERID', help='User ID of the owner of this organization', required=True)
@click.option('--name', metavar='NAME', help='Name of this organization', required=True)
@pass_cli
def change(cli, ownerid, name):
put_data = {
'name': name,
'ownerId': int(ownerid)}

click.echo(cli.do_PUT(f'/api/organization/{cli.orgid}', put_data))


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit ada51f7

Please sign in to comment.