Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for proxy (box/box-codegen#559) #302

Merged
merged 2 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .codegen.json
Original file line number Diff line number Diff line change
@@ -1 +1 @@
{ "engineHash": "edc49a6", "specHash": "6ca858e", "version": "1.4.1" }
{ "engineHash": "5d1b305", "specHash": "6ca858e", "version": "1.4.1" }
10 changes: 10 additions & 0 deletions box_sdk_gen/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,8 @@

from box_sdk_gen.networking.base_urls import BaseUrls

from box_sdk_gen.networking.proxy_config import ProxyConfig


class BoxClient:
def __init__(self, auth: Authentication, *, network_session: NetworkSession = None):
Expand Down Expand Up @@ -439,3 +441,11 @@ def with_custom_base_urls(self, base_urls: BaseUrls) -> 'BoxClient':
auth=self.auth,
network_session=self.network_session.with_custom_base_urls(base_urls),
)

def with_proxy(self, config: ProxyConfig) -> 'BoxClient':
"""
Create a new client with a custom proxy that will be used for every API call
"""
return BoxClient(
auth=self.auth, network_session=self.network_session.with_proxy(config)
)
210 changes: 105 additions & 105 deletions box_sdk_gen/managers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,20 @@

from typing import Optional

from typing import List

from typing import Dict

from box_sdk_gen.internal.utils import to_string

from box_sdk_gen.serialization.json.serializer import deserialize

from box_sdk_gen.schemas.events import Events
from typing import List

from box_sdk_gen.schemas.client_error import ClientError
from box_sdk_gen.internal.utils import to_string

from box_sdk_gen.schemas.realtime_servers import RealtimeServers

from box_sdk_gen.schemas.client_error import ClientError

from box_sdk_gen.schemas.events import Events

from box_sdk_gen.networking.auth import Authentication

from box_sdk_gen.networking.network import NetworkSession
Expand All @@ -26,10 +26,6 @@

from box_sdk_gen.internal.utils import ByteStream

from box_sdk_gen.internal.utils import DateTime

from box_sdk_gen.serialization.json.json_data import sd_to_json

from box_sdk_gen.networking.fetch import FetchOptions

from box_sdk_gen.networking.fetch import FetchResponse
Expand All @@ -38,6 +34,10 @@

from box_sdk_gen.serialization.json.json_data import SerializedData

from box_sdk_gen.internal.utils import DateTime

from box_sdk_gen.serialization.json.json_data import sd_to_json


class GetEventsStreamType(str, Enum):
ALL = 'all'
Expand Down Expand Up @@ -191,6 +191,101 @@ def __init__(
self.auth = auth
self.network_session = network_session

def get_events_with_long_polling(
self, *, extra_headers: Optional[Dict[str, Optional[str]]] = None
) -> RealtimeServers:
"""
Returns a list of real-time servers that can be used for long-polling updates

to the [event stream](#get-events).


Long polling is the concept where a HTTP request is kept open until the


server sends a response, then repeating the process over and over to receive


updated responses.


Long polling the event stream can only be used for user events, not for


enterprise events.


To use long polling, first use this endpoint to retrieve a list of long poll


URLs. Next, make a long poll request to any of the provided URLs.


When an event occurs in monitored account a response with the value


`new_change` will be sent. The response contains no other details as


it only serves as a prompt to take further action such as sending a


request to the [events endpoint](#get-events) with the last known


`stream_position`.


After the server sends this response it closes the connection. You must now


repeat the long poll process to begin listening for events again.


If no events occur for a while and the connection times out you will


receive a response with the value `reconnect`. When you receive this response


you’ll make another call to this endpoint to restart the process.


If you receive no events in `retry_timeout` seconds then you will need to


make another request to the real-time server (one of the URLs in the response


for this endpoint). This might be necessary due to network errors.


Finally, if you receive a `max_retries` error when making a request to the


real-time server, you should start over by making a call to this endpoint


first.

:param extra_headers: Extra headers that will be included in the HTTP request., defaults to None
:type extra_headers: Optional[Dict[str, Optional[str]]], optional
"""
if extra_headers is None:
extra_headers = {}
headers_map: Dict[str, str] = prepare_params({**extra_headers})
response: FetchResponse = fetch(
FetchOptions(
url=''.join([self.network_session.base_urls.base_url, '/2.0/events']),
method='OPTIONS',
headers=headers_map,
response_format='json',
auth=self.auth,
network_session=self.network_session,
)
)
return deserialize(response.data, RealtimeServers)

def get_events(
self,
*,
Expand Down Expand Up @@ -300,98 +395,3 @@ def get_events(
)
)
return deserialize(response.data, Events)

def get_events_with_long_polling(
self, *, extra_headers: Optional[Dict[str, Optional[str]]] = None
) -> RealtimeServers:
"""
Returns a list of real-time servers that can be used for long-polling updates

to the [event stream](#get-events).


Long polling is the concept where a HTTP request is kept open until the


server sends a response, then repeating the process over and over to receive


updated responses.


Long polling the event stream can only be used for user events, not for


enterprise events.


To use long polling, first use this endpoint to retrieve a list of long poll


URLs. Next, make a long poll request to any of the provided URLs.


When an event occurs in monitored account a response with the value


`new_change` will be sent. The response contains no other details as


it only serves as a prompt to take further action such as sending a


request to the [events endpoint](#get-events) with the last known


`stream_position`.


After the server sends this response it closes the connection. You must now


repeat the long poll process to begin listening for events again.


If no events occur for a while and the connection times out you will


receive a response with the value `reconnect`. When you receive this response


you’ll make another call to this endpoint to restart the process.


If you receive no events in `retry_timeout` seconds then you will need to


make another request to the real-time server (one of the URLs in the response


for this endpoint). This might be necessary due to network errors.


Finally, if you receive a `max_retries` error when making a request to the


real-time server, you should start over by making a call to this endpoint


first.

:param extra_headers: Extra headers that will be included in the HTTP request., defaults to None
:type extra_headers: Optional[Dict[str, Optional[str]]], optional
"""
if extra_headers is None:
extra_headers = {}
headers_map: Dict[str, str] = prepare_params({**extra_headers})
response: FetchResponse = fetch(
FetchOptions(
url=''.join([self.network_session.base_urls.base_url, '/2.0/events']),
method='OPTIONS',
headers=headers_map,
response_format='json',
auth=self.auth,
network_session=self.network_session,
)
)
return deserialize(response.data, RealtimeServers)
Loading
Loading