diff --git a/config.example.json b/config.example.json index 10ec87faf..f4f254df5 100644 --- a/config.example.json +++ b/config.example.json @@ -15,6 +15,7 @@ "hashrateExpiration": "3h", "healthCheck": true, + "debug": false, "maxFails": 100, "stratum": { diff --git a/proxy/config.go b/proxy/config.go index d7fab37ba..2840215dc 100644 --- a/proxy/config.go +++ b/proxy/config.go @@ -43,6 +43,7 @@ type Proxy struct { MaxFails int64 `json:"maxFails"` HealthCheck bool `json:"healthCheck"` + Debug bool `json:"debug"` Stratum Stratum `json:"stratum"` } diff --git a/proxy/handlers.go b/proxy/handlers.go index 6f1fa902c..74596cbb1 100644 --- a/proxy/handlers.go +++ b/proxy/handlers.go @@ -38,7 +38,7 @@ func (s *ProxyServer) handleGetWorkRPC(cs *Session) ([]string, *ErrorReply) { if t == nil || len(t.Header) == 0 || s.isSick() { return nil, &ErrorReply{Code: 0, Message: "Work not ready"} } - return []string{t.Header, t.Seed, s.diff}, nil + return []string{t.Header, t.Seed, s.diff, util.ToHex(int64(t.Height))}, nil } // Stratum @@ -63,18 +63,31 @@ func (s *ProxyServer) handleSubmitRPC(cs *Session, login, id string, params []st return false, &ErrorReply{Code: -1, Message: "Invalid params"} } + stratumMode := cs.stratumMode() + if stratumMode == NiceHash { + for i := 0; i <= 2; i++ { + if params[i][0:2] != "0x" { + params[i] = "0x" + params[i] + } + } + } + if !noncePattern.MatchString(params[0]) || !hashPattern.MatchString(params[1]) || !hashPattern.MatchString(params[2]) { s.policy.ApplyMalformedPolicy(cs.ip) log.Printf("Malformed PoW result from %s@%s %v", login, cs.ip, params) return false, &ErrorReply{Code: -1, Message: "Malformed PoW result"} } t := s.currentBlockTemplate() - exist, validShare := s.processShare(login, id, cs.ip, t, params) + exist, validShare := s.processShare(login, id, cs.ip, t, params, stratumMode != EthProxy) ok := s.policy.ApplySharePolicy(cs.ip, !exist && validShare) if exist { log.Printf("Duplicate share from %s@%s %v", login, cs.ip, params) - return false, &ErrorReply{Code: 22, Message: "Duplicate share"} + // see https://github.com/sammy007/open-ethereum-pool/compare/master...nicehashdev:patch-1 + if !ok { + return false, &ErrorReply{Code: 23, Message: "Invalid share"} + } + return false, nil } if !validShare { @@ -85,7 +98,9 @@ func (s *ProxyServer) handleSubmitRPC(cs *Session, login, id string, params []st } return false, nil } - log.Printf("Valid share from %s@%s", login, cs.ip) + if s.config.Proxy.Debug { + log.Printf("Valid share from %s@%s", login, cs.ip) + } if !ok { return true, &ErrorReply{Code: -1, Message: "High rate of invalid shares"} diff --git a/proxy/miner.go b/proxy/miner.go index 8d312f9d8..39e83b5f3 100644 --- a/proxy/miner.go +++ b/proxy/miner.go @@ -6,46 +6,74 @@ import ( "strconv" "strings" - "github.com/ethereum/ethash" "github.com/ethereum/go-ethereum/common" + "github.com/hackmod/ethereum-ethash" + + "github.com/sammy007/open-ethereum-pool/util" ) var hasher = ethash.New() -func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, params []string) (bool, bool) { +var ( + maxUint256 = new(big.Int).Exp(big.NewInt(2), big.NewInt(256), big.NewInt(0)) +) + +func (s *ProxyServer) processShare(login, id, ip string, t *BlockTemplate, params []string, stratum bool) (bool, bool) { nonceHex := params[0] hashNoNonce := params[1] mixDigest := params[2] nonce, _ := strconv.ParseUint(strings.Replace(nonceHex, "0x", "", -1), 16, 64) shareDiff := s.config.Proxy.Difficulty + var result common.Hash + if stratum { + hashNoNonceTmp := common.HexToHash(params[2]) + + _, mixDigestTmp, hashTmp := hasher.Compute(t.Height, hashNoNonceTmp, nonce) + params[1] = hashNoNonceTmp.Hex() + params[2] = mixDigestTmp.Hex() + hashNoNonce = params[1] + result = hashTmp + } else { + hashNoNonceTmp := common.HexToHash(hashNoNonce) + _, mixDigestTmp, hashTmp := hasher.Compute(t.Height, hashNoNonceTmp, nonce) + + // check mixDigest + if (mixDigestTmp.Hex() != mixDigest) { + return false, false + } + result = hashTmp + } + + // Block "difficulty" is BigInt + // NiceHash "difficulty" is float64 ... + // diffFloat => target; then: diffInt = 2^256 / target + shareDiffCalc := util.TargetHexToDiff(result.Hex()).Int64() + shareDiffFloat := util.DiffIntToFloat(shareDiffCalc) + if shareDiffFloat < 0.0001 { + log.Printf("share difficulty too low, %f < %d, from %v@%v", shareDiffFloat, t.Difficulty, login, ip) + return false, false + } + h, ok := t.headers[hashNoNonce] if !ok { log.Printf("Stale share from %v@%v", login, ip) return false, false } - share := Block{ - number: h.height, - hashNoNonce: common.HexToHash(hashNoNonce), - difficulty: big.NewInt(shareDiff), - nonce: nonce, - mixDigest: common.HexToHash(mixDigest), - } - - block := Block{ - number: h.height, - hashNoNonce: common.HexToHash(hashNoNonce), - difficulty: h.diff, - nonce: nonce, - mixDigest: common.HexToHash(mixDigest), + if s.config.Proxy.Debug { + log.Printf("Difficulty pool/block/share = %d / %d / %d(%f) from %v@%v", shareDiff, t.Difficulty, shareDiffCalc, shareDiffFloat, login, ip) } - if !hasher.Verify(share) { + // check share difficulty + shareTarget := new(big.Int).Div(maxUint256, big.NewInt(shareDiff)) + if (result.Big().Cmp(shareTarget) > 0) { return false, false } - if hasher.Verify(block) { + // check target difficulty + target := new(big.Int).Div(maxUint256, big.NewInt(h.diff.Int64())) + if result.Big().Cmp(target) <= 0 { ok, err := s.rpc().SubmitBlock(params) if err != nil { log.Printf("Block submission failure at height %v for %v: %v", h.height, t.Header, err) diff --git a/proxy/proto.go b/proxy/proto.go index 16de94340..dab1b2a55 100644 --- a/proxy/proto.go +++ b/proxy/proto.go @@ -8,6 +8,12 @@ type JSONRpcReq struct { Params json.RawMessage `json:"params"` } +type JSONStratumReq struct { + Method string `json:"method"` + Params interface{} `json:"params"` + Id interface{} `json:"id"` +} + type StratumReq struct { JSONRpcReq Worker string `json:"worker"` @@ -23,9 +29,9 @@ type JSONPushMessage struct { type JSONRpcResp struct { Id json.RawMessage `json:"id"` - Version string `json:"jsonrpc"` + Version string `json:"jsonrpc,omitempty"` Result interface{} `json:"result"` - Error interface{} `json:"error,omitempty"` + Error interface{} `json:"error"` } type SubmitReply struct { diff --git a/proxy/proxy.go b/proxy/proxy.go index 7135f2ab7..5cc591991 100644 --- a/proxy/proxy.go +++ b/proxy/proxy.go @@ -34,6 +34,20 @@ type ProxyServer struct { sessionsMu sync.RWMutex sessions map[*Session]struct{} timeout time.Duration + // Extranonce + Extranonces map[string]bool +} + +type jobDetails struct { + JobID string + SeedHash string + HeaderHash string + Height string +} + +type staleJob struct { + SeedHash string + HeaderHash string } type Session struct { @@ -44,6 +58,14 @@ type Session struct { sync.Mutex conn net.Conn login string + + stratum int + subscriptionID string + Extranonce string + ExtranonceSub bool + JobDetails jobDetails + staleJobs map[string]staleJob + staleJobIDs []string } func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer { @@ -64,6 +86,7 @@ func NewProxy(cfg *Config, backend *storage.RedisClient) *ProxyServer { if cfg.Proxy.Stratum.Enabled { proxy.sessions = make(map[*Session]struct{}) + proxy.Extranonces = make(map[string]bool) go proxy.ListenTCP() } diff --git a/proxy/stratum.go b/proxy/stratum.go index 837928937..fda127c14 100644 --- a/proxy/stratum.go +++ b/proxy/stratum.go @@ -7,7 +7,10 @@ import ( "errors" "io" "log" + "math/rand" "net" + "strconv" + "strings" "time" "github.com/sammy007/open-ethereum-pool/util" @@ -17,6 +20,11 @@ const ( MaxReqSize = 1024 ) +const ( + EthProxy int = iota + NiceHash +) + func (s *ProxyServer) ListenTCP() { s.timeout = util.MustParseDuration(s.config.Proxy.Stratum.Timeout) @@ -54,7 +62,11 @@ func (s *ProxyServer) ListenTCP() { continue } n += 1 - cs := &Session{conn: conn, ip: ip} + // make unique extranonce + extranonce := s.uniqExtranonce() + cs := &Session{conn: conn, ip: ip, Extranonce: extranonce, ExtranonceSub: false, stratum: -1} + // allocate stales cache + cs.staleJobs = make(map[string]staleJob) accept <- n go func(cs *Session) { @@ -106,13 +118,29 @@ func (s *ProxyServer) handleTCPClient(cs *Session) error { return nil } +func (cs *Session) setStratumMode(str string) error { + switch str { + case "EthereumStratum/1.0.0": + cs.stratum = NiceHash + break + default: + cs.stratum = EthProxy + break + } + return nil +} + +func (cs *Session) stratumMode() int { + return cs.stratum +} + func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { - // Handle RPC methods + // Handle RPC/Stratum methods switch req.Method { case "eth_submitLogin": var params []string err := json.Unmarshal(req.Params, ¶ms) - if err != nil { + if err != nil || len(params) < 1 { log.Println("Malformed stratum request params from", cs.ip) return err } @@ -120,7 +148,173 @@ func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { if errReply != nil { return cs.sendTCPError(req.Id, errReply) } + cs.setStratumMode("EthProxy") + log.Println("EthProxy login", cs.ip) return cs.sendTCPResult(req.Id, reply) + + case "mining.subscribe": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil || len(params) < 2 { + log.Println("Malformed stratum request params from", cs.ip) + return err + } + + if params[1] != "EthereumStratum/1.0.0" { + log.Println("Unsupported stratum version from ", cs.ip) + return cs.sendStratumError(req.Id, "unsupported stratum version") + } + + cs.setStratumMode("EthereumStratum/1.0.0") + log.Println("Nicehash subscribe", cs.ip) + result := cs.getNotificationResponse(s) + return cs.sendStratumResult(req.Id, result) + + default: + switch cs.stratumMode() { + case 0: + break + case 1: + break + default: + errReply := s.handleUnknownRPC(cs, req.Method) + return cs.sendTCPError(req.Id, errReply) + } + } + + if cs.stratumMode() == NiceHash { + switch req.Method { + case "mining.authorize": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil || len(params) < 1 { + return errors.New("invalid params") + } + splitData := strings.Split(params[0], ".") + params[0] = splitData[0] + reply, errReply := s.handleLoginRPC(cs, params, req.Worker) + if errReply != nil { + return cs.sendStratumError(req.Id, []string{ + string(errReply.Code), + errReply.Message, + }) + } + + if err := cs.sendStratumResult(req.Id, reply); err != nil { + return err + } + + paramsDiff := []float64{ + util.DiffIntToFloat(s.config.Proxy.Difficulty), + } + respReq := JSONStratumReq{Method: "mining.set_difficulty", Params: paramsDiff} + if err := cs.sendTCPReq(respReq); err != nil { + return err + } + + return cs.sendJob(s, req.Id, true) + + case "mining.extranonce.subscribe": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil { + return errors.New("invalid params") + } + if len(params) == 0 { + if err := cs.sendStratumResult(req.Id, true); err != nil { + return err + } + cs.ExtranonceSub = true + req := JSONStratumReq{ + Id: nil, + Method: "mining.set_extranonce", + Params: []interface{}{ + cs.Extranonce, + }, + } + return cs.sendTCPReq(req) + } + return cs.sendStratumError(req.Id, []string{ + "20", + "Not supported.", + }) + case "mining.submit": + var params []string + err := json.Unmarshal(req.Params, ¶ms) + if err != nil || len(params) < 3 { + log.Println("mining.submit: json.Unmarshal fail") + return err + } + + // params[0] = Username + // params[1] = Job ID + // params[2] = Minernonce + // Reference: + // https://github.com/nicehash/nhethpool/blob/060817a9e646cd9f1092647b870ed625ee138ab4/nhethpool/EthereumInstance.cs#L369 + + // WORKER NAME MANDATORY 0x1234.WORKERNAME + splitData := strings.Split(params[0], ".") + id := "0" + if len(splitData) > 1 { + id = splitData[1] + } + + // check Extranonce subscription. + extranonce := cs.Extranonce + if !cs.ExtranonceSub { extranonce = "" } + nonce := extranonce + params[2] + + if cs.JobDetails.JobID != params[1] { + stale, ok := cs.staleJobs[params[1]] + if ok { + log.Printf("Cached stale JobID %s", params[1]) + params = []string{ + nonce, + stale.SeedHash, + stale.HeaderHash, + } + } else { + log.Printf("Stale share (mining.submit JobID received %s != current %s)", params[1], cs.JobDetails.JobID) + if err := cs.sendStratumError(req.Id, []string{"21", "Stale share."}); err != nil { + return err + } + return cs.sendJob(s, req.Id, false) + } + } else { + params = []string{ + nonce, + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + } + } + + reply, errReply := s.handleTCPSubmitRPC(cs, id, params) + if errReply != nil { + log.Println("mining.submit: handleTCPSubmitRPC failed") + return cs.sendStratumError(req.Id, []string{ + strconv.Itoa(errReply.Code), + errReply.Message, + }) + } + + // TEST, ein notify zu viel + //if err := cs.sendTCPResult(resp); err != nil { + // return err + //} + + //return cs.sendJob(s, req.Id) + return cs.sendStratumResult(req.Id, reply) + + default: + errReply := s.handleUnknownRPC(cs, req.Method) + return cs.sendStratumError(req.Id, []string{ + strconv.Itoa(errReply.Code), + errReply.Message, + }) + } + } + + switch req.Method { case "eth_getWork": reply, errReply := s.handleGetWorkRPC(cs) if errReply != nil { @@ -130,7 +324,7 @@ func (cs *Session) handleTCPMessage(s *ProxyServer, req *StratumReq) error { case "eth_submitWork": var params []string err := json.Unmarshal(req.Params, ¶ms) - if err != nil { + if err != nil || len(params) < 3 { log.Println("Malformed stratum request params from", cs.ip) return err } @@ -155,9 +349,66 @@ func (cs *Session) sendTCPResult(id json.RawMessage, result interface{}) error { return cs.enc.Encode(&message) } -func (cs *Session) pushNewJob(result interface{}) error { +// cache stale jobs +func (cs *Session) cacheStales(max, n int) { + l := len(cs.staleJobIDs) + // remove outdated stales except last n caches if l > max + if l > max { + save := cs.staleJobIDs[l-n : l] + del := cs.staleJobIDs[0 : l-n] + for _, v := range del { + delete(cs.staleJobs, v) + } + cs.staleJobIDs = save + } + // save stales cache + cs.staleJobs[cs.JobDetails.JobID] = staleJob{ + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + } + cs.staleJobIDs = append(cs.staleJobIDs, cs.JobDetails.JobID) +} + +func (cs *Session) pushNewJob(s *ProxyServer, result interface{}) error { cs.Lock() defer cs.Unlock() + + if cs.stratumMode() == NiceHash { + cs.cacheStales(10, 3) + + t := result.(*[]string) + cs.JobDetails = jobDetails{ + JobID: randomHex(8), + SeedHash: (*t)[1], + HeaderHash: (*t)[0], + Height: (*t)[3], + } + + // strip 0x prefix + if cs.JobDetails.SeedHash[0:2] == "0x" { + cs.JobDetails.SeedHash = cs.JobDetails.SeedHash[2:] + cs.JobDetails.HeaderHash = cs.JobDetails.HeaderHash[2:] + } + + resp := JSONStratumReq{ + Method: "mining.notify", + Params: []interface{}{ + cs.JobDetails.JobID, + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + // If set to true, then miner needs to clear queue of jobs and immediatelly + // start working on new provided job, because all old jobs shares will + // result with stale share error. + // + // if true, NiceHash charges "Extra Rewards" for frequent job changes + // if false, the stale rate might be higher because miners take too long to switch jobs + // + // It's undetermined what's more cost-effective + true, + }, + } + return cs.enc.Encode(&resp) + } // FIXME: Temporarily add ID for Claymore compliance message := JSONPushMessage{Version: "2.0", Result: result, Id: 0} return cs.enc.Encode(&message) @@ -175,6 +426,30 @@ func (cs *Session) sendTCPError(id json.RawMessage, reply *ErrorReply) error { return errors.New(reply.Message) } +func (cs *Session) sendStratumResult(id json.RawMessage, result interface{}) error { + cs.Lock() + defer cs.Unlock() + + resp := JSONRpcResp{Id: id, Error: nil, Result: result} + return cs.enc.Encode(&resp) +} + +func (cs *Session) sendStratumError(id json.RawMessage, message interface{}) error { + cs.Lock() + defer cs.Unlock() + + resp := JSONRpcResp{Id: id, Error: message} + + return cs.enc.Encode(&resp) +} + +func (cs *Session) sendTCPReq(resp JSONStratumReq) error { + cs.Lock() + defer cs.Unlock() + + return cs.enc.Encode(&resp) +} + func (self *ProxyServer) setDeadline(conn net.Conn) { conn.SetDeadline(time.Now().Add(self.timeout)) } @@ -188,15 +463,55 @@ func (s *ProxyServer) registerSession(cs *Session) { func (s *ProxyServer) removeSession(cs *Session) { s.sessionsMu.Lock() defer s.sessionsMu.Unlock() + delete(s.Extranonces, cs.Extranonce) delete(s.sessions, cs) } +// nicehash +func (cs *Session) sendJob(s *ProxyServer, id json.RawMessage, newjob bool) error { + if newjob { + reply, errReply := s.handleGetWorkRPC(cs) + if errReply != nil { + return cs.sendStratumError(id, []string{ + string(errReply.Code), + errReply.Message, + }) + } + + cs.JobDetails = jobDetails{ + JobID: randomHex(8), + SeedHash: reply[1], + HeaderHash: reply[0], + Height: reply[3], + } + + // The NiceHash official .NET pool omits 0x... + // TO DO: clean up once everything works + if cs.JobDetails.SeedHash[0:2] == "0x" { + cs.JobDetails.SeedHash = cs.JobDetails.SeedHash[2:] + cs.JobDetails.HeaderHash = cs.JobDetails.HeaderHash[2:] + } + } + + resp := JSONStratumReq{ + Method: "mining.notify", + Params: []interface{}{ + cs.JobDetails.JobID, + cs.JobDetails.SeedHash, + cs.JobDetails.HeaderHash, + true, + }, + } + + return cs.sendTCPReq(resp) +} + func (s *ProxyServer) broadcastNewJobs() { t := s.currentBlockTemplate() if t == nil || len(t.Header) == 0 || s.isSick() { return } - reply := []string{t.Header, t.Seed, s.diff} + reply := []string{t.Header, t.Seed, s.diff, util.ToHex(int64(t.Height))} s.sessionsMu.RLock() defer s.sessionsMu.RUnlock() @@ -213,7 +528,7 @@ func (s *ProxyServer) broadcastNewJobs() { bcast <- n go func(cs *Session) { - err := cs.pushNewJob(&reply) + err := cs.pushNewJob(s, &reply) <-bcast if err != nil { log.Printf("Job transmit error to %v@%v: %v", cs.login, cs.ip, err) @@ -225,3 +540,37 @@ func (s *ProxyServer) broadcastNewJobs() { } log.Printf("Jobs broadcast finished %s", time.Since(start)) } + +func (s *ProxyServer) uniqExtranonce() string { + s.sessionsMu.RLock() + defer s.sessionsMu.RUnlock() + + extranonce := randomHex(4) + for { + if _, ok := s.Extranonces[extranonce]; ok { + extranonce = randomHex(4) + } else { + break + } + } + s.Extranonces[extranonce] = true + return extranonce +} + +func randomHex(strlen int) string { + rand.Seed(time.Now().UTC().UnixNano()) + const chars = "0123456789abcdef" + result := make([]byte, strlen) + for i := 0; i < strlen; i++ { + result[i] = chars[rand.Intn(len(chars))] + } + return string(result) +} + +func (cs *Session) getNotificationResponse(s *ProxyServer) interface{} { + result := make([]interface{}, 2) + result[0] = []string{"mining.notify", randomHex(16), "EthereumStratum/1.0.0"} + result[1] = cs.Extranonce + + return result +} diff --git a/util/util.go b/util/util.go index b4219c52e..df282f508 100644 --- a/util/util.go +++ b/util/util.go @@ -79,3 +79,13 @@ func String2Big(num string) *big.Int { n.SetString(num, 0) return n } + +func DiffFloatToInt(diffFloat float64) (diffInt int64) { + diffInt = int64(diffFloat * float64(1<<48) / float64(0xffff)) // 48 = 256 - 26*8 + return +} + +func DiffIntToFloat(diffInt int64) (diffFloat float64) { + diffFloat = float64(diffInt*0xffff) / float64(1<<48) // 48 = 256 - 26*8 + return +}