Skip to content

Commit

Permalink
Moved SPL to group scan
Browse files Browse the repository at this point in the history
  • Loading branch information
cgivre committed Aug 5, 2024
1 parent 7600cb1 commit c959d76
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@ public class SplunkBatchReader implements ManagedReader<SchemaNegotiator> {
private static final Logger logger = LoggerFactory.getLogger(SplunkBatchReader.class);
private static final List<String> INT_COLS = new ArrayList<>(Arrays.asList("date_hour", "date_mday", "date_minute", "date_second", "date_year", "linecount"));
private static final List<String> TS_COLS = new ArrayList<>(Arrays.asList("_indextime", "_time"));
private static final String EARLIEST_TIME_COLUMN = "earliestTime";
private static final String LATEST_TIME_COLUMN = "latestTime";
private final SplunkPluginConfig config;
private final SplunkSubScan subScan;
private final List<SchemaPath> projectedColumns;
Expand Down Expand Up @@ -242,18 +240,18 @@ private String buildQueryString () {

// Splunk searches perform best when they are time bound. This allows the user to set
// default time boundaries in the config. These will be overwritten in filter pushdowns
if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString();
if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString();

// Remove from map
filters.remove(EARLIEST_TIME_COLUMN);
filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN);
}

if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) {
latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString();
if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) {
latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString();

// Remove from map so they are not pushed down into the query
filters.remove(LATEST_TIME_COLUMN);
filters.remove(SplunkUtils.LATEST_TIME_COLUMN);
}

if (earliestTime == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,70 @@ public GroupScan clone(List<SchemaPath> columns) {
return new SplunkGroupScan(this, columns);
}

/**
* Generates the query which will be sent to Splunk. This method exists for debugging purposes so
* that the actual SPL will be recorded in the query plan.
*/
private String generateQuery() {
String earliestTime = null;
String latestTime = null;

// Splunk searches perform best when they are time bound. This allows the user to set
// default time boundaries in the config. These will be overwritten in filter pushdowns
if (filters != null && filters.containsKey(SplunkUtils.EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(SplunkUtils.EARLIEST_TIME_COLUMN).value.value.toString();

// Remove from map
filters.remove(SplunkUtils.EARLIEST_TIME_COLUMN);
}

if (filters != null && filters.containsKey(SplunkUtils.LATEST_TIME_COLUMN)) {
latestTime = filters.get(SplunkUtils.LATEST_TIME_COLUMN).value.value.toString();

// Remove from map so they are not pushed down into the query
filters.remove(SplunkUtils.LATEST_TIME_COLUMN);
}

if (earliestTime == null) {
earliestTime = config.getEarliestTime();
}

if (latestTime == null) {
latestTime = config.getLatestTime();
}

// Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL"
// Index and spl filter
if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) {
if (filters != null && filters.containsKey("spl")) {
return filters.get("spl").value.value.toString();
}
}

SplunkQueryBuilder builder = new SplunkQueryBuilder(splunkScanSpec.getIndexName());

// Set the sourcetype
if (filters != null && filters.containsKey("sourcetype")) {
String sourcetype = filters.get("sourcetype").value.value.toString();
builder.addSourceType(sourcetype);
filters.remove("sourcetype");
}

// Add projected columns, skipping star and specials.
for (SchemaPath projectedColumn: columns) {
builder.addField(projectedColumn.getAsUnescapedPath());
}

// Apply filters
builder.addFilters(filters);

// Apply limits
if (maxRecords > 0) {
builder.addLimit(maxRecords);
}
return builder.build();
}

@Override
public int hashCode() {

Expand Down Expand Up @@ -344,6 +408,7 @@ public String toString() {
.field("scan spec", splunkScanSpec)
.field("columns", columns)
.field("maxRecords", maxRecords)
.field("spl", generateQuery())
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,6 @@

@JsonTypeName("splunk-sub-scan")
public class SplunkSubScan extends AbstractBase implements SubScan {
private static final String EARLIEST_TIME_COLUMN = "earliestTime";
private static final String LATEST_TIME_COLUMN = "latestTime";

private final SplunkPluginConfig config;
private final SplunkScanSpec splunkScanSpec;
private final List<SchemaPath> columns;
Expand Down Expand Up @@ -117,73 +114,9 @@ public String toString() {
.field("columns", columns)
.field("filters", filters)
.field("maxRecords", maxRecords)
.field("spl", generateQuery())
.toString();
}

/**
* Generates the query which will be sent to Splunk. This method exists for debugging purposes so
* that the actual SPL will be recorded in the query plan.
*/
private String generateQuery() {
String earliestTime = null;
String latestTime = null;
Map<String, ExprNode.ColRelOpConstNode> filters = getFilters();

// Splunk searches perform best when they are time bound. This allows the user to set
// default time boundaries in the config. These will be overwritten in filter pushdowns
if (filters != null && filters.containsKey(EARLIEST_TIME_COLUMN)) {
earliestTime = filters.get(EARLIEST_TIME_COLUMN).value.value.toString();

// Remove from map
filters.remove(EARLIEST_TIME_COLUMN);
}

if (filters != null && filters.containsKey(LATEST_TIME_COLUMN)) {
latestTime = filters.get(LATEST_TIME_COLUMN).value.value.toString();

// Remove from map so they are not pushed down into the query
filters.remove(LATEST_TIME_COLUMN);
}

if (earliestTime == null) {
earliestTime = config.getEarliestTime();
}

if (latestTime == null) {
latestTime = config.getLatestTime();
}

// Special case: If the user wishes to send arbitrary SPL to Splunk, the user can use the "SPL"
// Index and spl filter
if (splunkScanSpec.getIndexName().equalsIgnoreCase("spl")) {
return filters.get("spl").value.value.toString();
}

SplunkQueryBuilder builder = new SplunkQueryBuilder(splunkScanSpec.getIndexName());

// Set the sourcetype
if (filters != null && filters.containsKey("sourcetype")) {
String sourcetype = filters.get("sourcetype").value.value.toString();
builder.addSourceType(sourcetype);
filters.remove("sourcetype");
}

// Add projected columns, skipping star and specials.
for (SchemaPath projectedColumn: columns) {
builder.addField(projectedColumn.getAsUnescapedPath());
}

// Apply filters
builder.addFilters(filters);

// Apply limits
if (getMaxRecords() > 0) {
builder.addLimit(getMaxRecords());
}
return builder.build();
}

@Override
public int hashCode() {
return Objects.hash(config, splunkScanSpec, columns, filters, maxRecords);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
package org.apache.drill.exec.store.splunk;

public class SplunkUtils {
public static final String EARLIEST_TIME_COLUMN = "earliestTime";
public static final String LATEST_TIME_COLUMN = "latestTime";

/**
* These are special fields that alter the queries sent to Splunk.
*/
Expand Down

0 comments on commit c959d76

Please sign in to comment.