diff --git a/explorer/explorer.go b/explorer/explorer.go index 0667dc29..672d2b73 100644 --- a/explorer/explorer.go +++ b/explorer/explorer.go @@ -67,7 +67,7 @@ type Store interface { SiafundElements(ids []types.SiafundOutputID) (result []SiafundOutput, err error) Hosts(pks []types.PublicKey) ([]Host, error) - HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) ([]chain.HostAnnouncement, error) + HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) ([]Host, error) } // Explorer implements a Sia explorer. diff --git a/explorer/scan.go b/explorer/scan.go index 5fe4b8f9..64a9aec8 100644 --- a/explorer/scan.go +++ b/explorer/scan.go @@ -9,7 +9,7 @@ import ( crhpv2 "go.sia.tech/core/rhp/v2" "go.sia.tech/core/types" - "go.sia.tech/coreutils/chain" + crhpv4 "go.sia.tech/coreutils/rhp/v4" "go.sia.tech/explored/internal/geoip" rhpv2 "go.sia.tech/explored/internal/rhp/v2" rhpv3 "go.sia.tech/explored/internal/rhp/v3" @@ -52,11 +52,12 @@ func (e *Explorer) waitForSync() error { return nil } -func (e *Explorer) scanHost(locator geoip.Locator, host chain.HostAnnouncement) (HostScan, error) { +func (e *Explorer) scanV1Host(locator geoip.Locator, host Host) (HostScan, error) { ctx, cancel := context.WithTimeout(e.ctx, e.scanCfg.Timeout) defer cancel() dialer := (&net.Dialer{}) + conn, err := dialer.DialContext(ctx, "tcp", host.NetAddress) if err != nil { return HostScan{}, fmt.Errorf("scanHost: failed to connect to host: %w", err) @@ -113,7 +114,49 @@ func (e *Explorer) scanHost(locator geoip.Locator, host chain.HostAnnouncement) }, nil } -func (e *Explorer) addHostScans(hosts chan chain.HostAnnouncement) { +func (e *Explorer) scanV2Host(locator geoip.Locator, host Host) (HostScan, error) { + ctx, cancel := context.WithTimeout(e.ctx, e.scanCfg.Timeout) + defer cancel() + + transport, err := crhpv4.DialSiaMux(ctx, host.V2NetAddresses[0].Address, host.PublicKey) + if err != nil { + return HostScan{}, fmt.Errorf("failed to dial host: %w", err) + } + defer transport.Close() + + settings, err := crhpv4.RPCSettings(ctx, transport) + if err != nil { + return HostScan{}, fmt.Errorf("failed to get host settings: %w", err) + } + + hostIP, _, err := net.SplitHostPort(host.NetAddress) + if err != nil { + return HostScan{}, fmt.Errorf("scanHost: failed to parse net address: %w", err) + } + + resolved, err := net.ResolveIPAddr("ip", hostIP) + // if we can resolve the address + if err != nil { + return HostScan{}, fmt.Errorf("scanHost: failed to resolve host address: %w", err) + } + + countryCode, err := locator.CountryCode(resolved) + if err != nil { + e.log.Debug("Failed to resolve IP geolocation, not setting country code", zap.String("addr", host.NetAddress)) + countryCode = "" + } + + return HostScan{ + PublicKey: host.PublicKey, + CountryCode: countryCode, + Success: true, + Timestamp: types.CurrentTimestamp(), + + RHPV4Settings: settings, + }, nil +} + +func (e *Explorer) addHostScans(hosts chan Host) { // use default included ip2location database locator, err := geoip.NewIP2LocationLocator("") if err != nil { @@ -129,7 +172,12 @@ func (e *Explorer) addHostScans(hosts chan chain.HostAnnouncement) { break } - scan, err := e.scanHost(locator, host) + var scan HostScan + if len(host.V2NetAddresses) == 0 { + scan, err = e.scanV1Host(locator, host) + } else { + scan, err = e.scanV2Host(locator, host) + } if err != nil { scans = append(scans, HostScan{ PublicKey: host.PublicKey, @@ -172,7 +220,7 @@ func (e *Explorer) isClosed() bool { } } -func (e *Explorer) fetchHosts(hosts chan chain.HostAnnouncement) { +func (e *Explorer) fetchHosts(hosts chan Host) { var exhausted bool offset := 0 @@ -210,7 +258,7 @@ func (e *Explorer) scanHosts() { for !e.isClosed() { // fetch hosts - hosts := make(chan chain.HostAnnouncement, scanBatchSize) + hosts := make(chan Host, scanBatchSize) e.wg.Add(1) go func() { defer e.wg.Done() diff --git a/explorer/types.go b/explorer/types.go index 2a59d18d..676c1645 100644 --- a/explorer/types.go +++ b/explorer/types.go @@ -290,7 +290,6 @@ type HostScan struct { PriceTable rhpv3.HostPriceTable `json:"priceTable"` RHPV4Settings rhpv4.HostSettings `json:"rhpV4Settings"` - RHPV4Prices rhpv4.HostPrices `json:"rhpV4Prices"` } // Host represents a host and the information gathered from scanning it. @@ -313,7 +312,6 @@ type Host struct { PriceTable rhpv3.HostPriceTable `json:"priceTable"` RHPV4Settings rhpv4.HostSettings `json:"rhpV4Settings"` - RHPV4Prices rhpv4.HostPrices `json:"rhpV4Prices"` } // HostMetrics represents averages of scanned information from hosts. diff --git a/persist/sqlite/addresses.go b/persist/sqlite/addresses.go index c7fd9a58..26b3a8be 100644 --- a/persist/sqlite/addresses.go +++ b/persist/sqlite/addresses.go @@ -134,7 +134,7 @@ func (st *Store) Hosts(pks []types.PublicKey) (result []explorer.Host, err error } // HostsForScanning returns hosts ordered by the transaction they were created in. -func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) (result []chain.HostAnnouncement, err error) { +func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, offset, limit uint64) (result []explorer.Host, err error) { err = s.transaction(func(tx *txn) error { rows, err := tx.Query(`SELECT public_key, net_address FROM host_info WHERE last_scan <= ? AND last_announcement >= ? ORDER BY last_scan ASC LIMIT ? OFFSET ?`, encode(maxLastScan), encode(minLastAnnouncement), limit, offset) if err != nil { @@ -142,11 +142,37 @@ func (s *Store) HostsForScanning(maxLastScan, minLastAnnouncement time.Time, off } defer rows.Close() + v2AddrStmt, err := tx.Prepare(`SELECT protocol,address FROM host_info_v2_netaddresses WHERE public_key = ? ORDER BY netaddress_order`) + if err != nil { + return err + } + defer v2AddrStmt.Close() + for rows.Next() { - var host chain.HostAnnouncement + var host explorer.Host if err := rows.Scan(decode(&host.PublicKey), &host.NetAddress); err != nil { return err } + + err := func() error { + v2AddrRows, err := v2AddrStmt.Query(encode(host.PublicKey)) + if err != nil { + return err + } + defer v2AddrRows.Close() + for v2AddrRows.Next() { + var netAddr chain.NetAddress + if err := v2AddrRows.Scan(&netAddr.Protocol, &netAddr.Address); err != nil { + return err + } + host.V2NetAddresses = append(host.V2NetAddresses, netAddr) + } + return nil + }() + if err != nil { + return err + } + result = append(result, host) } return nil diff --git a/persist/sqlite/consensus.go b/persist/sqlite/consensus.go index 2b08b5b2..3b3576bc 100644 --- a/persist/sqlite/consensus.go +++ b/persist/sqlite/consensus.go @@ -1140,7 +1140,7 @@ func addHosts(tx *txn, scans []explorer.Host) error { for _, scan := range scans { s, p := scan.Settings, scan.PriceTable - sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Prices + sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Settings.Prices if _, err := stmt.Exec(encode(scan.PublicKey), scan.NetAddress, scan.CountryCode, encode(scan.KnownSince), encode(scan.LastScan), scan.LastScanSuccessful, encode(scan.LastAnnouncement), scan.TotalScans, scan.SuccessfulInteractions, scan.FailedInteractions, s.AcceptingContracts, encode(s.MaxDownloadBatchSize), encode(s.MaxDuration), encode(s.MaxReviseBatchSize), s.NetAddress, encode(s.RemainingStorage), encode(s.SectorSize), encode(s.TotalStorage), encode(s.Address), encode(s.WindowSize), encode(s.Collateral), encode(s.MaxCollateral), encode(s.BaseRPCPrice), encode(s.ContractPrice), encode(s.DownloadBandwidthPrice), encode(s.SectorAccessPrice), encode(s.StoragePrice), encode(s.UploadBandwidthPrice), s.EphemeralAccountExpiry, encode(s.MaxEphemeralAccountBalance), encode(s.RevisionNumber), s.Version, s.Release, s.SiaMuxPort, encode(p.UID), p.Validity, encode(p.HostBlockHeight), encode(p.UpdatePriceTableCost), encode(p.AccountBalanceCost), encode(p.FundAccountCost), encode(p.LatestRevisionCost), encode(p.SubscriptionMemoryCost), encode(p.SubscriptionNotificationCost), encode(p.InitBaseCost), encode(p.MemoryTimeCost), encode(p.DownloadBandwidthCost), encode(p.UploadBandwidthCost), encode(p.DropSectorsBaseCost), encode(p.DropSectorsUnitCost), encode(p.HasSectorBaseCost), encode(p.ReadBaseCost), encode(p.ReadLengthCost), encode(p.RenewContractCost), encode(p.RevisionBaseCost), encode(p.SwapSectorBaseCost), encode(p.WriteBaseCost), encode(p.WriteLengthCost), encode(p.WriteStoreCost), encode(p.TxnFeeMinRecommended), encode(p.TxnFeeMaxRecommended), encode(p.ContractPrice), encode(p.CollateralCost), encode(p.MaxCollateral), encode(p.MaxDuration), encode(p.WindowSize), encode(p.RegistryEntriesLeft), encode(p.RegistryEntriesTotal), sV4.ProtocolVersion[:], sV4.Release, encode(sV4.WalletAddress), sV4.AcceptingContracts, encode(sV4.MaxCollateral), encode(sV4.MaxContractDuration), encode(sV4.MaxSectorDuration), encode(sV4.MaxSectorBatchSize), encode(sV4.RemainingStorage), encode(sV4.TotalStorage), encode(pV4.ContractPrice), encode(pV4.Collateral), encode(pV4.StoragePrice), encode(pV4.IngressPrice), encode(pV4.EgressPrice), encode(pV4.FreeSectorPrice), encode(pV4.TipHeight), encode(pV4.ValidUntil), encode(pV4.Signature)); err != nil { return fmt.Errorf("failed to execute host_info stmt: %w", err) } @@ -1170,7 +1170,7 @@ func addHostScans(tx *txn, scans []explorer.HostScan) error { } s, p := scan.Settings, scan.PriceTable - sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Prices + sV4, pV4 := scan.RHPV4Settings, scan.RHPV4Settings.Prices if _, err := stmt.Exec(scan.CountryCode, encode(scan.Timestamp), scan.Success, successful, failed, s.AcceptingContracts, encode(s.MaxDownloadBatchSize), encode(s.MaxDuration), encode(s.MaxReviseBatchSize), s.NetAddress, encode(s.RemainingStorage), encode(s.SectorSize), encode(s.TotalStorage), encode(s.Address), encode(s.WindowSize), encode(s.Collateral), encode(s.MaxCollateral), encode(s.BaseRPCPrice), encode(s.ContractPrice), encode(s.DownloadBandwidthPrice), encode(s.SectorAccessPrice), encode(s.StoragePrice), encode(s.UploadBandwidthPrice), s.EphemeralAccountExpiry, encode(s.MaxEphemeralAccountBalance), encode(s.RevisionNumber), s.Version, s.Release, s.SiaMuxPort, encode(p.UID), p.Validity, encode(p.HostBlockHeight), encode(p.UpdatePriceTableCost), encode(p.AccountBalanceCost), encode(p.FundAccountCost), encode(p.LatestRevisionCost), encode(p.SubscriptionMemoryCost), encode(p.SubscriptionNotificationCost), encode(p.InitBaseCost), encode(p.MemoryTimeCost), encode(p.DownloadBandwidthCost), encode(p.UploadBandwidthCost), encode(p.DropSectorsBaseCost), encode(p.DropSectorsUnitCost), encode(p.HasSectorBaseCost), encode(p.ReadBaseCost), encode(p.ReadLengthCost), encode(p.RenewContractCost), encode(p.RevisionBaseCost), encode(p.SwapSectorBaseCost), encode(p.WriteBaseCost), encode(p.WriteLengthCost), encode(p.WriteStoreCost), encode(p.TxnFeeMinRecommended), encode(p.TxnFeeMaxRecommended), encode(p.ContractPrice), encode(p.CollateralCost), encode(p.MaxCollateral), encode(p.MaxDuration), encode(p.WindowSize), encode(p.RegistryEntriesLeft), encode(p.RegistryEntriesTotal), sV4.ProtocolVersion[:], sV4.Release, encode(sV4.WalletAddress), sV4.AcceptingContracts, encode(sV4.MaxCollateral), encode(sV4.MaxContractDuration), encode(sV4.MaxSectorDuration), encode(sV4.MaxSectorBatchSize), encode(sV4.RemainingStorage), encode(sV4.TotalStorage), encode(pV4.ContractPrice), encode(pV4.Collateral), encode(pV4.StoragePrice), encode(pV4.IngressPrice), encode(pV4.EgressPrice), encode(pV4.FreeSectorPrice), encode(pV4.TipHeight), encode(pV4.ValidUntil), encode(pV4.Signature), encode(scan.PublicKey)); err != nil { return err }