From 07da42c07606e1c39ac2835404ba6b203a792b70 Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 01:37:16 +0200 Subject: [PATCH 1/8] focsec impl --- ory-actions/vpncheck-py/focsec.py | 96 +++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 ory-actions/vpncheck-py/focsec.py diff --git a/ory-actions/vpncheck-py/focsec.py b/ory-actions/vpncheck-py/focsec.py new file mode 100644 index 0000000..744b66d --- /dev/null +++ b/ory-actions/vpncheck-py/focsec.py @@ -0,0 +1,96 @@ +# Copyright © 2023 Ory Corp +# SPDX-License-Identifier: Apache-2.0 + +from flask import Flask, request, jsonify +import requests + +import os + +# load google cloud logging if running on GCP +if os.getenv('ENABLE_CLOUD_LOGGING', ''): + # set up the Google Cloud Logging python client library + import google.cloud.logging + client = google.cloud.logging.Client() + client.setup_logging() + +# use Python’s standard logging library to send logs to GCP +import logging + +app = Flask(__name__) + +# Define the bearer token for authentication +BEARER_TOKEN = os.environ.get("BEARER_TOKEN") +FOCSEC_API_KEY = os.environ.get("FOCSEC_API_KEY") + +if not BEARER_TOKEN or not FOCSEC_API_KEY: + raise ValueError("BEARER_TOKEN or FOCSEC_API_KEY not set in environment variables.") + +@app.route("/vpncheck", methods=["POST"]) +def handle_vpncheck(): + return vpncheck(request) + +def vpncheck(request): + # Check for bearer token authentication + auth_header = request.headers.get("Authorization") + if not auth_header or not auth_header.startswith("Bearer "): + return jsonify({"error": "Unauthorized"}), 401 + + provided_token = auth_header.split("Bearer ")[1] + if provided_token != BEARER_TOKEN: + return jsonify({"error": "Unauthorized"}), 401 + + # Parse the JSON payload and extract the IP address + data = request.get_json() + logging.info(f"request: {data}") + ip_address = data.get("ip_address") + if not ip_address: + return error_response("Cannot determine Client IP address") + + # Call vpnapi.io to check the IP address + # if the API fails, we permit by default + try: + vpn_result = query_focsec(ip_address) + except Exception as e: + return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200 + + # Check the response from focsec + if "is_vpn" in vpn_result and vpn_result["is_vpn"] == True: + logging.info(f"Blocked: VPN") + return error_response("Request blocked: VPN") + if "is_tor" in vpn_result and vpn_result["is_tor"] == True: + logging.info(f"Blocked: Tor") + return error_response("Request blocked: Tor") + + if ( + "iso_code" in vpn_result + and vpn_result["iso_code"] == "ru" + ): + logging.info(f"geoblock: {vpn_result['iso_code']}") + return error_response("Request blocked: Geolocation") + + # Return the result as success or error details + return jsonify(vpn_result), 200 + + +def error_response(msg): + return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400 + + +def query_focsec(ip_address): + # Implement the logic to call focsec.com and retrieve the result + # You can use libraries like requests or httpx for making HTTP requests + # Return the response as a dictionary + # For example: + + url = f"https://api.focsec.com/v1/ip/{ip_address}?api_key={FOCSEC_API_KEY}" + response = requests.get(url, timeout=1.5) + if response.status_code != 200: + raise Exception(f"vpnapi.io returned {response.status_code}") + + result = response.json() + + return result + + +if __name__ == "__main__": + app.run() From 5cacef17f900bc007752d394d30a1ccbf49ea529 Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 01:43:49 +0200 Subject: [PATCH 2/8] chore: cleanup to have both examples --- ory-actions/vpncheck-py/README.md | 6 +- ory-actions/vpncheck-py/main.py | 102 ------------------------------ 2 files changed, 4 insertions(+), 104 deletions(-) delete mode 100644 ory-actions/vpncheck-py/main.py diff --git a/ory-actions/vpncheck-py/README.md b/ory-actions/vpncheck-py/README.md index cf28292..3d09416 100644 --- a/ory-actions/vpncheck-py/README.md +++ b/ory-actions/vpncheck-py/README.md @@ -1,7 +1,7 @@ # Ory Action to check IP addresses against vpnapi.io This is an example Action (webhook) to check client IP addresses against -vpnapi.com and block requests +security services like focsec.com, vpnapi.com and ipqs.com and block requests - coming from TOR clients - coming from known VPNs @@ -34,6 +34,7 @@ pip3 install google-cloud-logging ```bash export BEARER_TOKEN=SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK; export VPNAPIIO_API_KEY=YOUR_VPNAPI_KEY; +export FOCSEC_API_KEY=YOUR_FOCSEC_KEY; python3 main.py ``` @@ -41,7 +42,7 @@ python3 main.py ```bash cd ory-actions/vpncheck-py -python3 main.py +python3 focsec.py # or vpnapi.py ``` #### Send a sample request @@ -69,6 +70,7 @@ After setting up your GCP project (see, for example, you can deploy the Action as a cloud function: ```bash +cp focsec.py main.py # Cloud functions like a main.py gcloud functions deploy vpncheck --runtime python39 --trigger-http --allow-unauthenticated --set-env-vars BEARER_TOKEN=$SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK,VPNAPIIO_API_KEY=$VPNAPIIO_API_KEY,ENABLE_CLOUD_LOGGING=true --source=. ``` diff --git a/ory-actions/vpncheck-py/main.py b/ory-actions/vpncheck-py/main.py deleted file mode 100644 index 5aff0b9..0000000 --- a/ory-actions/vpncheck-py/main.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright © 2023 Ory Corp -# SPDX-License-Identifier: Apache-2.0 - -from flask import Flask, request, jsonify -import requests - -import os - -# load google cloud logging if running on GCP -if os.getenv('ENABLE_CLOUD_LOGGING', ''): - # set up the Google Cloud Logging python client library - import google.cloud.logging - client = google.cloud.logging.Client() - client.setup_logging() - -# use Python’s standard logging library to send logs to GCP -import logging - -app = Flask(__name__) - -# Define the bearer token for authentication -BEARER_TOKEN = os.environ.get("BEARER_TOKEN") -VPNAPIIO_API_KEY = os.environ.get("VPNAPIIO_API_KEY") - -if not BEARER_TOKEN or not VPNAPIIO_API_KEY: - raise ValueError("BEARER_TOKEN or VPNAPIIO_API_KEY not set in environment variables.") - -@app.route("/vpncheck", methods=["POST"]) -def handle_vpncheck(): - return vpncheck(request) - -def vpncheck(request): - # Check for bearer token authentication - auth_header = request.headers.get("Authorization") - if not auth_header or not auth_header.startswith("Bearer "): - return jsonify({"error": "Unauthorized"}), 401 - - provided_token = auth_header.split("Bearer ")[1] - if provided_token != BEARER_TOKEN: - return jsonify({"error": "Unauthorized"}), 401 - - # Parse the JSON payload and extract the IP address - data = request.get_json() - logging.info(f"request: {data}") - ip_address = data.get("ip_address") - if not ip_address: - return error_response("Cannot determine Client IP address") - - # Call vpnapi.io to check the IP address - # if the API fails, we permit by default - try: - vpn_result = query_vpn_io(ip_address) - except Exception as e: - return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200 - - # Check the response from vpnapi.io - if "error" in vpn_result and vpn_result["error"] == "Blocked": - return error_response("Request blocked: Blocked by VPN API") - - if "security" in vpn_result: - security_info = vpn_result["security"] - if "vpn" in security_info and security_info["vpn"] == True: - logging.info(f"vpn block: {security_info['vpn']}") - return error_response("Request blocked: VPN") - if "tor" in security_info and security_info["tor"] == True: - logging.info(f"tor block: {security_info['tor']}") - return error_response("Request blocked: Tor") - - if ( - "location" in vpn_result - and "country_code" in vpn_result["location"] - and vpn_result["location"]["country_code"] == "RU" - ): - logging.info(f"geoblock: {vpn_result['location']['country_code']}") - return error_response("Request blocked: Geolocation") - - # Return the result as success or error details - return jsonify(vpn_result), 200 - - -def error_response(msg): - return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400 - - -def query_vpn_io(ip_address): - # Implement the logic to call vpnapi.io and retrieve the result - # You can use libraries like requests or httpx for making HTTP requests - # Return the response as a dictionary - # For example: - - url = f"https://vpnapi.io/api/{ip_address}?key={VPNAPIIO_API_KEY}" - response = requests.get(url, timeout=1.5) - if response.status_code != 200: - raise Exception(f"vpnapi.io returned {response.status_code}") - - result = response.json() - - return result - - -if __name__ == "__main__": - app.run() From fc3249a8c2b325e5f79722036520a56537d1e526 Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 02:21:53 +0200 Subject: [PATCH 3/8] feat: IPQS impl (untested) --- ory-actions/vpncheck-py/README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/ory-actions/vpncheck-py/README.md b/ory-actions/vpncheck-py/README.md index 3d09416..1c2d950 100644 --- a/ory-actions/vpncheck-py/README.md +++ b/ory-actions/vpncheck-py/README.md @@ -33,16 +33,17 @@ pip3 install google-cloud-logging ```bash export BEARER_TOKEN=SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK; -export VPNAPIIO_API_KEY=YOUR_VPNAPI_KEY; +# Set the API Key for the service you use +export VPNAPIIO_API_KEY=YOUR_VPNAPI_KEY; export FOCSEC_API_KEY=YOUR_FOCSEC_KEY; -python3 main.py +export IPQS_API_KEY=YOUR_IPQS_KEY; ``` ### Run locally ```bash cd ory-actions/vpncheck-py -python3 focsec.py # or vpnapi.py +python3 focsec.py # or vpnapi.py or ipqs.py ``` #### Send a sample request @@ -70,8 +71,8 @@ After setting up your GCP project (see, for example, you can deploy the Action as a cloud function: ```bash -cp focsec.py main.py # Cloud functions like a main.py -gcloud functions deploy vpncheck --runtime python39 --trigger-http --allow-unauthenticated --set-env-vars BEARER_TOKEN=$SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK,VPNAPIIO_API_KEY=$VPNAPIIO_API_KEY,ENABLE_CLOUD_LOGGING=true --source=. +cp focsec.py main.py # Cloud functions like a main.py, so copy the implementation you're adopting there +gcloud functions deploy vpncheck --runtime python39 --trigger-http --allow-unauthenticated --set-env-vars BEARER_TOKEN=$SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK,VPNAPIIO_API_KEY=$VPNAPIIO_API_KEY,IPQS_API_KEY=$IPQS_API_KEY,ENABLE_CLOUD_LOGGING=true --source=. ``` Note: You may need to create a `venv` for dependencies to load correctly. From 28440d8290c15341576a9bbca9ef08e6dd733af1 Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 02:28:59 +0200 Subject: [PATCH 4/8] fix: copy and paste mistakes --- ory-actions/vpncheck-py/ipqs.py | 116 ++++++++++++++++++++++++++++++ ory-actions/vpncheck-py/vpnapi.py | 102 ++++++++++++++++++++++++++ 2 files changed, 218 insertions(+) create mode 100644 ory-actions/vpncheck-py/ipqs.py create mode 100644 ory-actions/vpncheck-py/vpnapi.py diff --git a/ory-actions/vpncheck-py/ipqs.py b/ory-actions/vpncheck-py/ipqs.py new file mode 100644 index 0000000..d5663b0 --- /dev/null +++ b/ory-actions/vpncheck-py/ipqs.py @@ -0,0 +1,116 @@ +# Copyright © 2023 Ory Corp +# SPDX-License-Identifier: Apache-2.0 + +from flask import Flask, request, jsonify +import requests + +import os + +# load google cloud logging if running on GCP +if os.getenv('ENABLE_CLOUD_LOGGING', ''): + # set up the Google Cloud Logging python client library + import google.cloud.logging + client = google.cloud.logging.Client() + client.setup_logging() + +# use Python’s standard logging library to send logs to GCP +import logging + +app = Flask(__name__) + +# Define the bearer token for authentication +BEARER_TOKEN = os.environ.get("BEARER_TOKEN") +IPQS_API_KEY = os.environ.get("IPQS_API_KEY") + +if not BEARER_TOKEN or not IPQS_API_KEY: + raise ValueError("BEARER_TOKEN or IPQS_API_KEY not set in environment variables.") + +@app.route("/vpncheck", methods=["POST"]) +def handle_vpncheck(): + return vpncheck(request) + +def vpncheck(request): + # Check for bearer token authentication + auth_header = request.headers.get("Authorization") + if not auth_header or not auth_header.startswith("Bearer "): + return jsonify({"error": "Unauthorized"}), 401 + + provided_token = auth_header.split("Bearer ")[1] + if provided_token != BEARER_TOKEN: + return jsonify({"error": "Unauthorized"}), 401 + + # Parse the JSON payload and extract the IP address + data = request.get_json() + logging.info(f"request: {data}") + ip_address = data.get("ip_address") + if not ip_address: + return error_response("Cannot determine Client IP address") + + # Call vpnapi.io to check the IP address + # if the API fails, we permit by default + try: + vpn_result = query_ipqs(ip_address) + except Exception as e: + return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200 + + # Check the response from IPQS - based on example from https://www.ipqualityscore.com/documentation/proxy-detection/overview + + if not 'success' in vpn_result or vpn_result['success'] == False: + logging.info(f"IPQS failed, result: {vpn_result}") + return error_response("Can't verify IP address") + + if "fraud_score" in vpn_result and vpn_result["fraud_score"] >= 80: + logging.info(f"Blocked: Fraud score {vpn_result['fraud_score']}") + return error_response("Authentication failed: IP address cannot be verified") + if "vpn" in vpn_result and vpn_result["vpn"] == True: + logging.info(f"Blocked: VPN") + return error_response("Authentication failed: Please disable your VPN.") + if "tor" in vpn_result and vpn_result["tor"] == True: + logging.info(f"Blocked: Tor") + return error_response("Authentication failed: Please disable Tor.") + + if ( + "iso_code" in vpn_result + and vpn_result["iso_code"] == "ru" + ): + logging.info(f"geoblock: {vpn_result['iso_code']}") + return error_response("Request blocked: Geolocation") + + # Return the result as success or error details + return jsonify(vpn_result), 200 + + +def error_response(msg): + return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400 + + +def query_ipqs(ip_address): + # Implement the logic to call ipqs.com and retrieve the result + # You can use libraries like requests or httpx for making HTTP requests + # Return the response as a dictionary + # For example: + + ipqs_parameters = { + #'user_agent' : header_items['User-Agent'], + #'user_language' : header_items['Accept-Language'].split(',')[0], + 'strictness' : 1, + # You may want to allow public access points like coffee shops, schools, corporations, etc... + 'allow_public_access_points' : 'true', + # Reduce scoring penalties for mixed quality IP addresses shared by good and bad users. + 'lighter_penalties' : 'false' + } + + + url = f"https://www.ipqualityscore.com/api/json/ip/{IPQS_API_KEY}/{ip_address}" + response = requests.get(url, params = ipqs_parameters, timeout = 1.5) + + + if response.status_code != 200: + raise Exception(f"IPQS returned {response.status_code}") + + result = response.json() + return result + + +if __name__ == "__main__": + app.run() diff --git a/ory-actions/vpncheck-py/vpnapi.py b/ory-actions/vpncheck-py/vpnapi.py new file mode 100644 index 0000000..5aff0b9 --- /dev/null +++ b/ory-actions/vpncheck-py/vpnapi.py @@ -0,0 +1,102 @@ +# Copyright © 2023 Ory Corp +# SPDX-License-Identifier: Apache-2.0 + +from flask import Flask, request, jsonify +import requests + +import os + +# load google cloud logging if running on GCP +if os.getenv('ENABLE_CLOUD_LOGGING', ''): + # set up the Google Cloud Logging python client library + import google.cloud.logging + client = google.cloud.logging.Client() + client.setup_logging() + +# use Python’s standard logging library to send logs to GCP +import logging + +app = Flask(__name__) + +# Define the bearer token for authentication +BEARER_TOKEN = os.environ.get("BEARER_TOKEN") +VPNAPIIO_API_KEY = os.environ.get("VPNAPIIO_API_KEY") + +if not BEARER_TOKEN or not VPNAPIIO_API_KEY: + raise ValueError("BEARER_TOKEN or VPNAPIIO_API_KEY not set in environment variables.") + +@app.route("/vpncheck", methods=["POST"]) +def handle_vpncheck(): + return vpncheck(request) + +def vpncheck(request): + # Check for bearer token authentication + auth_header = request.headers.get("Authorization") + if not auth_header or not auth_header.startswith("Bearer "): + return jsonify({"error": "Unauthorized"}), 401 + + provided_token = auth_header.split("Bearer ")[1] + if provided_token != BEARER_TOKEN: + return jsonify({"error": "Unauthorized"}), 401 + + # Parse the JSON payload and extract the IP address + data = request.get_json() + logging.info(f"request: {data}") + ip_address = data.get("ip_address") + if not ip_address: + return error_response("Cannot determine Client IP address") + + # Call vpnapi.io to check the IP address + # if the API fails, we permit by default + try: + vpn_result = query_vpn_io(ip_address) + except Exception as e: + return jsonify({"warning": "Unable to check VPN: ", "details": str(e)}), 200 + + # Check the response from vpnapi.io + if "error" in vpn_result and vpn_result["error"] == "Blocked": + return error_response("Request blocked: Blocked by VPN API") + + if "security" in vpn_result: + security_info = vpn_result["security"] + if "vpn" in security_info and security_info["vpn"] == True: + logging.info(f"vpn block: {security_info['vpn']}") + return error_response("Request blocked: VPN") + if "tor" in security_info and security_info["tor"] == True: + logging.info(f"tor block: {security_info['tor']}") + return error_response("Request blocked: Tor") + + if ( + "location" in vpn_result + and "country_code" in vpn_result["location"] + and vpn_result["location"]["country_code"] == "RU" + ): + logging.info(f"geoblock: {vpn_result['location']['country_code']}") + return error_response("Request blocked: Geolocation") + + # Return the result as success or error details + return jsonify(vpn_result), 200 + + +def error_response(msg): + return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400 + + +def query_vpn_io(ip_address): + # Implement the logic to call vpnapi.io and retrieve the result + # You can use libraries like requests or httpx for making HTTP requests + # Return the response as a dictionary + # For example: + + url = f"https://vpnapi.io/api/{ip_address}?key={VPNAPIIO_API_KEY}" + response = requests.get(url, timeout=1.5) + if response.status_code != 200: + raise Exception(f"vpnapi.io returned {response.status_code}") + + result = response.json() + + return result + + +if __name__ == "__main__": + app.run() From 5ae4b9289730fb9afca1e3df386efc942f3e2216 Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 02:31:24 +0200 Subject: [PATCH 5/8] chore: readme update --- ory-actions/vpncheck-py/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ory-actions/vpncheck-py/README.md b/ory-actions/vpncheck-py/README.md index 1c2d950..d607a68 100644 --- a/ory-actions/vpncheck-py/README.md +++ b/ory-actions/vpncheck-py/README.md @@ -19,7 +19,7 @@ Cloud Functions, and can be adapted for different scenarios. - A Google Cloud project with Cloud Functions active (or an alternate way to deploy) -- A vpnapi.com account +- An account with focsec.com, ipqs.com or vpnapi.com - python 3.9+ with flask, requests, google cloud logging To install dependencies, run e.g. From f43416d02faf31fff563a998fecac926a865eeaf Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 11:05:23 +0200 Subject: [PATCH 6/8] fix: response format --- ory-actions/vpncheck-py/ipqs.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ory-actions/vpncheck-py/ipqs.py b/ory-actions/vpncheck-py/ipqs.py index d5663b0..c5ad96b 100644 --- a/ory-actions/vpncheck-py/ipqs.py +++ b/ory-actions/vpncheck-py/ipqs.py @@ -79,7 +79,6 @@ def vpncheck(request): # Return the result as success or error details return jsonify(vpn_result), 200 - def error_response(msg): return jsonify({"messages": [{ "messages": [{ "text": msg }] }]}), 400 From 22e4d557588270dcf04b31c2d0071cf6c2c43931 Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 11:07:44 +0200 Subject: [PATCH 7/8] fix: format --- ory-actions/vpncheck-py/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ory-actions/vpncheck-py/README.md b/ory-actions/vpncheck-py/README.md index d607a68..84f4f49 100644 --- a/ory-actions/vpncheck-py/README.md +++ b/ory-actions/vpncheck-py/README.md @@ -34,7 +34,7 @@ pip3 install google-cloud-logging ```bash export BEARER_TOKEN=SOME_SECRET_API_KEY_FOR_YOUR_WEBHOOK; # Set the API Key for the service you use -export VPNAPIIO_API_KEY=YOUR_VPNAPI_KEY; +export VPNAPIIO_API_KEY=YOUR_VPNAPI_KEY; export FOCSEC_API_KEY=YOUR_FOCSEC_KEY; export IPQS_API_KEY=YOUR_IPQS_KEY; ``` From 1da17bdd346753e72bafbf7c251fc77c8f750324 Mon Sep 17 00:00:00 2001 From: Klaus Herrmann Date: Thu, 10 Aug 2023 11:14:03 +0200 Subject: [PATCH 8/8] fix: readme changes --- ory-actions/vpncheck-py/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ory-actions/vpncheck-py/README.md b/ory-actions/vpncheck-py/README.md index 84f4f49..3ad0172 100644 --- a/ory-actions/vpncheck-py/README.md +++ b/ory-actions/vpncheck-py/README.md @@ -1,11 +1,11 @@ -# Ory Action to check IP addresses against vpnapi.io +# Ory Action to check IP addresses against VPN and Fraud detection services This is an example Action (webhook) to check client IP addresses against security services like focsec.com, vpnapi.com and ipqs.com and block requests - coming from TOR clients - coming from known VPNs -- coming from certain geographies (in this example: RU) +- coming from certain geographies It's intended for use as a post-login Action on Ory Network and returns a message that can be parsed by Ory and displayed to the user.