Skip to content

Commit

Permalink
Merge pull request data-integrations#239 from cloudsufi/oAuthInfoIssue
Browse files Browse the repository at this point in the history
Fix for oAuthInfo issue while using oAuth macro.
  • Loading branch information
vikasrathee-cs authored Feb 22, 2024
2 parents 3daf5fd + 715b68f commit 695cfb4
Show file tree
Hide file tree
Showing 8 changed files with 41 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
*/
package io.cdap.plugin.salesforce;

import io.cdap.cdap.api.plugin.PluginConfig;

import java.util.function.Function;

/**
* Constants related to Salesforce and configuration
*/
Expand Down Expand Up @@ -56,4 +60,7 @@ public class SalesforceConstants {

public static final String PROPERTY_MAX_RETRY_TIME_IN_MINS = "cdap.streaming.maxRetryTimeInMins";
public static final long DEFAULT_MAX_RETRY_TIME_IN_MINS = 360L;

public static Function<PluginConfig, Boolean> isOAuthMacroFunction = config -> config.containsMacro(
PROPERTY_OAUTH_INFO);
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@ public class SalesforceConnectorInfo {
private final OAuthInfo oAuthInfo;
private final SalesforceConnectorBaseConfig config;

public SalesforceConnectorInfo(@Nullable OAuthInfo oAuthInfo, SalesforceConnectorBaseConfig config) {
private final boolean isOAuthInfoMacro;

public SalesforceConnectorInfo(@Nullable OAuthInfo oAuthInfo, SalesforceConnectorBaseConfig config,
boolean isOAuthInfoMacro) {
this.oAuthInfo = oAuthInfo;
this.config = config;
this.isOAuthInfoMacro = isOAuthInfoMacro;
}

@Nullable
Expand Down Expand Up @@ -118,7 +122,7 @@ public boolean canAttemptToEstablishConnection() {
}

// At configurePipeline time, macro is not resolved, hence the OAuth field will be null.
if (config.containsMacro(SalesforceConstants.PROPERTY_OAUTH_INFO)) {
if (isOAuthInfoMacro) {
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,10 @@ public SalesforceConnector(SalesforceConnectorConfig config) {
@Override
public void test(ConnectorContext connectorContext) throws ValidationException {
FailureCollector collector = connectorContext.getFailureCollector();
SalesforceConnectorInfo connectorInfo = new SalesforceConnectorInfo(config.getOAuthInfo(), config);
SalesforceConnectorInfo connectorInfo =
new SalesforceConnectorInfo(config.getOAuthInfo(), config,
SalesforceConstants.isOAuthMacroFunction.apply(
config));
OAuthInfo oAuthInfo = SalesforceConnectionUtil.getOAuthInfo(connectorInfo, collector);
connectorInfo.validate(collector, oAuthInfo);
}
Expand Down Expand Up @@ -148,7 +151,10 @@ public ConnectorSpec generateSpec(ConnectorContext connectorContext, ConnectorSp
properties.put(SalesforceSourceConstants.PROPERTY_SOBJECT_NAME, tableName);
properties.put(SalesforceSinkConfig.PROPERTY_SOBJECT, tableName);
}
SalesforceConnectorInfo connectorInfo = new SalesforceConnectorInfo(config.getOAuthInfo(), config);
SalesforceConnectorInfo connectorInfo =
new SalesforceConnectorInfo(config.getOAuthInfo(), config,
SalesforceConstants.isOAuthMacroFunction.apply(
config));
AuthenticatorCredentials authenticatorCredentials = connectorInfo.getAuthenticatorCredentials();
try {
String fields = getObjectFields(tableName, authenticatorCredentials);
Expand Down Expand Up @@ -183,7 +189,10 @@ public List<StructuredRecord> sample(ConnectorContext connectorContext, SampleRe
private List<StructuredRecord> listObjectDetails(String object, int limit) throws AsyncApiException,
ConnectionException {
List<StructuredRecord> samples = new ArrayList<>();
SalesforceConnectorInfo connectorInfo = new SalesforceConnectorInfo(config.getOAuthInfo(), config);
SalesforceConnectorInfo connectorInfo =
new SalesforceConnectorInfo(config.getOAuthInfo(), config,
SalesforceConstants.isOAuthMacroFunction.apply(
config));
AuthenticatorCredentials credentials = connectorInfo.getAuthenticatorCredentials();
String fields = getObjectFields(object, credentials);
String query = String.format("SELECT %s FROM %s LIMIT %d", fields, object, limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ public SalesforceSinkConfig(String referenceName,

@Nullable
public SalesforceConnectorInfo getConnection() {
return connection == null ? null : new SalesforceConnectorInfo(oAuthInfo, connection);
return connection == null ? null : new SalesforceConnectorInfo(oAuthInfo, connection,
SalesforceConstants.isOAuthMacroFunction.apply(
this));
}

public String getSObject() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ public Map<ChronoUnit, Integer> getOffset() {

@Nullable
public SalesforceConnectorInfo getConnection() {
return connection == null ? null : new SalesforceConnectorInfo(oAuthInfo, connection);
return connection == null ? null : new SalesforceConnectorInfo(oAuthInfo, connection,
SalesforceConstants.isOAuthMacroFunction.apply(
this));
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ public SalesforceBatchSource(SalesforceSourceConfig config) {
public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();

OAuthInfo oAuthInfo = SalesforceConnectionUtil.getOAuthInfo(config.getConnection(), collector);
OAuthInfo oAuthInfo = config.containsMacro(SalesforceConstants.PROPERTY_OAUTH_INFO)
? null : SalesforceConnectionUtil.getOAuthInfo(config.getConnection(), collector);
config.validate(collector, oAuthInfo);

if (config.containsMacro(SalesforceSourceConstants.PROPERTY_SCHEMA)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,9 @@ public SalesforceStreamingSourceConfig(String referenceName,

@Nullable
public SalesforceConnectorInfo getConnection() {
return connection == null ? null : new SalesforceConnectorInfo(oAuthInfo, connection);
return connection == null ? null : new SalesforceConnectorInfo(oAuthInfo, connection,
SalesforceConstants.isOAuthMacroFunction.apply(
this));
}

public String getPushTopicName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.cdap.etl.mock.validation.MockFailureCollector;
import io.cdap.plugin.salesforce.InvalidConfigException;
import io.cdap.plugin.salesforce.SalesforceConstants;
import io.cdap.plugin.salesforce.plugin.SalesforceConnectorInfo;
import io.cdap.plugin.salesforce.plugin.connector.SalesforceConnectorConfig;
import io.cdap.plugin.salesforce.plugin.source.batch.util.SalesforceSourceConstants;
Expand Down Expand Up @@ -277,7 +278,10 @@ private void testPKChunkInvalidConfig(SalesforceSourceConfig config, String stag
Mockito.any(), Mockito.any(), Mockito.any(),
Mockito.anyString())
.thenReturn(connectorConfig);
SalesforceConnectorInfo salesforceConnectorInfo = new SalesforceConnectorInfo(null, connectorConfig);
SalesforceConnectorInfo salesforceConnectorInfo =
new SalesforceConnectorInfo(null, connectorConfig,
SalesforceConstants.isOAuthMacroFunction.apply(
connectorConfig));
Mockito.when(mock.getConnection()).thenReturn(salesforceConnectorInfo);
PowerMockito.when(salesforceConnectorInfo.canAttemptToEstablishConnection()).thenReturn(false);
ValidationFailure failure;
Expand Down

0 comments on commit 695cfb4

Please sign in to comment.