Skip to content

Commit

Permalink
Merge pull request #6 from hashmapinc/add_property
Browse files Browse the repository at this point in the history
added property to get_timestamp and exclude_null_value
  • Loading branch information
cherrera2001 authored Jun 2, 2017
2 parents 1fba2b8 + e4747fe commit e6baacc
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;

import com.kentender.nifi.nifi_opcua_services.OPCUAService;

Expand All @@ -54,16 +55,43 @@
@InputRequirement(Requirement.INPUT_REQUIRED)

public class GetValue extends AbstractProcessor {

static boolean error = false;


private final AtomicReference<String> timestamp = new AtomicReference<>();
private final AtomicReference<String> excludeNullValue = new AtomicReference<>();
private final AtomicReference<String> nullValueString = new AtomicReference<>();

public static final PropertyDescriptor OPCUA_SERVICE = new PropertyDescriptor.Builder()
.name("OPC UA Service")
.description("Specifies the OPC UA Service that can be used to access data")
.required(true)
.identifiesControllerService(OPCUAService.class)
.build();

public static final PropertyDescriptor RETURN_TIMESTAMP = new PropertyDescriptor
.Builder().name("Return Timestamp")
.description("Allows to select the source, server, or both timestamps")
.required(true)
.allowableValues("SourceTimestamp", "ServerTimestamp","Both")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor EXCLUDE_NULL_VALUE = new PropertyDescriptor
.Builder().name("Exclude Null Value")
.description("Return data only for non null values")
.required(true)
.allowableValues("No", "Yes")
.defaultValue("No")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();

public static final PropertyDescriptor NULL_VALUE_STRING = new PropertyDescriptor
.Builder().name("Null Value String")
.description("If removing null values, what string is used for null")
.required(false)
.defaultValue("")
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
.build();

public static final Relationship SUCCESS = new Relationship.Builder()
.name("Success")
.description("Successful OPC read")
Expand All @@ -82,7 +110,10 @@ public class GetValue extends AbstractProcessor {
protected void init(final ProcessorInitializationContext context) {
final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
descriptors.add(OPCUA_SERVICE);

descriptors.add(RETURN_TIMESTAMP);
descriptors.add(EXCLUDE_NULL_VALUE);
descriptors.add(NULL_VALUE_STRING);

this.descriptors = Collections.unmodifiableList(descriptors);

final Set<Relationship> relationships = new HashSet<Relationship>();
Expand All @@ -103,7 +134,9 @@ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {

@OnScheduled
public void onScheduled(final ProcessContext context) {

timestamp.set(context.getProperty(RETURN_TIMESTAMP).getValue());
excludeNullValue.set(context.getProperty(EXCLUDE_NULL_VALUE).getValue());
nullValueString.set(context.getProperty(NULL_VALUE_STRING).getValue());
}

/* (non-Javadoc)
Expand All @@ -115,9 +148,8 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final ComponentLog logger = getLogger();

// Initialize response variable
final AtomicReference<List<String>> requestedTagnames = new AtomicReference<>();


final AtomicReference<List<String>> requestedTagnames = new AtomicReference<>();

// get FlowFile
FlowFile flowFile = session.get();
if ( flowFile == null ) {
Expand Down Expand Up @@ -163,7 +195,7 @@ public void process(InputStream in) throws IOException {
logger.debug("Session update failed");
}

byte[] values = opcUAService.getValue(requestedTagnames.get());
byte[] values = opcUAService.getValue(requestedTagnames.get(),timestamp.get(),excludeNullValue.get(),nullValueString.get());

// Write the results back out to flow file
try{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
@CapabilityDescription("Provides client API for working with OPC servers")
public interface OPCUAService extends ControllerService {

byte[] getValue(List<String> reqTagname) throws ProcessException;
byte[] getValue(List<String> reqTagname, String returnTimestamp, String excludeNullValue, String nullValueString) throws ProcessException;

String getNameSpace(String print_indentation, int max_recursiveDepth, List<ExpandedNodeId> expandedNodeIds, UnsignedInteger max_reference_per_node)
throws ProcessException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public void shutdown() {
}

@Override
public byte[] getValue(List<String> reqTagnames) throws ProcessException {
public byte[] getValue(List<String> reqTagnames, String returnTimestamp, String excludeNullValue, String nullValueString) throws ProcessException {
final ComponentLog logger = getLogger();

//Create the nodes to read array
Expand Down Expand Up @@ -365,21 +365,42 @@ public byte[] getValue(List<String> reqTagnames) throws ProcessException {

// Validate response
if (values != null) {
if (values.length == 0)
logger.error("OPC Server returned nothing.");
if (values.length == 0) {
logger.error("OPC Server returned nothing.");
}

else {
// Iterate through values
for (int i = 0; i < values.length; i++) {
try {
String valueLine = "";
try {
// Build flowfile line
serverResponse = serverResponse + nodesToRead[i].getNodeId().toString() + ","
+ values[i].getValue().toString() + ","
+ values[i].getServerTimestamp().toString()
+ values[i].getStatusCode().getValue().toString()
+ System.getProperty("line.separator");
if(excludeNullValue.equals("Yes") && values[i].getValue().toString().equals(nullValueString)){
logger.debug("Null value returned for " + values[i].getValue().toString() + " -- Skipping because property is set");
continue;
}

valueLine += nodesToRead[i].getNodeId().toString() + ",";

if(returnTimestamp.equals("ServerTimestamp") || returnTimestamp.equals("Both")){
valueLine += values[i].getServerTimestamp().toString() + ",";
}
if(returnTimestamp.equals("SourceTimestamp") || returnTimestamp.equals("Both")){
valueLine += values[i].getSourceTimestamp().toString() + ",";
}

valueLine += values[i].getValue().toString() + ","
+ values[i].getStatusCode().getValue().toString()
+ System.getProperty("line.separator");

} catch (Exception ex){
logger.error("error parsing result for" + nodesToRead[i].getNodeId().toString());
logger.error("Error parsing result for" + nodesToRead[i].getNodeId().toString());
valueLine = "";
}
if (valueLine.isEmpty())
continue;

serverResponse += valueLine;
}

//clean up response
Expand Down

0 comments on commit e6baacc

Please sign in to comment.