Skip to content

Commit

Permalink
Added hook for treating all fields as nullable
Browse files Browse the repository at this point in the history
  • Loading branch information
MrRahulSharma committed Aug 22, 2024
1 parent 735cdf6 commit f1d5b11
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 17 deletions.
20 changes: 10 additions & 10 deletions src/main/java/io/cdap/plugin/salesforce/SalesforceSchemaUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,18 +107,18 @@ public static Schema getSchema(AuthenticatorCredentials credentials, SObjectDesc
*
* @param credentials connection credentials
* @param sObjectDescriptor sObject descriptor
* @param setAllCustomFieldsNullable set all custom fields nullable by default
* @param setAllFieldsNullable set all fields nullable by default
* @return CDAP schema
* @throws ConnectionException if unable to connect to Salesforce
*/
public static Schema getSchema(AuthenticatorCredentials credentials, SObjectDescriptor sObjectDescriptor,
boolean setAllCustomFieldsNullable)
boolean setAllFieldsNullable)
throws ConnectionException {
PartnerConnection partnerConnection = SalesforceConnectionUtil.getPartnerConnection(credentials);
SObjectsDescribeResult describeResult = SObjectsDescribeResult.of(partnerConnection,
sObjectDescriptor.getName(), sObjectDescriptor.getFeaturedSObjects());

return getSchemaWithFields(sObjectDescriptor, describeResult, setAllCustomFieldsNullable);
return getSchemaWithFields(sObjectDescriptor, describeResult, setAllFieldsNullable);
}

/**
Expand Down Expand Up @@ -211,15 +211,15 @@ public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,

public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
SObjectsDescribeResult describeResult,
boolean setAllCustomFieldsNullable) {
boolean setAllFieldsNullable) {
return getSchemaWithFields(sObjectDescriptor, describeResult,
Collections.emptyList(), setAllCustomFieldsNullable);
Collections.emptyList(), setAllFieldsNullable);
}

public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
SObjectsDescribeResult describeResult,
List<String> topLevelParents,
boolean setAllCustomFieldsNullable) {
boolean setAllFieldsNullable) {
List<Schema.Field> schemaFields = new ArrayList<>();

for (SObjectDescriptor.FieldDescriptor fieldDescriptor : sObjectDescriptor.getFields()) {
Expand All @@ -238,7 +238,7 @@ public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
fieldDescriptor.getFullName(), parentsPath));
}

fieldSchema = createFieldSchema(field, fieldDescriptor.hasParents(), setAllCustomFieldsNullable);
fieldSchema = createFieldSchema(field, fieldDescriptor.hasParents(), setAllFieldsNullable);
}

Schema queryFieldSchema = functionType.getSchema(fieldSchema);
Expand Down Expand Up @@ -274,7 +274,7 @@ public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,
*/
for (SObjectDescriptor childSObject : sObjectDescriptor.getChildSObjects()) {
Schema childSchema = getSchemaWithFields(childSObject, describeResult,
Collections.singletonList(sObjectDescriptor.getName()), setAllCustomFieldsNullable);
Collections.singletonList(sObjectDescriptor.getName()), setAllFieldsNullable);

String childName = normalizeAvroName(childSObject.getName());
Schema.Field childField = Schema.Field.of(childName,
Expand All @@ -288,9 +288,9 @@ public static Schema getSchemaWithFields(SObjectDescriptor sObjectDescriptor,

// Setting all the child columns as Nullable as in child object these fields can be mandatory but its reference
// object in parent class can be null.
private static Schema createFieldSchema(Field field, boolean isChild, boolean setAllCustomFieldsNullable) {
private static Schema createFieldSchema(Field field, boolean isChild, boolean setAllFieldsNullable) {
Schema fieldSchema = SALESFORCE_TYPE_TO_CDAP_SCHEMA.getOrDefault(field.getType(), DEFAULT_SCHEMA);
return field.isNillable() || isChild || (setAllCustomFieldsNullable && field.isCustom()) ?
return field.isNillable() || isChild || setAllFieldsNullable ?
Schema.nullableOf(fieldSchema) : fieldSchema;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -203,19 +203,19 @@ public static Schema getSchema(SalesforceSourceConfig config, OAuthInfo oAuthInf
* Get Salesforce schema by query, with the option to allow null values in non-nullable custom fields
*
* @param config Salesforce Source Batch config
* @param setAllCustomFieldsNullable set all custom fields nullable by default
* @param setAllFieldsNullable set all custom fields nullable by default
* @return schema calculated from query
*/
public static Schema getSchema(SalesforceSourceConfig config, OAuthInfo oAuthInfo,
boolean setAllCustomFieldsNullable) {
boolean setAllFieldsNullable) {
String query = config.getQuery(System.currentTimeMillis(), oAuthInfo);
SObjectDescriptor sObjectDescriptor = SObjectDescriptor.fromQuery(query);
try {
AuthenticatorCredentials credentials = AuthenticatorCredentials.fromParameters(oAuthInfo,
config.getConnection().getConnectTimeout(),
config.getConnection().getReadTimeout(),
config.getConnection().getProxyUrl());
return SalesforceSchemaUtil.getSchema(credentials, sObjectDescriptor, setAllCustomFieldsNullable);
return SalesforceSchemaUtil.getSchema(credentials, sObjectDescriptor, setAllFieldsNullable);
} catch (ConnectionException e) {
String errorMessage = SalesforceConnectionUtil.getSalesforceErrorMessageFromException(e);
throw new RuntimeException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,11 @@ public void testGetSchemaWithFields() {
}

@Test
public void testSchemaWithSetAllCustomFieldsNullable() {
public void testSchemaWithSetAllFieldsNullable() {
String objectName = "CustomTable";

List<SObjectDescriptor.FieldDescriptor> fieldDescriptors = Stream
.of("Name", "Value", "CreatedDate")
.of("Name", "Value", "CreatedDate", "UpdatedDate")
.map(name -> getFieldWithType(name, FieldType.anyType, false))
.map(SObjectDescriptor.FieldDescriptor::new)
.collect(Collectors.toList());
Expand All @@ -183,19 +183,23 @@ public void testSchemaWithSetAllCustomFieldsNullable() {
Collections.singletonList("Value"), null, SalesforceFunctionType.NONE));
fieldDescriptors.add(new SObjectDescriptor.FieldDescriptor(
Collections.singletonList("CreatedDate"), null, SalesforceFunctionType.NONE));
fieldDescriptors.add(new SObjectDescriptor.FieldDescriptor(
Collections.singletonList("UpdatedDate"), null, SalesforceFunctionType.NONE));
SObjectDescriptor sObjectDescriptor = new SObjectDescriptor(objectName, fieldDescriptors, ImmutableList.of());

Map<String, Field> objectFields = new LinkedHashMap<>();
objectFields.put("Name", getCustomFieldWithType("Name", FieldType.string, false));
objectFields.put("Value", getCustomFieldWithType("Value", FieldType.currency, false));
objectFields.put("CreatedDate", getCustomFieldWithType("CreatedDate", FieldType.date, false));
objectFields.put("UpdatedDate", getCustomFieldWithType("UpdatedDate", FieldType.date, false));
SObjectsDescribeResult describeResult = SObjectsDescribeResult.of(ImmutableMap.of(objectName, objectFields));

// Testing case with flag setAllCustomFieldsNullable = true
Schema expectedSchema = Schema.recordOf("output",
Schema.Field.of("Name", Schema.nullableOf(Schema.of(Schema.Type.STRING))),
Schema.Field.of("Value", Schema.nullableOf(Schema.of(Schema.Type.DOUBLE))),
Schema.Field.of("CreatedDate", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE)))
Schema.Field.of("CreatedDate", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE))),
Schema.Field.of("UpdatedDate", Schema.nullableOf(Schema.of(Schema.LogicalType.DATE)))
);

Schema actualSchema = SalesforceSchemaUtil.getSchemaWithFields(sObjectDescriptor, describeResult, true);
Expand All @@ -206,7 +210,8 @@ public void testSchemaWithSetAllCustomFieldsNullable() {
expectedSchema = Schema.recordOf("output",
Schema.Field.of("Name", Schema.of(Schema.Type.STRING)),
Schema.Field.of("Value", Schema.of(Schema.Type.DOUBLE)),
Schema.Field.of("CreatedDate", Schema.of(Schema.LogicalType.DATE))
Schema.Field.of("CreatedDate", Schema.of(Schema.LogicalType.DATE)),
Schema.Field.of("UpdatedDate", Schema.of(Schema.LogicalType.DATE))
);

actualSchema = SalesforceSchemaUtil.getSchemaWithFields(sObjectDescriptor, describeResult, false);
Expand Down

0 comments on commit f1d5b11

Please sign in to comment.