|
10 | 10 | """
|
11 | 11 | import gzip
|
12 | 12 | import json
|
| 13 | +import base64 |
13 | 14 | import selectors
|
14 | 15 | from typing import Any, cast
|
15 | 16 | from urllib import parse as urlparse
|
|
29 | 30 | from proxy.http.parser import HttpParser, httpParserTypes
|
30 | 31 | from proxy.common.utils import bytes_, build_http_request, build_http_response
|
31 | 32 | from proxy.http.responses import (
|
32 |
| - NOT_FOUND_RESPONSE_PKT, PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT, |
| 33 | + NOT_FOUND_RESPONSE_PKT, PROXY_AUTH_FAILED_RESPONSE_PKT, |
| 34 | + PROXY_TUNNEL_ESTABLISHED_RESPONSE_PKT, |
33 | 35 | )
|
34 | 36 | from proxy.common.constants import DEFAULT_HTTP_PORT, PROXY_AGENT_HEADER_VALUE
|
35 | 37 | from .utils import get_plugin_by_test_name
|
@@ -555,3 +557,73 @@ async def test_shortlink_plugin_external(self) -> None:
|
555 | 557 | ),
|
556 | 558 | )
|
557 | 559 | self.assertFalse(self.protocol_handler.work.has_buffer())
|
| 560 | + |
| 561 | + @pytest.mark.asyncio # type: ignore[misc] |
| 562 | + @pytest.mark.parametrize( |
| 563 | + "_setUp", |
| 564 | + ( |
| 565 | + ('test_auth_plugin'), |
| 566 | + ), |
| 567 | + indirect=True, |
| 568 | + ) # type: ignore[misc] |
| 569 | + async def test_auth_plugin(self) -> None: |
| 570 | + self.flags.auth_code = base64.b64encode(bytes_("admin:123456")) |
| 571 | + |
| 572 | + request = b'\r\n'.join([ |
| 573 | + b'GET http://www.facebook.com/tr/ HTTP/1.1', |
| 574 | + b'Host: www.facebook.com', |
| 575 | + b'User-Agent: proxy.py v2.4.4rc5.dev3+g95b646a.d20230811', |
| 576 | + b'', |
| 577 | + b'', |
| 578 | + ]) |
| 579 | + |
| 580 | + self._conn.recv.return_value = request |
| 581 | + self.mock_selector.return_value.select.side_effect = [ |
| 582 | + [( |
| 583 | + selectors.SelectorKey( |
| 584 | + fileobj=self._conn.fileno(), |
| 585 | + fd=self._conn.fileno(), |
| 586 | + events=selectors.EVENT_READ, |
| 587 | + data=None, |
| 588 | + ), |
| 589 | + selectors.EVENT_READ, |
| 590 | + )], |
| 591 | + ] |
| 592 | + await self.protocol_handler._run_once() |
| 593 | + self.assertEqual( |
| 594 | + self.protocol_handler.work.buffer[0], |
| 595 | + PROXY_AUTH_FAILED_RESPONSE_PKT, |
| 596 | + ) |
| 597 | + |
| 598 | + @pytest.mark.asyncio # type: ignore[misc] |
| 599 | + @pytest.mark.parametrize( |
| 600 | + "_setUp", |
| 601 | + ( |
| 602 | + ('test_auth_plugin'), |
| 603 | + ), |
| 604 | + indirect=True, |
| 605 | + ) # type: ignore[misc] |
| 606 | + async def test_auth_plugin_bypass(self) -> None: |
| 607 | + self.flags.auth_code = base64.b64encode(bytes_("admin:123456")) |
| 608 | + |
| 609 | + # miss requests header when https and HTTP 1.0 |
| 610 | + request = b'CONNECT www.facebook.com:443 HTTP/1.0\r\n\r\n' |
| 611 | + |
| 612 | + self._conn.recv.return_value = request |
| 613 | + self.mock_selector.return_value.select.side_effect = [ |
| 614 | + [( |
| 615 | + selectors.SelectorKey( |
| 616 | + fileobj=self._conn.fileno(), |
| 617 | + fd=self._conn.fileno(), |
| 618 | + events=selectors.EVENT_READ, |
| 619 | + data=None, |
| 620 | + ), |
| 621 | + selectors.EVENT_READ, |
| 622 | + )], |
| 623 | + ] |
| 624 | + await self.protocol_handler._run_once() |
| 625 | + |
| 626 | + self.assertEqual( |
| 627 | + self.protocol_handler.work.buffer[0], |
| 628 | + PROXY_AUTH_FAILED_RESPONSE_PKT, |
| 629 | + ) |
0 commit comments