Skip to content

Commit

Permalink
feat(ingest/openapi): support proxies and alternate auth schemes (dat…
Browse files Browse the repository at this point in the history
…ahub-project#9492)

Co-authored-by: Fernando Marino <[email protected]>
Co-authored-by: Harshal Sheth <[email protected]>
  • Loading branch information
3 people authored Dec 28, 2023
1 parent cfb4d2f commit b7a0bbc
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 16 deletions.
41 changes: 34 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ class OpenApiConfig(ConfigModel):
ignore_endpoints: list = Field(default=[], description="")
username: str = Field(default="", description="")
password: str = Field(default="", description="")
proxies: Optional[dict] = Field(
default=None,
description="Eg. "
"`{'http': 'http://10.10.1.10:3128', 'https': 'http://10.10.1.10:1080'}`."
"If authentication is required, add it to the proxy url directly e.g. "
"`http://user:[email protected]:3128/`.",
)
forced_examples: dict = Field(default={}, description="")
token: Optional[str] = Field(default=None, description="")
get_token: dict = Field(default={}, description="")
Expand Down Expand Up @@ -87,9 +94,13 @@ def get_swagger(self) -> Dict:
password=self.password,
tok_url=url4req,
method=self.get_token["request_type"],
proxies=self.proxies,
)
sw_dict = get_swag_json(
self.url, token=self.token, swagger_file=self.swagger_file
self.url,
token=self.token,
swagger_file=self.swagger_file,
proxies=self.proxies,
) # load the swagger file

else: # using basic auth for accessing endpoints
Expand All @@ -98,6 +109,7 @@ def get_swagger(self) -> Dict:
username=self.username,
password=self.password,
swagger_file=self.swagger_file,
proxies=self.proxies,
)
return sw_dict

Expand Down Expand Up @@ -258,10 +270,15 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
tot_url = clean_url(config.url + self.url_basepath + endpoint_k)

if config.token:
response = request_call(tot_url, token=config.token)
response = request_call(
tot_url, token=config.token, proxies=config.proxies
)
else:
response = request_call(
tot_url, username=config.username, password=config.password
tot_url,
username=config.username,
password=config.password,
proxies=config.proxies,
)
if response.status_code == 200:
fields2add, root_dataset_samples[dataset_name] = extract_fields(
Expand All @@ -281,10 +298,15 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
url_guess = try_guessing(endpoint_k, root_dataset_samples)
tot_url = clean_url(config.url + self.url_basepath + url_guess)
if config.token:
response = request_call(tot_url, token=config.token)
response = request_call(
tot_url, token=config.token, proxies=config.proxies
)
else:
response = request_call(
tot_url, username=config.username, password=config.password
tot_url,
username=config.username,
password=config.password,
proxies=config.proxies,
)
if response.status_code == 200:
fields2add, _ = extract_fields(response, dataset_name)
Expand All @@ -304,10 +326,15 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
)
tot_url = clean_url(config.url + self.url_basepath + composed_url)
if config.token:
response = request_call(tot_url, token=config.token)
response = request_call(
tot_url, token=config.token, proxies=config.proxies
)
else:
response = request_call(
tot_url, username=config.username, password=config.password
tot_url,
username=config.username,
password=config.password,
proxies=config.proxies,
)
if response.status_code == 200:
fields2add, _ = extract_fields(response, dataset_name)
Expand Down
26 changes: 17 additions & 9 deletions metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ def request_call(
token: Optional[str] = None,
username: Optional[str] = None,
password: Optional[str] = None,
proxies: Optional[dict] = None,
) -> requests.Response:
headers = {"accept": "application/json"}

Expand All @@ -60,8 +61,8 @@ def request_call(
)

elif token is not None:
headers["Authorization"] = f"Bearer {token}"
return requests.get(url, headers=headers)
headers["Authorization"] = f"{token}"
return requests.get(url, proxies=proxies, headers=headers)
else:
return requests.get(url, headers=headers)

Expand All @@ -72,12 +73,15 @@ def get_swag_json(
username: Optional[str] = None,
password: Optional[str] = None,
swagger_file: str = "",
proxies: Optional[dict] = None,
) -> Dict:
tot_url = url + swagger_file
if token is not None:
response = request_call(url=tot_url, token=token)
response = request_call(url=tot_url, token=token, proxies=proxies)
else:
response = request_call(url=tot_url, username=username, password=password)
response = request_call(
url=tot_url, username=username, password=password, proxies=proxies
)

if response.status_code != 200:
raise Exception(f"Unable to retrieve {tot_url}, error {response.status_code}")
Expand Down Expand Up @@ -251,7 +255,7 @@ def compose_url_attr(raw_url: str, attr_list: list) -> str:
attr_list=["2",])
asd2 == "http://asd.com/2"
"""
splitted = re.split(r"\{[^}]+\}", raw_url)
splitted = re.split(r"\{[^}]+}", raw_url)
if splitted[-1] == "": # it can happen that the last element is empty
splitted = splitted[:-1]
composed_url = ""
Expand All @@ -265,7 +269,7 @@ def compose_url_attr(raw_url: str, attr_list: list) -> str:


def maybe_theres_simple_id(url: str) -> str:
dets = re.findall(r"(\{[^}]+\})", url) # searching the fields between parenthesis
dets = re.findall(r"(\{[^}]+})", url) # searching the fields between parenthesis
if len(dets) == 0:
return url
dets_w_id = [det for det in dets if "id" in det] # the fields containing "id"
Expand Down Expand Up @@ -349,6 +353,7 @@ def get_tok(
password: str = "",
tok_url: str = "",
method: str = "post",
proxies: Optional[dict] = None,
) -> str:
"""
Trying to post username/password to get auth.
Expand All @@ -357,12 +362,15 @@ def get_tok(
url4req = url + tok_url
if method == "post":
# this will make a POST call with username and password
data = {"username": username, "password": password}
data = {"username": username, "password": password, "maxDuration": True}
# url2post = url + "api/authenticate/"
response = requests.post(url4req, data=data)
response = requests.post(url4req, proxies=proxies, json=data)
if response.status_code == 200:
cont = json.loads(response.content)
token = cont["tokens"]["access"]
if "token" in cont: # other authentication scheme
token = cont["token"]
else: # works only for bearer authentication scheme
token = f"Bearer {cont['tokens']['access']}"
elif method == "get":
# this will make a GET call with username and password
response = requests.get(url4req)
Expand Down

0 comments on commit b7a0bbc

Please sign in to comment.