Skip to content

Commit

Permalink
Merge pull request #485 from telefonicaid/feature/483_oauth2_hdfssink
Browse files Browse the repository at this point in the history
feature/483_oauth2_hdfssink
  • Loading branch information
Fermín Galán Márquez committed Jul 8, 2015
2 parents 8991896 + f0c4a1a commit d326d86
Show file tree
Hide file tree
Showing 7 changed files with 73 additions and 44 deletions.
1 change: 1 addition & 0 deletions CHANGES_NEXT_RELEASE
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@
- [HARDENING] Link the Quick Start Guide from the README (#454)
- [BUG] Fix the grouping rules validator, now empty fields and non-numeric ids are not allowed (#460)
- [HARDENING] Add detailed explanation about the syntax of the grouping rules (#459)
- [FEATURE] OAuth2 support for OrionHDFSSink (#483)
8 changes: 4 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -726,8 +726,8 @@ cygnusagent.sinks.hdfs-sink.hdfs_host = x1.y1.z1.w1,x2.y2.z2.w2
cygnusagent.sinks.hdfs-sink.hdfs_port = 14000
# username allowed to write in HDFS
cygnusagent.sinks.hdfs-sink.hdfs_username = hdfs_username
# password for the username
cygnusagent.sinks.hdfs-sink.hdfs_password = xxxxxxxxxxxxx
# OAuth2 token
cygnusagent.sinks.hdfs-sink.oauth2_token = xxxxxxxxxxxxx
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.hdfs-sink.attr_persistence = column
# Hive FQDN/IP address of the Hive server
Expand Down Expand Up @@ -903,11 +903,11 @@ Cygnus implements its own startup script, `cygnus-flume-ng` which replaces the s

In foreground (with logging):

$ APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/cygnus.conf -n cygnusagent -Dflume.root.logger=INFO,console [-p <mgmt-if-port>] [-t <polling-interval>]
$ APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/agent_<id>.conf -n cygnusagent -Dflume.root.logger=INFO,console [-p <mgmt-if-port>] [-t <polling-interval>]

In background:

$ nohup APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/cygnus.conf -n cygnusagent -Dflume.root.logger=INFO,LOGFILE [-p <mgmt-if-port>] [-t <polling-interval>] &
$ nohup APACHE_FLUME_HOME/bin/cygnus-flume-ng agent --conf APACHE_FLUME_HOME/conf -f APACHE_FLUME_HOME/conf/agent_<id>.conf -n cygnusagent -Dflume.root.logger=INFO,LOGFILE [-p <mgmt-if-port>] [-t <polling-interval>] &

The parameters used in these commands are:

Expand Down
4 changes: 2 additions & 2 deletions conf/agent.conf.template
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ cygnusagent.sinks.hdfs-sink.hdfs_host = x1.y1.z1.w1,x2.y2.z2.w2
cygnusagent.sinks.hdfs-sink.hdfs_port = 14000
# username allowed to write in HDFS
cygnusagent.sinks.hdfs-sink.hdfs_username = hdfs_username
# password for the username
cygnusagent.sinks.hdfs-sink.hdfs_password = xxxxxxxxxxxxx
# OAuth2 token
cygnusagent.sinks.hdfs-sink.oauth2_token = xxxxxxxx
# how the attributes are stored, either per row either per column (row, column)
cygnusagent.sinks.hdfs-sink.attr_persistence = column
# Hive FQDN/IP address of the Hive server
Expand Down
25 changes: 22 additions & 3 deletions doc/design/OrionHDFSSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* [Implementation details](#section4)
* [`OrionHDFSSink` class](#section4.1)
* [`HDFSBackendImpl` class](#section4.2)
* [Authentication and authorization](#section4.3)
* [Contact](#section5)

##<a name="section1"></a>Functionality
Expand Down Expand Up @@ -120,8 +121,7 @@ NOTE: `hive` is the Hive CLI for locally querying the data.
| cosmos_port<br>(**deprecated**) | no | 14000 | <i>14000</i> if using HttpFS, <i>50070</i> if using WebHDFS.<br>Still usable; if both are configured, `hdfs_port` is preferred |
| hdfs_username | yes | N/A | If `service_as_namespace=false` then it must be an already existent user in HDFS. If `service_as_namespace=true` then it must be a HDFS superuser |
| cosmos\_default\_username<br>(**deprecated**) | yes | N/A | If `service_as_namespace=false` then it must be an already existent user in HDFS. If `service_as_namespace=true` then it must be a HDFS superuser.<br>Still usable; if both are configured, `hdfs_username` is preferred |
| hdfs_password | yes | N/A |
| cosmos\_default\_password<br>(**deprecated**) | yes | N/A | Still usable; if both are configured, `hdfs_password` is preferred |
| oauth2_token | yes | N/A |
| service\_as\_namespace | no | false | If configured as <i>true</i> then the `fiware-service` (or the default one) is used as the HDFS namespace instead of `hdfs_username`/`cosmos_default_username`, which in this case must be a HDFS superuser |
| attr_persistence | no | row | <i>row</i> or <i>column</i>
| hive_host | no | localhost |
Expand All @@ -142,7 +142,7 @@ A configuration example could be:
cygnusagent.sinks.hdfs-sink.hdfs_host = 192.168.80.34
cygnusagent.sinks.hdfs-sink.hdfs_port = 14000
cygnusagent.sinks.hdfsƒsink.hdfs_username = myuser
cygnusagent.sinks.hdfs-sink.hdfs_password = mypassword
cygnusagent.sinks.hdfs-sink.oauth2_token = mytoken
cygnusagent.sinks.hdfs-sink.attr_persistence = column
cygnusagent.sinks.hdfs-sink.hive_host = 192.168.80.35
cygnusagent.sinks.hdfs-sink.hive_port = 10000
Expand Down Expand Up @@ -202,6 +202,25 @@ Provisions a Hive table with data stores in column-like mode within the given HD

[Top](#top)

###<a name="section4.3"></a>Authentication and authorization
[OAuth2](http://oauth.net/2/) is the evolution of the OAuth protocol, an open standard for authorization. Using OAuth, client applications can access in a secure way certain server resources on behalf of the resource owner, and the best, without sharing their credentials with the service. This works because of a trusted authorization service in charge of emitting some pieces of security information: the access tokens. Once requested, the access token is attached to the service request in order the server may ask the authorization service for the validity of the user requesting the access (authentication) and the availability of the resource itself for this user (authorization).

A detailed architecture of OAuth2 can be found [here](http://forge.fiware.org/plugins/mediawiki/wiki/fiware/index.php/PEP_Proxy_-_Wilma_-_Installation_and_Administration_Guide), but in a nutshell, FIWARE implements the above concept through the Identity Manager GE ([Keyrock](http://catalogue.fiware.org/enablers/identity-management-keyrock) implementation) and the Access Control ([AuthZForce](http://catalogue.fiware.org/enablers/authorization-pdp-authzforce) implementation); the join of this two enablers conform the OAuth2-based authorization service in FIWARE:

* Access tokens are requested to the Identity Manager, which is asked by the final service for authentication purposes once the tokens are received. Please observe by asking this the service not only discover who is the real FIWARE user behind the request, but the service has full certainty the user is who he/she says to be.
* At the same time, the Identity Manager relies on the Access Control for authorization purposes. The access token gives, in addition to the real identity of the user, his/her roles according to the requested resource. The Access Control owns a list of policies regarding who is allowed to access all the resources based on the user roles.

This is important for Cygnus since HDFS (big) data can be accessed through the native [WebHDFS](http://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html) RESTful API. And it may be protected with the above mentioned mechanism. If that's the case, simply ask for an access token and add it to the configuration through `cygnusagent.sinks.hdfs-sink.oauth2_token` parameter.

In order to get an access token, do the following request to your OAuth2 tokens provider; in FIWARE Lab this is `cosmos.lab.fi-ware.org:13000`:

$ curl -X POST "http://cosmos.lab.fi-ware.org:13000/cosmos-auth/v1/token" -H "Content-Type: application/x-www-form-urlencoded" -d "grant_type=password&[email protected]&password=xxxxxxxx”
{"access_token": "qjHPUcnW6leYAqr3Xw34DWLQlja0Ix", "token_type": "Bearer", "expires_in": 3600, "refresh_token": “V2Wlk7aFCnElKlW9BOmRzGhBtqgR2z"}

As you can see, your FIWARE Lab credentials are required in the payload, in the form of a password-based grant type (this will be the only time you have to give them).

[Top](#top)

##<a name="section5"></a>Contact
Francisco Romero Bueno ([email protected]) **[Main contributor]**
<br>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,21 @@
public class HDFSBackendImpl extends HttpBackend implements HDFSBackend {

private final String hdfsUser;
private final String hdfsPassword;
private final String oauth2Token;
private final String hiveHost;
private final String hivePort;
private final boolean serviceAsNamespace;
private static final CygnusLogger LOGGER = new CygnusLogger(HDFSBackendImpl.class);
private static final String BASE_URL = "/webhdfs/v1/user/";
private ArrayList<Header> headers;

/**
*
* @param hdfsHosts
* @param hdfsPort
* @param hdfsUser
* @param hiveHost
* @param hdfsPassword
* @param oauth2Token
* @param hivePort
* @param krb5
* @param krb5User
Expand All @@ -62,22 +63,30 @@ public class HDFSBackendImpl extends HttpBackend implements HDFSBackend {
* @param krb5ConfFile
* @param serviceAsNamespace
*/
public HDFSBackendImpl(String[] hdfsHosts, String hdfsPort, String hdfsUser, String hdfsPassword, String hiveHost,
public HDFSBackendImpl(String[] hdfsHosts, String hdfsPort, String hdfsUser, String oauth2Token, String hiveHost,
String hivePort, boolean krb5, String krb5User, String krb5Password, String krb5LoginConfFile,
String krb5ConfFile, boolean serviceAsNamespace) {
super(hdfsHosts, hdfsPort, false, krb5, krb5User, krb5Password, krb5LoginConfFile, krb5ConfFile);
this.hdfsUser = hdfsUser;
this.hdfsPassword = hdfsPassword;
this.oauth2Token = oauth2Token;
this.hiveHost = hiveHost;
this.hivePort = hivePort;
this.serviceAsNamespace = serviceAsNamespace;

// add the OAuth2 token as a the unique header that will be sent
if (oauth2Token != null && oauth2Token.length() > 0) {
headers = new ArrayList<Header>();
headers.add(new BasicHeader("X-Auth-Token", oauth2Token));
} else {
headers = null;
} // if else
} // HDFSBackendImpl

@Override
public void createDir(String dirPath) throws Exception {
String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + dirPath
+ "?op=mkdirs&user.name=" + hdfsUser;
JsonResponse response = doRequest("PUT", relativeURL, true, null, null);
JsonResponse response = doRequest("PUT", relativeURL, true, headers, null);

// check the status
if (response.getStatusCode() != 200) {
Expand All @@ -92,7 +101,7 @@ public void createFile(String filePath, String data)
throws Exception {
String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + filePath
+ "?op=create&user.name=" + hdfsUser;
JsonResponse response = doRequest("PUT", relativeURL, true, null, null);
JsonResponse response = doRequest("PUT", relativeURL, true, headers, null);

// check the status
if (response.getStatusCode() != 307) {
Expand All @@ -106,7 +115,10 @@ public void createFile(String filePath, String data)
String absoluteURL = header.getValue();

// do second step
ArrayList<Header> headers = new ArrayList<Header>();
if (headers == null) {
headers = new ArrayList<Header>();
} // if

headers.add(new BasicHeader("Content-Type", "application/octet-stream"));
response = doRequest("PUT", absoluteURL, false, headers, new StringEntity(data + "\n"));

Expand All @@ -122,7 +134,7 @@ public void createFile(String filePath, String data)
public void append(String filePath, String data) throws Exception {
String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + filePath
+ "?op=append&user.name=" + hdfsUser;
JsonResponse response = doRequest("POST", relativeURL, true, null, null);
JsonResponse response = doRequest("POST", relativeURL, true, headers, null);

// check the status
if (response.getStatusCode() != 307) {
Expand All @@ -136,7 +148,10 @@ public void append(String filePath, String data) throws Exception {
String absoluteURL = header.getValue();

// do second step
ArrayList<Header> headers = new ArrayList<Header>();
if (headers == null) {
headers = new ArrayList<Header>();
} // if

headers.add(new BasicHeader("Content-Type", "application/octet-stream"));
response = doRequest("POST", absoluteURL, false, headers, new StringEntity(data + "\n"));

Expand All @@ -152,7 +167,7 @@ public void append(String filePath, String data) throws Exception {
public boolean exists(String filePath) throws Exception {
String relativeURL = BASE_URL + (serviceAsNamespace ? "" : (hdfsUser + "/")) + filePath
+ "?op=getfilestatus&user.name=" + hdfsUser;
JsonResponse response = doRequest("GET", relativeURL, true, null, null);
JsonResponse response = doRequest("GET", relativeURL, true, headers, null);

// check the status
return (response.getStatusCode() == 200);
Expand All @@ -170,7 +185,7 @@ public void provisionHiveTable(String dirPath) throws Exception {
LOGGER.info("Creating Hive external table=" + tableName);

// get a Hive client
HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, hdfsPassword);
HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, oauth2Token);

// create the standard 8-fields
String fields = "("
Expand Down Expand Up @@ -210,7 +225,7 @@ public void provisionHiveTable(String dirPath, String fields) throws Exception {
LOGGER.info("Creating Hive external table=" + tableName);

// get a Hive client
HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, hdfsPassword);
HiveBackend hiveClient = new HiveBackend(hiveHost, hivePort, hdfsUser, oauth2Token);

// create the query
String query = "create external table " + tableName + " (" + fields + ") row format serde "
Expand Down
34 changes: 14 additions & 20 deletions src/main/java/com/telefonica/iot/cygnus/sinks/OrionHDFSSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public class OrionHDFSSink extends OrionSink {
private String[] host;
private String port;
private String username;
private String password;
private String oauth2Token;
private boolean rowAttrPersistence;
private String hiveHost;
private String hivePort;
Expand Down Expand Up @@ -123,13 +123,13 @@ protected String getCosmosDefaultUsername() {
} // getCosmosDefaultUsername

/**
* Gets the Cosmos password for the default username. It is protected due to it is only required for testing
* purposes.
* @return The Cosmos password for the detault Cosmos username
* Gets the OAuth2 token used for authentication and authorization. It is protected due to it is only required
* for testing purposes.
* @return The Cosmos oauth2Token for the detault Cosmos username
*/
protected String getCosmosDefaultPassword() {
return password;
} // getCosmosDefaultPassword
protected String getOAuth2Token() {
return oauth2Token;
} // getOAuth2Token

/**
* Gets the Hive port. It is protected due to it is only required for testing purposes.
Expand Down Expand Up @@ -202,20 +202,14 @@ public void configure(Context context) {
+ "properly work!");
} // if else

// FIXME: cosmosPassword should be read as a SHA1 and decoded here
String cosmosDefaultPassword = context.getString("cosmos_default_password");
String hdfsPassword = context.getString("hdfs_password");
oauth2Token = context.getString("oauth2_token");

if (hdfsPassword != null && hdfsPassword.length() > 0) {
password = hdfsPassword;
LOGGER.debug("[" + this.getName() + "] Reading configuration (hdfs_password=" + password + ")");
} else if (cosmosDefaultPassword != null && cosmosDefaultPassword.length() > 0) {
password = cosmosDefaultPassword;
LOGGER.debug("[" + this.getName() + "] Reading configuration (cosmos_default_password=" + password + ")"
+ " -- DEPRECATED, use hdfs_password instead");
if (oauth2Token != null && oauth2Token.length() > 0) {
LOGGER.debug("[" + this.getName() + "] Reading configuration (oauth2_token=" + this.oauth2Token + ")");
} else {
LOGGER.error("[" + this.getName() + "] No password provided. Cygnus can continue, but HDFS sink will not "
+ "properly work!");
LOGGER.error("[" + this.getName() + "] No OAuth2 token provided. Cygnus can continue, but HDFS sink may "
+ "not properly work if WebHDFS service is protected with such an authentication and "
+ "authorization mechanism!");
} // if else

rowAttrPersistence = context.getString("attr_persistence", "row").equals("row");
Expand Down Expand Up @@ -246,7 +240,7 @@ public void configure(Context context) {
public void start() {
try {
// create the persistence backend
persistenceBackend = new HDFSBackendImpl(host, port, username, password, hiveHost, hivePort, krb5,
persistenceBackend = new HDFSBackendImpl(host, port, username, oauth2Token, hiveHost, hivePort, krb5,
krb5User, krb5Password, krb5LoginConfFile, krb5ConfFile, serviceAsNamespace);
LOGGER.debug("[" + this.getName() + "] HDFS persistence backend created");
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public class OrionHDFSSinkTest {
private final String[] cosmosHost = {"localhost"};
private final String cosmosPort = "14000";
private final String cosmosDefaultUsername = "user1";
private final String cosmosDefaultPassword = "pass1234";
private final String oauth2Token = "tokenabcdefghijk";
private final String hivePort = "10000";
private final long recvTimeTs = 123456789;
private final String normalServiceName = "vehicles";
Expand Down Expand Up @@ -162,7 +162,7 @@ public void setUp() throws Exception {
context.put("cosmos_host", cosmosHost[0]);
context.put("cosmos_port", cosmosPort);
context.put("cosmos_default_username", cosmosDefaultUsername);
context.put("cosmos_default_password", cosmosDefaultPassword);
context.put("oauth2_token", oauth2Token);
context.put("hive_port", hivePort);
singleNotifyContextRequest = TestUtils.createJsonNotifyContextRequest(singleContextElementNotification);
multipleNotifyContextRequest = TestUtils.createJsonNotifyContextRequest(multipleContextElementNotification);
Expand All @@ -186,7 +186,7 @@ public void testConfigure() {
assertEquals(cosmosHost[0], sink.getCosmosHost()[0]);
assertEquals(cosmosPort, sink.getCosmosPort());
assertEquals(cosmosDefaultUsername, sink.getCosmosDefaultUsername());
assertEquals(cosmosDefaultPassword, sink.getCosmosDefaultPassword());
assertEquals(oauth2Token, sink.getOAuth2Token());
assertEquals(hivePort, sink.getHivePort());
} // testConfigure

Expand Down

0 comments on commit d326d86

Please sign in to comment.