forked from dell/dtias
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathget_resources.py
138 lines (118 loc) · 5.06 KB
/
get_resources.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""
#### Synopsis
Script to retrieve resources from DTIAS
#### Description
This script uses the DTIAS REST API to retrieve resources based on the specified filters. It authenticates using a
token obtained via the API. The script supports optional filters and pagination for resource queries.
#### Python Example
```bash
python get_resources.py --server_ip <ip addr> --tenant_id <tenant> --username <username> \
--password <password> --filters '{"Key": "Value"}' --pagination '{"offset": 0, "limit": 10}'
```
where:
- `server_ip` is the IP address of the DTIAS server.
- `tenant_id` is the tenant name (default: Fulcrum).
- `username` and `password` are the credentials for authentication.
- `filters` specifies query filters as a JSON string (optional).
- `pagination` specifies pagination parameters as a JSON string (optional).
"""
import argparse
import pprint
import requests
import urllib3
# Disable SSL warnings globally
urllib3.disable_warnings()
def create_token(server_ip, tenant_id, username, password):
"""
Creates a token for the DTIAS server.
Parameters:
server_ip (str): The IP address of the DTIAS server.
tenant_id (str): The tenant ID (e.g., "Fulcrum").
username (str): The username to authenticate with.
password (str): The password for the username.
Returns:
dict: A dictionary containing the access token, id token, and refresh token.
"""
url = f"https://{server_ip}/identity/v1/tenant/{tenant_id}/token/create"
headers = {"Content-Type": "application/json"}
data = {
"grant_type": "password",
"client_id": "ccpapi",
"username": username,
"password": password
}
try:
response = requests.post(url, headers=headers, json=data, verify=False) # Set verify=False for self-signed certs.
response.raise_for_status() # Raise an error for HTTP codes 4xx/5xx.
return response.json() # Return the JSON response with tokens.
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
return None
def get_resources(server_ip, tenant_id, id_token, filters=None, pagination=None):
"""
Retrieves resources from the DTIAS server.
Parameters:
server_ip (str): The IP address of the DTIAS server.
tenant_id (str): The tenant ID (e.g., "Fulcrum").
id_token (str): The ID token for authentication.
filters (list): List of filters for querying resources (optional).
pagination (dict): Pagination parameters such as offset and limit (optional).
Returns:
dict: Resources data.
"""
url = f"https://{server_ip}/v1/tenants/{tenant_id}/search/resources"
headers = {
"accept": "application/json, text/plain, */*",
"authorization": f"Bearer {id_token}",
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
data = {}
if filters:
data["Filters"] = filters
if pagination:
data.update(pagination)
try:
response = requests.post(url, headers=headers, json=data, verify=False)
response.raise_for_status()
return response.json()
except requests.exceptions.RequestException as e:
print(f"Error retrieving resources: {e}")
return None
if __name__ == "__main__":
# Set up argparse
parser = argparse.ArgumentParser(description="Retrieve resources from the DTIAS server.")
parser.add_argument("--server_ip", required=True, help="The IP address of the DTIAS server.")
parser.add_argument("--tenant_id", default="Fulcrum", help="The tenant ID (default: Fulcrum).")
parser.add_argument("--username", required=True, help="The username to authenticate with.")
parser.add_argument("--password", required=True, help="The password for the username.")
parser.add_argument("--filters", help="Filters for querying resources as JSON string (optional).", default=None)
parser.add_argument("--pagination", help="Pagination parameters as JSON string (optional).", default=None)
# Parse arguments
args = parser.parse_args()
# Generate the token
tokens = create_token(args.server_ip, args.tenant_id, args.username, args.password)
if not tokens:
print("Failed to generate token. Exiting...")
exit(1)
id_token = tokens.get("id_token")
# Step 2: Retrieve resources
filters = None
if args.filters:
try:
filters = eval(args.filters) # Safely parse JSON-like string
except Exception as e:
print(f"Error parsing filters: {e}")
exit(1)
pagination = None
if args.pagination:
try:
pagination = eval(args.pagination) # Safely parse JSON-like string
except Exception as e:
print(f"Error parsing pagination: {e}")
exit(1)
resources_data = get_resources(args.server_ip, args.tenant_id, id_token, filters, pagination)
if resources_data:
print("Resources Retrieved Successfully:")
pprint.pprint((resources_data))
else:
print("Failed to retrieve resources.")