diff --git a/dbt/adapters/spark/connections.py b/dbt/adapters/spark/connections.py index df0dcb5ee..a307f57c1 100644 --- a/dbt/adapters/spark/connections.py +++ b/dbt/adapters/spark/connections.py @@ -448,8 +448,12 @@ def open(cls, connection): Connection, SessionConnectionWrapper, ) +<<<<<<< Updated upstream handle = SessionConnectionWrapper(Connection()) +======= + handle = SessionConnectionWrapper(Connection(creds.server_side_parameters)) +>>>>>>> Stashed changes else: raise dbt.exceptions.DbtProfileError( f"invalid credential method: {creds.method}" diff --git a/dbt/adapters/spark/session.py b/dbt/adapters/spark/session.py index d275c73c5..14dada715 100644 --- a/dbt/adapters/spark/session.py +++ b/dbt/adapters/spark/session.py @@ -9,6 +9,7 @@ from dbt.events import AdapterLogger from dbt.utils import DECIMALS from pyspark.sql import DataFrame, Row, SparkSession +from pyspark.sql.utils import AnalysisException logger = AdapterLogger("Spark") @@ -106,8 +107,21 @@ def execute(self, sql: str, *parameters: Any) -> None: """ if len(parameters) > 0: sql = sql % parameters +<<<<<<< Updated upstream spark_session = SparkSession.builder.enableHiveSupport().getOrCreate() self._df = spark_session.sql(sql) +======= + builder = SparkSession.builder.enableHiveSupport() + + for k, v in self.server_side_parameters.items(): + builder = builder.config(k, v) + + spark_session = builder.getOrCreate() + try: + self._df = spark_session.sql(sql) + except AnalysisException: + raise dbt.exceptions.DbtRuntimeError(str(exc)) +>>>>>>> Stashed changes def fetchall(self) -> Optional[List[Row]]: """