Skip to content

Commit

Permalink
utils: remove admin access token validation
Browse files Browse the repository at this point in the history
Remove admin access token validation from some users-related utilities,
as these checks are already performed by the admin CLI.
  • Loading branch information
mdonadoni committed Jun 29, 2023
1 parent 584a932 commit fd0af51
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 13 deletions.
10 changes: 5 additions & 5 deletions reana_server/reana_admin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def users_create_default(email, password, id_):
def list_users(ctx, id, email, user_access_token, admin_access_token, output_format):
"""List users according to the search criteria."""
try:
response = _get_users(id, email, user_access_token, admin_access_token)
response = _get_users(id, email, user_access_token)
headers = ["id", "email", "access_token", "access_token_status"]
data = []
for user in response:
Expand Down Expand Up @@ -174,7 +174,7 @@ def list_users(ctx, id, email, user_access_token, admin_access_token, output_for
def create_user(ctx, email, user_access_token, admin_access_token):
"""Create a new user. Requires the token of an administrator."""
try:
response = _create_user(email, user_access_token, admin_access_token)
response = _create_user(email, user_access_token)
headers = ["id", "email", "access_token"]
data = [(str(response.id_), response.email, response.access_token)]
click.echo(click.style("User was successfully created.", fg="green"))
Expand All @@ -195,7 +195,7 @@ def create_user(ctx, email, user_access_token, admin_access_token):
def export_users(ctx, admin_access_token):
"""Export all users in current REANA cluster."""
try:
csv_file = _export_users(admin_access_token)
csv_file = _export_users()
click.echo(csv_file.getvalue(), nl=False)
except Exception as e:
click.secho(
Expand All @@ -218,7 +218,7 @@ def export_users(ctx, admin_access_token):
def import_users(ctx, admin_access_token, file_):
"""Import users from file."""
try:
_import_users(admin_access_token, file_)
_import_users(file_)
click.secho("Users successfully imported.", fg="green")
except Exception as e:
click.secho(
Expand Down Expand Up @@ -463,7 +463,7 @@ def list_quota_usage(
):
"""List quota usage of users."""
try:
response = _get_users(id, email, user_access_token, admin_access_token)
response = _get_users(id, email, user_access_token)
headers = ["id", "email", "cpu-used", "cpu-limit", "disk-used", "disk-limit"]
health_order = {
QuotaHealth.healthy.name: 0,
Expand Down
12 changes: 4 additions & 8 deletions reana_server/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,8 @@ def _validate_admin_access_token(admin_access_token: str):
raise ValueError("Admin access token invalid.")


def _get_users(_id, email, user_access_token, admin_access_token):
def _get_users(_id, email, user_access_token):
"""Return all users matching search criteria."""
_validate_admin_access_token(admin_access_token)
search_criteria = dict()
if _id:
search_criteria["id_"] = _id
Expand All @@ -311,10 +310,9 @@ def _get_users(_id, email, user_access_token, admin_access_token):
return query.all()


def _create_user(email, user_access_token, admin_access_token):
def _create_user(email, user_access_token):
"""Create user with provided credentials."""
try:
_validate_admin_access_token(admin_access_token)
if not user_access_token:
user_access_token = secrets.token_urlsafe(16)
user_parameters = dict(access_token=user_access_token)
Expand All @@ -328,13 +326,12 @@ def _create_user(email, user_access_token, admin_access_token):
return user


def _export_users(admin_access_token):
def _export_users():
"""Export all users in database as csv.
:param admin_access_token: Admin access token.
:type admin_access_token: str
"""
_validate_admin_access_token(admin_access_token)
csv_file_obj = io.StringIO()
csv_writer = csv.writer(csv_file_obj, dialect="unix")
for user in User.query.all():
Expand All @@ -344,15 +341,14 @@ def _export_users(admin_access_token):
return csv_file_obj


def _import_users(admin_access_token, users_csv_file):
def _import_users(users_csv_file):
"""Import list of users to database.
:param admin_access_token: Admin access token.
:type admin_access_token: str
:param users_csv_file: CSV file object containing a list of users.
:type users_csv_file: _io.TextIOWrapper
"""
_validate_admin_access_token(admin_access_token)
csv_reader = csv.reader(users_csv_file)
for row in csv_reader:
user = User(
Expand Down

0 comments on commit fd0af51

Please sign in to comment.