Skip to content

Commit

Permalink
scan v2
Browse files Browse the repository at this point in the history
  • Loading branch information
chris124567 committed Nov 14, 2024
1 parent 63e0063 commit 3b1061f
Show file tree
Hide file tree
Showing 5 changed files with 85 additions and 13 deletions.
2 changes: 1 addition & 1 deletion explorer/explorer.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ type Store interface {
SiafundElements(ids []types.SiafundOutputID) (result []SiafundOutput, err error)

Hosts(pks []types.PublicKey) ([]Host, error)
HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) ([]chain.HostAnnouncement, error)
HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) ([]Host, error)
}

// Explorer implements a Sia explorer.
Expand Down
60 changes: 54 additions & 6 deletions explorer/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

crhpv2 "go.sia.tech/core/rhp/v2"
"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
crhpv4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/explored/internal/geoip"
rhpv2 "go.sia.tech/explored/internal/rhp/v2"
rhpv3 "go.sia.tech/explored/internal/rhp/v3"
Expand Down Expand Up @@ -52,11 +52,12 @@ func (e *Explorer) waitForSync() error {
return nil
}

func (e *Explorer) scanHost(locator geoip.Locator, host chain.HostAnnouncement) (HostScan, error) {
func (e *Explorer) scanV1Host(locator geoip.Locator, host Host) (HostScan, error) {
ctx, cancel := context.WithTimeout(e.ctx, e.scanCfg.Timeout)
defer cancel()

dialer := (&net.Dialer{})

conn, err := dialer.DialContext(ctx, "tcp", host.NetAddress)
if err != nil {
return HostScan{}, fmt.Errorf("scanHost: failed to connect to host: %w", err)
Expand Down Expand Up @@ -113,7 +114,49 @@ func (e *Explorer) scanHost(locator geoip.Locator, host chain.HostAnnouncement)
}, nil
}

func (e *Explorer) addHostScans(hosts chan chain.HostAnnouncement) {
func (e *Explorer) scanV2Host(locator geoip.Locator, host Host) (HostScan, error) {
ctx, cancel := context.WithTimeout(e.ctx, e.scanCfg.Timeout)
defer cancel()

transport, err := crhpv4.DialSiaMux(ctx, host.V2NetAddresses[0].Address, host.PublicKey)
if err != nil {
return HostScan{}, fmt.Errorf("failed to dial host: %w", err)
}
defer transport.Close()

settings, err := crhpv4.RPCSettings(ctx, transport)
if err != nil {
return HostScan{}, fmt.Errorf("failed to get host settings: %w", err)
}

hostIP, _, err := net.SplitHostPort(host.NetAddress)
if err != nil {
return HostScan{}, fmt.Errorf("scanHost: failed to parse net address: %w", err)
}

resolved, err := net.ResolveIPAddr("ip", hostIP)
// if we can resolve the address
if err != nil {
return HostScan{}, fmt.Errorf("scanHost: failed to resolve host address: %w", err)
}

countryCode, err := locator.CountryCode(resolved)
if err != nil {
e.log.Debug("Failed to resolve IP geolocation, not setting country code", zap.String("addr", host.NetAddress))
countryCode = ""
}

return HostScan{
PublicKey: host.PublicKey,
CountryCode: countryCode,
Success: true,
Timestamp: types.CurrentTimestamp(),

RHPV4Settings: settings,
}, nil
}

func (e *Explorer) addHostScans(hosts chan Host) {
// use default included ip2location database
locator, err := geoip.NewIP2LocationLocator("")
if err != nil {
Expand All @@ -129,7 +172,12 @@ func (e *Explorer) addHostScans(hosts chan chain.HostAnnouncement) {
break
}

scan, err := e.scanHost(locator, host)
var scan HostScan
if len(host.V2NetAddresses) == 0 {
scan, err = e.scanV1Host(locator, host)
} else {
scan, err = e.scanV2Host(locator, host)
}
if err != nil {
scans = append(scans, HostScan{
PublicKey: host.PublicKey,
Expand Down Expand Up @@ -172,7 +220,7 @@ func (e *Explorer) isClosed() bool {
}
}

func (e *Explorer) fetchHosts(hosts chan chain.HostAnnouncement) {
func (e *Explorer) fetchHosts(hosts chan Host) {
var exhausted bool
offset := 0

Expand Down Expand Up @@ -210,7 +258,7 @@ func (e *Explorer) scanHosts() {

for !e.isClosed() {
// fetch hosts
hosts := make(chan chain.HostAnnouncement, scanBatchSize)
hosts := make(chan Host, scanBatchSize)
e.wg.Add(1)
go func() {
defer e.wg.Done()
Expand Down
2 changes: 0 additions & 2 deletions explorer/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,6 @@ type HostScan struct {
PriceTable rhpv3.HostPriceTable `json:"priceTable"`

RHPV4Settings rhpv4.HostSettings `json:"rhpV4Settings"`
RHPV4Prices rhpv4.HostPrices `json:"rhpV4Prices"`
}

// Host represents a host and the information gathered from scanning it.
Expand All @@ -313,7 +312,6 @@ type Host struct {
PriceTable rhpv3.HostPriceTable `json:"priceTable"`

RHPV4Settings rhpv4.HostSettings `json:"rhpV4Settings"`
RHPV4Prices rhpv4.HostPrices `json:"rhpV4Prices"`
}

// HostMetrics represents averages of scanned information from hosts.
Expand Down
30 changes: 28 additions & 2 deletions persist/sqlite/addresses.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,19 +134,45 @@ func (st *Store) Hosts(pks []types.PublicKey) (result []explorer.Host, err error
}

// HostsForScanning returns hosts ordered by the transaction they were created in.
func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) (result []chain.HostAnnouncement, err error) {
func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) (result []explorer.Host, err error) {
err = s.transaction(func(tx *txn) error {
rows, err := tx.Query(`SELECT public_key, net_address FROM host_info WHERE last_scan <= ? AND last_announcement >= ? ORDER BY last_scan ASC LIMIT ? OFFSET ?`, encode(maxLastScan), encode(minLastAnnouncement), limit, offset)
if err != nil {
return err
}
defer rows.Close()

v2AddrStmt, err := tx.Prepare(`SELECT protocol,address FROM host_info_v2_netaddresses WHERE public_key = ? ORDER BY netaddress_order`)
if err != nil {
return err
}
defer v2AddrStmt.Close()

for rows.Next() {
var host chain.HostAnnouncement
var host explorer.Host
if err := rows.Scan(decode(&host.PublicKey), &host.NetAddress); err != nil {
return err
}

err := func() error {
v2AddrRows, err := v2AddrStmt.Query(encode(host.PublicKey))
if err != nil {
return err
}
defer v2AddrRows.Close()
for v2AddrRows.Next() {
var netAddr chain.NetAddress
if err := v2AddrRows.Scan(&netAddr.Protocol, &netAddr.Address); err != nil {
return err
}
host.V2NetAddresses = append(host.V2NetAddresses, netAddr)
}
return nil
}()
if err != nil {
return err
}

result = append(result, host)
}
return nil
Expand Down
4 changes: 2 additions & 2 deletions persist/sqlite/consensus.go
Original file line number Diff line number Diff line change
Expand Up @@ -1140,7 +1140,7 @@ func addHosts(tx *txn, scans []explorer.Host) error {

for _, scan := range scans {
s, p := scan.Settings, scan.PriceTable
sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Prices
sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Settings.Prices
if _, err := stmt.Exec(encode(scan.PublicKey), scan.NetAddress, scan.CountryCode, encode(scan.KnownSince), encode(scan.LastScan), scan.LastScanSuccessful, encode(scan.LastAnnouncement), scan.TotalScans, scan.SuccessfulInteractions, scan.FailedInteractions, s.AcceptingContracts, encode(s.MaxDownloadBatchSize), encode(s.MaxDuration), encode(s.MaxReviseBatchSize), s.NetAddress, encode(s.RemainingStorage), encode(s.SectorSize), encode(s.TotalStorage), encode(s.Address), encode(s.WindowSize), encode(s.Collateral), encode(s.MaxCollateral), encode(s.BaseRPCPrice), encode(s.ContractPrice), encode(s.DownloadBandwidthPrice), encode(s.SectorAccessPrice), encode(s.StoragePrice), encode(s.UploadBandwidthPrice), s.EphemeralAccountExpiry, encode(s.MaxEphemeralAccountBalance), encode(s.RevisionNumber), s.Version, s.Release, s.SiaMuxPort, encode(p.UID), p.Validity, encode(p.HostBlockHeight), encode(p.UpdatePriceTableCost), encode(p.AccountBalanceCost), encode(p.FundAccountCost), encode(p.LatestRevisionCost), encode(p.SubscriptionMemoryCost), encode(p.SubscriptionNotificationCost), encode(p.InitBaseCost), encode(p.MemoryTimeCost), encode(p.DownloadBandwidthCost), encode(p.UploadBandwidthCost), encode(p.DropSectorsBaseCost), encode(p.DropSectorsUnitCost), encode(p.HasSectorBaseCost), encode(p.ReadBaseCost), encode(p.ReadLengthCost), encode(p.RenewContractCost), encode(p.RevisionBaseCost), encode(p.SwapSectorBaseCost), encode(p.WriteBaseCost), encode(p.WriteLengthCost), encode(p.WriteStoreCost), encode(p.TxnFeeMinRecommended), encode(p.TxnFeeMaxRecommended), encode(p.ContractPrice), encode(p.CollateralCost), encode(p.MaxCollateral), encode(p.MaxDuration), encode(p.WindowSize), encode(p.RegistryEntriesLeft), encode(p.RegistryEntriesTotal), sV4.ProtocolVersion[:], sV4.Release, encode(sV4.WalletAddress), sV4.AcceptingContracts, encode(sV4.MaxCollateral), encode(sV4.MaxContractDuration), encode(sV4.MaxSectorDuration), encode(sV4.MaxSectorBatchSize), encode(sV4.RemainingStorage), encode(sV4.TotalStorage), encode(pV4.ContractPrice), encode(pV4.Collateral), encode(pV4.StoragePrice), encode(pV4.IngressPrice), encode(pV4.EgressPrice), encode(pV4.FreeSectorPrice), encode(pV4.TipHeight), encode(pV4.ValidUntil), encode(pV4.Signature)); err != nil {
return fmt.Errorf("failed to execute host_info stmt: %w", err)
}
Expand Down Expand Up @@ -1170,7 +1170,7 @@ func addHostScans(tx *txn, scans []explorer.HostScan) error {
}

s, p := scan.Settings, scan.PriceTable
sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Prices
sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Settings.Prices
if _, err := stmt.Exec(scan.CountryCode, encode(scan.Timestamp), scan.Success, successful, failed, s.AcceptingContracts, encode(s.MaxDownloadBatchSize), encode(s.MaxDuration), encode(s.MaxReviseBatchSize), s.NetAddress, encode(s.RemainingStorage), encode(s.SectorSize), encode(s.TotalStorage), encode(s.Address), encode(s.WindowSize), encode(s.Collateral), encode(s.MaxCollateral), encode(s.BaseRPCPrice), encode(s.ContractPrice), encode(s.DownloadBandwidthPrice), encode(s.SectorAccessPrice), encode(s.StoragePrice), encode(s.UploadBandwidthPrice), s.EphemeralAccountExpiry, encode(s.MaxEphemeralAccountBalance), encode(s.RevisionNumber), s.Version, s.Release, s.SiaMuxPort, encode(p.UID), p.Validity, encode(p.HostBlockHeight), encode(p.UpdatePriceTableCost), encode(p.AccountBalanceCost), encode(p.FundAccountCost), encode(p.LatestRevisionCost), encode(p.SubscriptionMemoryCost), encode(p.SubscriptionNotificationCost), encode(p.InitBaseCost), encode(p.MemoryTimeCost), encode(p.DownloadBandwidthCost), encode(p.UploadBandwidthCost), encode(p.DropSectorsBaseCost), encode(p.DropSectorsUnitCost), encode(p.HasSectorBaseCost), encode(p.ReadBaseCost), encode(p.ReadLengthCost), encode(p.RenewContractCost), encode(p.RevisionBaseCost), encode(p.SwapSectorBaseCost), encode(p.WriteBaseCost), encode(p.WriteLengthCost), encode(p.WriteStoreCost), encode(p.TxnFeeMinRecommended), encode(p.TxnFeeMaxRecommended), encode(p.ContractPrice), encode(p.CollateralCost), encode(p.MaxCollateral), encode(p.MaxDuration), encode(p.WindowSize), encode(p.RegistryEntriesLeft), encode(p.RegistryEntriesTotal), sV4.ProtocolVersion[:], sV4.Release, encode(sV4.WalletAddress), sV4.AcceptingContracts, encode(sV4.MaxCollateral), encode(sV4.MaxContractDuration), encode(sV4.MaxSectorDuration), encode(sV4.MaxSectorBatchSize), encode(sV4.RemainingStorage), encode(sV4.TotalStorage), encode(pV4.ContractPrice), encode(pV4.Collateral), encode(pV4.StoragePrice), encode(pV4.IngressPrice), encode(pV4.EgressPrice), encode(pV4.FreeSectorPrice), encode(pV4.TipHeight), encode(pV4.ValidUntil), encode(pV4.Signature), encode(scan.PublicKey)); err != nil {
return err
}
Expand Down

0 comments on commit 3b1061f

Please sign in to comment.