Skip to content

Commit

Permalink
changes to the key management
Browse files Browse the repository at this point in the history
  • Loading branch information
tiagoncl committed Feb 13, 2024
1 parent 934e6d8 commit eefcb30
Showing 1 changed file with 46 additions and 18 deletions.
64 changes: 46 additions & 18 deletions encryption/encryption_manager.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import base64
import datetime
import json
import pickle
import time
from base64 import b64encode, b64decode
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding as sym_padding, padding
Expand All @@ -14,21 +16,58 @@ def __init__(self, encryption_status, config_manager):
self.config_manager = config_manager

def _get_current_key_and_iv(self, key_name):
current_timestamp = time.time()
if key_name in self.key_cache:
key, iv, timestamp = self.key_cache[key_name]
if datetime.datetime.now() - timestamp < datetime.timedelta(hours=1): # 1 hour validity
key, iv, cached_timestamp = self.key_cache[key_name]
if current_timestamp - cached_timestamp < 3600: # 1 hour validity in seconds
return key, iv
else:
self.config_manager.destroy_secret_version(key_name, version="latest")

# Fetch from storage if not in cache or cache is expired
try:
key, iv, stored_timestamp = self._retrieve_key_and_iv(key_name)
if current_timestamp - stored_timestamp < 3600: # Check validity
self.key_cache[key_name] = (key, iv, stored_timestamp)
return key, iv
except Exception as e:
# If retrieval fails, log the exception and proceed to generate a new key
print(f"Error retrieving key: {e}")

# Generate new key and IV
key = self._generate_symmetric_key()
iv = urandom(16)

self._save_key_and_iv(key_name, key, iv)
# Update cache
self.key_cache[key_name] = (key, iv, datetime.datetime.now())
self.key_cache[key_name] = (key, iv, current_timestamp)
return key, iv

def _save_key_and_iv(self, key_name, key, iv):
data = {
"key": b64encode(key).decode('utf-8'),
"iv": b64encode(iv).decode('utf-8'),
"timestamp": time.time() # Current timestamp
}
encoded_data = b64encode(json.dumps(data).encode('utf-8'))
self.config_manager.save_secret_to_gcloud(key_name, encoded_data)

def _retrieve_key_and_iv(self, key_name):
# Retrieve the base64-encoded data, which is a string
base64_encoded_json_str = self.config_manager.get_key_iv(key_name)

# First, decode the base64 string to get the actual JSON string in bytes
json_bytes = b64decode(base64_encoded_json_str)

# Then, decode the bytes to a string using UTF-8
json_str = json_bytes.decode('utf-8')

# Now, you can use json.loads() to deserialize the string into a Python object
data = json.loads(json_str)

# Extract 'key', 'iv', and 'timestamp' from the data, assuming they are base64 encoded
key = b64decode(data["key"])
iv = b64decode(data["iv"])
timestamp = data["timestamp"] # Assuming timestamp is in the expected format

return key, iv, timestamp

def _generate_symmetric_key(self):
# Generate a random symmetric key for AES (256-bit key)
return urandom(32)
Expand All @@ -51,17 +90,6 @@ def _encrypt_data(self, data, key_name):

return encrypted_data

def _save_key_and_iv(self, key_name, key, iv):
# Convert the key and iv from bytes to a string for storage
encoded_key_iv = b64encode(key + iv).decode('utf-8')
self.config_manager.save_secret_to_gcloud(key_name, encoded_key_iv)

def _retrieve_key_and_iv(self, key_name):
# Fetch the combined key and IV
encoded_key_iv = self.config_manager.get_key_iv(key_name)
key_iv = b64decode(encoded_key_iv)
key, iv = key_iv[:32], key_iv[32:]
return key, iv

def _decrypt_data(self, encrypted_data, key_name):
key, iv = self._get_current_key_and_iv(key_name)
Expand Down

0 comments on commit eefcb30

Please sign in to comment.