diff --git a/consumer/consumer.go b/consumer/consumer.go index e59cc27..90461ab 100644 --- a/consumer/consumer.go +++ b/consumer/consumer.go @@ -25,8 +25,6 @@ import ( "github.com/nats-io/nats.go" "github.com/nats-io/nats.go/jetstream" "github.com/rs/zerolog" - "github.com/xitongsys/parquet-go-source/local" - "github.com/xitongsys/parquet-go/writer" ) const basePath = "/data" @@ -43,10 +41,6 @@ type ConsumerConfig struct { type Consumer struct { log zerolog.Logger - discoveryWriter *writer.ParquetWriter - metadataWriter *writer.ParquetWriter - validatorWriter *writer.ParquetWriter - ipMetadataWriter *writer.ParquetWriter js jetstream.JetStream peerDiscoveredChan chan *types.PeerDiscoveredEvent @@ -65,56 +59,8 @@ type Consumer struct { } func NewConsumer(cfg *ConsumerConfig, log zerolog.Logger, js jetstream.JetStream, chClient *ch.ClickhouseClient, db *sql.DB, dune *Dune) (*Consumer, error) { - discoveryFilePath := fmt.Sprintf("%s/discovery_events.parquet", basePath) - w_discovery, err := local.NewLocalFileWriter(discoveryFilePath) - if err != nil { - return nil, fmt.Errorf("error creating discovery events parquet file: %w", err) - } - - metadataFilePath := fmt.Sprintf("%s/metadata_events.parquet", basePath) - w_metadata, err := local.NewLocalFileWriter(metadataFilePath) - if err != nil { - return nil, fmt.Errorf("error creating metadata events parquet file: %w", err) - } - - validatorFilePath := fmt.Sprintf("%s/validator_metadata_events.parquet", basePath) - w_validator, err := local.NewLocalFileWriter(validatorFilePath) - if err != nil { - return nil, fmt.Errorf("error creating validator parquet file: %w", err) - } - - ipMetadataFilePath := fmt.Sprintf("%s/ip_metadata_events.parquet", basePath) - w_ipMetadata, err := local.NewLocalFileWriter(ipMetadataFilePath) - if err != nil { - return nil, fmt.Errorf("error creating IP metadata events parquet file: %w", err) - } - - discoveryWriter, err := writer.NewParquetWriter(w_discovery, new(types.PeerDiscoveredEvent), 4) - if err != nil { - return nil, fmt.Errorf("error creating Peer discovered Parquet writer: %w", err) - } - - metadataWriter, err := writer.NewParquetWriter(w_metadata, new(types.MetadataReceivedEvent), 4) - if err != nil { - return nil, fmt.Errorf("error creating Metadata Parquet writer: %w", err) - } - - validatorWriter, err := writer.NewParquetWriter(w_validator, new(types.ValidatorEvent), 4) - if err != nil { - return nil, fmt.Errorf("error creating Validator Parquet writer: %w", err) - } - - ipMetadataWriter, err := writer.NewParquetWriter(w_ipMetadata, new(types.IPMetadataEvent), 4) - if err != nil { - return nil, fmt.Errorf("error creating IP Metadata Parquet writer: %w", err) - } - return &Consumer{ log: log, - discoveryWriter: discoveryWriter, - metadataWriter: metadataWriter, - validatorWriter: validatorWriter, - ipMetadataWriter: ipMetadataWriter, js: js, peerDiscoveredChan: make(chan *types.PeerDiscoveredEvent, 16384), @@ -592,12 +538,6 @@ func (c *Consumer) handleMetadataEvent(event types.MetadataReceivedEvent) { c.log.Info().Any("validator_event", validatorEvent).Msg("Inserted validator event") } - if err := c.validatorWriter.Write(validatorEvent); err != nil { - c.log.Err(err).Msg("Failed to write validator event to Parquet file") - } else { - c.log.Trace().Msg("Wrote validator event to Parquet file") - } - maAddr, err := ma.NewMultiaddr(event.Multiaddr) if err != nil { c.log.Error().Err(err).Msg("Invalid multiaddr") @@ -626,12 +566,6 @@ func (c *Consumer) storeDiscoveryEvent(event types.PeerDiscoveredEvent) { return } - if err := c.discoveryWriter.Write(event); err != nil { - c.log.Err(err).Msg("Failed to write discovery event to Parquet file") - } else { - c.log.Trace().Msg("Wrote discovery event to Parquet file") - } - if c.chClient != nil && c.chClient.PeerDiscoveredEventChan != nil { c.chClient.PeerDiscoveredEventChan <- &event c.log.Info().Str("ID", event.ID).Msg("Inserted peer discovered event into ClickHouse channel") @@ -641,12 +575,6 @@ func (c *Consumer) storeDiscoveryEvent(event types.PeerDiscoveredEvent) { } func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { - if err := c.metadataWriter.Write(event); err != nil { - c.log.Err(err).Msg("Failed to write metadata event to Parquet file") - } else { - c.log.Trace().Msg("Wrote metadata event to Parquet file") - } - if c.chClient != nil { c.chClient.MetadataReceivedEventChan <- &event c.log.Info().Str("ID", event.ID).Msg("Inserted metadata received event into ClickHouse channel") @@ -656,11 +584,6 @@ func (c *Consumer) storeMetadataEvent(event types.MetadataReceivedEvent) { func (c *Consumer) processIPMetadataEvents() { for ipEvent := range c.ipMetadataChan { c.log.Info().Msgf("Received IP metadata event for processing: %s", ipEvent.IP) - if err := c.ipMetadataWriter.Write(ipEvent); err != nil { - c.log.Err(err).Msg("Failed to write IP metadata event to Parquet file") - continue - } - c.log.Trace().Msg("Wrote IP metadata event to Parquet file") } }