Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ip metadata loaded to clickhouse #2

Merged
merged 2 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@ peerstore.db
*.parquet
*.sqlite
*.sqlite-journal
**/._*
api_keys.txt
vendor
39 changes: 20 additions & 19 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,41 +1,42 @@
ARG GO_VERSION=1.22.5
FROM golang:${GO_VERSION}-bookworm as builder

# Install CA certificates
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*
# Stage 1: Dependency management and build
FROM golang:${GO_VERSION}-bookworm as builder

# Set the working directory inside the container to /usr/src/app
WORKDIR /usr/src/app
WORKDIR /app

# Copy the Go module files first to leverage Docker cache for dependency layers
# Copy go.mod and go.sum files
COPY go.mod go.sum ./

# Run module download separately to also leverage caching of downloaded modules
# Download dependencies and verify modules
RUN go mod download && go mod verify

# Copy the CSV file into a data directory within the builder stage
COPY ip_metadata.csv ./data/ip_metadata.csv

# Copy the rest of the application source code
COPY . .

# Build the application; output the binary to a known location
RUN go build -v -o /run-app .
# Run go mod tidy to ensure the go.mod file is up to date
RUN go mod tidy

# Expose port 9000 if it's being used by the application
EXPOSE 9000
# Build the application and capture the output
RUN go build -v -o /run-app .

# Final stage based on Debian Bookworm.
FROM debian:bookworm
# Stage 2: Final stage
FROM debian:bookworm-slim

# Install CA certificates in the final image to ensure they are present.
# Install CA certificates in the final image
RUN apt-get update && apt-get install -y ca-certificates && rm -rf /var/lib/apt/lists/*

# Copy the built executable from the builder stage
COPY --from=builder /run-app /usr/local/bin/run-app

# Copy the CSV and other data files from the builder stage to the runtime image
COPY --from=builder /usr/src/app/data /data
# Create necessary directory
RUN mkdir -p /app/data

# Copy the CSV file to /app/data
COPY /data/ip_metadata.csv /app/data/ip_metadata.csv

# Set the working directory
WORKDIR /app

# Set the command to run the application
CMD ["/usr/local/bin/run-app"]
175 changes: 173 additions & 2 deletions clickhouse/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,181 @@ import (
"github.com/chainbound/valtrack/log"
"github.com/chainbound/valtrack/types"
"github.com/rs/zerolog"

"encoding/csv"
"encoding/json"
"os"
"io"
"strconv"
"strings"
"regexp"
)

func (c *ClickhouseClient) LoadIPMetadataFromCSV() error {
// Check if the table is empty
isEmpty, err := c.isTableEmpty("ip_metadata")
if err != nil {
return fmt.Errorf("failed to check if table is empty: %w", err)
}

if !isEmpty {
c.log.Info().Msg("ip_metadata table is not empty, skipping CSV load")
return nil
}

csvPath := "/app/data/ip_metadata.csv"

file, err := os.Open(csvPath)
if err != nil {
c.log.Error().Err(err).Str("path", csvPath).Msg("Failed to open IP metadata CSV file")
return err
}
defer file.Close()

c.log.Info().Str("path", csvPath).Msg("Successfully opened IP metadata CSV file")

reader := csv.NewReader(file)
reader.FieldsPerRecord = -1 // Allow variable number of fields

batch, err := c.chConn.PrepareBatch(context.Background(), "INSERT INTO ip_metadata")
if err != nil {
return fmt.Errorf("failed to prepare batch: %w", err)
}

for {
record, err := reader.Read()
if err == io.EOF {
break
}
if err != nil {
return fmt.Errorf("error reading CSV record: %w", err)
}

// Ensure we have at least the minimum required fields
if len(record) < 9 {
c.log.Warn().Str("row", strings.Join(record, ",")).Msg("Skipping row with insufficient columns")
continue
}

// Parse latitude and longitude
latLon := strings.Split(record[5], ",")
lat, err := parseFloat(latLon[0])
if err != nil {
c.log.Warn().Str("latitude", latLon[0]).Err(err).Msg("Invalid latitude, using 0")
lat = 0
}
lon, err := parseFloat(latLon[1])
if err != nil {
c.log.Warn().Str("longitude", latLon[1]).Err(err).Msg("Invalid longitude, using 0")
lon = 0
}

// Parse ASN info
var asnInfo struct {
ASN string `json:"asn"`
Name string `json:"name"`
Type string `json:"type"`
Domain string `json:"domain"`
Route string `json:"route"`
}
// Preprocess the JSON string
asnJSONString := record[8]

// Handle "nan" case
if strings.ToLower(asnJSONString) == "nan" {
c.log.Warn().Str("original_asn_json", asnJSONString).Msg("ASN JSON is 'nan', using default values")
asnInfo = struct {
ASN string `json:"asn"`
Name string `json:"name"`
Type string `json:"type"`
Domain string `json:"domain"`
Route string `json:"route"`
}{ASN: "", Name: "", Type: "", Domain: "", Route: ""}
continue // Skip to the next record
}

// Replace single quotes with double quotes
asnJSONString = strings.ReplaceAll(asnJSONString, "'", "\"")

// Use regex to find the "name" field and properly escape its value
re := regexp.MustCompile(`"name":\s*"((?:[^"\\]|\\.)*)"`)
asnJSONString = re.ReplaceAllStringFunc(asnJSONString, func(match string) string {
parts := re.FindStringSubmatch(match)
if len(parts) < 2 {
return match
}
// Escape any double quotes within the value
escapedValue := strings.ReplaceAll(parts[1], "\"", "\\\"")
return fmt.Sprintf(`"name": "%s"`, escapedValue)
})

// Ensure the JSON string is complete
if !strings.HasSuffix(asnJSONString, "}") {
asnJSONString += "}"
}

c.log.Debug().Str("processed_asn_json", asnJSONString).Msg("Processed ASN JSON before parsing")

if err := json.Unmarshal([]byte(asnJSONString), &asnInfo); err != nil {
c.log.Warn().Str("original_asn_json", record[8]).Str("processed_asn_json", asnJSONString).Err(err).Msg("Failed to parse ASN JSON, using default values")
asnInfo = struct {
ASN string `json:"asn"`
Name string `json:"name"`
Type string `json:"type"`
Domain string `json:"domain"`
Route string `json:"route"`
}{ASN: "", Name: "", Type: "", Domain: "", Route: ""}
}

err = batch.Append(
record[0], // IP
record[1], // Hostname
record[2], // City
record[3], // Region
record[4], // Country
lat, // Latitude
lon, // Longitude
record[7], // Postal Code
strings.TrimPrefix(asnInfo.ASN, "AS"), // ASN
asnInfo.Name, // ASN Organization
asnInfo.Type, // ASN Type
)
if err != nil {
return fmt.Errorf("failed to append to batch: %w", err)
}
}

if err := batch.Send(); err != nil {
return fmt.Errorf("failed to send batch: %w", err)
}


c.log.Info().Msg("Successfully loaded IP metadata from CSV")
return nil
}

// Helper function to parse float values, treating "nan" as 0
func parseFloat(s string) (float64, error) {
s = strings.TrimSpace(s)
if s == "" || strings.ToLower(s) == "nan" {
return 0, nil
}
return strconv.ParseFloat(s, 64)
}

func (c *ClickhouseClient) isTableEmpty(tableName string) (bool, error) {
query := fmt.Sprintf("SELECT count(*) FROM %s", tableName)
var count uint64
err := c.chConn.QueryRow(context.Background(), query).Scan(&count)
if err != nil {
return false, err
}
return count == 0, nil
}

func ValidatorMetadataDDL(db string) string {
return fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.validator_metadata (
return fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s.validator_metadata (
enr String,
id String,
multiaddr String,
Expand Down Expand Up @@ -251,4 +422,4 @@ func sendBatch(client *ClickhouseClient, batch driver.Batch) error {
}
client.log.Info().Msg("Batch sent successfully")
return nil
}
}
21 changes: 20 additions & 1 deletion consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,11 @@ func (c *Consumer) Start(name string) error {
return err
}

// Load IP metadata from CSV
if err := c.chClient.LoadIPMetadataFromCSV(); err != nil {
c.log.Error().Err(err).Msg("Failed to load IP metadata from CSV")
}

go func() {
for {
batch, err := consumer.FetchNoWait(BATCH_SIZE)
Expand Down Expand Up @@ -303,7 +308,7 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
progress := float64(md.Sequence.Stream) / (float64(md.NumPending) + float64(md.Sequence.Stream)) * 100

switch msg.Subject() {
case "events.ip_metadata":
case "events.ip_metadata":
var ipEvent types.IPMetadataEvent
if err := json.Unmarshal(msg.Data(), &ipEvent); err != nil {
c.log.Err(err).Msg("Error unmarshaling IPMetadataEvent")
Expand All @@ -313,6 +318,11 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
c.log.Info().Str("IP", ipEvent.IP).Msg("IP metadata received")
c.ipMetadataChan <- &ipEvent

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.IPMetadataEventChan <- &ipEvent
}

case "events.peer_discovered":
var event types.PeerDiscoveredEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
Expand All @@ -324,6 +334,11 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
c.log.Info().Time("timestamp", md.Timestamp).Uint64("pending", md.NumPending).Str("progress", fmt.Sprintf("%.2f%%", progress)).Msg("peer_discovered")
c.storeDiscoveryEvent(event)

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.PeerDiscoveredEventChan <- &event
}

case "events.metadata_received":
var event types.MetadataReceivedEvent
if err := json.Unmarshal(msg.Data(), &event); err != nil {
Expand All @@ -336,6 +351,10 @@ func handleMessage(c *Consumer, msg jetstream.Msg) {
c.handleMetadataEvent(event)
c.storeMetadataEvent(event)

// Send to ClickHouse if client is initialized
if c.chClient != nil {
c.chClient.MetadataReceivedEventChan <- &event
}

default:
c.log.Warn().Str("subject", msg.Subject()).Msg("Unknown event type")
Expand Down
Loading