Skip to content

Commit

Permalink
gRPC: update protobuf message definitions
Browse files Browse the repository at this point in the history
match to update messages:
  - file info includes file's uid, gid and path
  - CancelRetrieveRequest uses archiveId to identify tape file
  - add request arguments validation
  • Loading branch information
kofemann committed Jun 8, 2022
1 parent 03089a3 commit fb00ba0
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 22 deletions.
8 changes: 4 additions & 4 deletions src/main/java/org/dcache/nearline/cta/CtaNearlineStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import org.dcache.cta.rpc.ArchiveResponse;
import org.dcache.cta.rpc.CtaRpcGrpc;
import org.dcache.cta.rpc.CtaRpcGrpc.CtaRpcStub;
import org.dcache.cta.rpc.RetrieveResponse;
import ch.cern.cta.rpc.ArchiveResponse;
import ch.cern.cta.rpc.CtaRpcGrpc;
import ch.cern.cta.rpc.CtaRpcGrpc.CtaRpcStub;
import ch.cern.cta.rpc.RetrieveResponse;
import org.dcache.nearline.cta.xrootd.DataMover;
import org.dcache.pool.nearline.spi.FlushRequest;
import org.dcache.pool.nearline.spi.NearlineStorage;
Expand Down
25 changes: 17 additions & 8 deletions src/main/java/org/dcache/nearline/cta/RequestsFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@
import cta.eos.CtaEos.Transport;
import java.io.File;
import java.util.Objects;
import org.dcache.cta.rpc.ArchiveResponse;
import org.dcache.cta.rpc.CancelRetrieveRequest;
import org.dcache.cta.rpc.DeleteRequest;
import org.dcache.cta.rpc.FileInfo;
import org.dcache.cta.rpc.RetrieveRequest;
import org.dcache.cta.rpc.RetrieveResponse;
import ch.cern.cta.rpc.ArchiveResponse;
import ch.cern.cta.rpc.CancelRetrieveRequest;
import ch.cern.cta.rpc.DeleteRequest;
import ch.cern.cta.rpc.FileInfo;
import ch.cern.cta.rpc.RetrieveRequest;
import ch.cern.cta.rpc.RetrieveResponse;
import org.dcache.namespace.FileAttribute;
import org.dcache.pool.nearline.spi.FlushRequest;
import org.dcache.cta.rpc.ArchiveRequest;
import ch.cern.cta.rpc.ArchiveRequest;
import org.dcache.pool.nearline.spi.RemoveRequest;
import org.dcache.pool.nearline.spi.StageRequest;
import org.dcache.util.ChecksumType;
Expand Down Expand Up @@ -105,6 +105,9 @@ public ArchiveRequest valueOf(FlushRequest request) {
.setFid(dcacheFileAttrs.getPnfsId().toString())
.setStorageClass(dcacheFileAttrs.getStorageClass() + "@" + dcacheFileAttrs.getHsm())
.setCsb(checksumBuilder.build())
.setGid(1)
.setUid(1)
.setPath("/" + id)
.build();

return ArchiveRequest.newBuilder()
Expand All @@ -125,6 +128,9 @@ public DeleteRequest valueOf(RemoveRequest request) {

var ctaFileInfo = FileInfo.newBuilder()
.setFid(id)
.setGid(1)
.setUid(1)
.setPath("/" + id)
.build();

return DeleteRequest.newBuilder()
Expand All @@ -151,6 +157,9 @@ public RetrieveRequest valueOf(StageRequest request) {
.setSize(dcacheFileAttrs.getSize())
.setFid(dcacheFileAttrs.getPnfsId().toString())
.setStorageClass(dcacheFileAttrs.getStorageClass() + "@" + dcacheFileAttrs.getHsm())
.setGid(1)
.setUid(1)
.setPath("/" + id)
.build();

return RetrieveRequest.newBuilder()
Expand All @@ -167,7 +176,7 @@ public CancelRetrieveRequest cancelValueOf(RetrieveRequest request, RetrieveResp
return CancelRetrieveRequest.newBuilder()
.setInstance(instance)
.setCli(client)
.setFid(request.getArchiveId())
.setArchiveId(request.getArchiveId())
.setReqId(response.getReqId())
.build();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.dcache.cta.rpc";
option java_package = "ch.cern.cta.rpc";
option optimize_for = CODE_SIZE;
package cta.dcache.rpc;
package cta.frontend.rpc;

import "google/protobuf/empty.proto";

Expand All @@ -23,6 +23,9 @@ message FileInfo {
uint64 size = 2; // file size
string storageClass = 3; // tape system related storage class (file family)
cta.common.ChecksumBlob csb = 4; // set of knows checksums for the given file
uint32 uid = 5; // files owner user id
uint32 gid = 6; // files owner group id
string path = 7; // files path at creation time
}

/*
Expand Down Expand Up @@ -79,7 +82,7 @@ message DeleteRequest {
message CancelRetrieveRequest {
cta.common.Service instance = 1; // client instance ID
cta.eos.Client cli = 2; // requester information
uint64 fid = 3; // tape system unique file ID
uint64 archiveId = 3; // tape system unique file ID
string reqId = 4; // tape request scheduler ID, used to cancel the request
}

Expand Down
136 changes: 129 additions & 7 deletions src/test/java/org/dcache/nearline/cta/DummyCta.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.dcache.cta.rpc.ArchiveRequest;
import org.dcache.cta.rpc.ArchiveResponse;
import org.dcache.cta.rpc.CancelRetrieveRequest;
import org.dcache.cta.rpc.CtaRpcGrpc;
import org.dcache.cta.rpc.DeleteRequest;
import org.dcache.cta.rpc.RetrieveRequest;
import org.dcache.cta.rpc.RetrieveResponse;
import ch.cern.cta.rpc.ArchiveRequest;
import ch.cern.cta.rpc.ArchiveResponse;
import ch.cern.cta.rpc.CancelRetrieveRequest;
import ch.cern.cta.rpc.CtaRpcGrpc;
import ch.cern.cta.rpc.DeleteRequest;
import ch.cern.cta.rpc.RetrieveRequest;
import ch.cern.cta.rpc.RetrieveResponse;

public class DummyCta {

Expand Down Expand Up @@ -84,6 +84,36 @@ public void version(Empty request, StreamObserver<Version> responseObserver) {
public void archive(ArchiveRequest request,
StreamObserver<ArchiveResponse> responseObserver) {

if (request.getInstance().getName().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getUsername().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getGroupname().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getUid() == 0) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getGid() == 0) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getPath().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (!fail) {
var response = ArchiveResponse.newBuilder()
.setFid(ThreadLocalRandom.current().nextLong())
Expand All @@ -101,6 +131,41 @@ public void archive(ArchiveRequest request,
public void retrieve(RetrieveRequest request,
StreamObserver<RetrieveResponse> responseObserver) {

if (request.getInstance().getName().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getUsername().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getGroupname().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getUid() == 0) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getGid() == 0) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getPath().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getArchiveId() == 0L) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (!fail) {
var response = RetrieveResponse.newBuilder()
.setReqId("RetrieveRequest-" + ThreadLocalRandom.current().nextInt())
Expand All @@ -116,6 +181,27 @@ public void retrieve(RetrieveRequest request,
@Override
public void cancelRetrieve(CancelRetrieveRequest request,
StreamObserver<Empty> responseObserver) {

if (request.getInstance().getName().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getUsername().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getGroupname().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getArchiveId() == 0L) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (!fail) {
var response = Empty.newBuilder()
.build();
Expand All @@ -128,6 +214,42 @@ public void cancelRetrieve(CancelRetrieveRequest request,

@Override
public void delete(DeleteRequest request, StreamObserver<Empty> responseObserver) {

if (request.getInstance().getName().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getUsername().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getCli().getUser().getGroupname().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getUid() == 0) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getGid() == 0) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getFile().getPath().isEmpty()) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (request.getArchiveId() == 0L) {
responseObserver.onError(new StatusException(Status.INVALID_ARGUMENT));
return;
}

if (!fail) {
var response = Empty.newBuilder()
.build();
Expand Down
24 changes: 24 additions & 0 deletions src/test/java/org/dcache/nearline/cta/RequestsFactoryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ public void testArchive() {
achriveRequest.getFile().getCsb().getCs(0).getValue());
assertEquals(fileAttrs.getSize(), achriveRequest.getFile().getSize());
assertEquals(fileAttrs.getPnfsId().toString(), achriveRequest.getFile().getFid());

assertFalse(achriveRequest.getInstance().getName().isEmpty());
assertFalse(achriveRequest.getCli().getUser().getUsername().isEmpty());
assertFalse(achriveRequest.getCli().getUser().getGroupname().isEmpty());
assertFalse(achriveRequest.getFile().getUid() == 0);
assertFalse(achriveRequest.getFile().getGid() == 0);
assertFalse(achriveRequest.getFile().getPath().isEmpty());
}

@Test
Expand All @@ -80,6 +87,15 @@ public void testDelete() {

assertEquals(pnfsid, deleteRequest.getFile().getFid());
assertEquals(archiveId, deleteRequest.getArchiveId());

assertFalse(deleteRequest.getInstance().getName().isEmpty());
assertFalse(deleteRequest.getCli().getUser().getUsername().isEmpty());
assertFalse(deleteRequest.getCli().getUser().getGroupname().isEmpty());
assertFalse(deleteRequest.getFile().getUid() == 0);
assertFalse(deleteRequest.getFile().getGid() == 0);
assertFalse(deleteRequest.getFile().getPath().isEmpty());
assertFalse(deleteRequest.getArchiveId() == 0L);

}

@Test
Expand Down Expand Up @@ -112,6 +128,14 @@ public void testRetrieve() {
assertEquals(fileAttrs.getSize(), retrieveRequest.getFile().getSize());
assertEquals(fileAttrs.getPnfsId().toString(), retrieveRequest.getFile().getFid());
assertEquals(archiveId, retrieveRequest.getArchiveId());

assertFalse(retrieveRequest.getInstance().getName().isEmpty());
assertFalse(retrieveRequest.getCli().getUser().getUsername().isEmpty());
assertFalse(retrieveRequest.getCli().getUser().getGroupname().isEmpty());
assertFalse(retrieveRequest.getFile().getUid() == 0);
assertFalse(retrieveRequest.getFile().getGid() == 0);
assertFalse(retrieveRequest.getFile().getPath().isEmpty());
assertFalse(retrieveRequest.getArchiveId() == 0L);
}

}

0 comments on commit fb00ba0

Please sign in to comment.