Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add windows support #5

Merged
merged 1 commit into from
Apr 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/python-publish-macos-and-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ jobs:
CIBW_ARCHS: ${{ matrix.arch }}
run: |
cibuildwheel --output-dir dist
twine upload dist/* --skip-existing
twine upload dist/* --skip-existing
36 changes: 36 additions & 0 deletions .github/workflows/python-publish-windows.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This workflows will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries

name: Upload python package for windows

on:
release:
types: [ created ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
if: '! github.event.pull_request.draft'
runs-on: windows-latest

steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -U build setuptools wheel twine
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
CIBW_ARCHS: ${{ matrix.arch }}
run: |
python3 -m build
twine upload dist/* --skip-existing
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[project]
name = "pytun-pmd3"
version = "1.0.0"
description = "python-pytun fork with Darwin support (IPv6-ONLY)"
description = "python-pytun fork with darwin and windows support (IPv6-ONLY)"
readme = "README.md"
requires-python = ">=3.8"
license = { file = "LICENSE" }
keywords = ["tun", "tuntap", "darwin", "pymobiledevice3", "macos"]
keywords = ["tun", "tuntap", "darwin", "pymobiledevice3", "macos", "windows"]
authors = [
{ name = "doronz88", email = "[email protected]" }
]
Expand All @@ -20,6 +20,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3 :: Only",
]

Expand All @@ -30,6 +31,9 @@ test = ["pytest"]
"Homepage" = "https://github.com/doronz88/pytun-pmd3"
"Bug Reports" = "https://github.com/doronz88/pytun-pmd3/issues"

[tool.setuptools]
package-data = { "pytun_pmd3" = ["wintun/**/*"] }

[tool.setuptools.packages.find]
exclude = ["docs*", "tests*"]

Expand Down
1 change: 1 addition & 0 deletions pytun_pmd3/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pytun_pmd3.wintun import TunTapDevice
2 changes: 2 additions & 0 deletions pytun_pmd3/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class PyWinTunException(Exception):
pass
303 changes: 303 additions & 0 deletions pytun_pmd3/wintun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
import ctypes
import platform
import subprocess
from ctypes import POINTER, Structure, WinDLL, byref, c_ubyte, c_ulonglong, c_void_p, create_unicode_buffer, \
get_last_error, string_at
from ctypes.wintypes import BOOL, BOOLEAN, BYTE, DWORD, HANDLE, LARGE_INTEGER, LPCWSTR, ULARGE_INTEGER, ULONG, USHORT
from pathlib import Path
from socket import AF_INET6
from uuid import uuid4

from pytun_pmd3.exceptions import PyWinTunException

DEFAULT_ADAPTER_NAME = 'pywintun'
DEFAULT_RING_CAPCITY = 0x400000

# Load the Wintun library
wintun = WinDLL(str(Path(__file__).parent / f'wintun/bin/{platform.uname().machine}/wintun.dll'))
iphlpapi = WinDLL('Iphlpapi.dll')
kernel32 = WinDLL('kernel32.dll')

FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200

# Define the return type and argument types of the methods

UCHAR = c_ubyte


class ULARGE_INTEGER(Structure):
_fields_ = [("QuadPart", c_ulonglong)]


class MIB_IPINTERFACE_ROW(Structure):
_fields_ = [
("Family", ULONG),
("InterfaceLuid", ULARGE_INTEGER),
("InterfaceIndex", ULONG),
("MaxReassemblySize", ULONG),
("InterfaceIdentifier", c_ulonglong),
("MinRouterAdvertisementInterval", ULONG),
("MaxRouterAdvertisementInterval", ULONG),
("AdvertisingEnabled", BOOLEAN),
("ForwardingEnabled", BOOLEAN),
("WeakHostSend", BOOLEAN),
("WeakHostReceive", BOOLEAN),
("UseAutomaticMetric", BOOLEAN),
("UseNeighborUnreachabilityDetection", BOOLEAN),
("ManagedAddressConfigurationSupported", BOOLEAN),
("OtherStatefulConfigurationSupported", BOOLEAN),
("AdvertiseDefaultRoute", BOOLEAN),
("RouterDiscoveryBehavior", ULONG),
("DadTransmits", ULONG),
("BaseReachableTime", ULONG),
("RetransmitTime", ULONG),
("PathMtuDiscoveryTimeout", ULONG),
("LinkLocalAddressBehavior", ULONG),
("LinkLocalAddressTimeout", ULONG),
("ZoneIndices", ULONG * 16),
("SitePrefixLength", ULONG),
("Metric", ULONG),
("NlMtu", ULONG),
("Connected", BOOLEAN),
("SupportsWakeUpPatterns", BOOLEAN),
("SupportsNeighborDiscovery", BOOLEAN),
("SupportsRouterDiscovery", BOOLEAN),
("ReachableTime", ULONG),
("TransmitOffload", ULONG),
("ReceiveOffload", ULONG),
("DisableDefaultRoutes", BOOLEAN),
]


class SOCKADDR_INET(Structure):
_fields_ = [
("Ipv4", ULONG), # Placeholder for union
("Ipv6", BYTE * 16),
("si_family", USHORT),
]


class MIB_UNICASTIPADDRESS_ROW(Structure):
_fields_ = [
("Address", SOCKADDR_INET),
("InterfaceLuid", ULARGE_INTEGER),
("InterfaceIndex", DWORD),
("PrefixOrigin", DWORD),
("SuffixOrigin", DWORD),
("ValidLifetime", DWORD),
("PreferredLifetime", DWORD),
("OnLinkPrefixLength", UCHAR),
("SkipAsSource", BOOLEAN),
("DadState", DWORD),
("ScopeId", ULONG),
("CreationTimeStamp", LARGE_INTEGER),
]


# WintunCreateAdapter(const WCHAR *AdapterName, const GUID *TunnelType, const GUID *RequestedGUID, GUID *AllocatedGUID,
# DWORD *LastError)
wintun.WintunCreateAdapter.restype = HANDLE
wintun.WintunCreateAdapter.argtypes = [LPCWSTR, POINTER(c_ubyte * 16),
POINTER(c_ubyte * 16), POINTER(c_ubyte * 16),
POINTER(DWORD)]

wintun.WintunCloseAdapter.restype = BOOL
wintun.WintunCloseAdapter.argtypes = [HANDLE]

wintun.WintunGetAdapterLUID.restype = None
wintun.WintunGetAdapterLUID.argtypes = [HANDLE, POINTER(ULARGE_INTEGER)]

wintun.WintunStartSession.restype = HANDLE
wintun.WintunStartSession.argtypes = [HANDLE, ULONG]

wintun.WintunEndSession.restype = None
wintun.WintunEndSession.argtypes = [HANDLE]

wintun.WintunAllocateSendPacket.restype = POINTER(c_ubyte)
wintun.WintunAllocateSendPacket.argtypes = [HANDLE, DWORD]

wintun.WintunReceivePacket.restype = POINTER(c_ubyte)
wintun.WintunReceivePacket.argtypes = [HANDLE, POINTER(DWORD)]

wintun.WintunReleaseReceivePacket.restype = None
wintun.WintunReleaseReceivePacket.argtypes = [HANDLE, POINTER(c_ubyte)]

wintun.WintunSendPacket.restype = None
wintun.WintunSendPacket.argtypes = [HANDLE, c_void_p]

iphlpapi.InitializeIpInterfaceEntry.restype = None
iphlpapi.InitializeIpInterfaceEntry.argtypes = [c_void_p]

iphlpapi.SetIpInterfaceEntry.restype = DWORD
iphlpapi.SetIpInterfaceEntry.argtypes = [c_void_p]

iphlpapi.GetIpInterfaceEntry.restype = DWORD
iphlpapi.GetIpInterfaceEntry.argtypes = [POINTER(MIB_IPINTERFACE_ROW)]

iphlpapi.CreateUnicastIpAddressEntry.argtypes = [POINTER(MIB_UNICASTIPADDRESS_ROW)]
iphlpapi.CreateUnicastIpAddressEntry.restype = DWORD


def get_error_message(error_code: int) -> str:
"""
Uses FormatMessage to retrieve the system error message for the given error_code.

:param error_code: The error code for which to get the error message.
:return: The formatted error message string.
"""
buffer = create_unicode_buffer(256) # Adjust the size as necessary
kernel32.FormatMessageW(
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
None,
error_code,
0,
buffer,
len(buffer),
None
)
return buffer.value.strip()


def raise_last_error() -> None:
raise Exception(get_error_message(get_last_error()))


def set_adapter_mtu(adapter_handle: HANDLE, mtu: int) -> None:
luid = ULARGE_INTEGER()
wintun.WintunGetAdapterLUID(adapter_handle, byref(luid))

print('luid', luid, hex(adapter_handle))
row = MIB_IPINTERFACE_ROW()
iphlpapi.InitializeIpInterfaceEntry(byref(row))

row.InterfaceLuid.QuadPart = luid.QuadPart # Ensure correct assignment
row.Family = 2 # Assuming IPv4; for IPv6, use AF_INET6 or 23

row.NlMtu = mtu # Set the new MTU value

# Attempt to set the modified interface entry
result = iphlpapi.SetIpInterfaceEntry(byref(row))
if result != 0:
raise PyWinTunException(f"Failed to set adapter MTU, error code: {result} ({get_error_message(result)})")

# Attempt to get the current interface entry to ensure all other fields are correctly populated
result = iphlpapi.GetIpInterfaceEntry(byref(row))
if result != 0:
raise PyWinTunException(f"Failed to get IP interface entry, error code: {result}")


class TunTapDevice:
def __init__(self, name: str = DEFAULT_ADAPTER_NAME) -> None:
tunnel_type_guid = (c_ubyte * 16)(*uuid4().bytes)
requested_guid = (c_ubyte * 16)(*uuid4().bytes)
allocated_guid = (c_ubyte * 16)() # Empty GUID, to be filled by the function
last_error = DWORD()
self._name = name

self.session = None

# Create an adapter
self.handle = wintun.WintunCreateAdapter(name, byref(tunnel_type_guid),
byref(requested_guid),
byref(allocated_guid), byref(last_error))

if not self.handle:
raise PyWinTunException(f"Failed to create adapter. Last error: {last_error.value}")

@property
def luid(self) -> c_ulonglong:
luid = ULARGE_INTEGER()
wintun.WintunGetAdapterLUID(self.handle, byref(luid))
row = MIB_IPINTERFACE_ROW()
iphlpapi.InitializeIpInterfaceEntry(byref(row))
return luid.QuadPart

@property
def ip_interface_entry(self) -> MIB_IPINTERFACE_ROW:
row = MIB_IPINTERFACE_ROW()
iphlpapi.InitializeIpInterfaceEntry(byref(row))

row.InterfaceLuid.QuadPart = self.luid
row.Family = AF_INET6

result = iphlpapi.GetIpInterfaceEntry(byref(row))
if result != 0:
raise PyWinTunException(f"Failed to get IP interface entry, error code: {result}")
return row

@ip_interface_entry.setter
def ip_interface_entry(self, value: MIB_IPINTERFACE_ROW) -> None:
result = iphlpapi.SetIpInterfaceEntry(byref(value))
if result != 0:
raise PyWinTunException(f"Failed to set adapter MTU, error code: {result} ({get_error_message(result)})")

@property
def mtu(self) -> int:
return self.ip_interface_entry.NlMtu

@mtu.setter
def mtu(self, value: int) -> None:
row = self.ip_interface_entry
row.NlMtu = value
self.ip_interface_entry = row

@property
def interface_index(self) -> int:
return self.ip_interface_entry.InterfaceIndex

def close(self) -> None:
wintun.WintunCloseAdapter(self.handle)

@property
def addr(self) -> str:
return ''

@addr.setter
def addr(self, value: str) -> None:
result = subprocess.run(f"netsh interface ipv6 set address interface={self.interface_index} address={value}/64",
shell=True, capture_output=True, text=True)

# Check result
if result.returncode != 0:
raise PyWinTunException(f"Failed to set IPv6 address. Error: {result.stderr}")

@property
def name(self) -> str:
return self._name

def up(self, capacity: int = DEFAULT_RING_CAPCITY) -> None:
self.session = wintun.WintunStartSession(self.handle, capacity)

def down(self) -> None:
wintun.WintunEndSession(self.session)
self.session = None

def read(self) -> bytes:
size = DWORD()
packet_ptr = wintun.WintunReceivePacket(self.session, byref(size))
if not packet_ptr:
# No packet was received
return b''

# Create a bytes object from the packet data
packet_data = string_at(packet_ptr, size.value)

wintun.WintunReleaseReceivePacket(self.session, packet_ptr)

if (packet_data[0] >> 4) != 6:
# make sure to output only IPv6 packets
return b''

return packet_data

def write(self, payload: bytes) -> None:
if payload.startswith(b'\x00\x00\x86\xdd'):
payload = payload[4:]

packet_ptr = wintun.WintunAllocateSendPacket(self.session, len(payload))
if packet_ptr == 0:
raise PyWinTunException('failed to allocate packet')

ctypes.memmove(packet_ptr, payload, len(payload))
wintun.WintunSendPacket(self.session, packet_ptr)
Loading