diff --git a/airbyte-integrations/connectors/source-hubspot/integration_tests/incremental_catalog.json b/airbyte-integrations/connectors/source-hubspot/integration_tests/incremental_catalog.json index ccab47f3a4f0..01ff994a6336 100644 --- a/airbyte-integrations/connectors/source-hubspot/integration_tests/incremental_catalog.json +++ b/airbyte-integrations/connectors/source-hubspot/integration_tests/incremental_catalog.json @@ -1,316 +1,1004 @@ { "streams": [ - { - "stream": { - "name": "campaigns", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["lastUpdatedTime"] - }, - "sync_mode": "incremental", - "cursor_field": ["lastUpdatedTime"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "companies", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "contact_lists", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, { "stream": { "name": "contacts", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "deal_pipelines", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "deal_splits", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "deals", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "deals_archived", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["archivedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["archivedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "email_events", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["created"] - }, - "sync_mode": "incremental", - "cursor_field": ["created"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "engagements", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["lastUpdated"] - }, - "sync_mode": "incremental", - "cursor_field": ["lastUpdated"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "engagements_calls", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "engagements_emails", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "engagements_meetings", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "engagements_notes", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "engagements_tasks", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "forms", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "goals", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "line_items", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "owners", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "products", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "subscription_changes", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["timestamp"] - }, - "sync_mode": "incremental", - "cursor_field": ["timestamp"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "tickets", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "workflows", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["updatedAt"] - }, - "sync_mode": "incremental", - "cursor_field": ["updatedAt"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "contacts_property_history", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["timestamp"] - }, - "sync_mode": "incremental", - "cursor_field": ["timestamp"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "companies_property_history", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], - "source_defined_cursor": true, - "default_cursor_field": ["timestamp"] - }, - "sync_mode": "incremental", - "cursor_field": ["timestamp"], - "destination_sync_mode": "append" - }, - { - "stream": { - "name": "deals_property_history", - "json_schema": {}, - "supported_sync_modes": ["full_refresh", "incremental"], + "json_schema": { + "$schema": "http://json-schema.org/draft-07/schema#", + "properties": { + "archived": { + "description": "Indicates if the contact is archived or not.", + "type": [ + "null", + "boolean" + ] + }, + "companies": { + "description": "List of companies associated with the contact.", + "items": { + "description": "Details of a company associated with the contact.", + "type": [ + "null", + "string" + ] + }, + "type": [ + "null", + "array" + ] + }, + "createdAt": { + "description": "Date and time when the contact was created.", + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "id": { + "description": "Unique identifier for the contact.", + "type": [ + "null", + "string" + ] + }, + "properties": { + "properties": { + "address": { + "type": [ + "null", + "string" + ] + }, + "annualrevenue": { + "type": [ + "null", + "string" + ] + }, + "associatedcompanyid": { + "type": [ + "null", + "number" + ] + }, + "associatedcompanylastupdated": { + "type": [ + "null", + "number" + ] + }, + "city": { + "type": [ + "null", + "string" + ] + }, + "closedate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "company": { + "type": [ + "null", + "string" + ] + }, + "company_size": { + "type": [ + "null", + "string" + ] + }, + "country": { + "type": [ + "null", + "string" + ] + }, + "createdate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "currentlyinworkflow": { + "type": [ + "null", + "string" + ] + }, + "date_of_birth": { + "type": [ + "null", + "string" + ] + }, + "days_to_close": { + "type": [ + "null", + "number" + ] + }, + "degree": { + "type": [ + "null", + "string" + ] + }, + "email": { + "type": [ + "null", + "string" + ] + }, + "engagements_last_meeting_booked": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "engagements_last_meeting_booked_campaign": { + "type": [ + "null", + "string" + ] + }, + "engagements_last_meeting_booked_medium": { + "type": [ + "null", + "string" + ] + }, + "engagements_last_meeting_booked_source": { + "type": [ + "null", + "string" + ] + }, + "fax": { + "type": [ + "null", + "string" + ] + }, + "field_of_study": { + "type": [ + "null", + "string" + ] + }, + "firstname": { + "type": [ + "null", + "string" + ] + }, + "first_conversion_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "first_conversion_event_name": { + "type": [ + "null", + "string" + ] + }, + "first_deal_created_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "gender": { + "type": [ + "null", + "string" + ] + }, + "graduation_date": { + "type": [ + "null", + "string" + ] + }, + "hubspotscore": { + "type": [ + "null", + "number" + ] + }, + "hubspot_owner_assigneddate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "hubspot_owner_id": { + "type": [ + "null", + "string" + ] + }, + "hubspot_team_id": { + "type": [ + "null", + "string" + ] + }, + "industry": { + "type": [ + "null", + "string" + ] + }, + "ip_city": { + "type": [ + "null", + "string" + ] + }, + "ip_country": { + "type": [ + "null", + "string" + ] + }, + "ip_country_code": { + "type": [ + "null", + "string" + ] + }, + "ip_latlon": { + "type": [ + "null", + "string" + ] + }, + "ip_state": { + "type": [ + "null", + "string" + ] + }, + "ip_state_code": { + "type": [ + "null", + "string" + ] + }, + "ip_zipcode": { + "type": [ + "null", + "string" + ] + }, + "jobtitle": { + "type": [ + "null", + "string" + ] + }, + "job_function": { + "type": [ + "null", + "string" + ] + }, + "lastmodifieddate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "lastname": { + "type": [ + "null", + "string" + ] + }, + "lifecyclestage": { + "type": [ + "null", + "string" + ] + }, + "marital_status": { + "type": [ + "null", + "string" + ] + }, + "message": { + "type": [ + "null", + "string" + ] + }, + "military_status": { + "type": [ + "null", + "string" + ] + }, + "mobilephone": { + "type": [ + "null", + "string" + ] + }, + "notes_last_contacted": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "notes_last_updated": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "notes_next_activity_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "numemployees": { + "type": [ + "null", + "string" + ] + }, + "num_associated_deals": { + "type": [ + "null", + "number" + ] + }, + "num_contacted_notes": { + "type": [ + "null", + "number" + ] + }, + "num_conversion_events": { + "type": [ + "null", + "number" + ] + }, + "num_notes": { + "type": [ + "null", + "number" + ] + }, + "num_unique_conversion_events": { + "type": [ + "null", + "number" + ] + }, + "phone": { + "type": [ + "null", + "string" + ] + }, + "recent_conversion_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "recent_conversion_event_name": { + "type": [ + "null", + "string" + ] + }, + "recent_deal_amount": { + "type": [ + "null", + "number" + ] + }, + "recent_deal_close_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "relationship_status": { + "type": [ + "null", + "string" + ] + }, + "salutation": { + "type": [ + "null", + "string" + ] + }, + "school": { + "type": [ + "null", + "string" + ] + }, + "seniority": { + "type": [ + "null", + "string" + ] + }, + "start_date": { + "type": [ + "null", + "string" + ] + }, + "state": { + "type": [ + "null", + "string" + ] + }, + "surveymonkeyeventlastupdated": { + "type": [ + "null", + "number" + ] + }, + "total_revenue": { + "type": [ + "null", + "number" + ] + }, + "twitterhandle": { + "type": [ + "null", + "string" + ] + }, + "webinareventlastupdated": { + "type": [ + "null", + "number" + ] + }, + "website": { + "type": [ + "null", + "string" + ] + }, + "work_email": { + "type": [ + "null", + "string" + ] + }, + "zip": { + "type": [ + "null", + "string" + ] + } + }, + "type": "object" + }, + "properties_address": { + "type": [ + "null", + "string" + ] + }, + "properties_annualrevenue": { + "type": [ + "null", + "string" + ] + }, + "properties_associatedcompanyid": { + "type": [ + "null", + "number" + ] + }, + "properties_associatedcompanylastupdated": { + "type": [ + "null", + "number" + ] + }, + "properties_city": { + "type": [ + "null", + "string" + ] + }, + "properties_closedate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_company": { + "type": [ + "null", + "string" + ] + }, + "properties_company_size": { + "type": [ + "null", + "string" + ] + }, + "properties_country": { + "type": [ + "null", + "string" + ] + }, + "properties_createdate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_currentlyinworkflow": { + "type": [ + "null", + "string" + ] + }, + "properties_date_of_birth": { + "type": [ + "null", + "string" + ] + }, + "properties_days_to_close": { + "type": [ + "null", + "number" + ] + }, + "properties_degree": { + "type": [ + "null", + "string" + ] + }, + "properties_email": { + "type": [ + "null", + "string" + ] + }, + "properties_engagements_last_meeting_booked": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_engagements_last_meeting_booked_campaign": { + "type": [ + "null", + "string" + ] + }, + "properties_engagements_last_meeting_booked_medium": { + "type": [ + "null", + "string" + ] + }, + "properties_engagements_last_meeting_booked_source": { + "type": [ + "null", + "string" + ] + }, + "properties_fax": { + "type": [ + "null", + "string" + ] + }, + "properties_field_of_study": { + "type": [ + "null", + "string" + ] + }, + "properties_firstname": { + "type": [ + "null", + "string" + ] + }, + "properties_first_conversion_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_first_conversion_event_name": { + "type": [ + "null", + "string" + ] + }, + "properties_first_deal_created_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_gender": { + "type": [ + "null", + "string" + ] + }, + "properties_graduation_date": { + "type": [ + "null", + "string" + ] + }, + "properties_hubspotscore": { + "type": [ + "null", + "number" + ] + }, + "properties_hubspot_owner_assigneddate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_hubspot_owner_id": { + "type": [ + "null", + "string" + ] + }, + "properties_hubspot_team_id": { + "type": [ + "null", + "string" + ] + }, + "properties_industry": { + "type": [ + "null", + "string" + ] + }, + "properties_ip_city": { + "type": [ + "null", + "string" + ] + }, + "properties_ip_country": { + "type": [ + "null", + "string" + ] + }, + "properties_ip_country_code": { + "type": [ + "null", + "string" + ] + }, + "properties_ip_latlon": { + "type": [ + "null", + "string" + ] + }, + "properties_ip_state": { + "type": [ + "null", + "string" + ] + }, + "properties_ip_state_code": { + "type": [ + "null", + "string" + ] + }, + "properties_ip_zipcode": { + "type": [ + "null", + "string" + ] + }, + "properties_jobtitle": { + "type": [ + "null", + "string" + ] + }, + "properties_job_function": { + "type": [ + "null", + "string" + ] + }, + "properties_lastmodifieddate": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_lastname": { + "type": [ + "null", + "string" + ] + }, + "properties_lifecyclestage": { + "type": [ + "null", + "string" + ] + }, + "properties_marital_status": { + "type": [ + "null", + "string" + ] + }, + "properties_message": { + "type": [ + "null", + "string" + ] + }, + "properties_military_status": { + "type": [ + "null", + "string" + ] + }, + "properties_mobilephone": { + "type": [ + "null", + "string" + ] + }, + "properties_notes_last_contacted": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_notes_last_updated": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_notes_next_activity_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_numemployees": { + "type": [ + "null", + "string" + ] + }, + "properties_num_associated_deals": { + "type": [ + "null", + "number" + ] + }, + "properties_num_contacted_notes": { + "type": [ + "null", + "number" + ] + }, + "properties_num_conversion_events": { + "type": [ + "null", + "number" + ] + }, + "properties_num_notes": { + "type": [ + "null", + "number" + ] + }, + "properties_num_unique_conversion_events": { + "type": [ + "null", + "number" + ] + }, + "properties_phone": { + "type": [ + "null", + "string" + ] + }, + "properties_recent_conversion_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_recent_conversion_event_name": { + "type": [ + "null", + "string" + ] + }, + "properties_recent_deal_amount": { + "type": [ + "null", + "number" + ] + }, + "properties_recent_deal_close_date": { + "format": "date-time", + "type": [ + "null", + "string" + ] + }, + "properties_relationship_status": { + "type": [ + "null", + "string" + ] + }, + "properties_salutation": { + "type": [ + "null", + "string" + ] + }, + "properties_school": { + "type": [ + "null", + "string" + ] + }, + "properties_seniority": { + "type": [ + "null", + "string" + ] + }, + "properties_start_date": { + "type": [ + "null", + "string" + ] + }, + "properties_state": { + "type": [ + "null", + "string" + ] + }, + "properties_surveymonkeyeventlastupdated": { + "type": [ + "null", + "number" + ] + }, + "properties_total_revenue": { + "type": [ + "null", + "number" + ] + }, + "properties_twitterhandle": { + "type": [ + "null", + "string" + ] + }, + "properties_webinareventlastupdated": { + "type": [ + "null", + "number" + ] + }, + "properties_website": { + "type": [ + "null", + "string" + ] + }, + "properties_work_email": { + "type": [ + "null", + "string" + ] + }, + "properties_zip": { + "type": [ + "null", + "string" + ] + }, + "updatedAt": { + "description": "Date and time when the contact was last updated.", + "format": "date-time", + "type": [ + "null", + "string" + ] + } + }, + "type": [ + "null", + "object" + ] + }, + "supported_sync_modes": [ + "full_refresh", + "incremental" + ], "source_defined_cursor": true, - "default_cursor_field": ["timestamp"] + "default_cursor_field": [ + "updatedAt" + ] }, "sync_mode": "incremental", - "cursor_field": ["timestamp"], + "cursor_field": [ + "updatedAt" + ], "destination_sync_mode": "append" } ] -} +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hubspot/sample_files/sample_config_oauth.json b/airbyte-integrations/connectors/source-hubspot/sample_files/sample_config_oauth.json index f3b7f165c557..cc9512489c80 100644 --- a/airbyte-integrations/connectors/source-hubspot/sample_files/sample_config_oauth.json +++ b/airbyte-integrations/connectors/source-hubspot/sample_files/sample_config_oauth.json @@ -5,5 +5,15 @@ "client_id": "123456789_client_id_hubspot", "client_secret": "123456789_client_secret_hubspot", "refresh_token": "123456789_some_refresh_token" - } -} + }, + "stream_filters": [ + { + "stream_name": "contacts", + "filter_value": { + "propertyName": "city", + "operator": "EQ", + "value": "Madrid" + } + } + ] +} \ No newline at end of file diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/run.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/run.py index 26f4d0abef15..6c4506405c44 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/run.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/run.py @@ -1,14 +1,45 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # - - import sys +import traceback +from datetime import datetime +from typing import List -from airbyte_cdk.entrypoint import launch +from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch +from airbyte_cdk.models import AirbyteErrorTraceMessage, AirbyteMessage, AirbyteTraceMessage, TraceType, Type from source_hubspot import SourceHubspot +def _get_source(args: List[str]): + catalog_path = AirbyteEntrypoint.extract_catalog(args) + config_path = AirbyteEntrypoint.extract_config(args) + state_path = AirbyteEntrypoint.extract_state(args) + try: + return SourceHubspot( + SourceHubspot.read_catalog(catalog_path) if catalog_path else None, + SourceHubspot.read_config(config_path) if config_path else None, + SourceHubspot.read_state(state_path) if state_path else None, + ) + except Exception as error: + print( + AirbyteMessage( + type=Type.TRACE, + trace=AirbyteTraceMessage( + type=TraceType.ERROR, + emitted_at=int(datetime.now().timestamp() * 1000), + error=AirbyteErrorTraceMessage( + message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}", + stack_trace=traceback.format_exc(), + ), + ), + ).json() + ) + return None + def run(): - source = SourceHubspot() - launch(source, sys.argv[1:]) + _args = sys.argv[1:] + source = _get_source(_args) + if source: + launch(source, _args) + diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py index 2de34fd18639..25c4053b1f42 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/source.py @@ -8,8 +8,9 @@ from itertools import chain from typing import Any, Generator, List, Mapping, Optional, Tuple, Union -from airbyte_cdk.models import FailureType +from airbyte_cdk.models import FailureType, ConfiguredAirbyteCatalog from airbyte_cdk.sources import AbstractSource +from airbyte_cdk.sources.source import TState from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.http import HttpClient from airbyte_cdk.sources.streams.http.error_handlers import ErrorResolution, HttpStatusErrorHandler, ResponseAction @@ -78,6 +79,11 @@ class SourceHubspot(AbstractSource): logger = logging.getLogger("airbyte") + def __init__(self, catalog: Optional[ConfiguredAirbyteCatalog], config: Optional[Mapping[str, Any]], state: Optional[TState], **kwargs): + self.catalog = catalog + self.state = state + self.config = config + def check_connection(self, logger: logging.Logger, config: Mapping[str, Any]) -> Tuple[bool, Optional[Any]]: """Check connection""" common_params = self.get_common_params(config=config) @@ -129,11 +135,21 @@ def get_common_params(self, config) -> Mapping[str, Any]: api = self.get_api(config=config) # Additional configuration is necessary for testing certain streams due to their specific restrictions. acceptance_test_config = config.get("acceptance_test_config", {}) - return dict(api=api, start_date=start_date, credentials=credentials, acceptance_test_config=acceptance_test_config) + + common_param = dict(api=api, start_date=start_date, credentials=credentials, acceptance_test_config=acceptance_test_config) + + stream_filters = "stream_filters" in config and config["stream_filters"] + if stream_filters: + common_param.update(stream_filters=stream_filters) + + if self.catalog: + common_param.update(catalog=self.catalog) + + return common_param def streams(self, config: Mapping[str, Any]) -> List[Stream]: - credentials = config.get("credentials", {}) - common_params = self.get_common_params(config=config) + credentials = self.config.get("credentials", {}) + common_params = self.get_common_params(config=self.config) streams = [ Campaigns(**common_params), Companies(**common_params), @@ -171,7 +187,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]: Workflows(**common_params), ] - enable_experimental_streams = "enable_experimental_streams" in config and config["enable_experimental_streams"] + enable_experimental_streams = "enable_experimental_streams" in self.config and self.config["enable_experimental_streams"] if enable_experimental_streams: streams.extend( diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml b/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml index 6f105f4d05a5..dac2d6c31167 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/spec.yaml @@ -91,6 +91,29 @@ connectionSpecification: description: If enabled then experimental streams become available for sync. type: boolean default: false + stream_filters: + title: Filters + description: Filters to apply to the data streams + type: array + items: + type: object + properties: + stream_name: + type: string + title: Stream Name + filter_value: + type: object + title: Filter expression + properties: + propertyName: + type: string + title: Property Name + operator: + type: string + title: Operator + value: + type: string + title: Value advanced_auth: auth_flow_type: oauth2.0 predicate_key: diff --git a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py index 5b7772d88346..64d010f2f794 100644 --- a/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py +++ b/airbyte-integrations/connectors/source-hubspot/source_hubspot/streams.py @@ -386,12 +386,16 @@ def __init__( api: API, start_date: Union[str, pendulum.datetime], credentials: Mapping[str, Any] = None, + stream_filters: Mapping[str, Any] = None, + catalog: Mapping[str, Any] = None, acceptance_test_config: Mapping[str, Any] = None, **kwargs, ): super().__init__(**kwargs) self._api: API = api self._credentials = credentials + self._stream_filter = None + self.catalog = None self._start_date = start_date if isinstance(self._start_date, str): @@ -409,6 +413,14 @@ def __init__( self._is_test = self.name in acceptance_test_config self._acceptance_test_config = acceptance_test_config.get(self.name, {}) + # Filter for records + if stream_filters: + for filter in stream_filters: + if filter["stream_name"] == self.name: + self._stream_filter = filter["filter_value"] + if catalog: + self.catalog = catalog + def should_retry(self, response: requests.Response) -> bool: if response.status_code == HTTPStatus.UNAUTHORIZED: message = response.json().get("message") @@ -818,10 +830,16 @@ def properties(self) -> Mapping[str, Any]: f"to be able to fetch all properties available." ) return props - data, response = self._api.get(f"/properties/v2/{self.entity}/properties") - for row in data: - props[row["name"]] = self._get_field_props(row["type"]) + if self.catalog: + for catalog_stream in self.catalog.streams: + if self.name == catalog_stream.stream.name and catalog_stream.stream.json_schema.get("properties", {}): + #properties are nested field + props=catalog_stream.stream.json_schema.get("properties").get("properties").get('properties') + else: + data, response = self._api.get(f"/properties/v2/{self.entity}/properties") + for row in data: + props[row["name"]] = self._get_field_props(row["type"]) return props def properties_scope_is_granted(self): @@ -1101,7 +1119,7 @@ class CRMSearchStream(IncrementalStream, ABC): @property def url(self): object_type_id = self.fully_qualified_name or self.entity - return f"/crm/v3/objects/{object_type_id}/search" if self.state else f"/crm/v3/objects/{object_type_id}" + return f"/crm/v3/objects/{object_type_id}/search" def __init__( self, @@ -1136,11 +1154,24 @@ def _process_search( "filters": [{"value": int(self._state.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}], "sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}], "properties": properties_list, - "limit": 100, + "limit": 200, } if self.state - else {} + else { + "filters": [{"value": int(self._start_date.timestamp() * 1000), "propertyName": self.last_modified_field, "operator": "GTE"}], + "sorts": [{"propertyName": self.last_modified_field, "direction": "ASCENDING"}], + "properties": properties_list, + "limit": 200, + } ) + if self._stream_filter: + if "propertyName" in self._stream_filter and "operator" in self._stream_filter and "value" in self._stream_filter: + payload["filters"].append({ + "propertyName": self._stream_filter["propertyName"], + "operator": self._stream_filter["operator"], + "value": self._stream_filter["value"], + }) + if next_page_token: payload.update(next_page_token["payload"]) @@ -1181,21 +1212,13 @@ def read_records( latest_cursor = None while not pagination_complete: - if self.state: - records, raw_response = self._process_search( - next_page_token=next_page_token, - stream_state=stream_state, - stream_slice=stream_slice, - ) - if self.associations: - records = self._read_associations(records) - else: - records, raw_response = self._read_stream_records( - stream_slice=stream_slice, - stream_state=stream_state, - next_page_token=next_page_token, + records, raw_response = self._process_search( + next_page_token=next_page_token, + stream_state=stream_state, + stream_slice=stream_slice, ) - records = self._flat_associations(records) + if self.associations: + records = self._read_associations(records) records = self._filter_old_records(records) records = self.record_unnester.unnest(records) diff --git a/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py b/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py index c64d689bde9e..00e184784d63 100644 --- a/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py +++ b/airbyte-integrations/connectors/source-hubspot/unit_tests/conftest.py @@ -64,6 +64,16 @@ def config_eperimantal_fixture(): } +@pytest.fixture(name="config_with_filters") +def config_with_filters_fixture(): + return { + "start_date": "2021-01-10T00:00:00Z", + "credentials": {"credentials_title": "Private App Credentials", "access_token": "test_access_token"}, + "enable_experimental_streams": True, + "stream_filters": [{"stream_name": "contacts", "filter_value": {"propertyName": "city","operator": "EQ","value": "Madrid"}}], + } + + @pytest.fixture(name="config_invalid_date") def config_invalid_date_fixture(): return {