Skip to content

Commit

Permalink
Merge pull request #5 from doronz88/feature/windows
Browse files Browse the repository at this point in the history
add windows support
  • Loading branch information
doronz88 committed Apr 10, 2024
2 parents c1ed5c7 + f61274c commit 16818ed
Show file tree
Hide file tree
Showing 16 changed files with 1,065 additions and 17 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/python-publish-macos-and-linux.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,4 @@ jobs:
CIBW_ARCHS: ${{ matrix.arch }}
run: |
cibuildwheel --output-dir dist
twine upload dist/* --skip-existing
twine upload dist/* --skip-existing
36 changes: 36 additions & 0 deletions .github/workflows/python-publish-windows.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# This workflows will upload a Python Package using Twine when a release is created
# For more information see: https://help.github.com/en/actions/language-and-framework-guides/using-python-with-github-actions#publishing-to-package-registries

name: Upload python package for windows

on:
release:
types: [ created ]

concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
build:
if: '! github.event.pull_request.draft'
runs-on: windows-latest

steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.x'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -U build setuptools wheel twine
- name: Build and publish
env:
TWINE_USERNAME: ${{ secrets.PYPI_USERNAME }}
TWINE_PASSWORD: ${{ secrets.PYPI_PASSWORD }}
CIBW_ARCHS: ${{ matrix.arch }}
run: |
python3 -m build
twine upload dist/* --skip-existing
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
[project]
name = "pytun-pmd3"
version = "1.0.0"
description = "python-pytun fork with Darwin support (IPv6-ONLY)"
description = "python-pytun fork with darwin and windows support (IPv6-ONLY)"
readme = "README.md"
requires-python = ">=3.8"
license = { file = "LICENSE" }
keywords = ["tun", "tuntap", "darwin", "pymobiledevice3", "macos"]
keywords = ["tun", "tuntap", "darwin", "pymobiledevice3", "macos", "windows"]
authors = [
{ name = "doronz88", email = "[email protected]" }
]
Expand All @@ -20,6 +20,7 @@ classifiers = [
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3 :: Only",
]

Expand All @@ -30,6 +31,9 @@ test = ["pytest"]
"Homepage" = "https://github.com/doronz88/pytun-pmd3"
"Bug Reports" = "https://github.com/doronz88/pytun-pmd3/issues"

[tool.setuptools]
package-data = { "pytun_pmd3" = ["wintun/**/*"] }

[tool.setuptools.packages.find]
exclude = ["docs*", "tests*"]

Expand Down
1 change: 1 addition & 0 deletions pytun_pmd3/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from pytun_pmd3.wintun import TunTapDevice
2 changes: 2 additions & 0 deletions pytun_pmd3/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class PyWinTunException(Exception):
pass
303 changes: 303 additions & 0 deletions pytun_pmd3/wintun.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,303 @@
import ctypes
import platform
import subprocess
from ctypes import POINTER, Structure, WinDLL, byref, c_ubyte, c_ulonglong, c_void_p, create_unicode_buffer, \
get_last_error, string_at
from ctypes.wintypes import BOOL, BOOLEAN, BYTE, DWORD, HANDLE, LARGE_INTEGER, LPCWSTR, ULARGE_INTEGER, ULONG, USHORT
from pathlib import Path
from socket import AF_INET6
from uuid import uuid4

from pytun_pmd3.exceptions import PyWinTunException

DEFAULT_ADAPTER_NAME = 'pywintun'
DEFAULT_RING_CAPCITY = 0x400000

# Load the Wintun library
wintun = WinDLL(str(Path(__file__).parent / f'wintun/bin/{platform.uname().machine}/wintun.dll'))
iphlpapi = WinDLL('Iphlpapi.dll')
kernel32 = WinDLL('kernel32.dll')

FORMAT_MESSAGE_FROM_SYSTEM = 0x00001000
FORMAT_MESSAGE_IGNORE_INSERTS = 0x00000200

# Define the return type and argument types of the methods

UCHAR = c_ubyte


class ULARGE_INTEGER(Structure):
_fields_ = [("QuadPart", c_ulonglong)]


class MIB_IPINTERFACE_ROW(Structure):
_fields_ = [
("Family", ULONG),
("InterfaceLuid", ULARGE_INTEGER),
("InterfaceIndex", ULONG),
("MaxReassemblySize", ULONG),
("InterfaceIdentifier", c_ulonglong),
("MinRouterAdvertisementInterval", ULONG),
("MaxRouterAdvertisementInterval", ULONG),
("AdvertisingEnabled", BOOLEAN),
("ForwardingEnabled", BOOLEAN),
("WeakHostSend", BOOLEAN),
("WeakHostReceive", BOOLEAN),
("UseAutomaticMetric", BOOLEAN),
("UseNeighborUnreachabilityDetection", BOOLEAN),
("ManagedAddressConfigurationSupported", BOOLEAN),
("OtherStatefulConfigurationSupported", BOOLEAN),
("AdvertiseDefaultRoute", BOOLEAN),
("RouterDiscoveryBehavior", ULONG),
("DadTransmits", ULONG),
("BaseReachableTime", ULONG),
("RetransmitTime", ULONG),
("PathMtuDiscoveryTimeout", ULONG),
("LinkLocalAddressBehavior", ULONG),
("LinkLocalAddressTimeout", ULONG),
("ZoneIndices", ULONG * 16),
("SitePrefixLength", ULONG),
("Metric", ULONG),
("NlMtu", ULONG),
("Connected", BOOLEAN),
("SupportsWakeUpPatterns", BOOLEAN),
("SupportsNeighborDiscovery", BOOLEAN),
("SupportsRouterDiscovery", BOOLEAN),
("ReachableTime", ULONG),
("TransmitOffload", ULONG),
("ReceiveOffload", ULONG),
("DisableDefaultRoutes", BOOLEAN),
]


class SOCKADDR_INET(Structure):
_fields_ = [
("Ipv4", ULONG), # Placeholder for union
("Ipv6", BYTE * 16),
("si_family", USHORT),
]


class MIB_UNICASTIPADDRESS_ROW(Structure):
_fields_ = [
("Address", SOCKADDR_INET),
("InterfaceLuid", ULARGE_INTEGER),
("InterfaceIndex", DWORD),
("PrefixOrigin", DWORD),
("SuffixOrigin", DWORD),
("ValidLifetime", DWORD),
("PreferredLifetime", DWORD),
("OnLinkPrefixLength", UCHAR),
("SkipAsSource", BOOLEAN),
("DadState", DWORD),
("ScopeId", ULONG),
("CreationTimeStamp", LARGE_INTEGER),
]


# WintunCreateAdapter(const WCHAR *AdapterName, const GUID *TunnelType, const GUID *RequestedGUID, GUID *AllocatedGUID,
# DWORD *LastError)
wintun.WintunCreateAdapter.restype = HANDLE
wintun.WintunCreateAdapter.argtypes = [LPCWSTR, POINTER(c_ubyte * 16),
POINTER(c_ubyte * 16), POINTER(c_ubyte * 16),
POINTER(DWORD)]

wintun.WintunCloseAdapter.restype = BOOL
wintun.WintunCloseAdapter.argtypes = [HANDLE]

wintun.WintunGetAdapterLUID.restype = None
wintun.WintunGetAdapterLUID.argtypes = [HANDLE, POINTER(ULARGE_INTEGER)]

wintun.WintunStartSession.restype = HANDLE
wintun.WintunStartSession.argtypes = [HANDLE, ULONG]

wintun.WintunEndSession.restype = None
wintun.WintunEndSession.argtypes = [HANDLE]

wintun.WintunAllocateSendPacket.restype = POINTER(c_ubyte)
wintun.WintunAllocateSendPacket.argtypes = [HANDLE, DWORD]

wintun.WintunReceivePacket.restype = POINTER(c_ubyte)
wintun.WintunReceivePacket.argtypes = [HANDLE, POINTER(DWORD)]

wintun.WintunReleaseReceivePacket.restype = None
wintun.WintunReleaseReceivePacket.argtypes = [HANDLE, POINTER(c_ubyte)]

wintun.WintunSendPacket.restype = None
wintun.WintunSendPacket.argtypes = [HANDLE, c_void_p]

iphlpapi.InitializeIpInterfaceEntry.restype = None
iphlpapi.InitializeIpInterfaceEntry.argtypes = [c_void_p]

iphlpapi.SetIpInterfaceEntry.restype = DWORD
iphlpapi.SetIpInterfaceEntry.argtypes = [c_void_p]

iphlpapi.GetIpInterfaceEntry.restype = DWORD
iphlpapi.GetIpInterfaceEntry.argtypes = [POINTER(MIB_IPINTERFACE_ROW)]

iphlpapi.CreateUnicastIpAddressEntry.argtypes = [POINTER(MIB_UNICASTIPADDRESS_ROW)]
iphlpapi.CreateUnicastIpAddressEntry.restype = DWORD


def get_error_message(error_code: int) -> str:
"""
Uses FormatMessage to retrieve the system error message for the given error_code.
:param error_code: The error code for which to get the error message.
:return: The formatted error message string.
"""
buffer = create_unicode_buffer(256) # Adjust the size as necessary
kernel32.FormatMessageW(
FORMAT_MESSAGE_FROM_SYSTEM | FORMAT_MESSAGE_IGNORE_INSERTS,
None,
error_code,
0,
buffer,
len(buffer),
None
)
return buffer.value.strip()


def raise_last_error() -> None:
raise Exception(get_error_message(get_last_error()))


def set_adapter_mtu(adapter_handle: HANDLE, mtu: int) -> None:
luid = ULARGE_INTEGER()
wintun.WintunGetAdapterLUID(adapter_handle, byref(luid))

print('luid', luid, hex(adapter_handle))
row = MIB_IPINTERFACE_ROW()
iphlpapi.InitializeIpInterfaceEntry(byref(row))

row.InterfaceLuid.QuadPart = luid.QuadPart # Ensure correct assignment
row.Family = 2 # Assuming IPv4; for IPv6, use AF_INET6 or 23

row.NlMtu = mtu # Set the new MTU value

# Attempt to set the modified interface entry
result = iphlpapi.SetIpInterfaceEntry(byref(row))
if result != 0:
raise PyWinTunException(f"Failed to set adapter MTU, error code: {result} ({get_error_message(result)})")

# Attempt to get the current interface entry to ensure all other fields are correctly populated
result = iphlpapi.GetIpInterfaceEntry(byref(row))
if result != 0:
raise PyWinTunException(f"Failed to get IP interface entry, error code: {result}")


class TunTapDevice:
def __init__(self, name: str = DEFAULT_ADAPTER_NAME) -> None:
tunnel_type_guid = (c_ubyte * 16)(*uuid4().bytes)
requested_guid = (c_ubyte * 16)(*uuid4().bytes)
allocated_guid = (c_ubyte * 16)() # Empty GUID, to be filled by the function
last_error = DWORD()
self._name = name

self.session = None

# Create an adapter
self.handle = wintun.WintunCreateAdapter(name, byref(tunnel_type_guid),
byref(requested_guid),
byref(allocated_guid), byref(last_error))

if not self.handle:
raise PyWinTunException(f"Failed to create adapter. Last error: {last_error.value}")

@property
def luid(self) -> c_ulonglong:
luid = ULARGE_INTEGER()
wintun.WintunGetAdapterLUID(self.handle, byref(luid))
row = MIB_IPINTERFACE_ROW()
iphlpapi.InitializeIpInterfaceEntry(byref(row))
return luid.QuadPart

@property
def ip_interface_entry(self) -> MIB_IPINTERFACE_ROW:
row = MIB_IPINTERFACE_ROW()
iphlpapi.InitializeIpInterfaceEntry(byref(row))

row.InterfaceLuid.QuadPart = self.luid
row.Family = AF_INET6

result = iphlpapi.GetIpInterfaceEntry(byref(row))
if result != 0:
raise PyWinTunException(f"Failed to get IP interface entry, error code: {result}")
return row

@ip_interface_entry.setter
def ip_interface_entry(self, value: MIB_IPINTERFACE_ROW) -> None:
result = iphlpapi.SetIpInterfaceEntry(byref(value))
if result != 0:
raise PyWinTunException(f"Failed to set adapter MTU, error code: {result} ({get_error_message(result)})")

@property
def mtu(self) -> int:
return self.ip_interface_entry.NlMtu

@mtu.setter
def mtu(self, value: int) -> None:
row = self.ip_interface_entry
row.NlMtu = value
self.ip_interface_entry = row

@property
def interface_index(self) -> int:
return self.ip_interface_entry.InterfaceIndex

def close(self) -> None:
wintun.WintunCloseAdapter(self.handle)

@property
def addr(self) -> str:
return ''

@addr.setter
def addr(self, value: str) -> None:
result = subprocess.run(f"netsh interface ipv6 set address interface={self.interface_index} address={value}/64",
shell=True, capture_output=True, text=True)

# Check result
if result.returncode != 0:
raise PyWinTunException(f"Failed to set IPv6 address. Error: {result.stderr}")

@property
def name(self) -> str:
return self._name

def up(self, capacity: int = DEFAULT_RING_CAPCITY) -> None:
self.session = wintun.WintunStartSession(self.handle, capacity)

def down(self) -> None:
wintun.WintunEndSession(self.session)
self.session = None

def read(self) -> bytes:
size = DWORD()
packet_ptr = wintun.WintunReceivePacket(self.session, byref(size))
if not packet_ptr:
# No packet was received
return b''

# Create a bytes object from the packet data
packet_data = string_at(packet_ptr, size.value)

wintun.WintunReleaseReceivePacket(self.session, packet_ptr)

if (packet_data[0] >> 4) != 6:
# make sure to output only IPv6 packets
return b''

return packet_data

def write(self, payload: bytes) -> None:
if payload.startswith(b'\x00\x00\x86\xdd'):
payload = payload[4:]

packet_ptr = wintun.WintunAllocateSendPacket(self.session, len(payload))
if packet_ptr == 0:
raise PyWinTunException('failed to allocate packet')

ctypes.memmove(packet_ptr, payload, len(payload))
wintun.WintunSendPacket(self.session, packet_ptr)
Loading

0 comments on commit 16818ed

Please sign in to comment.