Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix incorrect UDP packet truncation. Add TCP_NODELAY. #8

Merged
merged 3 commits into from Aug 26, 2024
Merged

Fix incorrect UDP packet truncation. Add TCP_NODELAY. #8

merged 3 commits into from Aug 26, 2024

Conversation

ghost
Copy link

@ghost ghost commented Aug 26, 2024

解决 UDP 包被截断的问题,可能解决了 #4 ,没有游戏无法证实,2k大小的UDP包通过ECHO SERVER测试,待确认。

TCPStream 增加 TCP_NODELAY

max_external_packet_size 服务端配置在本PR中被移除

@Itsusinn
Copy link
Owner

不太可能出现大于1.5KB的udp报文的,按理来说程序会将超过MTU的udp包切片发送

@ghost
Copy link
Author

ghost commented Aug 26, 2024

@Itsusinn 是的,但是这个位置是拼接后的

@Itsusinn
Copy link
Owner

Itsusinn commented Aug 26, 2024

啊?我再看看。
还有这是分配了一个60K的缓冲吗?

@ghost
Copy link
Author

ghost commented Aug 26, 2024

啊?我再看看。 还有这是分配了一个60K的缓冲吗?

对的,不预分配可以用peek查询,效率没直接划个大池高

@Itsusinn
Copy link
Owner

Itsusinn commented Aug 26, 2024

虽然这60k里有些稍后又释放了。
但我担心高并发时,这60k还没来得及释放就又需要分配新的60k。这可能会导致内存是这样的布局:
1k数据 + 59k free + 1k数据 + 59k free + 1k数据 + 59k free

@ghost
Copy link
Author

ghost commented Aug 26, 2024

虽然这60k里有些稍后又释放了。 但我担心高并发时,这60k还没来得及释放就又需要分配新的60k。这可能会导致内存是这样的布局: 1k数据 + 59k free + 1k数据 + 59k free + 1k数据 + 59k free

好像Rust的内存管理挺好的,我开了32个线程发UDP大包,内存稳定5MB,CPU也是0.5%,岿然不动,htop第一页都找不到

@Itsusinn
Copy link
Owner

好像Rust的内存管理挺好的,我开了32个线程发UDP大包,内存稳定5MB,CPU也是0.5%,岿然不动,htop第一页都找不到

挺好,可能这个60k整体的存活时间不长。
那我合掉吧

@Itsusinn Itsusinn merged commit e048526 into Itsusinn:dev Aug 26, 2024
17 checks passed
@ghost
Copy link
Author

ghost commented Aug 26, 2024

Benchmark 代码 (问GPT要了一份)

  • Client:
#!/usr/bin/env python3
"""
A multi-threaded client for talking to a UDP echo server, sending packets using a thread pool.
"""
import argparse
import itertools
import logging
import socket
import time
import os
import socks
from concurrent.futures import ThreadPoolExecutor
import threading

logger = logging.getLogger(__name__)

# the buffer for receiving incoming messages
BUFFER_SIZE = 84096
PACKET_SIZE = 1892

def generate_packet(size):
    """Generate a packet of the specified size."""
    return os.urandom(size)

def send_and_receive_one(sock, packet, addr):
    """Sends a single datagram over the socket, waits for the response, and validates it."""
    thread_name = threading.current_thread().name
    output_len = sock.sendto(packet, addr)
    logger.info(f"[{thread_name}] Sent packet to {addr}: {output_len} bytes")
    
    try:
        input_data, addr = sock.recvfrom(BUFFER_SIZE)
        logger.info(f"[{thread_name}] Received packet back from {addr}: {len(input_data)} bytes")
        
        if input_data == packet:
            logger.info(f"[{thread_name}] Received data matches sent data.")
        else:
            logger.warning(f"[{thread_name}] Received data does not match sent data!")
            if len(input_data) != len(packet):
                logger.warning(f"[{thread_name}] Length mismatch: sent {len(packet)} bytes, received {len(input_data)} bytes.")
            else:
                mismatch_count = sum(1 for a, b in zip(input_data, packet) if a != b)
                logger.warning(f"[{thread_name}] Data mismatch: {mismatch_count} bytes differ.")
    except socket.timeout:
        logger.warning(f"[{thread_name}] Packet never received back from {addr}")

def worker(args):
    """Worker function for each thread in the pool."""
    sock = socks.socksocket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.set_proxy(socks.SOCKS5, "127.0.0.1", 2080)
    sock.settimeout(1)
    addr = (args.host, args.port)
    
    try:
        while True:
            packet = generate_packet(PACKET_SIZE)
            send_and_receive_one(sock, packet, addr)
    finally:
        sock.close()

def start(args):
    """Starts the thread pool and assigns tasks to workers."""
    with ThreadPoolExecutor(max_workers=args.threads) as executor:
        futures = [executor.submit(worker, args) for _ in range(args.threads)]
        
        try:
            for future in futures:
                future.result()
        except KeyboardInterrupt:
            logger.info("Keyboard interrupt received. Shutting down...")
            for future in futures:
                future.cancel()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument('--host', help='The host that the client should connect to.', default="127.0.0.1")
    parser.add_argument('--port', help='The port that the client should connect to.', type=int, default=123)
    parser.add_argument('--threads', help='Number of threads in the pool.', type=int, default=32)
    parser.add_argument('--verbose', '-v', help="Increases the logging verbosity level.", action='count', default=0)
    args = parser.parse_args()

    logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
                        format='%(asctime)s %(levelname)s %(message)s')
    
    start(args)

  • Server
#!/usr/bin/env python3
"""
A simple UDP echo server that can handle binary data.
"""
import argparse
import itertools
import logging
import socket

logger = logging.getLogger(__name__)

# the buffer for receiving incoming messages
BUFFER_SIZE = 54096

def receive_next(sock):
    "Repeatedly tries receiving on the given socket until some data comes in."
    logger.debug("Waiting to receive data...")
    while True:
        try:
            return sock.recvfrom(BUFFER_SIZE)
        except socket.timeout:
            logger.debug("No data received yet: retrying.")
            pass

def receive_and_send_one(sock):
    "Waits for a single datagram over the socket and echoes it back."
    input_data, addr = receive_next(sock)
    logger.info("Received packet from %s: %s bytes", addr, len(input_data))
    output_len = sock.sendto(input_data, addr)
    logger.info("Echoed packet back to %s: %s bytes", addr, output_len)

def start(args):
    "Runs the server."
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    sock.settimeout(5) # seconds
    sock.bind((args.host, args.port))
    logger.info("Listening on %s:%s.", args.host, args.port)
    try:
        for i in itertools.count(1):
            receive_and_send_one(sock)
    finally:
        logger.info("Shutting down.")
        sock.close()

if __name__ == "__main__":
    parser = argparse.ArgumentParser(__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument('--host', help='The host that the server socket should bind to.', default="0.0.0.0")
    parser.add_argument('--port', help='The port that the server socket should bind to.', type=int, default=123)
    parser.add_argument('--verbose', '-v', help="Increases the logging verbosity level.", action='count')
    args = parser.parse_args()
    logging.basicConfig(level=logging.DEBUG if args.verbose else logging.INFO,
                        format='%(asctime)s %(levelname)s %(message)s')
    start(args)

占用绘制:
1
确定没看错,真是一条直线

Itsusinn added a commit that referenced this pull request Dec 10, 2024
Fix incorrect UDP packet truncation. Add TCP_NODELAY.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant