Skip to content

Commit

Permalink
Fixing the Get Node ID and Get Value logic
Browse files Browse the repository at this point in the history
There was an issue of a switch statement fall through in the get node ID, also now handling the case of multiple selected namespaces.

For get Value the read call was getting executed once per item. Fixed that issue to call in batch mode.
  • Loading branch information
Chris Herrera committed May 26, 2017
1 parent 9309961 commit 894ca21
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 78 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -183,15 +183,23 @@ public void process(OutputStream out) throws IOException {

case "Yes":{
String str = stringBuilder.toString();
str = str.substring(str.indexOf('\n')+1);
final String result = str;
out.write(result.getBytes());
String parts[] = str.split("\\r?\\n");
String outString = "";
for (int i = 0; i < parts.length; i++){
if (parts[i].startsWith("nsu")){
continue;
}
outString = outString + parts[i] + System.getProperty("line.separator");;
}
outString.trim();
out.write(outString.getBytes());
break;
}
case "No":{
out.write(stringBuilder.toString().getBytes());
break;
}
}

}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
final ComponentLog logger = getLogger();

// Initialize response variable
//final AtomicReference<String> requestedTagname = new AtomicReference<>();
final AtomicReference<List<String>> requestedTagname = new AtomicReference<>();
final AtomicReference<List<String>> requestedTagnames = new AtomicReference<>();


// get FlowFile
Expand All @@ -130,12 +129,10 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
public void process(InputStream in) throws IOException {

try{
//String tagname = new BufferedReader(new InputStreamReader(in))
//.lines().collect(Collectors.joining("\n"));
List<String> tagname = new BufferedReader(new InputStreamReader(in))
.lines().collect(Collectors.toList());

requestedTagname.set(tagname);
requestedTagnames.set(tagname);

}catch (Exception e) {
// TODO Auto-generated catch block
Expand Down Expand Up @@ -165,20 +162,16 @@ public void process(InputStream in) throws IOException {
}else {
logger.debug("Session update failed");
}

//byte[] value = opcUAService.getValue(requestedTagname.get());
List<byte[]> values = opcUAService.getValue(requestedTagname.get());

byte[] values = opcUAService.getValue(requestedTagnames.get());

// Write the results back out to flow file
try{
flowFile = session.write(flowFile, new OutputStreamCallback() {

@Override
public void process(OutputStream out) throws IOException {
//out.write(value);
for(byte[] value: values){
out.write(value);
}
out.write(values);
}

});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
public interface OPCUAService extends ControllerService {

//byte[] getValue(String reqTagname) throws ProcessException;
List<byte[]> getValue(List<String> reqTagname) throws ProcessException;
byte[] getValue(List<String> reqTagname) throws ProcessException;

String getNameSpace(String print_indentation, int max_recursiveDepth, List<ExpandedNodeId> expandedNodeIds)
throws ProcessException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@
import static org.opcfoundation.ua.utils.EndpointUtil.selectBySecurityPolicy;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.*;

import javafx.scene.control.IndexRange;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
Expand Down Expand Up @@ -272,7 +270,6 @@ public void onEnabled(final ConfigurationContext context) throws InitializationE
timestamp = System.currentTimeMillis();

} catch (ServiceResultException e) {
// TODO Auto-generated catch block
logger.debug("Error while creating initial SessionChannel: ");
logger.error(e.getMessage());
}
Expand Down Expand Up @@ -321,75 +318,76 @@ public boolean updateSession(){
public void shutdown() {
// Close the session
final ComponentLog logger = getLogger();
/*
* ( is this necessary or common practice.
* Timeouts clean up abandoned sessions ??? )*
* - yes, a client that is aware it will not
* communicate again should close its connection
*
*/

try {
if ( mySession != null )
mySession.close();
} catch (ServiceFaultException e) {
// TODO Auto-generated catch block
logger.debug(e.getMessage());
e.printStackTrace();
} catch (ServiceResultException e) {
// TODO Auto-generated catch block
logger.debug(e.getMessage());
e.printStackTrace();
} catch (Exception e){
// TODO Auto-generated catch block
e.printStackTrace();
}

}

@Override
//public byte[] getValue(String reqTagname) throws ProcessException {
public List<byte[]> getValue(List<String> reqTagnames) throws ProcessException {
public byte[] getValue(List<String> reqTagnames) throws ProcessException {
final ComponentLog logger = getLogger();
// TODO presently this method accepts a tag name as input and fetches a value for that tag
// A future version will need to be able to acquire a value from a specific time in the past

List<byte[]> responseList = new ArrayList<>();
for(String reqTagname: reqTagnames ){
String serverResponse = "";

ReadValueId[] NodesToRead = {
new ReadValueId(NodeId.parseNodeId(reqTagname), Attributes.Value, null, null )
};

if (NodesToRead != null)
{
// Form OPC request
ReadRequest req = new ReadRequest();
req.setMaxAge(500.00);
req.setTimestampsToReturn(TimestampsToReturn.Both);
req.setRequestHeader(null);
req.setNodesToRead(NodesToRead);

// Submit OPC Read and handle response
try{
ReadResponse readResponse = mySession.Read(req);
DataValue[] values = readResponse.getResults();
// TODO need to check the result for errors and other quality issues
serverResponse = reqTagname + "," + values[0].getValue().toString() + ","+ values[0].getServerTimestamp().toString();

}catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();

}
}

responseList.add(serverResponse.getBytes());

}
return responseList;
// return serverResponse.getBytes();

//Create the nodes to read array
ReadValueId nodesToRead[] = new ReadValueId[reqTagnames.size()];

for (int i = 0; i < reqTagnames.size(); i++){
nodesToRead[i] = (new ReadValueId(NodeId.parseNodeId(reqTagnames.get(i)), Attributes.Value, null, null));
}

String serverResponse = "";

// Form OPC request
ReadRequest req = new ReadRequest();
req.setMaxAge(500.00);
req.setTimestampsToReturn(TimestampsToReturn.Both);
req.setRequestHeader(null);
req.setNodesToRead(nodesToRead);

// Submit OPC Read and handle response
try{
ReadResponse readResponse = mySession.Read(req);
DataValue[] values = readResponse.getResults();

// Validate response
if (values != null) {
if (values.length == 0)
logger.error("OPC Server returned nothing.");
else {
// Iterate through values
for (int i = 0; i < values.length; i++) {
try {
// Build flowfile line
serverResponse = serverResponse + nodesToRead[i].getNodeId().toString() + ","
+ values[i].getValue().toString() + ","
+ values[i].getServerTimestamp().toString()
+ values[i].getStatusCode().getValue().toString()
+ System.getProperty("line.separator");
} catch (Exception ex){
logger.error("error parsing result for" + nodesToRead[i].getNodeId().toString());
}
}

//clean up response
serverResponse.trim();
}
}

}catch (Exception e) {
logger.error("Error parsing OPC Server Results: " + e.getMessage() + Arrays.toString(e.getStackTrace()));
}

return serverResponse.getBytes();
}

@Override
Expand All @@ -402,10 +400,6 @@ public String getNameSpace(String print_indentation, int max_recursiveDepth, Lis
// Set the starting node and parse the node tree
logger.debug("Parse the result list for node " + expNodeId.toString());
stringBuilder.append(parseNodeTree(print_indentation, 0, max_recursiveDepth, expNodeId));
/*String str = parseNodeTree(print_indentation, 0, max_recursiveDepth, expNodeId);
if (str != null){
stringBuilder.append(str);
}*/
}

return stringBuilder.toString();
Expand Down

0 comments on commit 894ca21

Please sign in to comment.