diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index dd5fa47ee..2e0ff5445 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -6,3 +6,4 @@ - [HARDENING] Link the Quick Start Guide from the README (#454) - [BUG] Fix the grouping rules validator, now empty fields and non-numeric ids are not allowed (#460) - [HARDENING] Add detailed explanation about the syntax of the grouping rules (#459) +- [FEATURE] OAuth2 support for OrionHDFSSink (#483) diff --git a/README.md b/README.md index 24e00a70b..bfe4b3711 100644 --- a/README.md +++ b/README.md @@ -726,8 +726,8 @@ cygnusagent.sinks.hdfs-sink.hdfs_host = x1.y1.z1.w1,x2.y2.z2.w2 cygnusagent.sinks.hdfs-sink.hdfs_port = 14000 # username allowed to write in HDFS cygnusagent.sinks.hdfs-sink.hdfs_username = hdfs_username -# password for the username -cygnusagent.sinks.hdfs-sink.hdfs_password = xxxxxxxxxxxxx +# OAuth2 token +cygnusagent.sinks.hdfs-sink.oauth2_token = xxxxxxxxxxxxx # how the attributes are stored, either per row either per column (row, column) cygnusagent.sinks.hdfs-sink.attr_persistence = column # Hive FQDN/IP address of the Hive server @@ -903,11 +903,11 @@ Cygnus implements its own startup script, `cygnus-flume-ng` which replaces the s In foreground (with logging): - $ APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/cygnus.conf -n cygnusagent -Dflume.root.logger=INFO,console [-p ] [-t ] + $ APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/agent_.conf -n cygnusagent -Dflume.root.logger=INFO,console [-p ] [-t ] In background: - $ nohup APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/cygnus.conf -n cygnusagent -Dflume.root.logger=INFO,LOGFILE [-p ] [-t ] & + $ nohup APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/agent_.conf -n cygnusagent -Dflume.root.logger=INFO,LOGFILE [-p ] [-t ] & The parameters used in these commands are: diff --git a/conf/agent.conf.template b/conf/agent.conf.template index 2581b07ef..4311639d0 100644 --- a/conf/agent.conf.template +++ b/conf/agent.conf.template @@ -71,8 +71,8 @@ cygnusagent.sinks.hdfs-sink.hdfs_host = x1.y1.z1.w1,x2.y2.z2.w2 cygnusagent.sinks.hdfs-sink.hdfs_port = 14000 # username allowed to write in HDFS cygnusagent.sinks.hdfs-sink.hdfs_username = hdfs_username -# password for the username -cygnusagent.sinks.hdfs-sink.hdfs_password = xxxxxxxxxxxxx +# OAuth2 token +cygnusagent.sinks.hdfs-sink.oauth2_token = xxxxxxxx # how the attributes are stored, either per row either per column (row, column) cygnusagent.sinks.hdfs-sink.attr_persistence = column # Hive FQDN/IP address of the Hive server diff --git a/doc/design/OrionHDFSSink.md b/doc/design/OrionHDFSSink.md index 02b44083c..3b1dd9ca7 100644 --- a/doc/design/OrionHDFSSink.md +++ b/doc/design/OrionHDFSSink.md @@ -8,6 +8,7 @@ * [Implementation details](#section4) * [`OrionHDFSSink` class](#section4.1) * [`HDFSBackendImpl` class](#section4.2) + * [Authentication and authorization](#section4.3) * [Contact](#section5) ##Functionality @@ -120,8 +121,7 @@ NOTE: `hive` is the Hive CLI for locally querying the data. | cosmos_port
(**deprecated**) | no | 14000 | 14000 if using HttpFS, 50070 if using WebHDFS.
Still usable; if both are configured, `hdfs_port` is preferred | | hdfs_username | yes | N/A | If `service_as_namespace=false` then it must be an already existent user in HDFS. If `service_as_namespace=true` then it must be a HDFS superuser | | cosmos\_default\_username
(**deprecated**) | yes | N/A | If `service_as_namespace=false` then it must be an already existent user in HDFS. If `service_as_namespace=true` then it must be a HDFS superuser.
Still usable; if both are configured, `hdfs_username` is preferred | -| hdfs_password | yes | N/A | -| cosmos\_default\_password
(**deprecated**) | yes | N/A | Still usable; if both are configured, `hdfs_password` is preferred | +| oauth2_token | yes | N/A | | service\_as\_namespace | no | false | If configured as true then the `fiware-service` (or the default one) is used as the HDFS namespace instead of `hdfs_username`/`cosmos_default_username`, which in this case must be a HDFS superuser | | attr_persistence | no | row | row or column | hive_host | no | localhost | @@ -142,7 +142,7 @@ A configuration example could be: cygnusagent.sinks.hdfs-sink.hdfs_host = 192.168.80.34 cygnusagent.sinks.hdfs-sink.hdfs_port = 14000 cygnusagent.sinks.hdfsƒsink.hdfs_username = myuser - cygnusagent.sinks.hdfs-sink.hdfs_password = mypassword + cygnusagent.sinks.hdfs-sink.oauth2_token = mytoken cygnusagent.sinks.hdfs-sink.attr_persistence = column cygnusagent.sinks.hdfs-sink.hive_host = 192.168.80.35 cygnusagent.sinks.hdfs-sink.hive_port = 10000 @@ -202,6 +202,25 @@ Provisions a Hive table with data stores in column-like mode within the given HD [Top](#top) +###Authentication and authorization +[OAuth2](http://oauth.net/2/) is the evolution of the OAuth protocol, an open standard for authorization. Using OAuth, client applications can access in a secure way certain server resources on behalf of the resource owner, and the best, without sharing their credentials with the service. This works because of a trusted authorization service in charge of emitting some pieces of security information: the access tokens. Once requested, the access token is attached to the service request in order the server may ask the authorization service for the validity of the user requesting the access (authentication) and the availability of the resource itself for this user (authorization). + +A detailed architecture of OAuth2 can be found [here](http://forge.fiware.org/plugins/mediawiki/wiki/fiware/index.php/PEP_Proxy_-_Wilma_-_Installation_and_Administration_Guide), but in a nutshell, FIWARE implements the above concept through the Identity Manager GE ([Keyrock](http://catalogue.fiware.org/enablers/identity-management-keyrock) implementation) and the Access Control ([AuthZForce](http://catalogue.fiware.org/enablers/authorization-pdp-authzforce) implementation); the join of this two enablers conform the OAuth2-based authorization service in FIWARE: + +* Access tokens are requested to the Identity Manager, which is asked by the final service for authentication purposes once the tokens are received. Please observe by asking this the service not only discover who is the real FIWARE user behind the request, but the service has full certainty the user is who he/she says to be. +* At the same time, the Identity Manager relies on the Access Control for authorization purposes. The access token gives, in addition to the real identity of the user, his/her roles according to the requested resource. The Access Control owns a list of policies regarding who is allowed to access all the resources based on the user roles. + +This is important for Cygnus since HDFS (big) data can be accessed through the native [WebHDFS](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) RESTful API. And it may be protected with the above mentioned mechanism. If that's the case, simply ask for an access token and add it to the configuration through `cygnusagent.sinks.hdfs-sink.oauth2_token` parameter. + +In order to get an access token, do the following request to your OAuth2 tokens provider; in FIWARE Lab this is `cosmos.lab.fi-ware.org:13000`: + + $ curl -X POST "http://cosmos.lab.fi-ware.org:13000/cosmos-auth/v1/token" -H "Content-Type: application/x-www-form-urlencoded" -d "grant_type=password&username=frb@tid.es&password=xxxxxxxx” + {"access_token": "qjHPUcnW6leYAqr3Xw34DWLQlja0Ix", "token_type": "Bearer", "expires_in": 3600, "refresh_token": “V2Wlk7aFCnElKlW9BOmRzGhBtqgR2z"} + +As you can see, your FIWARE Lab credentials are required in the payload, in the form of a password-based grant type (this will be the only time you have to give them). + +[Top](#top) + ##Contact Francisco Romero Bueno (francisco.romerobueno@telefonica.com) **[Main contributor]**
diff --git a/src/main/java/com/telefonica/iot/cygnus/backends/hdfs/HDFSBackendImpl.java b/src/main/java/com/telefonica/iot/cygnus/backends/hdfs/HDFSBackendImpl.java index bcea1308a..4554d07b5 100644 --- a/src/main/java/com/telefonica/iot/cygnus/backends/hdfs/HDFSBackendImpl.java +++ b/src/main/java/com/telefonica/iot/cygnus/backends/hdfs/HDFSBackendImpl.java @@ -40,12 +40,13 @@ public class HDFSBackendImpl extends HttpBackend implements HDFSBackend { private final String hdfsUser; - private final String hdfsPassword; + private final String oauth2Token; private final String hiveHost; private final String hivePort; private final boolean serviceAsNamespace; private static final CygnusLogger LOGGER = new CygnusLogger(HDFSBackendImpl.class); private static final String BASE_URL = "/webhdfs/v1/user/"; + private ArrayList
headers; /** * @@ -53,7 +54,7 @@ public class HDFSBackendImpl extends HttpBackend implements HDFSBackend { * @param hdfsPort * @param hdfsUser * @param hiveHost - * @param hdfsPassword + * @param oauth2Token * @param hivePort * @param krb5 * @param krb5User @@ -62,22 +63,30 @@ public class HDFSBackendImpl extends HttpBackend implements HDFSBackend { * @param krb5ConfFile * @param serviceAsNamespace */ - public HDFSBackendImpl(String[] hdfsHosts, String hdfsPort, String hdfsUser, String hdfsPassword, String hiveHost, + public HDFSBackendImpl(String[] hdfsHosts, String hdfsPort, String hdfsUser, String oauth2Token, String hiveHost, String hivePort, boolean krb5, String krb5User, String krb5Password, String krb5LoginConfFile, String krb5ConfFile, boolean serviceAsNamespace) { super(hdfsHosts, hdfsPort, false, krb5, krb5User, krb5Password, krb5LoginConfFile, krb5ConfFile); this.hdfsUser = hdfsUser; - this.hdfsPassword = hdfsPassword; + this.oauth2Token = oauth2Token; this.hiveHost = hiveHost; this.hivePort = hivePort; this.serviceAsNamespace = serviceAsNamespace; + + // add the OAuth2 token as a the unique header that will be sent + if (oauth2Token != null && oauth2Token.length() > 0) { + headers = new ArrayList
(); + headers.add(new BasicHeader("X-Auth-Token", oauth2Token)); + } else { + headers = null; + } // if else } // HDFSBackendImpl @Override public void createDir(String dirPath) throws Exception { String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + dirPath + "?op=mkdirs&user.name=" + hdfsUser; - JsonResponse response = doRequest("PUT", relativeURL, true, null, null); + JsonResponse response = doRequest("PUT", relativeURL, true, headers, null); // check the status if (response.getStatusCode() != 200) { @@ -92,7 +101,7 @@ public void createFile(String filePath, String data) throws Exception { String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + filePath + "?op=create&user.name=" + hdfsUser; - JsonResponse response = doRequest("PUT", relativeURL, true, null, null); + JsonResponse response = doRequest("PUT", relativeURL, true, headers, null); // check the status if (response.getStatusCode() != 307) { @@ -106,7 +115,10 @@ public void createFile(String filePath, String data) String absoluteURL = header.getValue(); // do second step - ArrayList
headers = new ArrayList
(); + if (headers == null) { + headers = new ArrayList
(); + } // if + headers.add(new BasicHeader("Content-Type", "application/octet-stream")); response = doRequest("PUT", absoluteURL, false, headers, new StringEntity(data + "\n")); @@ -122,7 +134,7 @@ public void createFile(String filePath, String data) public void append(String filePath, String data) throws Exception { String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + filePath + "?op=append&user.name=" + hdfsUser; - JsonResponse response = doRequest("POST", relativeURL, true, null, null); + JsonResponse response = doRequest("POST", relativeURL, true, headers, null); // check the status if (response.getStatusCode() != 307) { @@ -136,7 +148,10 @@ public void append(String filePath, String data) throws Exception { String absoluteURL = header.getValue(); // do second step - ArrayList
headers = new ArrayList
(); + if (headers == null) { + headers = new ArrayList
(); + } // if + headers.add(new BasicHeader("Content-Type", "application/octet-stream")); response = doRequest("POST", absoluteURL, false, headers, new StringEntity(data + "\n")); @@ -152,7 +167,7 @@ public void append(String filePath, String data) throws Exception { public boolean exists(String filePath) throws Exception { String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + filePath + "?op=getfilestatus&user.name=" + hdfsUser; - JsonResponse response = doRequest("GET", relativeURL, true, null, null); + JsonResponse response = doRequest("GET", relativeURL, true, headers, null); // check the status return (response.getStatusCode() == 200); @@ -170,7 +185,7 @@ public void provisionHiveTable(String dirPath) throws Exception { LOGGER.info("Creating Hive external table=" + tableName); // get a Hive client - HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, hdfsPassword); + HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, oauth2Token); // create the standard 8-fields String fields = "(" @@ -210,7 +225,7 @@ public void provisionHiveTable(String dirPath, String fields) throws Exception { LOGGER.info("Creating Hive external table=" + tableName); // get a Hive client - HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, hdfsPassword); + HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, oauth2Token); // create the query String query = "create external table " + tableName + " (" + fields + ") row format serde " diff --git a/src/main/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSink.java b/src/main/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSink.java index 377b60c4b..9cd77ba3f 100644 --- a/src/main/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSink.java +++ b/src/main/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSink.java @@ -79,7 +79,7 @@ public class OrionHDFSSink extends OrionSink { private String[] host; private String port; private String username; - private String password; + private String oauth2Token; private boolean rowAttrPersistence; private String hiveHost; private String hivePort; @@ -123,13 +123,13 @@ protected String getCosmosDefaultUsername() { } // getCosmosDefaultUsername /** - * Gets the Cosmos password for the default username. It is protected due to it is only required for testing - * purposes. - * @return The Cosmos password for the detault Cosmos username + * Gets the OAuth2 token used for authentication and authorization. It is protected due to it is only required + * for testing purposes. + * @return The Cosmos oauth2Token for the detault Cosmos username */ - protected String getCosmosDefaultPassword() { - return password; - } // getCosmosDefaultPassword + protected String getOAuth2Token() { + return oauth2Token; + } // getOAuth2Token /** * Gets the Hive port. It is protected due to it is only required for testing purposes. @@ -202,20 +202,14 @@ public void configure(Context context) { + "properly work!"); } // if else - // FIXME: cosmosPassword should be read as a SHA1 and decoded here - String cosmosDefaultPassword = context.getString("cosmos_default_password"); - String hdfsPassword = context.getString("hdfs_password"); + oauth2Token = context.getString("oauth2_token"); - if (hdfsPassword != null && hdfsPassword.length() > 0) { - password = hdfsPassword; - LOGGER.debug("[" + this.getName() + "] Reading configuration (hdfs_password=" + password + ")"); - } else if (cosmosDefaultPassword != null && cosmosDefaultPassword.length() > 0) { - password = cosmosDefaultPassword; - LOGGER.debug("[" + this.getName() + "] Reading configuration (cosmos_default_password=" + password + ")" - + " -- DEPRECATED, use hdfs_password instead"); + if (oauth2Token != null && oauth2Token.length() > 0) { + LOGGER.debug("[" + this.getName() + "] Reading configuration (oauth2_token=" + this.oauth2Token + ")"); } else { - LOGGER.error("[" + this.getName() + "] No password provided. Cygnus can continue, but HDFS sink will not " - + "properly work!"); + LOGGER.error("[" + this.getName() + "] No OAuth2 token provided. Cygnus can continue, but HDFS sink may " + + "not properly work if WebHDFS service is protected with such an authentication and " + + "authorization mechanism!"); } // if else rowAttrPersistence = context.getString("attr_persistence", "row").equals("row"); @@ -246,7 +240,7 @@ public void configure(Context context) { public void start() { try { // create the persistence backend - persistenceBackend = new HDFSBackendImpl(host, port, username, password, hiveHost, hivePort, krb5, + persistenceBackend = new HDFSBackendImpl(host, port, username, oauth2Token, hiveHost, hivePort, krb5, krb5User, krb5Password, krb5LoginConfFile, krb5ConfFile, serviceAsNamespace); LOGGER.debug("[" + this.getName() + "] HDFS persistence backend created"); } catch (Exception e) { diff --git a/src/test/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSinkTest.java b/src/test/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSinkTest.java index cb414a70b..bf19f549e 100644 --- a/src/test/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSinkTest.java +++ b/src/test/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSinkTest.java @@ -60,7 +60,7 @@ public class OrionHDFSSinkTest { private final String[] cosmosHost = {"localhost"}; private final String cosmosPort = "14000"; private final String cosmosDefaultUsername = "user1"; - private final String cosmosDefaultPassword = "pass1234"; + private final String oauth2Token = "tokenabcdefghijk"; private final String hivePort = "10000"; private final long recvTimeTs = 123456789; private final String normalServiceName = "vehicles"; @@ -162,7 +162,7 @@ public void setUp() throws Exception { context.put("cosmos_host", cosmosHost[0]); context.put("cosmos_port", cosmosPort); context.put("cosmos_default_username", cosmosDefaultUsername); - context.put("cosmos_default_password", cosmosDefaultPassword); + context.put("oauth2_token", oauth2Token); context.put("hive_port", hivePort); singleNotifyContextRequest = TestUtils.createJsonNotifyContextRequest(singleContextElementNotification); multipleNotifyContextRequest = TestUtils.createJsonNotifyContextRequest(multipleContextElementNotification); @@ -186,7 +186,7 @@ public void testConfigure() { assertEquals(cosmosHost[0], sink.getCosmosHost()[0]); assertEquals(cosmosPort, sink.getCosmosPort()); assertEquals(cosmosDefaultUsername, sink.getCosmosDefaultUsername()); - assertEquals(cosmosDefaultPassword, sink.getCosmosDefaultPassword()); + assertEquals(oauth2Token, sink.getOAuth2Token()); assertEquals(hivePort, sink.getHivePort()); } // testConfigure