Skip to content

Commit

Permalink
added support for receiver to publish ack/receipt back
Browse files Browse the repository at this point in the history
  • Loading branch information
mgravitt committed Mar 15, 2020
1 parent 3dc7b2f commit 43598f8
Show file tree
Hide file tree
Showing 14 changed files with 231 additions and 140 deletions.
6 changes: 5 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ build
node_modules
package-lock.json
**.DS_Store
.DS_Store
.DS_Store
ipld
Dockerfile
demo.txt

11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ Eosio:
PublishAccount: messengerbus
PublishPrivateKey: 5KAP1zytghuvowgprSPLNasajibZcxf4KMgdgNbrNj98xhcGAUa
Dfuse:
WSEndpoint: wss://kylin.eos.dfuse.io/v1/stream
Protocol: GraphQL
GraphQLEndpoint: kylin.eos.dfuse.io:443
Origin: github.com/eosio-enterprise/chappe
ApiKey: web_*** # Replace this, get one at dfuse.io
KeyDirectory: channels/
Expand All @@ -43,6 +44,14 @@ Open New Shell, and run Publisher
```
./chappe publish --channel-name chan4242 --readable-memo "This is human-readable, unencrypted memo"
```
## Features
- Send/receive encrypted (or unencrypted) messages/documents using public or private EOSIO chains
- Messages are sent on channels, and all nodes with the channel key can read messages
- Optionally publish receipts (acknowledgements) signed with a node's key
- Support for large messages and files
- Optionally mask all metadata (publisher, type of message)
- (Coming Soon) Ability to set reveal parameters that automatically publish decrypted version after time elapses
- (Coming Soon) Hierarchies for data visibility

## Menu
Run chappe
Expand Down
7 changes: 5 additions & 2 deletions cmd/subscribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,18 @@ func MakeSubscribe() *cobra.Command {
}

var channelName string
var sendReceipts bool
command.Flags().StringVarP(&channelName, "channel-name", "n", "", "channel name")
command.Flags().BoolVarP(&sendReceipts, "send-receipts", "r", false, "send device-specific receipts back to hub")

command.Run = func(cmd *cobra.Command, args []string) {

go func() {

if viper.GetString("Dfuse.Protocol") == "WebSocket" {
pkg.StreamWS(channelName)
pkg.StreamWS(channelName, sendReceipts)
} else {
pkg.StreamMessages(context.TODO(), channelName)
pkg.StreamMessages(context.TODO(), channelName, sendReceipts)
}
}()

Expand Down
1 change: 1 addition & 0 deletions contracts/messenger/include/messenger.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ CONTRACT messenger : public contract {

ACTION pub( string ipfs_hash, string memo );

ACTION pubmap ( std::map<string, string> payload);
using pub_action = action_wrapper<"pub"_n, &messenger::pub>;
};
29 changes: 29 additions & 0 deletions contracts/messenger/messenger/messenger.abi
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
"version": "eosio::abi/1.1",
"types": [],
"structs": [
{
"name": "pair_string_string",
"base": "",
"fields": [
{
"name": "key",
"type": "string"
},
{
"name": "value",
"type": "string"
}
]
},
{
"name": "pub",
"base": "",
Expand All @@ -16,13 +30,28 @@
"type": "string"
}
]
},
{
"name": "pubmap",
"base": "",
"fields": [
{
"name": "payload",
"type": "pair_string_string[]"
}
]
}
],
"actions": [
{
"name": "pub",
"type": "pub",
"ricardian_contract": ""
},
{
"name": "pubmap",
"type": "pubmap",
"ricardian_contract": ""
}
],
"tables": [],
Expand Down
1 change: 0 additions & 1 deletion contracts/messenger/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,3 @@ find_package(eosio.cdt)

add_contract( messenger messenger messenger.cpp )
target_include_directories( messenger PUBLIC ${CMAKE_SOURCE_DIR}/../include )
target_ricardian_directory( messenger ${CMAKE_SOURCE_DIR}/../ricardian )
3 changes: 2 additions & 1 deletion contracts/messenger/src/messenger.cpp
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
#include <messenger.hpp>
ACTION messenger::pub( string ipfs_hash, string memo) {}
ACTION messenger::pub( string ipfs_hash, string memo) {}
ACTION messenger::pubmap ( std::map<string, string> payload) {}
115 changes: 59 additions & 56 deletions internal/encryption/rsa.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package encryption

import (
"crypto"
"crypto/rand"
"crypto/rsa"
"crypto/sha256"
Expand All @@ -9,62 +10,43 @@ import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"

"github.com/spf13/viper"
)

// ParseRsaPublicKeyFromPem ...
func ParseRsaPublicKeyFromPem(pubPEM []byte) (*rsa.PublicKey, error) {
block, _ := pem.Decode(pubPEM)
if block == nil {
return nil, errors.New("failed to parse PEM block containing the key")
}

pub, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
return nil, err
}
// CreateChannel ...
func CreateChannel(keyname string) (*rsa.PrivateKey, *rsa.PublicKey) {

switch pub := pub.(type) {
case *rsa.PublicKey:
return pub, nil
default:
break // fall through
}
return nil, errors.New("Key type is not RSA")
}
priv, pub := generateRsaKeyPair()

// ParseRsaPrivateKeyFromPem ...
func ParseRsaPrivateKeyFromPem(privPEM []byte) (*rsa.PrivateKey, error) {
block, _ := pem.Decode(privPEM)
if block == nil {
return nil, errors.New("failed to parse PEM block containing the key")
}
// Export the keys to pem string
privPem := exportPrivateKey(priv)
pubPem, _ := exportPublicKey(pub)

priv, err := x509.ParsePKCS1PrivateKey(block.Bytes)
if err != nil {
return nil, err
}
ioutil.WriteFile(viper.GetString("KeyDirectory")+keyname+".pub", pubPem, 0644)
ioutil.WriteFile(viper.GetString("KeyDirectory")+keyname+".pem", privPem, 0644)

return priv, nil
return priv, pub
}

func loadPublicKey(keyname string) *rsa.PublicKey {
// CalcReceipt ...
func CalcReceipt(payload []byte) ([]byte, error) {

publicKeyPemStr, err := ioutil.ReadFile(viper.GetString("KeyDirectory") + keyname + ".pub")
if err != nil {
fmt.Fprintf(os.Stderr, "Error from reading key file: %s\n", err)
return nil
}
var signature []byte
rsaPrivateKey := load(viper.GetString("DeviceRSAPrivateKey"))

// Only small messages can be signed directly; thus the hash of a
// message, rather than the message itself, is signed.
hashed := sha256.Sum256(payload)

publicKey, err := ParseRsaPublicKeyFromPem(publicKeyPemStr)
signature, err := rsa.SignPKCS1v15(rand.Reader, rsaPrivateKey, crypto.SHA256, hashed[:])
if err != nil {
fmt.Fprintf(os.Stderr, "Error from reading key file: %s\n", err)
return nil
log.Printf("Error from signing: %s\n", err)
return signature, err
}

return publicKey
return signature, nil
}

// RsaEncrypt ...
Expand Down Expand Up @@ -94,6 +76,42 @@ func RsaDecrypt(channelName string, payload []byte) ([]byte, error) {
return plaintext, nil
}

func loadPublicKey(keyname string) *rsa.PublicKey {
publicKeyPemStr, err := ioutil.ReadFile(viper.GetString("KeyDirectory") + keyname + ".pub")
if err != nil {
fmt.Fprintf(os.Stderr, "Error from reading key file: %s\n", err)
return nil
}

publicKey, err := parseRsaPublicKeyFromPem(publicKeyPemStr)
if err != nil {
fmt.Fprintf(os.Stderr, "Error from reading key file: %s\n", err)
return nil
}

return publicKey
}

func parseRsaPublicKeyFromPem(pubPEM []byte) (*rsa.PublicKey, error) {
block, _ := pem.Decode(pubPEM)
if block == nil {
return nil, errors.New("failed to parse PEM block containing the key")
}

pub, err := x509.ParsePKIXPublicKey(block.Bytes)
if err != nil {
return nil, err
}

switch pub := pub.(type) {
case *rsa.PublicKey:
return pub, nil
default:
break // fall through
}
return nil, errors.New("Key type is not RSA")
}

func parseRsaPrivateKeyFromPem(privPEM []byte) (*rsa.PrivateKey, error) {
block, _ := pem.Decode(privPEM)
if block == nil {
Expand Down Expand Up @@ -153,18 +171,3 @@ func exportPublicKey(pubkey *rsa.PublicKey) ([]byte, error) {
)
return pubkeyPem, nil
}

// CreateChannel ...
func CreateChannel(keyname string) (*rsa.PrivateKey, *rsa.PublicKey) {

priv, pub := generateRsaKeyPair()

// Export the keys to pem string
privPem := exportPrivateKey(priv)
pubPem, _ := exportPublicKey(pub)

ioutil.WriteFile(viper.GetString("KeyDirectory")+keyname+".pub", pubPem, 0644)
ioutil.WriteFile(viper.GetString("KeyDirectory")+keyname+".pem", privPem, 0644)

return priv, pub
}
13 changes: 10 additions & 3 deletions pkg/dfuse-graphql.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func getToken(apiKey string) (token string, expiration time.Time, err error) {
func createClient(endpoint string) pb.GraphQLClient {
dfuseAPIKey := viper.GetString("Dfuse.ApiKey")
if dfuseAPIKey == "" {
panic("Dfuse.ApiKey is required in configuration")
log.Fatal("Dfuse.ApiKey is required in configuration")
}

token, _, err := getToken(dfuseAPIKey)
Expand Down Expand Up @@ -86,7 +86,7 @@ type eosioDocument struct {
}

// StreamMessages ...
func StreamMessages(ctx context.Context, channelName string) {
func StreamMessages(ctx context.Context, channelName string, sendReceipts bool) {
/* The client can be re-used for all requests, cache it at the appropriate level */
client := createClient(viper.GetString("Dfuse.GraphQLEndpoint"))
executor, err := client.Execute(ctx, &pb.Request{Query: operationEOS})
Expand Down Expand Up @@ -121,7 +121,14 @@ func StreamMessages(ctx context.Context, channelName string) {
} else {
for _, action := range result.Trace.MatchingActions {
data := action.JSON
receiveGQL(channelName, data)
if data["memo"] == "receipt" {
log.Println("Received receipt, ignoring.")
} else {
message, err := receiveGQL(channelName, data)
if err == nil && sendReceipts {
SendReceipt(channelName, message)
}
}
}
}
}
Expand Down
17 changes: 10 additions & 7 deletions pkg/dfuse-websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
)

// StreamWS ...
func StreamWS(channelName string) {
client := GetClient()
err := client.Send(GetActionTraces())
func StreamWS(channelName string, sendReceipts bool) {
client := getClient()
err := client.Send(getActionTraces())
if err != nil {
log.Fatalf("Failed to send request to dfuse: %s", err)
}
Expand All @@ -25,8 +25,10 @@ func StreamWS(channelName string) {

switch m := msg.(type) {
case *eosws.ActionTrace:
// pkg.StreamMessages(context.TODO())
receiveWS(channelName, m)
message, err := receiveWS(channelName, m)
if err == nil && sendReceipts {
SendReceipt(channelName, message)
}
case *eosws.Progress:
fmt.Print(".") // poor man's progress bar, using print not log
case *eosws.Listening:
Expand All @@ -48,11 +50,12 @@ func receiveWS(channelName string, dfuseMessage *eosws.ActionTrace) (Message, er
log.Println("Error loading message: ", err)
return msg, err
}

return msg, nil
}

// GetClient ...
func GetClient() *eosws.Client {
func getClient() *eosws.Client {
apiKey := viper.GetString("Dfuse.ApiKey")
if apiKey == "" {
log.Fatalf("Missing Dfuse.ApiKey in config")
Expand All @@ -74,7 +77,7 @@ func GetClient() *eosws.Client {
}

// GetActionTraces ...
func GetActionTraces() *eosws.GetActionTraces {
func getActionTraces() *eosws.GetActionTraces {
ga := &eosws.GetActionTraces{}
ga.ReqID = "chappe"
ga.StartBlock = -300
Expand Down
18 changes: 14 additions & 4 deletions pkg/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,20 @@ import (

// Message ...
type Message struct {
EncryptedPayload []byte
EncryptedAESKey []byte
ReadableMemos []string
Payload map[string][]byte
// EncryptedPayload []byte
// EncryptedAESKey []byte
// ReadableMemos []string
Payload map[string][]byte
}

// Bytes ...
func (m Message) Bytes() []byte {
jsonMessage, err := json.Marshal(m)
if err != nil {
log.Println("Cannot convert to message to bytes", err)
}

return []byte(jsonMessage)
}

// NewMessage - creates a new Message object, provisions the Payload map
Expand Down
Loading

0 comments on commit 43598f8

Please sign in to comment.