Skip to content

Commit

Permalink
Incomplete implementation of --prefix, refs #12
Browse files Browse the repository at this point in the history
  • Loading branch information
simonw committed Nov 30, 2021
1 parent 41f33d5 commit 56cf379
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 25 deletions.
16 changes: 12 additions & 4 deletions s3_credentials/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ def policy(buckets, read_only, write_only):
help="Create buckets if they do not already exist",
is_flag=True,
)
@click.option(
"--prefix", help="Restrict to keys starting with this prefix", default="*"
)
@click.option("--read-only", help="Only allow reading from the bucket", is_flag=True)
@click.option("--write-only", help="Only allow writing to the bucket", is_flag=True)
@click.option(
Expand All @@ -193,6 +196,7 @@ def create(
duration,
username,
create_bucket,
prefix,
read_only,
write_only,
policy,
Expand Down Expand Up @@ -270,13 +274,13 @@ def log(message):
statements = []
if permission == "read-write":
for bucket in buckets:
statements.extend(policies.read_write_statements(bucket))
statements.extend(policies.read_write_statements(bucket, prefix))
elif permission == "read-only":
for bucket in buckets:
statements.extend(policies.read_only_statements(bucket))
statements.extend(policies.read_only_statements(bucket, prefix))
elif permission == "write-only":
for bucket in buckets:
statements.extend(policies.write_only_statements(bucket))
statements.extend(policies.write_only_statements(bucket, prefix))
else:
assert False, "Unknown permission: {}".format(permission)
bucket_access_policy = policies.wrap_policy(statements)
Expand All @@ -285,7 +289,11 @@ def log(message):
# We're going to use sts.assume_role() rather than creating a user
if dry_run:
click.echo("Would ensure role: 's3-credentials.AmazonS3FullAccess'")
click.echo("Would assume role using following policy for {} seconds:".format(duration))
click.echo(
"Would assume role using following policy for {} seconds:".format(
duration
)
)
click.echo(json.dumps(bucket_access_policy, indent=4))
else:
s3_role_arn = ensure_s3_role_exists(iam, sts)
Expand Down
50 changes: 32 additions & 18 deletions s3_credentials/policies.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,42 @@
def read_write(bucket):
return wrap_policy(read_write_statements(bucket))
def read_write(bucket, prefix="*"):
return wrap_policy(read_write_statements(bucket, prefix=prefix))


def read_write_statements(bucket):
def read_write_statements(bucket, prefix="*"):
# https://github.com/simonw/s3-credentials/issues/24
return read_only_statements(bucket) + [
if not prefix.endswith("*"):
prefix += "*"
return read_only_statements(bucket, prefix) + [
{
"Effect": "Allow",
"Action": ["s3:PutObject", "s3:DeleteObject"],
"Resource": ["arn:aws:s3:::{}/*".format(bucket)],
"Resource": ["arn:aws:s3:::{}/{}".format(bucket, prefix)],
}
]


def read_only(bucket):
return wrap_policy(read_only_statements(bucket))
def read_only(bucket, prefix="*"):
return wrap_policy(read_only_statements(bucket, prefix))


def read_only_statements(bucket):
def read_only_statements(bucket, prefix="*"):
# https://github.com/simonw/s3-credentials/issues/23
if not prefix.endswith("*"):
prefix += "*"
allow_list = {
"Effect": "Allow",
"Action": ["s3:ListBucket", "s3:GetBucketLocation"],
"Resource": ["arn:aws:s3:::{}".format(bucket)],
}
if prefix != "*":
allow_list["Condition"] = {
"StringLike": {
# Note that prefix must end in / if user wants to limit to a folder
"s3:prefix": [prefix]
}
}
return [
{
"Effect": "Allow",
"Action": ["s3:ListBucket", "s3:GetBucketLocation"],
"Resource": ["arn:aws:s3:::{}".format(bucket)],
},
allow_list,
{
"Effect": "Allow",
"Action": [
Expand All @@ -34,22 +46,24 @@ def read_only_statements(bucket):
"s3:GetObjectRetention",
"s3:GetObjectTagging",
],
"Resource": ["arn:aws:s3:::{}/*".format(bucket)],
"Resource": ["arn:aws:s3:::{}/{}".format(bucket, prefix)],
},
]


def write_only(bucket):
return wrap_policy(write_only_statements(bucket))
def write_only(bucket, prefix="*"):
return wrap_policy(write_only_statements(bucket, prefix))


def write_only_statements(bucket):
def write_only_statements(bucket, prefix="*"):
# https://github.com/simonw/s3-credentials/issues/25
if not prefix.endswith("*"):
prefix += "*"
return [
{
"Effect": "Allow",
"Action": ["s3:PutObject"],
"Resource": ["arn:aws:s3:::{}/*".format(bucket)],
"Resource": ["arn:aws:s3:::{}/{}".format(bucket, prefix)],
}
]

Expand Down
48 changes: 45 additions & 3 deletions tests/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,47 @@ def test_create_bucket_read_only_duration_15():
Key="hello-read-only.txt",
)
# Client should be able to read this
credentials_response = credentials_s3.get_object(
Bucket=bucket_name, Key="hello-read-only.txt"
assert (
read_file(credentials_s3, bucket_name, "hello-read-only.txt")
== "hello read-only"
)
assert credentials_response["Body"].read() == b"hello read-only"


def test_read_write_bucket_prefix():
bucket_name = "s3-credentials-tests.read-write-prefix.{}".format(
secrets.token_hex(4)
)
s3 = boto3.client("s3")
assert not bucket_exists(s3, bucket_name)
credentials_decoded = json.loads(
get_output(
"create", bucket_name, "-c", "--duration", "15m", "--prefix", "my/prefix/"
)
)
# Wait for everything to exist
time.sleep(10)
# Create client with these credentials
credentials_s3 = boto3.session.Session(
aws_access_key_id=credentials_decoded["AccessKeyId"],
aws_secret_access_key=credentials_decoded["SecretAccessKey"],
aws_session_token=credentials_decoded["SessionToken"],
).client("s3")
# Write file with root credentials that I should not be able to see
s3.put_object(
Body="hello".encode("utf-8"),
Bucket=bucket_name,
Key="should-not-be-visible.txt",
)
# I should be able to write to and read from /my/prefix/file.txt
credentials_s3.put_object(
Body="hello".encode("utf-8"),
Bucket=bucket_name,
Key="my/prefix/file.txt",
)
assert read_file(credentials_s3, bucket_name, "my/prefix/file.txt") == "hello"
# Should NOT be able to read should-not-be-visible.txt
with pytest.raises(AssertionError):
read_file(credentials_s3, bucket_name, "should-not-be-visible.txt")


def get_output(*args, input=None):
Expand All @@ -105,6 +142,11 @@ def get_output(*args, input=None):
return result.stdout


def read_file(s3, bucket, path):
response = s3.get_object(Bucket=bucket, Key=path)
return response["Body"].read().decode("utf-8")


def cleanup_any_resources():
# Delete any users beginning s3-credentials-tests.
users = json.loads(get_output("list-users", "--array"))
Expand Down

0 comments on commit 56cf379

Please sign in to comment.