Skip to content

Commit

Permalink
sqlite: remove netaddress from hosts table and use announcement table…
Browse files Browse the repository at this point in the history
… instead
  • Loading branch information
ChrisSchinnerl committed Nov 13, 2024
1 parent 0a5c01f commit d6f7b67
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 21 deletions.
12 changes: 12 additions & 0 deletions internal/bus/chainsubscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"go.sia.tech/core/types"
"go.sia.tech/coreutils/chain"
rhp4 "go.sia.tech/coreutils/rhp/v4"
"go.sia.tech/coreutils/wallet"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/internal/utils"
Expand Down Expand Up @@ -164,6 +165,17 @@ func (s *chainSubscriber) applyChainUpdate(tx sql.ChainUpdateTx, cau chain.Apply
hus[ha.PublicKey] = ha
}
})
chain.ForEachV2HostAnnouncement(b, func(hk types.PublicKey, addrs []chain.NetAddress) {
for _, addr := range addrs {
switch addr.Protocol {
case rhp4.ProtocolTCPSiaMux:
// TODO: implement
default:
// any other protocol is not supported
}
}
})
// v1 announcements
for hk, ha := range hus {
if err := tx.UpdateHost(hk, ha, cau.State.Index.Height, b.ID(), b.Timestamp); err != nil {
return fmt.Errorf("failed to update host: %w", err)
Expand Down
39 changes: 34 additions & 5 deletions stores/sql/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -838,7 +838,7 @@ func Hosts(ctx context.Context, tx sql.Tx, opts api.HostOptions) ([]api.Host, er
}

rows, err = tx.Query(ctx, fmt.Sprintf(`
SELECT h.id, h.created_at, h.last_announcement, h.public_key, h.net_address, h.price_table, h.price_table_expiry,
SELECT h.id, h.created_at, h.last_announcement, h.public_key, h.price_table, h.price_table_expiry,
h.settings, h.total_scans, h.last_scan, h.last_scan_success, h.second_to_last_scan_success,
h.uptime, h.downtime, h.successful_interactions, h.failed_interactions, COALESCE(h.lost_sectors, 0),
h.scanned, h.resolved_addresses, %s
Expand All @@ -853,16 +853,17 @@ func Hosts(ctx context.Context, tx sql.Tx, opts api.HostOptions) ([]api.Host, er
defer rows.Close()

var hosts []api.Host
var hostIDs []int64
for rows.Next() {
var h api.Host
var hostID int64
var pte dsql.NullTime
var resolvedAddresses string
err := rows.Scan(&hostID, &h.KnownSince, &h.LastAnnouncement, (*PublicKey)(&h.PublicKey),
&h.NetAddress, (*PriceTable)(&h.PriceTable.HostPriceTable), &pte,
(*HostSettings)(&h.Settings), &h.Interactions.TotalScans, (*UnixTimeMS)(&h.Interactions.LastScan), &h.Interactions.LastScanSuccess,
&h.Interactions.SecondToLastScanSuccess, (*DurationMS)(&h.Interactions.Uptime), (*DurationMS)(&h.Interactions.Downtime),
&h.Interactions.SuccessfulInteractions, &h.Interactions.FailedInteractions, &h.Interactions.LostSectors,
(*PriceTable)(&h.PriceTable.HostPriceTable), &pte, (*HostSettings)(&h.Settings), &h.Interactions.TotalScans,
(*UnixTimeMS)(&h.Interactions.LastScan), &h.Interactions.LastScanSuccess, &h.Interactions.SecondToLastScanSuccess,
(*DurationMS)(&h.Interactions.Uptime), (*DurationMS)(&h.Interactions.Downtime), &h.Interactions.SuccessfulInteractions,
&h.Interactions.FailedInteractions, &h.Interactions.LostSectors,
&h.Scanned, &resolvedAddresses, &h.Blocked,
)
if err != nil {
Expand All @@ -879,6 +880,34 @@ func Hosts(ctx context.Context, tx sql.Tx, opts api.HostOptions) ([]api.Host, er
h.PriceTable.Expiry = pte.Time
h.StoredData = storedDataMap[h.PublicKey]
hosts = append(hosts, h)
hostIDs = append(hostIDs, hostID)
}

// populate net addresses
addrStmt, err := tx.Prepare(ctx, "SELECT ha.net_address FROM host_announcements ha INNER JOIN hosts h ON ha.db_host_id = h.id WHERE h.id = ?")
if err != nil {
return nil, fmt.Errorf("failed to prepare net address statement: %w", err)
}
defer addrStmt.Close()

populateNetAddr := func(h *api.Host, hostID int64) error {
rows, err := addrStmt.Query(ctx, hostID)
if err != nil {
return fmt.Errorf("failed to fetch net address: %w", err)
}
defer rows.Close()
for rows.Next() {
// TODO: handle v2 announcement
if err := rows.Scan(h.NetAddress); err != nil {
return fmt.Errorf("failed to scan net address: %w", err)
}
}
return nil
}
for i := range hostIDs {
if err := populateNetAddr(&hosts[i], hostIDs[i]); err != nil {
return nil, fmt.Errorf("failed to populate net address for host: %w", err)
}
}

// query host checks
Expand Down
35 changes: 20 additions & 15 deletions stores/sql/sqlite/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,23 +210,11 @@ func (c chainUpdateTx) UpdateFailedContracts(blockHeight uint64) error {
func (c chainUpdateTx) UpdateHost(hk types.PublicKey, ha chain.HostAnnouncement, bh uint64, blockID types.BlockID, ts time.Time) error { //
c.l.Debugw("update host", "hk", hk, "netaddress", ha.NetAddress)

// create the announcement
if _, err := c.tx.Exec(c.ctx,
"INSERT OR IGNORE INTO host_announcements (created_at,host_key, block_height, block_id, net_address) VALUES (?, ?, ?, ?, ?)",
time.Now().UTC(),
ssql.PublicKey(hk),
bh,
blockID.String(),
ha.NetAddress,
); err != nil {
return fmt.Errorf("failed to insert host announcement: %w", err)
}

// create the host
var hostID int64
if err := c.tx.QueryRow(c.ctx, `
INSERT INTO hosts (created_at, public_key, settings, price_table, total_scans, last_scan, last_scan_success, second_to_last_scan_success, scanned, uptime, downtime, recent_downtime, recent_scan_failures, successful_interactions, failed_interactions, lost_sectors, last_announcement, net_address)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
INSERT INTO hosts (created_at, public_key, settings, price_table, total_scans, last_scan, last_scan_success, second_to_last_scan_success, scanned, uptime, downtime, recent_downtime, recent_scan_failures, successful_interactions, failed_interactions, lost_sectors, last_announcement)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(public_key) DO UPDATE SET
last_announcement = EXCLUDED.last_announcement,
net_address = EXCLUDED.net_address
Expand All @@ -248,7 +236,6 @@ func (c chainUpdateTx) UpdateHost(hk types.PublicKey, ha chain.HostAnnouncement,
0,
0,
ts.UTC(),
ha.NetAddress,
).Scan(&hostID); err != nil {
if errors.Is(err, dsql.ErrNoRows) {
err = c.tx.QueryRow(c.ctx,
Expand All @@ -265,6 +252,24 @@ func (c chainUpdateTx) UpdateHost(hk types.PublicKey, ha chain.HostAnnouncement,
}
}

// delete old announcements
if _, err := c.tx.Exec(c.ctx, "DELETE FROM host_announcements WHERE db_host_id = ?", hostID); err != nil {
return fmt.Errorf("failed to remove previous announcments: %w", err)
}

// create the announcement
if _, err := c.tx.Exec(c.ctx,
"INSERT OR IGNORE INTO host_announcements (created_at, db_host_id, block_height, block_id, net_address) VALUES (?, ?, ?, ?, ?)",
time.Now().UTC(),
hostID,
bh,
blockID.String(),
ha.NetAddress,
); err != nil {
return fmt.Errorf("failed to insert host announcement: %w", err)
}
fmt.Println("insert", ha.NetAddress, hostID)

// update allow list
rows, err := c.tx.Query(c.ctx, "SELECT id, entry FROM host_allowlist_entries")
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion stores/sql/sqlite/migrations/main/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,16 @@ CREATE INDEX `idx_slices_db_slab_id` ON `slices`(`db_slab_id`);
CREATE INDEX `idx_slices_db_multipart_part_id` ON `slices`(`db_multipart_part_id`);

-- dbHostAnnouncement
CREATE TABLE `host_announcements` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`host_key` blob NOT NULL,`block_height` integer,`block_id` text,`net_address` text);
CREATE TABLE `host_announcements` (
`id` integer PRIMARY KEY AUTOINCREMENT,
`created_at` datetime NOT NULL,
`block_height` integer NOT NULL,
`block_id` text NOT NULL,
`db_host_id` integer NOT NULL,
`net_address` text NOT NULL,
CONSTRAINT `fk_host_announcements_db_host` FOREIGN KEY (`db_host_id`) REFERENCES `hosts`(`id`) ON DELETE CASCADE
);
CREATE INDEX `idx_host_announcements_db_host_id` ON `host_announcements`(`db_host_id`);

-- dbConsensusInfo
CREATE TABLE `consensus_infos` (`id` integer PRIMARY KEY AUTOINCREMENT,`created_at` datetime,`height` integer,`block_id` blob);
Expand Down

0 comments on commit d6f7b67

Please sign in to comment.