From 15a93d9ffcc8885af2bda5788933751d0e36648e Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Mon, 17 Oct 2022 22:02:38 -0300 Subject: [PATCH 01/11] Add logging wrapper (from config import logging) + remove logging handler --- megalista_dataflow/config/logging.py | 136 ++++++++++++++++-- .../data_sources/base_data_source.py | 9 +- .../big_query/big_query_data_source.py | 2 +- .../data_sources/data_schemas.py | 1 - .../data_sources/file/file_data_source.py | 2 +- .../data_sources/file/file_provider.py | 2 +- megalista_dataflow/error/error_handling.py | 2 +- megalista_dataflow/main.py | 2 +- .../abstract_list_pii_hashing_mapper.py | 2 +- .../ads_user_list_pii_hashing_mapper.py | 2 +- .../batches_grouped_by_source_mapper.py | 2 +- .../dv_user_list_pii_hashing_mapper.py | 2 +- .../executions_grouped_by_source_mapper.py | 2 +- megalista_dataflow/models/execution.py | 1 - .../sources/batches_from_executions.py | 1 - .../sources/firestore_execution_source.py | 2 +- .../sources/json_execution_source.py | 2 +- .../sources/primary_execution_source.py | 2 +- .../sources/spreadsheet_execution_source.py | 2 +- megalista_dataflow/steps/last_step.py | 4 +- .../appsflyer/appsflyer_s2s_uploader_async.py | 2 +- .../campaign_manager_conversion_uploader.py | 2 +- .../customer_match/abstract_uploader.py | 2 +- .../customer_match/contact_info_uploader.py | 2 +- .../customer_match/mobile_uploader.py | 1 - ...ads_enhanced_conversions_leads_uploader.py | 2 +- ..._ads_offline_conversions_calls_uploader.py | 2 +- ...google_ads_offline_conversions_uploader.py | 2 +- .../conversions/google_ads_ssd_uploader.py | 2 +- .../customer_match/abstract_uploader.py | 2 +- .../customer_match/contact_info_uploader.py | 1 - .../customer_match/mobile_uploader.py | 1 - ...google_analytics_4_measurement_protocol.py | 2 +- .../google_analytics_data_import_eraser.py | 2 +- .../google_analytics_data_import_uploader.py | 2 +- .../google_analytics_measurement_protocol.py | 2 +- .../google_analytics_user_list_uploader.py | 2 +- .../transactional_events_results_writer.py | 2 +- megalista_dataflow/uploaders/utils.py | 2 +- megalista_dataflow/uploaders/utils_test.py | 2 +- 40 files changed, 167 insertions(+), 50 deletions(-) diff --git a/megalista_dataflow/config/logging.py b/megalista_dataflow/config/logging.py index ac2bda2c..202082d7 100644 --- a/megalista_dataflow/config/logging.py +++ b/megalista_dataflow/config/logging.py @@ -13,16 +13,18 @@ # limitations under the License. import logging -import sys -from error.logging_handler import LoggingHandler +from optparse import Option +import sys, io, os, traceback +from types import FrameType +from typing import Optional, Tuple, List, Any + +from models.execution import Execution class LoggingConfig: @staticmethod def config_logging(show_lines: bool = False): # If there is a FileHandler, the execution is running on Dataflow # In this scenario, we shouldn't change the formatter - logging_handler = LoggingHandler() - logging.getLogger().addHandler(logging_handler) file_handler = LoggingConfig.get_file_handler() if file_handler is None: log_format = "[%(levelname)s] %(name)s: %(message)s" @@ -36,9 +38,7 @@ def config_logging(show_lines: bool = False): stream_handler = logging.StreamHandler(stream=sys.stderr) logging.getLogger().addHandler(stream_handler) stream_handler.setFormatter(formatter) - - logging_handler.setFormatter(formatter) - + logging.getLogger().setLevel(logging.ERROR) logging.getLogger("megalista").setLevel(logging.INFO) @@ -50,10 +50,9 @@ def get_stream_handler(): def get_file_handler(): return LoggingConfig.get_handler(logging.FileHandler) - @staticmethod def get_logging_handler(): - return LoggingConfig.get_handler(LoggingHandler) + return None @staticmethod def get_handler(type: type): @@ -63,4 +62,121 @@ def get_handler(type: type): result_handler = handler break - return result_handler \ No newline at end of file + return result_handler + +class _LogWrapper: + def __init__(self, name: Optional[str]): + self._name = str(name) + self._logger = logging.getLogger(name) + + def debug(self, msg: str, *args, **kwargs): + self.log(msg, logging.DEBUG, *args, **kwargs) + + def info(self, msg: str, *args, **kwargs): + self.log(msg, logging.INFO, *args, **kwargs) + + def warning(self, msg: str, *args, **kwargs): + self.log(msg, logging.WARNING, *args, **kwargs) + + def error(self, msg: str, *args, **kwargs): + self.log(msg, logging.ERROR, *args, **kwargs) + + def critical(self, msg: str, *args, **kwargs): + self.log(msg, logging.CRITICAL, *args, **kwargs) + + def exception(self, msg: str, *args, **kwargs): + self.log(msg, logging.CRITICAL, *args, **kwargs) + + def log(self, msg: str, level: int, *args, **kwargs): + stacklevel = self._get_stacklevel(**kwargs) + msg = self._get_msg_execution(msg, **kwargs) + msg = self._get_msg_context(msg, **kwargs) + if level >= logging.ERROR: + _add_error(self._name, msg, stacklevel, level, args) + keys_to_remove = ['execution', 'context'] + for key in keys_to_remove: + if key in kwargs: + del kwargs[key] + self._logger.log(level, msg, *args, **self._change_stacklevel(**kwargs)) + + def _change_stacklevel(self, **kwargs): + stacklevel = self._get_stacklevel(**kwargs) + return dict(kwargs, stacklevel = stacklevel) + + def _get_stacklevel(self, **kwargs): + dict_kwargs = dict(kwargs) + stacklevel = 3 + if 'stacklevel' in dict_kwargs: + stacklevel = 2 + dict_kwargs['stacklevel'] + return stacklevel + + def _get_msg_context(self, msg: str, **kwargs): + if 'context' in kwargs: + context = kwargs['context'] + msg = f'[Context: {context}] {msg}' + return msg + + def _get_msg_execution(self, msg: str, **kwargs): + if 'execution' in kwargs: + execution: Execution = kwargs['execution'] + msg = f'[Execution: {execution.source.source_name} -> {execution.destination.destination_name}] {msg}' + return msg + +def getLogger(name: Optional[str] = None): + return get_logger(name) + +def get_logger(name: Optional[str] = None): + return _LogWrapper(name) + +_error_list: List[logging.LogRecord] = [] + +def _add_error(name: str, msg: str, stacklevel: int, level: int, args): + fn, lno, func, sinfo = _get_stack_trace(stacklevel) + _error_list.append(logging.LogRecord(name, level, fn, lno, msg, args, None, func, sinfo)) + +def _get_stack_trace(stacklevel: int, stack_info: bool = True): + # from python logging module + f: Optional[FrameType] = sys._getframe(3) + if f is not None: + f = f.f_back + orig_f = f + while f and stacklevel > 1: + f = f.f_back + stacklevel -= 1 + if not f: + f = orig_f + rv: Tuple[str, int, str, Optional[str]]= ("(unknown file)", 0, "(unknown function)", None) + if f is not None and hasattr(f, "f_code"): + co = f.f_code + sinfo = None + if stack_info: + sio = io.StringIO() + sio.write('Stack (most recent call last):\n') + traceback.print_stack(f, file=sio) + sinfo = sio.getvalue() + if sinfo[-1] == '\n': + sinfo = sinfo[:-1] + sio.close() + rv = (co.co_filename, f.f_lineno, co.co_name, sinfo) + return rv + +def has_errors() -> bool: + return len(_error_list) > 0 + +def error_list() -> List[logging.LogRecord]: + return _error_list + +def get_formatted_error_list() -> Optional[str]: + records = _error_list + if records is not None and len(records) > 0: + message = '' + for i in range(len(records)): + rec = records[i] + message += f'{i+1}. {rec.msg}\n... in {rec.pathname}:{rec.lineno}\n' + return message + else: + return None + +def null_filter(el: Any) -> Any: + get_logger('megalista.LOG').info(f'Logging: {el}') + return el \ No newline at end of file diff --git a/megalista_dataflow/data_sources/base_data_source.py b/megalista_dataflow/data_sources/base_data_source.py index 382fa24e..dbe05397 100644 --- a/megalista_dataflow/data_sources/base_data_source.py +++ b/megalista_dataflow/data_sources/base_data_source.py @@ -30,4 +30,11 @@ def retrieve_data(self, executions: ExecutionsGroupedBySource) -> List[DataRowsG raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).") def write_transactional_info(self, rows, execution): - raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).") \ No newline at end of file + raise NotImplementedError("Source Type not implemented. Please check your configuration (sheet / json / firestore).") + + @staticmethod + def _convert_row_to_dict(row): + dict = {} + for key, value in row.items(): + dict[key] = value + return dict \ No newline at end of file diff --git a/megalista_dataflow/data_sources/big_query/big_query_data_source.py b/megalista_dataflow/data_sources/big_query/big_query_data_source.py index 8ba20cd7..aecac254 100644 --- a/megalista_dataflow/data_sources/big_query/big_query_data_source.py +++ b/megalista_dataflow/data_sources/big_query/big_query_data_source.py @@ -16,7 +16,7 @@ from string import Template import apache_beam as beam -import logging +from config import logging from google.cloud import bigquery from google.cloud.bigquery import SchemaField, Client from apache_beam.io.gcp.bigquery import ReadFromBigQueryRequest diff --git a/megalista_dataflow/data_sources/data_schemas.py b/megalista_dataflow/data_sources/data_schemas.py index a96a21a5..6f7bf04e 100644 --- a/megalista_dataflow/data_sources/data_schemas.py +++ b/megalista_dataflow/data_sources/data_schemas.py @@ -16,7 +16,6 @@ from configparser import MissingSectionHeaderError from typing import List, Dict, Any from models.execution import Destination, DestinationType, Execution, Batch -import logging import functools import pandas as pd import ast diff --git a/megalista_dataflow/data_sources/file/file_data_source.py b/megalista_dataflow/data_sources/file/file_data_source.py index cdc465d7..931a4224 100644 --- a/megalista_dataflow/data_sources/file/file_data_source.py +++ b/megalista_dataflow/data_sources/file/file_data_source.py @@ -25,7 +25,7 @@ from apache_beam.typehints.decorators import with_output_types import numpy as np -import logging +from config import logging from models.execution import SourceType, DestinationType, Execution, Batch, TransactionalType, ExecutionsGroupedBySource, DataRowsGroupedBySource from models.options import DataflowOptions diff --git a/megalista_dataflow/data_sources/file/file_provider.py b/megalista_dataflow/data_sources/file/file_provider.py index 2934bb35..f903ac53 100644 --- a/megalista_dataflow/data_sources/file/file_provider.py +++ b/megalista_dataflow/data_sources/file/file_provider.py @@ -20,7 +20,7 @@ """ import io -import logging +from config import logging from os.path import exists from urllib.parse import ParseResultBytes diff --git a/megalista_dataflow/error/error_handling.py b/megalista_dataflow/error/error_handling.py index 8e962c19..a26bbb93 100644 --- a/megalista_dataflow/error/error_handling.py +++ b/megalista_dataflow/error/error_handling.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import base64 -import logging +from config import logging from email.mime.text import MIMEText from typing import Iterable, Optional, Dict diff --git a/megalista_dataflow/main.py b/megalista_dataflow/main.py index ceaf4e7c..493a9ef4 100644 --- a/megalista_dataflow/main.py +++ b/megalista_dataflow/main.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import warnings import apache_beam as beam @@ -23,6 +22,7 @@ from error.error_handling import GmailNotifier from config.logging import LoggingConfig +from config import logging from models.execution import DataRowsGroupedBySource, Execution, ExecutionsGroupedBySource from sources.batches_from_executions import ExecutionsGroupedBySourceCoder, DataRowsGroupedBySourceCoder, ExecutionCoder diff --git a/megalista_dataflow/mappers/abstract_list_pii_hashing_mapper.py b/megalista_dataflow/mappers/abstract_list_pii_hashing_mapper.py index 9701ae61..50810e21 100644 --- a/megalista_dataflow/mappers/abstract_list_pii_hashing_mapper.py +++ b/megalista_dataflow/mappers/abstract_list_pii_hashing_mapper.py @@ -13,7 +13,7 @@ # limitations under the License. import hashlib -import logging +from config import logging import re from models.execution import Batch diff --git a/megalista_dataflow/mappers/ads_user_list_pii_hashing_mapper.py b/megalista_dataflow/mappers/ads_user_list_pii_hashing_mapper.py index 0e7375ac..50100d7a 100644 --- a/megalista_dataflow/mappers/ads_user_list_pii_hashing_mapper.py +++ b/megalista_dataflow/mappers/ads_user_list_pii_hashing_mapper.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from models.execution import Batch from mappers.abstract_list_pii_hashing_mapper import ListPIIHashingMapper diff --git a/megalista_dataflow/mappers/batches_grouped_by_source_mapper.py b/megalista_dataflow/mappers/batches_grouped_by_source_mapper.py index 05551df1..95f3625e 100644 --- a/megalista_dataflow/mappers/batches_grouped_by_source_mapper.py +++ b/megalista_dataflow/mappers/batches_grouped_by_source_mapper.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from models.execution import Batch from mappers.abstract_list_pii_hashing_mapper import ListPIIHashingMapper diff --git a/megalista_dataflow/mappers/dv_user_list_pii_hashing_mapper.py b/megalista_dataflow/mappers/dv_user_list_pii_hashing_mapper.py index 0de6266e..0aa8ee95 100644 --- a/megalista_dataflow/mappers/dv_user_list_pii_hashing_mapper.py +++ b/megalista_dataflow/mappers/dv_user_list_pii_hashing_mapper.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from models.execution import Batch from mappers.abstract_list_pii_hashing_mapper import ListPIIHashingMapper diff --git a/megalista_dataflow/mappers/executions_grouped_by_source_mapper.py b/megalista_dataflow/mappers/executions_grouped_by_source_mapper.py index c86cb994..2ea2a79f 100644 --- a/megalista_dataflow/mappers/executions_grouped_by_source_mapper.py +++ b/megalista_dataflow/mappers/executions_grouped_by_source_mapper.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from models.execution import Batch from mappers.abstract_list_pii_hashing_mapper import ListPIIHashingMapper diff --git a/megalista_dataflow/models/execution.py b/megalista_dataflow/models/execution.py index acdc986e..9672df31 100644 --- a/megalista_dataflow/models/execution.py +++ b/megalista_dataflow/models/execution.py @@ -14,7 +14,6 @@ from enum import Enum from typing import Dict, List, Union, Any -import logging from apache_beam.typehints.decorators import with_output_types diff --git a/megalista_dataflow/sources/batches_from_executions.py b/megalista_dataflow/sources/batches_from_executions.py index 983777f5..b9f4c889 100644 --- a/megalista_dataflow/sources/batches_from_executions.py +++ b/megalista_dataflow/sources/batches_from_executions.py @@ -14,7 +14,6 @@ from enum import Enum import apache_beam as beam -import logging import json import functools diff --git a/megalista_dataflow/sources/firestore_execution_source.py b/megalista_dataflow/sources/firestore_execution_source.py index 068e3949..4ddb6ef4 100644 --- a/megalista_dataflow/sources/firestore_execution_source.py +++ b/megalista_dataflow/sources/firestore_execution_source.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import distutils.util -import logging +from config import logging from apache_beam.options.value_provider import ValueProvider diff --git a/megalista_dataflow/sources/json_execution_source.py b/megalista_dataflow/sources/json_execution_source.py index e322c97b..13aab5f9 100644 --- a/megalista_dataflow/sources/json_execution_source.py +++ b/megalista_dataflow/sources/json_execution_source.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from apache_beam.options.value_provider import ValueProvider diff --git a/megalista_dataflow/sources/primary_execution_source.py b/megalista_dataflow/sources/primary_execution_source.py index ef140324..6443193b 100644 --- a/megalista_dataflow/sources/primary_execution_source.py +++ b/megalista_dataflow/sources/primary_execution_source.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from apache_beam.options.value_provider import ValueProvider diff --git a/megalista_dataflow/sources/spreadsheet_execution_source.py b/megalista_dataflow/sources/spreadsheet_execution_source.py index 5b485955..2ee7fb6c 100644 --- a/megalista_dataflow/sources/spreadsheet_execution_source.py +++ b/megalista_dataflow/sources/spreadsheet_execution_source.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import distutils.util -import logging +from config import logging from apache_beam.options.value_provider import ValueProvider diff --git a/megalista_dataflow/steps/last_step.py b/megalista_dataflow/steps/last_step.py index c572d1b5..8f20efb8 100644 --- a/megalista_dataflow/steps/last_step.py +++ b/megalista_dataflow/steps/last_step.py @@ -14,7 +14,7 @@ from distutils.log import Log import apache_beam as beam -import logging +from config import logging from error.logging_handler import LoggingHandler from models.execution import Execution from .megalista_step import MegalistaStep @@ -56,7 +56,7 @@ def process(self, executions): logging_handler = LoggingConfig.get_logging_handler() if logging_handler is None: - logging.getLogger("megalista").info(f"Clould not find error interception handler. Skipping error intereception.") + logging.getLogger("megalista").info(f"Could not find error interception handler. Skipping error intereception.") else: if logging_handler.has_errors: logging.getLogger("megalista.LOG").error(f"SUMMARY OF ERRORS:\n{LoggingHandler.format_records(logging_handler.error_records)}") diff --git a/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py b/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py index 5bab489f..a954fa3f 100644 --- a/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py +++ b/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py @@ -13,7 +13,7 @@ # limitations under the License. import asyncio -import logging +from config import logging import time from datetime import datetime from typing import Any, List, Optional diff --git a/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py b/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py index a1527c5c..24f2e5aa 100644 --- a/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py +++ b/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py @@ -12,7 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging import math import time diff --git a/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py b/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py index 22e7b376..1a401dd3 100644 --- a/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py +++ b/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from typing import Dict, Any, List, Optional from apache_beam.options.value_provider import StaticValueProvider from google.oauth2.credentials import Credentials diff --git a/megalista_dataflow/uploaders/display_video/customer_match/contact_info_uploader.py b/megalista_dataflow/uploaders/display_video/customer_match/contact_info_uploader.py index 1f76638d..c81b1059 100644 --- a/megalista_dataflow/uploaders/display_video/customer_match/contact_info_uploader.py +++ b/megalista_dataflow/uploaders/display_video/customer_match/contact_info_uploader.py @@ -13,7 +13,7 @@ # limitations under the License. import apache_beam as beam -import logging +from config import logging from typing import Dict, Any, List diff --git a/megalista_dataflow/uploaders/display_video/customer_match/mobile_uploader.py b/megalista_dataflow/uploaders/display_video/customer_match/mobile_uploader.py index e42dd44e..6a555d29 100644 --- a/megalista_dataflow/uploaders/display_video/customer_match/mobile_uploader.py +++ b/megalista_dataflow/uploaders/display_video/customer_match/mobile_uploader.py @@ -14,7 +14,6 @@ from functools import reduce import apache_beam as beam -import logging from typing import List, Dict, Any diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py index 0d6492ff..10c7ac17 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from apache_beam.options.value_provider import ValueProvider diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py index 0c8dd8fe..3a131a23 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from apache_beam.options.value_provider import ValueProvider diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py index c05e509e..5001ddf1 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from apache_beam.options.value_provider import ValueProvider diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py index 66251c8f..9dc6d666 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py @@ -13,7 +13,7 @@ # limitations under the License. import datetime -import logging +from config import logging from error.error_handling import ErrorHandler from models.execution import AccountConfig, Batch, Destination, Execution diff --git a/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py b/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py index d8cf080d..beca8711 100644 --- a/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from typing import Dict, Any, List, Optional, Union from apache_beam.options.value_provider import StaticValueProvider diff --git a/megalista_dataflow/uploaders/google_ads/customer_match/contact_info_uploader.py b/megalista_dataflow/uploaders/google_ads/customer_match/contact_info_uploader.py index 939033ed..11c3589b 100644 --- a/megalista_dataflow/uploaders/google_ads/customer_match/contact_info_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/customer_match/contact_info_uploader.py @@ -13,7 +13,6 @@ # limitations under the License. import apache_beam as beam -import logging from typing import Dict, Any, List diff --git a/megalista_dataflow/uploaders/google_ads/customer_match/mobile_uploader.py b/megalista_dataflow/uploaders/google_ads/customer_match/mobile_uploader.py index e1600fc1..4b5ee9ef 100644 --- a/megalista_dataflow/uploaders/google_ads/customer_match/mobile_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/customer_match/mobile_uploader.py @@ -13,7 +13,6 @@ # limitations under the License. import apache_beam as beam -import logging from typing import List, Dict, Any diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py index 2b120bf4..3cead7d1 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py @@ -14,7 +14,7 @@ import json -import logging +from config import logging from typing import Dict, Any, Sequence import requests diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py index 288f636d..5d0fce69 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from google.oauth2.credentials import Credentials from googleapiclient.discovery import build diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py index ea32fbcd..953d143e 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py @@ -13,7 +13,7 @@ # limitations under the License. # -import logging +from config import logging from typing import List, Dict from google.oauth2.credentials import Credentials diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py index d4320413..e2e4d031 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py @@ -13,7 +13,7 @@ # limitations under the License. -import logging +from config import logging import re from typing import Dict, Any from urllib.parse import quote diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py index d5cd52a1..d476b69a 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py @@ -13,7 +13,7 @@ # limitations under the License. -import logging +from config import logging from google.oauth2.credentials import Credentials from googleapiclient.discovery import build diff --git a/megalista_dataflow/uploaders/support/transactional_events_results_writer.py b/megalista_dataflow/uploaders/support/transactional_events_results_writer.py index 02fc3aa1..b934f09d 100644 --- a/megalista_dataflow/uploaders/support/transactional_events_results_writer.py +++ b/megalista_dataflow/uploaders/support/transactional_events_results_writer.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from datetime import datetime import apache_beam as beam diff --git a/megalista_dataflow/uploaders/utils.py b/megalista_dataflow/uploaders/utils.py index 34c35d6c..3dfa3366 100644 --- a/megalista_dataflow/uploaders/utils.py +++ b/megalista_dataflow/uploaders/utils.py @@ -13,7 +13,7 @@ # limitations under the License. import datetime -import logging +from config import logging import pytz import math diff --git a/megalista_dataflow/uploaders/utils_test.py b/megalista_dataflow/uploaders/utils_test.py index cf935e85..a3738e94 100644 --- a/megalista_dataflow/uploaders/utils_test.py +++ b/megalista_dataflow/uploaders/utils_test.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging +from config import logging from error.error_handling import ErrorHandler from error.error_handling_test import MockErrorNotifier From f2df96da9ce984c85def77ba07cf27deea5c6eec Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Wed, 19 Oct 2022 13:50:00 -0300 Subject: [PATCH 02/11] Execution success counters --- megalista_dataflow/config/logging.py | 21 ++++---- .../big_query/big_query_data_source_test.py | 2 - megalista_dataflow/error/error_handling.py | 2 +- megalista_dataflow/error/logging_handler.py | 54 ------------------- .../error/logging_handler_test.py | 39 -------------- megalista_dataflow/main.py | 17 ++---- megalista_dataflow/models/execution.py | 41 ++++++++++++++ megalista_dataflow/steps/last_step.py | 25 ++++++--- .../customer_match/abstract_uploader.py | 10 ++-- ...ads_enhanced_conversions_leads_uploader.py | 2 + ..._ads_offline_conversions_calls_uploader.py | 2 + ...google_ads_offline_conversions_uploader.py | 4 +- .../conversions/google_ads_ssd_uploader.py | 2 + .../customer_match/abstract_uploader.py | 9 ++-- ...google_analytics_4_measurement_protocol.py | 7 +-- .../google_analytics_data_import_eraser.py | 15 +++--- .../google_analytics_data_import_uploader.py | 13 +++-- .../google_analytics_measurement_protocol.py | 6 ++- .../google_analytics_user_list_uploader.py | 27 ++++++---- megalista_dataflow/uploaders/utils.py | 20 +++++-- 20 files changed, 155 insertions(+), 163 deletions(-) delete mode 100644 megalista_dataflow/error/logging_handler.py delete mode 100644 megalista_dataflow/error/logging_handler_test.py diff --git a/megalista_dataflow/config/logging.py b/megalista_dataflow/config/logging.py index 202082d7..83a50752 100644 --- a/megalista_dataflow/config/logging.py +++ b/megalista_dataflow/config/logging.py @@ -13,8 +13,9 @@ # limitations under the License. import logging -from optparse import Option -import sys, io, os, traceback +import sys +import io +import traceback from types import FrameType from typing import Optional, Tuple, List, Any @@ -70,29 +71,31 @@ def __init__(self, name: Optional[str]): self._logger = logging.getLogger(name) def debug(self, msg: str, *args, **kwargs): - self.log(msg, logging.DEBUG, *args, **kwargs) + self._log(msg, logging.DEBUG, *args, **kwargs) def info(self, msg: str, *args, **kwargs): - self.log(msg, logging.INFO, *args, **kwargs) + self._log(msg, logging.INFO, *args, **kwargs) def warning(self, msg: str, *args, **kwargs): - self.log(msg, logging.WARNING, *args, **kwargs) + self._log(msg, logging.WARNING, *args, **kwargs) def error(self, msg: str, *args, **kwargs): - self.log(msg, logging.ERROR, *args, **kwargs) + self._log(msg, logging.ERROR, *args, **kwargs) def critical(self, msg: str, *args, **kwargs): - self.log(msg, logging.CRITICAL, *args, **kwargs) + self._log(msg, logging.CRITICAL, *args, **kwargs) def exception(self, msg: str, *args, **kwargs): - self.log(msg, logging.CRITICAL, *args, **kwargs) + self._log(msg, logging.CRITICAL, *args, **kwargs) - def log(self, msg: str, level: int, *args, **kwargs): + def _log(self, msg: str, level: int, *args, **kwargs): stacklevel = self._get_stacklevel(**kwargs) msg = self._get_msg_execution(msg, **kwargs) msg = self._get_msg_context(msg, **kwargs) if level >= logging.ERROR: _add_error(self._name, msg, stacklevel, level, args) + if level == logging.ERROR: + level = logging.WARNING keys_to_remove = ['execution', 'context'] for key in keys_to_remove: if key in kwargs: diff --git a/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py b/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py index 4cede48f..4e0ec36b 100644 --- a/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py +++ b/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py @@ -20,8 +20,6 @@ from models.execution import Execution from models.execution import Source from models.execution import SourceType -from models.execution import Batch -import pytest from models.execution import TransactionalType diff --git a/megalista_dataflow/error/error_handling.py b/megalista_dataflow/error/error_handling.py index a26bbb93..267e5768 100644 --- a/megalista_dataflow/error/error_handling.py +++ b/megalista_dataflow/error/error_handling.py @@ -163,7 +163,7 @@ def add_error(self, execution: Execution, error_message: str): if execution.destination.destination_type != self._destination_type: raise ValueError( - f'Received a error of destination type: {execution.destination.destination_type}' + f'Received an error of destination type: {execution.destination.destination_type}' f' but this error handler is initialized with {self._destination_type} destination type') error = Error(execution, error_message) diff --git a/megalista_dataflow/error/logging_handler.py b/megalista_dataflow/error/logging_handler.py deleted file mode 100644 index 79ab20c1..00000000 --- a/megalista_dataflow/error/logging_handler.py +++ /dev/null @@ -1,54 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -import re -from typing import Optional, List -from .error_handling import ErrorNotifier - -class LoggingHandler(logging.Handler): - def __init__(self, level=logging.INFO): - self.level = level - self.filters = [] - self.lock = None - self._has_errors:bool = False - self._records: List[logging.LogRecord] = [] - - def emit(self, record: logging.LogRecord): - if record.levelno >= logging.ERROR: - self._has_errors = True - self._records.append(record) - - @property - def has_errors(self) -> bool: - return self._has_errors - - @property - def all_records(self) -> List[logging.LogRecord]: - return self._records - - @property - def error_records(self) -> List[logging.LogRecord]: - return list(filter(lambda rec: rec.levelno >= logging.ERROR, self._records)) - - @staticmethod - def format_records(records: List[logging.LogRecord]) -> Optional[str]: - if records is not None and len(records) > 0: - message = '' - for i in range(len(records)): - rec = records[i] - message += f'{i+1}. {rec.msg}\n in {rec.pathname}:{rec.lineno}\n' - return message - else: - return None \ No newline at end of file diff --git a/megalista_dataflow/error/logging_handler_test.py b/megalista_dataflow/error/logging_handler_test.py deleted file mode 100644 index 41d38ea8..00000000 --- a/megalista_dataflow/error/logging_handler_test.py +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright 2022 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from typing import Iterable -from error.logging_handler import LoggingHandler -import logging - -import pytest -from apache_beam.options.value_provider import StaticValueProvider - -from error.error_handling import ErrorHandler, Error, GmailNotifier, ErrorNotifier -from models.execution import DestinationType, Execution, AccountConfig, Source, SourceType, Destination -from models.oauth_credentials import OAuthCredentials - -def get_log_record_info(): - return logging.LogRecord("unit_test", logging.INFO, '', 1, 'Message Info', None, None) - -def get_log_record_error(): - return logging.LogRecord("unit_test", logging.ERROR, '', 1, 'Message Error', None, None) - -# ErrorHandler tests -def test_has_errors(): - handler = LoggingHandler() - assert handler.has_errors == False - handler.emit(get_log_record_info()) - assert handler.has_errors == False - handler.emit(get_log_record_error()) - assert handler.has_errors == True - diff --git a/megalista_dataflow/main.py b/megalista_dataflow/main.py index 493a9ef4..c85f6305 100644 --- a/megalista_dataflow/main.py +++ b/megalista_dataflow/main.py @@ -90,16 +90,9 @@ def run(argv=None): if __name__ == "__main__": run() - logging_handler = LoggingConfig.get_logging_handler() - if logging_handler is None: - logging.getLogger("megalista").info( - f"MEGALISTA build {MEGALISTA_VERSION}: Clould not find error interception handler. Skipping error intereception.") - else: - if logging_handler.has_errors: - logging.getLogger("megalista").critical( - f'MEGALISTA build {MEGALISTA_VERSION}: Completed with errors') - raise SystemExit(1) - else: - logging.getLogger("megalista").info( - f"MEGALISTA build {MEGALISTA_VERSION}: Completed successfully!") + if logging.has_errors(): + logging.get_logger("megalista").critical(f'MEGALISTA build {MEGALISTA_VERSION}: Completed with errors') + raise SystemExit(1) + + logging.get_logger("megalista").info(f"MEGALISTA build {MEGALISTA_VERSION}: Completed successfully!") exit(0) diff --git a/megalista_dataflow/models/execution.py b/megalista_dataflow/models/execution.py index 9672df31..9440742a 100644 --- a/megalista_dataflow/models/execution.py +++ b/megalista_dataflow/models/execution.py @@ -260,6 +260,9 @@ def __init__( self._account_config = account_config self._source = source self._destination = destination + self._total_records = 0 + self._failed_records = 0 + self._successful_records = 0 @property def source(self) -> Source: @@ -273,6 +276,44 @@ def destination(self) -> Destination: def account_config(self) -> AccountConfig: return self._account_config + @property + def summary_of_records(self) -> dict: + return { + 'total': self.total_records, + 'successful': self.successful_records, + 'failed': self._failed_records + } + + @property + def total_records(self): + return self._total_records + + @total_records.setter + def total_records(self, total_records): + self._total_records = total_records + + @property + def successful_records(self): + return self._successful_records + + @successful_records.setter + def successful_records(self, value): + self._successful_records = value + + def add_successful_record(self, _): + self._successful_records = self._successful_records + 1 + + @property + def failed_records(self): + return self._failed_records + + @failed_records.setter + def failed_records(self, value): + self._failed_records = value + + def add_failed_record(self, _): + self._failed_records = self._failed_records + 1 + def to_dict(self): return { 'account_config': self.account_config.to_dict(), diff --git a/megalista_dataflow/steps/last_step.py b/megalista_dataflow/steps/last_step.py index 8f20efb8..dfdc11c0 100644 --- a/megalista_dataflow/steps/last_step.py +++ b/megalista_dataflow/steps/last_step.py @@ -52,11 +52,20 @@ def extract_output(self, accumulator): return accumulator class PrintResultsDoFn(beam.DoFn): - def process(self, executions): - logging_handler = LoggingConfig.get_logging_handler() - - if logging_handler is None: - logging.getLogger("megalista").info(f"Could not find error interception handler. Skipping error intereception.") - else: - if logging_handler.has_errors: - logging.getLogger("megalista.LOG").error(f"SUMMARY OF ERRORS:\n{LoggingHandler.format_records(logging_handler.error_records)}") + def process(self, executions): + executions_results = [] + execution_counter = 1 + for key in executions: + execution = executions[key] + summary_of_records = execution.summary_of_records + msg = f"{execution_counter}. {key}:\n \ + - Type: {str(execution.destination.destination_type)[16:]}\n \ + - Total records: {summary_of_records['total']}\n \ + - Successful: {summary_of_records['successful']}\n \ + - Failed: {summary_of_records['failed']}\n" + executions_results.append(msg) + execution_counter = execution_counter + 1 + summary_msg = '\n'.join(executions_results) + logging.get_logger("megalista.LOG").info(f"SUMMARY OF RESULTS:\n{summary_msg}") + if logging.has_errors(): + logging.get_logger("megalista.LOG").error(f"SUMMARY OF ERRORS:\n{logging.get_formatted_error_list()}") \ No newline at end of file diff --git a/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py b/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py index 1a401dd3..d768ce38 100644 --- a/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py +++ b/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py @@ -156,13 +156,13 @@ def get_filtered_rows(self, rows: List[Any], keys: List[str]) -> List[Dict[str, @utils.safe_process(logger=logging.getLogger(_DEFAULT_LOGGER)) def process(self, batch: Batch, **kwargs) -> None: + execution = batch.execution + if not self.active: - logging.getLogger(_DEFAULT_LOGGER).warning( - 'Skipping upload to DV, parameters not configured.') + logging.getLogger(_DEFAULT_LOGGER).error( + 'Skipping upload to DV, parameters not configured.', execution=execution) return - execution = batch.execution - self._assert_execution_is_valid(execution) # Gets advertiser_id from Metadata 0 @@ -205,6 +205,8 @@ def process(self, batch: Batch, **kwargs) -> None: body=updated_list_definition ).execute() + execution.successful_records = execution.successful_records + len(rows) + return [execution] def get_list_definition(self, account_config: AccountConfig, diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py index 10c7ac17..6227e25a 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py @@ -135,6 +135,8 @@ def _do_upload(self, oc_service, execution, conversion_resource_name, customer_i _DEFAULT_LOGGER, 'uploading enhanced conversions for leads', response) if error_message: self._add_error(execution, error_message) + + utils.update_execution_counters_ads(execution, rows, response) return response diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py index 3a131a23..87e8f3d6 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py @@ -115,6 +115,8 @@ def _do_upload(self, oc_service: Any, execution: Execution, conversion_resource_ if error_message: self._add_error(execution, error_message) + utils.update_execution_counters_ads(execution, rows, response) + return response def _get_resource_name(self, customer_id: str, name: str): diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py index 5001ddf1..6dc5346f 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_uploader.py @@ -104,7 +104,7 @@ def process(self, batch: Batch, **kwargs): return [batch_with_successful_gclids] def _do_upload(self, oc_service, execution, conversion_resource_name, customer_id, rows): - logging.getLogger(_DEFAULT_LOGGER).info(f'Uploading {len(rows)} offline conversions on {conversion_resource_name} to Google Ads.') + logging.getLogger(_DEFAULT_LOGGER).info(f'Uploading {len(rows)} offline conversions on {conversion_resource_name} to Google Ads.', execution=execution) conversions = [{ 'conversion_action': conversion_resource_name, 'conversion_date_time': utils.format_date(conversion['time']), @@ -125,6 +125,8 @@ def _do_upload(self, oc_service, execution, conversion_resource_name, customer_i if error_message: self._add_error(execution, error_message) + utils.update_execution_counters_ads(execution, rows, response) + return response def _get_resource_name(self, ads_service, customer_id: str, name: str): diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py index 9dc6d666..3b678cd7 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_ssd_uploader.py @@ -114,6 +114,8 @@ def _do_upload(self, execution, offline_user_data_job_service, customer_id, curr if error_message: self._add_error(execution, error_message) + utils.update_execution_counters_ads(execution, rows, data_insertion_response) + # 3. Runs the Job offline_user_data_job_service.run_offline_user_data_job(resource_name = job_resource_name) diff --git a/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py b/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py index beca8711..876a3e22 100644 --- a/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/customer_match/abstract_uploader.py @@ -193,12 +193,13 @@ def get_filtered_rows(self, rows: List[Any], keys: List[str]) -> List[Dict[str, @utils.safe_process(logger=logging.getLogger(_DEFAULT_LOGGER)) def process(self, batch: Batch, **kwargs): + execution = batch.execution + if not self.active: - logging.getLogger(_DEFAULT_LOGGER).warning( - 'Skipping upload to ads, parameters not configured.') + logging.getLogger(_DEFAULT_LOGGER).error( + 'Skipping upload to ads, parameters not configured.', execution=execution) return - execution = batch.execution self._assert_execution_is_valid(execution) @@ -247,6 +248,8 @@ def process(self, batch: Batch, **kwargs): if error_message: self._add_error(execution, error_message) + utils.update_execution_counters_ads(execution, batch.elements, data_insertion_response) + return [execution] def get_list_definition(self, account_config: AccountConfig, diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py index 3cead7d1..b2410414 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py @@ -24,6 +24,7 @@ from uploaders import utils from uploaders.uploaders import MegalistaUploader +_LOGGER_NAME = 'megalista.GoogleAnalytics4MeasurementProtocolUploader' class GoogleAnalytics4MeasurementProtocolUploaderDoFn(MegalistaUploader): def __init__(self, error_handler: ErrorHandler): @@ -46,7 +47,7 @@ def _exactly_one_of(a: Any, b: Any) -> bool: def _validate_param(key: str, value: Any, reserved_keys: Sequence[str]) -> bool: return key not in reserved_keys and value is not None and value != '' - @utils.safe_process(logger=logging.getLogger('megalista.GoogleAnalytics4MeasurementProtocolUploader')) + @utils.safe_process(logger=logging.getLogger(_LOGGER_NAME)) def process(self, batch: Batch, **kwargs): return self.do_process(batch) @@ -129,11 +130,11 @@ def do_process(self, batch: Batch): response = requests.post(url,data=json.dumps(payload)) if response.status_code != 204: error_message = f'Error calling GA4 MP {response.status_code}: {str(response.content)}' - logging.getLogger('megalista.GoogleAnalytics4MeasurementProtocolUploader').error(error_message) + logging.getLogger(_LOGGER_NAME).error(error_message, execution=execution) self._add_error(execution, error_message) else: accepted_elements.append(row) - logging.getLogger('megalista.GoogleAnalytics4MeasurementProtocolUploader').info( + logging.getLogger(_LOGGER_NAME).info( f'Successfully uploaded {len(accepted_elements)}/{len(batch.elements)} events.') return [Batch(execution, accepted_elements)] diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py index 5d0fce69..14b5a50b 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser.py @@ -22,6 +22,7 @@ from uploaders import utils from uploaders.uploaders import MegalistaUploader +_LOGGER_NAME = 'megalista.GoogleAnalyticsDataImportUploader' class GoogleAnalyticsDataImportEraser(MegalistaUploader): """ @@ -60,7 +61,7 @@ def _assert_all_list_names_are_present(any_execution): raise ValueError('Missing destination information. Received {}'.format(str(destination))) @utils.safe_process( - logger=logging.getLogger('megalista.GoogleAnalyticsDataImportUploader')) + logger=logging.getLogger(_LOGGER_NAME)) def process(self, batch: Batch, **kwargs): execution = batch.execution self._assert_all_list_names_are_present(execution) @@ -86,16 +87,16 @@ def process(self, batch: Batch, **kwargs): return [batch] except Exception as e: error_message = f'Error while delete GA Data Import files: {e}' - logging.getLogger("megalista.GoogleAnalyticsDataImportUploader").error(error_message) + logging.getLogger(_LOGGER_NAME).error(error_message, execution=execution) self._add_error(execution, error_message) else: error_message = f"{data_import_name} - data import not found, please configure it in Google Analytics" - logging.getLogger("megalista.GoogleAnalyticsDataImportUploader").error(error_message) + logging.getLogger(_LOGGER_NAME).error(error_message, execution=execution) self._add_error(execution, error_message) @staticmethod def _call_delete_api(analytics, data_import_name, ga_account_id, data_source_id, web_property_id): - logging.getLogger("megalista.GoogleAnalyticsDataImportUploader").info( + logging.getLogger(_LOGGER_NAME).info( "Listing files from %s - %s" % (data_import_name, data_source_id)) uploads = analytics.management().uploads().list( @@ -106,14 +107,14 @@ def _call_delete_api(analytics, data_import_name, ga_account_id, data_source_id, file_ids = [upload.get('id') for upload in uploads.get('items', [])] if len(file_ids) == 0: - logging.getLogger("megalista.GoogleAnalyticsDataImportUploader").error( + logging.getLogger(_LOGGER_NAME).error( "Data Source %s had no files to delete" % data_import_name) else: - logging.getLogger("megalista.GoogleAnalyticsDataImportUploader").info( + logging.getLogger(_LOGGER_NAME).info( "File Ids: %s" % file_ids) - logging.getLogger("megalista.GoogleAnalyticsDataImportUploader").info( + logging.getLogger(_LOGGER_NAME).info( "Deleting %s files from %s - %s" % (len(file_ids), data_import_name, data_source_id)) analytics.management().uploads().deleteUploadData( accountId=ga_account_id, diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py index 953d143e..30630074 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py @@ -25,6 +25,7 @@ from uploaders import utils from uploaders.uploaders import MegalistaUploader +_LOGGER_NAME = 'megalista.GoogleAnalyticsDataImportUploader' class GoogleAnalyticsDataImportUploaderDoFn(MegalistaUploader): """ @@ -69,7 +70,7 @@ def _assert_all_list_names_are_present(any_execution): str(destination))) @utils.safe_process( - logger=logging.getLogger('megalista.GoogleAnalyticsDataImportUploader')) + logger=logging.getLogger(_LOGGER_NAME)) def process(self, batch: Batch, **kwargs): execution = batch.execution self._assert_all_list_names_are_present(execution) @@ -106,12 +107,16 @@ def _do_upload_data(self, execution, web_property_id, data_import_name, ga_accou data_source_id, rows, web_property_id) except Exception as e: error_message = f'Error while uploading GA Data: {e}' - logging.getLogger('megalista.GoogleAnalyticsDataImportUploader').error(error_message) + logging.getLogger(_LOGGER_NAME).error(error_message, execution=execution) self._add_error(execution, error_message) + execution.failed_records = execution.failed_records + len(rows) + else: + execution.successful_records = execution.successful_records + len(rows) else: error_message = f'{data_import_name} - data import not found, please configure it in Google Analytics' - logging.getLogger('megalista.GoogleAnalyticsDataImportUploader').error(error_message) + logging.getLogger(_LOGGER_NAME).error(error_message, execution=execution) self._add_error(execution, error_message) + execution.failed_records = execution.failed_records + len(rows) @staticmethod def prepare_csv(rows): @@ -138,7 +143,7 @@ def prepare_csv(rows): def _call_upload_api(self, analytics, data_import_name, ga_account_id, data_source_id, rows, web_property_id): - logging.getLogger('megalista.GoogleAnalyticsDataImportUploader').info( + logging.getLogger(_LOGGER_NAME).info( 'Adding data to %s - %s' % (data_import_name, data_source_id)) csv = self.prepare_csv(rows) diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py index e2e4d031..e9cb0e6a 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_measurement_protocol.py @@ -90,7 +90,7 @@ def build_hit(self, batch: Batch, row: Dict[str, Any]) -> Dict[str, Any]: payload["cu"] = row.get('currency_code') # Currency code. else: error_message = f"Hit type {hit_type} is not supported." - logging.getLogger("megalista.GoogleAnalyticsMeasurementProtocolUploader").error(error_message) + logging.getLogger("megalista.GoogleAnalyticsMeasurementProtocolUploader").error(error_message, execution=batch.execution) self._add_error(batch.execution, error_message) return payload @@ -108,7 +108,9 @@ def process(self, batch: Batch, **kwargs): response = requests.post(url=self.API_URL, data=payload) if response.status_code != 200: error_message = f"Error uploading to Analytics HTTP {response.status_code}: {response.raw}" - logging.getLogger("megalista.GoogleAnalyticsMeasurementProtocolUploader").error(error_message) + logging.getLogger("megalista.GoogleAnalyticsMeasurementProtocolUploader").error(error_message, execution=batch.execution) self._add_error(batch.execution, error_message) + batch.execution.failed_records = batch.execution.failed_records + len(rows) else: + batch.execution.successful_records = batch.execution.successful_records + len(rows) return [batch] diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py index d476b69a..2519b941 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader.py @@ -24,6 +24,8 @@ from uploaders import utils from uploaders.uploaders import MegalistaUploader +_LOGGER_NAME = 'megalista.GoogleAnalyticsUserListUploader' + class GoogleAnalyticsUserListUploaderDoFn(MegalistaUploader): @@ -50,7 +52,7 @@ def _create_list_if_doesnt_exist(self, analytics, web_property_id, view_ids, lis results = list( filter(lambda x: x['name'] == list_name, lists)) if len(results) == 0: - logging.getLogger().info('%s list does not exist, creating...' % list_name) + logging.getLogger(_LOGGER_NAME).info('%s list does not exist, creating...' % list_name) response = analytics.management().remarketingAudience().insert( accountId=ga_account_id, @@ -65,10 +67,10 @@ def _create_list_if_doesnt_exist(self, analytics, web_property_id, view_ids, lis **list_definition }).execute() id = response['id'] - logging.getLogger().info('%s created with id: %s' % (list_name, id)) + logging.getLogger(_LOGGER_NAME).info('%s created with id: %s' % (list_name, id)) else: id = results[0]['id'] - logging.getLogger().info('%s found with id: %s' % (list_name, id)) + logging.getLogger(_LOGGER_NAME).info('%s found with id: %s' % (list_name, id)) return id def start_bundle(self): @@ -104,7 +106,7 @@ def _assert_all_list_names_are_present(any_execution): or not destination[5]: raise ValueError('Missing destination information. Received {}'.format(str(destination))) - @utils.safe_process(logger=logging.getLogger("megalista.GoogleAnalyticsUserListUploader")) + @utils.safe_process(logger=logging.getLogger(_LOGGER_NAME)) def process(self, batch: Batch, **kwargs): execution = batch.execution self._assert_all_list_names_are_present(execution) @@ -149,7 +151,7 @@ def _do_upload_data(self, execution, web_property_id, view_id, data_import_name, id = results[0]['id'] - logging.getLogger().info("Adding data to %s - %s" % (data_import_name, id)) + logging.getLogger(_LOGGER_NAME).info("Adding data to %s - %s" % (data_import_name, id)) body = '\n'.join([ '%s,%s' % (user_id_custom_dim, buyer_custom_dim), *['%s,%s' % (row['user_id'], row[custom_dim_field] if custom_dim_field else 'buyer') for row in rows] @@ -165,10 +167,17 @@ def _do_upload_data(self, execution, web_property_id, view_id, data_import_name, customDataSourceId=id, media_body=media).execute() except Exception as e: - error_message = f'Error while uploading GA Data: {e}' - logging.getLogger().error(error_message) - self._add_error(execution, error_message) + self._add_error(execution, f'Error uploading data: {e}') + logging.getLogger(_LOGGER_NAME).error(f'Error uploading data for :{batch.elements}', execution=execution) + logging.getLogger(_LOGGER_NAME).error(f'Exception type: {type(e).__name__}', execution=execution) + logging.getLogger(_LOGGER_NAME).error(e, exc_info=True, execution=execution) + execution.failed_records = execution.failed_records + len(rows) + else: + execution.successful_records = execution.successful_records + len(rows) else: error_message = f"{data_import_name} - data import not found, please configure it in Google Analytics" - logging.getLogger().error(error_message) self._add_error(execution, error_message) + logging.getLogger(_LOGGER_NAME).error(f'Error uploading data for :{batch.elements}', execution=execution) + logging.getLogger(_LOGGER_NAME).error(f'Exception type: {type(e).__name__}', execution=execution) + logging.getLogger(_LOGGER_NAME).error(e, exc_info=True, execution=execution) + execution.failed_records = execution.failed_records + len(rows) diff --git a/megalista_dataflow/uploaders/utils.py b/megalista_dataflow/uploaders/utils.py index 3dfa3366..9c331a4d 100644 --- a/megalista_dataflow/uploaders/utils.py +++ b/megalista_dataflow/uploaders/utils.py @@ -69,17 +69,16 @@ def inner(*args, **kwargs): self_ = args[0] batch = args[1] if not batch: - logger.warning('Skipping upload, received no elements.') + logger.error('Skipping upload, received no elements.') return logger.info(f'Uploading {len(batch.elements)} rows...') try: return func(*args, **kwargs) except BaseException as e: self_._add_error(batch.execution, f'Error uploading data: {e}') - logger.error(f'Error uploading data for :{batch.elements}') - logger.error(e, exc_info=True) - logger.exception('Error uploading data.') - + logger.error(f'Error uploading data for :{batch.elements}', execution=batch.execution) + logger.error(f'Exception type: {type(e).__name__}', execution=batch.execution) + logger.error(e, exc_info=True, execution=batch.execution) return inner return deco @@ -134,3 +133,14 @@ def print_partial_error_messages(logger_name, action, response) -> Optional[str] logging.getLogger(logger_name).debug(message) return error_message + + +def update_execution_counters_ads(execution, elements, response): + failed_records = 0 + partial_failure = getattr(response, 'partial_failure_error', None) + if partial_failure is not None: + details = getattr(partial_failure, 'details', []) + failed_records = len(details) + successful_records = len(elements) - failed_records + execution.successful_records = execution.successful_records + successful_records + execution.failed_records = execution.failed_records + failed_records \ No newline at end of file From a42ee49f92292a841ce177855bb0b209cf1c91ce Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Wed, 19 Oct 2022 14:09:11 -0300 Subject: [PATCH 03/11] Add execution info to log messages --- .../uploaders/appsflyer/appsflyer_s2s_uploader_async.py | 2 +- .../campaign_manager_conversion_uploader.py | 6 +++--- .../google_ads_enhanced_conversions_leads_uploader.py | 4 ++-- .../google_ads_offline_conversions_calls_uploader.py | 2 +- .../google_analytics_4_measurement_protocol.py | 2 +- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py b/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py index a954fa3f..30ae7a93 100644 --- a/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py +++ b/megalista_dataflow/third_party/uploaders/appsflyer/appsflyer_s2s_uploader_async.py @@ -138,6 +138,6 @@ def process(self, batch: Batch, **kwargs): if delta_sec < min_duration_sec: time.sleep(min_duration_sec - delta_sec) logging.getLogger("megalista.AppsFlyerS2SUploader").info( - f"Successfully uploaded {len(success_elements)}/{len(batch.elements)} events.") + f"Successfully uploaded {len(success_elements)}/{len(batch.elements)} events.", execution=execution) yield Batch(execution, success_elements) diff --git a/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py b/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py index 24f2e5aa..ecd28d14 100644 --- a/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py +++ b/megalista_dataflow/uploaders/campaign_manager/campaign_manager_conversion_uploader.py @@ -132,14 +132,14 @@ def _do_upload_data( 'conversions': conversions, } - logger.info(f'Conversions: \n{conversions}') + logger.info(f'Conversions: \n{conversions}', execution=execution) request = service.conversions().batchinsert( profileId=campaign_manager_profile_id, body=request_body) response = request.execute() if response['hasFailures']: - logger.error(f'Error(s) inserting conversions:\n{response}') + logger.error(f'Error(s) inserting conversions:\n{response}', execution=execution) conversions_status = response['status'] error_messages = [] @@ -149,5 +149,5 @@ def _do_upload_data( error_messages.append('[{}]: {}'.format(error['code'], error['message'])) final_error_message = 'Errors from API:\n{}'.format('\n'.join(error_messages)) - logger.error(final_error_message) + logger.error(final_error_message, execution=execution) self._add_error(execution, final_error_message) diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py index 6227e25a..cec97957 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py @@ -106,14 +106,14 @@ def process(self, batch: Batch, **kwargs): successful_users = [ user for user in response.results if user.ListFields()] logging.getLogger(_DEFAULT_LOGGER).info( - f'Sucessfully uploaded {len(successful_users)} conversions') + f'Sucessfully uploaded {len(successful_users)} conversions', execution=execution) # all uploaded results do not need to be sent again return [batch] def _do_upload(self, oc_service, execution, conversion_resource_name, customer_id, rows): logging.getLogger(_DEFAULT_LOGGER).info( - f'Uploading {len(rows)} offline conversions on {conversion_resource_name} to Google Ads.') + f'Uploading {len(rows)} offline conversions on {conversion_resource_name} to Google Ads.', execution=execution) conversions = [{ 'conversion_action': conversion_resource_name, 'conversion_date_time': utils.format_date(conversion['time']), diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py index 87e8f3d6..5514bbc9 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_offline_conversions_calls_uploader.py @@ -93,7 +93,7 @@ def process(self, batch: Batch, **kwargs): batch.elements) def _do_upload(self, oc_service: Any, execution: Execution, conversion_resource_name: str, customer_id: str, rows: List[Dict[str, Union[str, Dict[str, str]]]]): - logging.getLogger(_DEFAULT_LOGGER).info(f'Uploading {len(rows)} offline conversions (calls) on {conversion_resource_name} to Google Ads.') + logging.getLogger(_DEFAULT_LOGGER).info(f'Uploading {len(rows)} offline conversions (calls) on {conversion_resource_name} to Google Ads.', execution=execution) conversions = [{ 'conversion_action': conversion_resource_name, 'caller_id': conversion['caller_id'], diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py index b2410414..51b713db 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_4_measurement_protocol.py @@ -136,5 +136,5 @@ def do_process(self, batch: Batch): accepted_elements.append(row) logging.getLogger(_LOGGER_NAME).info( - f'Successfully uploaded {len(accepted_elements)}/{len(batch.elements)} events.') + f'Successfully uploaded {len(accepted_elements)}/{len(batch.elements)} events.', execution=execution) return [Batch(execution, accepted_elements)] From d379bee7da74b800da5932171cfdf83b7edb7a1b Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Wed, 19 Oct 2022 14:44:41 -0300 Subject: [PATCH 04/11] Fix typos --- .../data_sources/big_query/big_query_data_source_test.py | 1 + .../google_analytics/google_analytics_data_import_uploader.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py b/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py index 4e0ec36b..01d97be5 100644 --- a/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py +++ b/megalista_dataflow/data_sources/big_query/big_query_data_source_test.py @@ -13,6 +13,7 @@ # limitations under the License. import datetime +import pytest from models.execution import AccountConfig, ExecutionsGroupedBySource from models.execution import Destination diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py index 30630074..ff0e3b30 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_uploader.py @@ -109,7 +109,7 @@ def _do_upload_data(self, execution, web_property_id, data_import_name, ga_accou error_message = f'Error while uploading GA Data: {e}' logging.getLogger(_LOGGER_NAME).error(error_message, execution=execution) self._add_error(execution, error_message) - execution.failed_records = execution.failed_records + len(rows) + execution.failed_records = execution.failed_records + len(rows) else: execution.successful_records = execution.successful_records + len(rows) else: From 5a0df4f6bd6ce91b191e7f15d5bfcf29e7740310 Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Tue, 18 Oct 2022 06:59:02 -0300 Subject: [PATCH 05/11] Sample Service Account file for GCP auth inside docker (#109) Sample Service Account file for GCP auth inside docker --- deployment/docker/entrypoint.sh | 2 +- deployment/docker/service-account-file.json | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) create mode 100644 deployment/docker/service-account-file.json diff --git a/deployment/docker/entrypoint.sh b/deployment/docker/entrypoint.sh index 93c4f3ba..1b19a14c 100644 --- a/deployment/docker/entrypoint.sh +++ b/deployment/docker/entrypoint.sh @@ -30,7 +30,7 @@ do IFS=' ' done -export GOOGLE_APPLICATION_CREDENTIALS=/app/megalista_dataflow/service-account-file.json +export GOOGLE_APPLICATION_CREDENTIALS=/app/service-account-file.json echo "Activating virual environment (python)" source virtual_env/bin/activate diff --git a/deployment/docker/service-account-file.json b/deployment/docker/service-account-file.json new file mode 100644 index 00000000..a5ea1a88 --- /dev/null +++ b/deployment/docker/service-account-file.json @@ -0,0 +1,12 @@ +{ + "type": "service_account", + "project_id": "", + "private_key_id": "", + "private_key": "", + "client_email": "", + "client_id": "", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "" + } From 5647b43eb3f765a8e121eee61dfa07c1a26b6d76 Mon Sep 17 00:00:00 2001 From: mr-lopes <6962758+Mr-Lopes@users.noreply.github.com> Date: Tue, 18 Oct 2022 16:23:59 -0300 Subject: [PATCH 06/11] Updating the terrform deployment script --- .../google_ads_enhanced_conversions_leads_uploader.py | 2 +- terraform/scripts/deploy_cloud.sh | 11 ++++++++--- 2 files changed, 9 insertions(+), 4 deletions(-) diff --git a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py index cec97957..80f09df8 100644 --- a/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py +++ b/megalista_dataflow/uploaders/google_ads/conversions/google_ads_enhanced_conversions_leads_uploader.py @@ -117,7 +117,7 @@ def _do_upload(self, oc_service, execution, conversion_resource_name, customer_i conversions = [{ 'conversion_action': conversion_resource_name, 'conversion_date_time': utils.format_date(conversion['time']), - 'conversion_value': int(conversion['amount']), + 'conversion_value': float(str(conversion['amount'])), 'user_identifiers': [{k: v} for (k, v) in conversion.items() if k in ('hashed_email', 'hashed_phone_number')] } for conversion in rows] diff --git a/terraform/scripts/deploy_cloud.sh b/terraform/scripts/deploy_cloud.sh index a1d18ae7..f1613c13 100755 --- a/terraform/scripts/deploy_cloud.sh +++ b/terraform/scripts/deploy_cloud.sh @@ -14,8 +14,8 @@ # limitations under the License. -if [ $# != 3 ]; then - echo "Usage: $0 gcp_project_id bucket_name region" +if [ $# != 4 ]; then + echo "Usage: $0 gcp_project_id bucket_name region service_account_email" exit 1 fi @@ -25,7 +25,12 @@ echo "Configuration GCP project in gcloud" gcloud config set project "$1" echo "Build Dataflow metadata" python3 -m pip install --user -q -r requirements.txt -python3 -m main --runner DataflowRunner --project "$1" --gcp_project_id "$1" --temp_location "gs://$2/tmp/" --region "$3" --setup_file ./setup.py --template_location "gs://$2/templates/megalista" --num_workers 1 --autoscaling_algorithm=NONE +echo "Update commit info inside code" +sed -i "s/MEGALISTA_VERSION\s*=.*/MEGALISTA_VERSION = '$(git rev-parse HEAD)'/" ./config/version.py +python3 -m main --runner DataflowRunner --project "$1" --gcp_project_id "$1" --temp_location "gs://$2/tmp/" --region "$3" --setup_file ./setup.py --template_location "gs://$2/templates/megalista" --num_workers 1 --autoscaling_algorithm=NONE --service_account_email "$4" echo "Copy megalista_medata to bucket $2" gsutil cp megalista_metadata "gs://$2/templates/megalista_metadata" +echo "Cleanup" +sed -i "s/MEGALISTA_VERSION\s*=.*/MEGALISTA_VERSION = '\[megalista_version\]'/" ./config/version.py cd .. +echo "Finished" From 07e0334cd71e89b7b0d2b592071bde7e9edde218 Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Thu, 20 Oct 2022 10:37:36 -0300 Subject: [PATCH 07/11] Unit tests (#112) --- megalista_dataflow/models/json_config.py | 6 +- .../sources/json_execution_source.py | 2 +- .../sources/json_execution_source_test.py | 69 +++++++++++++++++++ .../steps/processing_steps_test.py | 37 ++++++++++ 4 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 megalista_dataflow/sources/json_execution_source_test.py create mode 100644 megalista_dataflow/steps/processing_steps_test.py diff --git a/megalista_dataflow/models/json_config.py b/megalista_dataflow/models/json_config.py index a24defb0..bc6396bd 100644 --- a/megalista_dataflow/models/json_config.py +++ b/megalista_dataflow/models/json_config.py @@ -22,9 +22,13 @@ class JsonConfig: def __init__(self, dataflow_options: DataflowOptions): self._dataflow_options = dataflow_options + def _get_json(self, url): + file_provider = FileProvider(url, self._dataflow_options, SourceType.FILE, "File Config (JSON)", False) + return file_provider.read().decode('utf-8') + def parse_json_from_url(self, url): fileProvider = FileProvider(url, self._dataflow_options, SourceType.FILE, "File Config (JSON)", False) - data = json.loads(fileProvider.read().decode('utf-8')) + data = json.loads(self._get_json(url)) return data def get_value(self, config_json, key): diff --git a/megalista_dataflow/sources/json_execution_source.py b/megalista_dataflow/sources/json_execution_source.py index 13aab5f9..1f94fe76 100644 --- a/megalista_dataflow/sources/json_execution_source.py +++ b/megalista_dataflow/sources/json_execution_source.py @@ -36,7 +36,7 @@ def __init__(self, json_config: JsonConfig, setup_json_url: ValueProvider): def _do_count(self): json_url = self._setup_json_url.get() json_data = self._json_config.parse_json_from_url(json_url) - return len(json_data) + return len(json_data['Connections']) def read(self, range_tracker): json_url = self._setup_json_url.get() diff --git a/megalista_dataflow/sources/json_execution_source_test.py b/megalista_dataflow/sources/json_execution_source_test.py new file mode 100644 index 00000000..c3823db0 --- /dev/null +++ b/megalista_dataflow/sources/json_execution_source_test.py @@ -0,0 +1,69 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import MagicMock +from models.options import DataflowOptions +from models.json_config import JsonConfig +from sources.json_execution_source import JsonExecutionSource +from models.execution import AccountConfig, DataRow, DataRowsGroupedBySource, SourceType, DestinationType, TransactionalType, Execution, Source, Destination, ExecutionsGroupedBySource +from apache_beam.options.value_provider import StaticValueProvider + + +from typing import List, Dict +import pytest +from pytest_mock import MockFixture + +_JSON = '{"GoogleAdsAccountId":"","GoogleAdsMCC":false,"AppId":"","GoogleAnalyticsAccountId":"","CampaignManagerAccountId":"","Sources":[{"Name":"[BQ] Contact Info","Type":"BIG_QUERY","Dataset":"megalista_data","Table":"customer_match_contact_info"},{"Name":"[BQ] Contact Info - Email","Type":"BIG_QUERY","Dataset":"megalista_data","Table":"customer_match_contact_info_email"},{"Name":"[BQ] Contact Info - Phone","Type":"BIG_QUERY","Dataset":"megalista_data","Table":"customer_match_contact_info_phone"},{"Name":"[BQ] Contact Info - Mailing Address","Type":"BIG_QUERY","Dataset":"megalista_data","Table":"customer_match_contact_info_mailing_address"},{"Name":"[BQ] Ads - Offline Conversion (click)","Type":"BIG_QUERY","Dataset":"megalista_data","Table":"ads_offline_conversion"},{"Name":"[BQ - Carga] Contact Info 10m","Type":"BIG_QUERY","Dataset":"megalista_data","Table":"customer_match_contact_info_teste_carga_1"}],"Destinations":[{"Name":"[BQ] Contact Info","Type":"ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD","Metadata":["Megalista - Testing - Contact Info","ADD","10"]},{"Name":"[BQ] Contact Info - Email","Type":"ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD","Metadata":["Megalista - Testing - Contact Info (email)","ADD","10"]},{"Name":"[BQ] Contact Info - Phone","Type":"ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD","Metadata":["Megalista - Testing - Contact Info (phone)","ADD","10"]},{"Name":"[BQ] Contact Info - Mailing Address","Type":"ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD","Metadata":["Megalista - Testing - Contact Info (mailing address)","ADD","10"]},{"Name":"[BQ] Ads - Offline Conversion (click)","Type":"ADS_OFFLINE_CONVERSION","Metadata":["Megalista - testing - Offline Conversions (click)"]},{"Name":"[BQ] DV360 Contact Info","Type":"DV_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD","Metadata":["633801967","Megalista - Testing - DV360 - Contact Info"]}],"Connections":[{"Enabled":false,"Source":"[BQ] Contact Info","Destination":"[BQ] Contact Info"},{"Enabled":false,"Source":"[BQ] Contact Info - Email","Destination":"[BQ] Contact Info - Email"},{"Enabled":false,"Source":"[BQ] Contact Info - Phone","Destination":"[BQ] Contact Info - Phone"},{"Enabled":false,"Source":"[BQ] Contact Info - Mailing Address","Destination":"[BQ] Contact Info - Mailing Address"},{"Enabled":false,"Source":"[BQ] Ads - Offline Conversion (click)","Destination":"[BQ] Ads - Offline Conversion (click)"},{"Enabled":false,"Source":"[BQ] Contact Info","Destination":"[BQ] DV360 Contact Info"}]}' + +@pytest.fixture +def json_config(mocker: MockFixture): + json_config = JsonConfig(DataflowOptions()) + return json_config + +def patch_json_config(mocker, json_config): + mocker.patch.object(json_config, '_get_json') + json_config._get_json.return_value = _JSON + + +def test_read_destinations(mocker, json_config): + patch_json_config(mocker, json_config) + json_data = json_config.parse_json_from_url('') + + destinations: Dict[str, Destination] = JsonExecutionSource._read_destination(json_config, json_data) + + key = '[BQ] Contact Info' + assert len(destinations) == 6 + assert destinations[key].destination_name == key + assert destinations[key].destination_type == DestinationType.ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD + assert destinations[key].destination_metadata == ['Megalista - Testing - Contact Info', 'ADD', '10'] + +def test_read_sources(mocker, json_config): + patch_json_config(mocker, json_config) + + json_data = json_config.parse_json_from_url('') + + sources: Dict[str, Source] = JsonExecutionSource._read_sources(json_config, json_data) + + key = '[BQ] Contact Info' + assert len(sources) == 6 + assert sources[key].source_name == key + assert sources[key].source_type == SourceType.BIG_QUERY + assert sources[key].source_metadata == ['megalista_data', 'customer_match_contact_info'] + +def test_count(mocker, json_config): + patch_json_config(mocker, json_config) + + execution_source: JsonExecutionSource = JsonExecutionSource(json_config, StaticValueProvider(str, '')) + + assert execution_source.count() == 6 \ No newline at end of file diff --git a/megalista_dataflow/steps/processing_steps_test.py b/megalista_dataflow/steps/processing_steps_test.py new file mode 100644 index 00000000..de6ec75e --- /dev/null +++ b/megalista_dataflow/steps/processing_steps_test.py @@ -0,0 +1,37 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest.mock import MagicMock +from steps.megalista_step import MegalistaStepParams +from steps.processing_steps import PROCESSING_STEPS, ProcessingStep +from third_party import THIRD_PARTY_STEPS + +from models.options import DataflowOptions +from models.json_config import JsonConfig +from sources.json_execution_source import JsonExecutionSource +from models.execution import AccountConfig, DataRow, DataRowsGroupedBySource, SourceType, DestinationType, TransactionalType, Execution, Source, Destination, ExecutionsGroupedBySource +from apache_beam.options.value_provider import StaticValueProvider + + +from typing import List, Dict +import pytest +from pytest_mock import MockFixture + +def processing_step_expand_test(): + params: MegalistaStepParams = MegalistaStepParams('', DataflowOptions(), None) + step: ProcessingStep = ProcessingStep(params) + executions: List[Execution] = list() + processing_results = step.expand(executions) + + assert len(processing_results) == len(PROCESSING_STEPS) + len(THIRD_PARTY_STEPS) \ No newline at end of file From c99400044493c0cce3df16b30716a0448cd78731 Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Thu, 20 Oct 2022 11:52:14 -0300 Subject: [PATCH 08/11] Fix BatchesFromExecutions (#111) * Fix BatchesFromExecutions: size estimation overflow + filter by destination type in process func * Explanation about estimate_size --- .../sources/batches_from_executions.py | 34 +++++++-- .../sources/batches_from_executions_test.py | 75 +++++++++++++++++++ 2 files changed, 101 insertions(+), 8 deletions(-) create mode 100644 megalista_dataflow/sources/batches_from_executions_test.py diff --git a/megalista_dataflow/sources/batches_from_executions.py b/megalista_dataflow/sources/batches_from_executions.py index b9f4c889..308f6600 100644 --- a/megalista_dataflow/sources/batches_from_executions.py +++ b/megalista_dataflow/sources/batches_from_executions.py @@ -33,12 +33,9 @@ _LOGGER_NAME = 'megalista.BatchesFromExecutions' - -def _convert_row_to_dict(row): - dict = {} - for key, value in row.items(): - dict[key] = value - return dict +# max int size. +# used for avoiding overflow when casting from str to int (underlying C code) +_INT_MAX = 2147483647 class ExecutionCoder(coders.Coder): """A custom coder for the Execution class.""" @@ -77,6 +74,23 @@ def decode(self, s): def is_deterministic(self): return True + def estimate_size(self, o): + """Estimation of P-Collection size (in bytes). + - Called from Dataflow / Apache Beam + - Estimated size had to be truncated into _INT_MAX for + avoiding overflow when casting from str to int + (in C underlying code).""" + amount_of_rows = len(o.rows) + row_size = 0 + if amount_of_rows > 0: + row_size = len(json.dumps(o.rows[0]).encode('utf-8')) + estimate = amount_of_rows * row_size + # there is an overflow error if estimated size > _INT_MAX + if estimate > _INT_MAX: + estimate = _INT_MAX + return estimate + + class BatchesFromExecutions(beam.PTransform): """ @@ -116,9 +130,13 @@ def process(self, grouped_elements): yield Batch(execution, batch, iteration) class _BreakIntoExecutions(beam.DoFn): + def __init__(self, destination_type: DestinationType): + self._destination_type = destination_type + def process(self, el): for item in el: - yield item + if item[0].destination.destination_type == self._destination_type: + yield item def __init__( self, @@ -148,6 +166,6 @@ def expand(self, executions): ) | beam.ParDo(self._ReadDataSource(self._transactional_type, self._dataflow_options, self._error_handler)) | beam.Map(lambda el: [(execution, el.rows) for execution in iter(el.executions.executions)]) - | beam.ParDo(self._BreakIntoExecutions()) + | beam.ParDo(self._BreakIntoExecutions(self._destination_type)) | beam.ParDo(self._BatchElements(self._batch_size, self._error_handler)) ) diff --git a/megalista_dataflow/sources/batches_from_executions_test.py b/megalista_dataflow/sources/batches_from_executions_test.py new file mode 100644 index 00000000..262ba58e --- /dev/null +++ b/megalista_dataflow/sources/batches_from_executions_test.py @@ -0,0 +1,75 @@ +# Copyright 2022 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sources.batches_from_executions import BatchesFromExecutions, DataRowsGroupedBySourceCoder, _INT_MAX +from models.execution import AccountConfig, DataRow, DataRowsGroupedBySource, SourceType, DestinationType, TransactionalType, Execution, Source, Destination, ExecutionsGroupedBySource + +from typing import List +import pytest + +def _get_execution() -> Execution: + return Execution( + AccountConfig('', False, '', '', ''), + Source( + 'source_name', + SourceType.BIG_QUERY, + [] + ), + Destination( + 'destination_name', + DestinationType.ADS_CUSTOMER_MATCH_CONTACT_INFO_UPLOAD, + [] + ) + ) + +@pytest.fixture +def execution() -> Execution: + return _get_execution() + +@pytest.fixture +def executions_grouped_by_source() -> ExecutionsGroupedBySource: + return ExecutionsGroupedBySource( + 'source_name', + [_get_execution()] + ) + +@pytest.fixture +def data_rows_grouped_by_source_coder() -> DataRowsGroupedBySourceCoder: + return DataRowsGroupedBySourceCoder() + +def test_data_rows_grouped_by_source_estimate_size_zero(mocker, data_rows_grouped_by_source_coder: DataRowsGroupedBySourceCoder, executions_grouped_by_source: ExecutionsGroupedBySource): + data_rows: List[DataRow] = [] + o = DataRowsGroupedBySource(executions_grouped_by_source, data_rows) + assert data_rows_grouped_by_source_coder.estimate_size(o) == 0 + +def test_data_rows_grouped_by_source_estimate_size_overflow(mocker, data_rows_grouped_by_source_coder: DataRowsGroupedBySourceCoder, executions_grouped_by_source: ExecutionsGroupedBySource): + item: DataRow = DataRow({ + 'phone': '5ecdb1fcdba73c56fc682fceb87166537e7d3990cbefcadb31ee23fe0add6322' + }) + data_rows: List[DataRow] = [item for _ in range(100000000)] + + o = DataRowsGroupedBySource(executions_grouped_by_source, data_rows) + assert data_rows_grouped_by_source_coder.estimate_size(o) == _INT_MAX + +def test_batch_elements(mocker, execution): + item: DataRow = DataRow({ + 'phone': '5ecdb1fcdba73c56fc682fceb87166537e7d3990cbefcadb31ee23fe0add6322' + }) + data_rows: List[DataRow] = [item for _ in range(11)] + batch_elements = BatchesFromExecutions._BatchElements(2, None) + grouped_elements = (execution, data_rows) + amount_of_batches = 0 + for _ in batch_elements.process(grouped_elements): + amount_of_batches = amount_of_batches + 1 + assert amount_of_batches == 6 \ No newline at end of file From 559a7075a2631056f1dd4fe0769253b26b0f8531 Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Thu, 20 Oct 2022 16:10:17 -0300 Subject: [PATCH 09/11] Fix data schema (Ads Customer Match Contact Info) --- megalista_dataflow/data_sources/data_schemas.py | 4 ++-- megalista_dataflow/steps/last_step.py | 3 --- megalista_dataflow/steps/processing_steps.py | 2 ++ megalista_dataflow/uploaders/utils.py | 6 +++--- 4 files changed, 7 insertions(+), 8 deletions(-) diff --git a/megalista_dataflow/data_sources/data_schemas.py b/megalista_dataflow/data_sources/data_schemas.py index 6f7bf04e..c9f5cd29 100644 --- a/megalista_dataflow/data_sources/data_schemas.py +++ b/megalista_dataflow/data_sources/data_schemas.py @@ -103,9 +103,9 @@ 'required': False, 'data_type': 'string'}, {'name': 'mailing_address_last_name', 'required': False, 'data_type': 'string'}, - {'name': 'mailing_address_country_name', + {'name': 'mailing_address_country', 'required': False, 'data_type': 'string'}, - {'name': 'mailing_address_zip_name', + {'name': 'mailing_address_zip', 'required': False, 'data_type': 'string'} ], 'groups': [] diff --git a/megalista_dataflow/steps/last_step.py b/megalista_dataflow/steps/last_step.py index dfdc11c0..9198bd05 100644 --- a/megalista_dataflow/steps/last_step.py +++ b/megalista_dataflow/steps/last_step.py @@ -12,13 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from distutils.log import Log import apache_beam as beam from config import logging -from error.logging_handler import LoggingHandler from models.execution import Execution from .megalista_step import MegalistaStep -from config.logging import LoggingConfig class LastStep(MegalistaStep): def expand(self, executions): diff --git a/megalista_dataflow/steps/processing_steps.py b/megalista_dataflow/steps/processing_steps.py index 4264097e..3fedf72a 100644 --- a/megalista_dataflow/steps/processing_steps.py +++ b/megalista_dataflow/steps/processing_steps.py @@ -45,6 +45,8 @@ from third_party import THIRD_PARTY_STEPS +from config import logging + ADS_CM_HASHER = AdsUserListPIIHashingMapper() DV_CM_HASHER = DVUserListPIIHashingMapper() diff --git a/megalista_dataflow/uploaders/utils.py b/megalista_dataflow/uploaders/utils.py index 9c331a4d..75270089 100644 --- a/megalista_dataflow/uploaders/utils.py +++ b/megalista_dataflow/uploaders/utils.py @@ -68,10 +68,10 @@ def deco(func): def inner(*args, **kwargs): self_ = args[0] batch = args[1] - if not batch: - logger.error('Skipping upload, received no elements.') + if not batch or len(batch.elements) == 0: + logger.error('Skipping upload, received no elements.', execution=batch.execution) return - logger.info(f'Uploading {len(batch.elements)} rows...') + logger.info(f'Uploading {len(batch.elements)} rows...', execution=batch.execution) try: return func(*args, **kwargs) except BaseException as e: From d2d6222585e59d47c486704e0d557353eaba1659 Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Thu, 20 Oct 2022 17:00:01 -0300 Subject: [PATCH 10/11] Fix unit tests --- .../google_analytics_data_import_eraser_test.py | 10 +++++----- .../google_analytics_user_list_uploader_test.py | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser_test.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser_test.py index 84daf2f1..8aa21828 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser_test.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_data_import_eraser_test.py @@ -52,11 +52,11 @@ def test_analytics_has_not_data_sources(mocker, eraser, caplog, error_notifier): Source('orig1', SourceType.BIG_QUERY, ['dt1', 'buyers']), Destination('dest1', DestinationType.GA_DATA_IMPORT, ['web_property', 'data_import_name'])) # Act - eraser.process(Batch(execution, [])) + eraser.process(Batch(execution, [{}])) eraser.finish_bundle() - assert 'data_import_name - data import not found, please configure it in Google Analytics' in caplog.text + assert '[Execution: orig1 -> dest1] data_import_name - data import not found, please configure it in Google Analytics' in caplog.text assert error_notifier.were_errors_sent @@ -75,12 +75,12 @@ def test_data_source_not_found(mocker, eraser, caplog, error_notifier): Source('orig1', SourceType.BIG_QUERY, ['dt1', 'buyers']), Destination('dest1', DestinationType.GA_DATA_IMPORT, ['web_property', 'data_import_name'])) - eraser.process(Batch(execution, [])) + eraser.process(Batch(execution, [{}])) # Act eraser.finish_bundle() - assert 'data_import_name - data import not found, please configure it in Google Analytics' in caplog.text + assert '[Execution: orig1 -> dest1] data_import_name - data import not found, please configure it in Google Analytics' in caplog.text assert error_notifier.were_errors_sent @@ -136,7 +136,7 @@ def test_files_deleted_with_success(mocker, eraser): service.management().uploads().deleteUploadData.side_effect = delete_call_mock # Act - eraser.process(Batch(execution, [])) + eraser.process(Batch(execution, [{}])) # Called once delete_call_mock.assert_called_once() diff --git a/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader_test.py b/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader_test.py index cd904590..f1fc7ea6 100644 --- a/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader_test.py +++ b/megalista_dataflow/uploaders/google_analytics/google_analytics_user_list_uploader_test.py @@ -54,7 +54,7 @@ def test_list_already_exists(mocker, uploader): Destination('dest1', DestinationType.GA_USER_LIST_UPLOAD, ['a', 'b', 'c', 'list', 'd', 'e'])) - uploader.process(Batch(execution, [])) + uploader.process(Batch(execution, [{}])) uploader._get_analytics_service().management().remarketingAudience( ).insert.assert_not_called() @@ -79,7 +79,7 @@ def test_list_creation_not_mcc(mocker, uploader): Destination( 'dest1', DestinationType.GA_USER_LIST_UPLOAD, ['web_property', 'view', 'c', 'list', 'd', 'buyers_custom_dim'])) - uploader.process(Batch(execution, [])) + uploader.process(Batch(execution, [{}])) service.management().remarketingAudience().insert.assert_any_call( accountId=ga_account_id, @@ -126,7 +126,7 @@ def test_list_creation_mcc(mocker, uploader): Destination( 'dest1', DestinationType.GA_USER_LIST_UPLOAD, ['web_property', 'view', 'c', 'list', 'd', 'buyers_custom_dim'])) - uploader.process(Batch(execution, [])) + uploader.process(Batch(execution, [{}])) service.management().remarketingAudience().insert.assert_any_call( accountId=ga_account_id, From 836635b5b2966cb6f802169b53e7f5874a97ef81 Mon Sep 17 00:00:00 2001 From: Diogo Aihara Date: Tue, 1 Nov 2022 14:13:37 -0300 Subject: [PATCH 11/11] fix --- .../uploaders/display_video/customer_match/abstract_uploader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py b/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py index e8a31b64..c631ff40 100644 --- a/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py +++ b/megalista_dataflow/uploaders/display_video/customer_match/abstract_uploader.py @@ -161,7 +161,7 @@ def process(self, batch: Batch, **kwargs) -> List[Execution]: if not self.active: logging.getLogger(_DEFAULT_LOGGER).info( 'Skipping upload to DV, parameters not configured.', execution=execution) - return + return [execution] self._assert_execution_is_valid(execution)