From f5522ce3cd91750a2f15786df269c008ed63edc1 Mon Sep 17 00:00:00 2001 From: 0x00badcode Date: Fri, 26 Jan 2024 04:13:54 +0000 Subject: [PATCH 1/3] add new version of the IP location script --- .env_template | 1 + go.mod | 3 + go.sum | 4 + pkg/crawler/ethereum.go | 12 +- pkg/db/models/ip.go | 38 ++- pkg/utils/apis/ip_api.go | 570 ++++++++++++++++++++++++++------------- 6 files changed, 424 insertions(+), 204 deletions(-) diff --git a/.env_template b/.env_template index 249cde5..c658d01 100644 --- a/.env_template +++ b/.env_template @@ -8,3 +8,4 @@ CRAWLER_FORK_DIGEST="0x4a26c58b" CRAWLER_GOSSIP_TOPIC="beacon_block" CRAWLER_SUBNET="all" CRAWLER_PERSIST_CONNEVENTS="false" +IP2LOCATION_TOKEN="" diff --git a/go.mod b/go.mod index 0e6d588..f164e83 100644 --- a/go.mod +++ b/go.mod @@ -56,6 +56,8 @@ require ( github.com/hashicorp/golang-lru v0.5.5-0.20210104140557-80c98217689d // indirect github.com/holiman/uint256 v1.2.0 // indirect github.com/huin/goupnp v1.0.2 // indirect + github.com/ip2location/ip2location-go/v9 v9.6.1 // indirect + github.com/ip2location/ip2proxy-go/v4 v4.0.1 // indirect github.com/ipfs/go-cid v0.0.7 // indirect github.com/ipfs/go-datastore v0.5.0 // indirect github.com/ipfs/go-ipfs-util v0.0.2 // indirect @@ -159,6 +161,7 @@ require ( google.golang.org/protobuf v1.27.1 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect + lukechampine.com/uint128 v1.2.0 // indirect ) replace github.com/libp2p/go-libp2p-pubsub v0.5.5 => ./go-libp2p-pubsub diff --git a/go.sum b/go.sum index 0d4bffe..31af7ee 100644 --- a/go.sum +++ b/go.sum @@ -427,6 +427,8 @@ github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19y github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368/go.mod h1:Wbbw6tYNvwa5dlB6304Sd+82Z3f7PmVZHVKU637d4po= +github.com/ip2location/ip2proxy-go/v4 v4.0.1 h1:n63WK4EYsXqt5hXHvHABknRYZEnVFqF/KX3xx84Zw8I= +github.com/ip2location/ip2proxy-go/v4 v4.0.1/go.mod h1:knSLTGvow2tCTxGuZNACMiqRW7h9u/F7KFrPa8HBJ8U= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.2/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= github.com/ipfs/go-cid v0.0.3/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= @@ -1813,6 +1815,8 @@ honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k= honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las= +lukechampine.com/uint128 v1.2.0 h1:mBi/5l91vocEN8otkC5bDLhi2KdCticRiwbdB0O+rjI= +lukechampine.com/uint128 v1.2.0/go.mod h1:c4eWIwlEGaxC/+H1VguhU4PHXNWDCDMUlWdIWl2j1gk= rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8= rsc.io/pdf v0.1.1/go.mod h1:n8OzWcQ6Sp37PL01nO98y4iUCRdTGarVfzxY20ICaU4= rsc.io/quote/v3 v3.1.0/go.mod h1:yEA65RcK8LyAZtP9Kv3t0HmxON59tX3rD+tICJqUlj0= diff --git a/pkg/crawler/ethereum.go b/pkg/crawler/ethereum.go index aed2fcf..67c180f 100644 --- a/pkg/crawler/ethereum.go +++ b/pkg/crawler/ethereum.go @@ -91,12 +91,12 @@ func NewEthereumCrawler(mainCtx *cli.Context, conf config.EthereumCrawlerConfig) return nil, err } dbClient, err := psql.NewDBClient( - ctx, - ethNode.Network(), - conf.PsqlEndpoint, - backupInterval, - psql.InitializeTables(true), - psql.WithConnectionEventsPersist(conf.PersistConnEvents), + ctx, + ethNode.Network(), + conf.PsqlEndpoint, + backupInterval, + psql.InitializeTables(true), + psql.WithConnectionEventsPersist(conf.PersistConnEvents), ) if err != nil { cancel() diff --git a/pkg/db/models/ip.go b/pkg/db/models/ip.go index 8ac5a75..cb38e05 100644 --- a/pkg/db/models/ip.go +++ b/pkg/db/models/ip.go @@ -1,6 +1,10 @@ package models -import "time" +import ( + "time" + + "github.com/ip2location/ip2proxy-go/v4" +) const ( IpInfoTTL = 30 * 24 * time.Hour // 30 days @@ -34,11 +38,35 @@ func (m *IpApiMsg) IsEmpty() bool { return m.Country == "" && m.City == "" } +// some fields are commented because we don't have data for them +func mapTempIpInfoToApiMsg(data ip2proxy.IP2ProxyRecord, ip string) IpApiMsg { + return IpApiMsg{ + IP: ip, + Status: "success", + // Continent: "", + // ContinentCode: "", + Country: data.CountryLong, + CountryCode: data.CountryShort, + Region: data.Region, + // RegionName: "", + City: data.City, + // Zip: "", + // Lat: "", + // Lon: "", + Isp: data.Isp, + // Org: "", + // As: "", + // AsName: "", + Mobile: data.UsageType == "MOB", + Proxy: data.ProxyType != "", + // Hosting: false, + } +} + type ApiResp struct { - IpInfo IpInfo - DelayTime time.Duration - AttemptsLeft int - Err error + IpInfo IpInfo + DelayTime time.Duration + Err error } type IpInfo struct { diff --git a/pkg/utils/apis/ip_api.go b/pkg/utils/apis/ip_api.go index 531f10a..32b7add 100644 --- a/pkg/utils/apis/ip_api.go +++ b/pkg/utils/apis/ip_api.go @@ -1,32 +1,38 @@ package apis import ( + "archive/zip" "context" - "encoding/json" + "errors" "fmt" + "io" "io/ioutil" "net/http" - "strconv" + "os" + "path/filepath" + "regexp" "strings" "sync" - "sync/atomic" "time" + "github.com/ip2location/ip2proxy-go/v4" "github.com/migalabs/armiarma/pkg/db/models" - "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) +var ( + ErrorQueueFull = errors.New("queue is full") + ErrorQueueEmpty = errors.New("queue is emtpy") +) + const ( defaultIpTTL = 30 * 24 * time.Hour // 30 days ipChanBuffSize = 45 // number of ips that can be buffered unto the channel ipBuffSize = 8192 // number of ip queries that can be queued in the ipQueue - ipApiEndpoint = "http://ip-api.com/json/{__ip__}?fields=status,continent,continentCode,country,countryCode,region,regionName,city,zip,lat,lon,isp,org,as,asname,mobile,proxy,hosting,query" + inApiEndpoint = "https://www.ip2location.com/download/?token=%s&file=%s" minIterTime = 100 * time.Millisecond ) -var TooManyRequestError error = fmt.Errorf("error HTTP 429") - // DB Interface for DBWriter type DBWriter interface { PersistToDB(interface{}) @@ -35,13 +41,18 @@ type DBWriter interface { GetExpiredIpInfo() ([]string, error) } +type ipQueue struct { + sync.RWMutex + queueSize int + ipList []string +} + // PEER LOCALIZER type IpLocator struct { ctx context.Context - // Request channels + // Request channel s locationRequest chan string - // dbClient dbClient DBWriter ipQueue *ipQueue @@ -61,113 +72,280 @@ func NewIpLocator(ctx context.Context, dbCli DBWriter) *IpLocator { } } -// Run the necessary routines to locate the IPs -func (c *IpLocator) Run() { - //l.SetLevel(Logrus.TraceLevel) - c.locatorRoutine() +func newIpQueue(queueSize int) *ipQueue { + return &ipQueue{ + queueSize: queueSize, + ipList: make([]string, 0, queueSize), + } } -// locatorRoutine is the main routine that will wait until an request to identify an IP arrives -// or if the routine gets canceled -func (c *IpLocator) locatorRoutine() { - log.Info("IP locator routine started") - // ip queue reading routine - go func() { - ticker := time.NewTicker(minIterTime) - for { - ip, err := c.ipQueue.readItem() - if err == nil { - // put the request in the Queue - c.locationRequest <- ip +// ----------------------------------------------------------- // +// ------------------ DB UPDATE UTILITIES -------------------- // +// ----------------------------------------------------------- // + +const ( + DatabaseDir = "./database/" + IP2LocationToken = "IP2LOCATION_TOKEN" + InApiEndpoint = "https://www.ip2location.com/download/?token=%s&file=%s" + UpdateThreshold = 24 * time.Hour + IPv4DbName = "PX11LITEBIN" + IPv6DbName = "PX11LITEBINIPV6" + UncompressedFileName = "IP2LOCATION-LITE-DB11.BIN" +) + +func unzip(zipFile, destDir string) error { + r, err := zip.OpenReader(zipFile) + if err != nil { + return err + } + defer r.Close() + + for _, f := range r.File { + fpath := filepath.Join(destDir, f.Name) + if f.FileInfo().IsDir() { + os.MkdirAll(fpath, os.ModePerm) + continue + } + + outFile, err := os.OpenFile(fpath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, f.Mode()) + if err != nil { + return err + } + + rc, err := f.Open() + if err != nil { + outFile.Close() + return err + } + + _, err = io.Copy(outFile, rc) + outFile.Close() + rc.Close() + if err != nil { + return err + } + } + return nil +} + +func downloadAndSave(url, baseFilename string) error { + version := func() string { + if strings.Contains(baseFilename, "IPV6") { + return "IPv6" + } + return "IPv4" + }() + + timestamp := time.Now().Format("20060102-150405") // Format: YYYYMMDD-HHMMSS + + filename := fmt.Sprintf("%s-%s.BIN", baseFilename, timestamp) + + resp, err := http.Get(url) + if err != nil { + return err + } + defer resp.Body.Close() + + file, err := os.Create(filename) + if err != nil { + return err + } + defer file.Close() + + fmt.Println("Starting download of IP2Location DB for " + version) + _, err = io.Copy(file, resp.Body) + if err != nil { + fmt.Println("Error while downloading IP2Location DB for " + version) + return err + } + fmt.Println("Download completed for IP2Location DB for " + version) + + return nil +} + +func updateSpecificDb(dbName, dbToken string) { + dbLink := fmt.Sprintf(InApiEndpoint, dbToken, dbName) + if needsUpdate(dbName) { + if err := downloadAndSave(dbLink, dbName); err != nil { + log.Printf("Failed to update database %s: %v\n", dbName, err) + } + } + cleanupOldDatabases(dbName) +} + +// checks if the database needs to be updated +func needsUpdate(baseFilename string) bool { + fmt.Println("Ip2Location DB: checking time since last update...") + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + log.Fatal(err) + } + fmt.Println("Finished checking directory...") + + latest := time.Time{} + for _, f := range files { + if strings.HasPrefix(f.Name(), baseFilename) && strings.HasSuffix(f.Name(), ".BIN") { + nameParts := strings.Split(f.Name(), "-") + if len(nameParts) >= 2 { + timestampPart := nameParts[len(nameParts)-1] + timestampPart = strings.TrimSuffix(timestampPart, ".BIN") + fileTime, err := time.Parse("20060102-150405", timestampPart) + if err == nil && fileTime.After(latest) { + latest = fileTime + } } - select { - case <-ticker.C: - ticker.Reset(minIterTime) + } + } - case <-c.ctx.Done(): - return + return time.Since(latest) > UpdateThreshold +} + +// finds the latest database file in the directory to see if it's older than 24 hours +func findLatestDbFile(baseFilename string) string { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + log.Fatal(err) + } + + latestFile := "" + latest := time.Time{} + for _, f := range files { + if strings.HasPrefix(f.Name(), baseFilename) && strings.HasSuffix(f.Name(), ".BIN") { + nameParts := strings.Split(f.Name(), "-") + if len(nameParts) >= 2 { + timestampPart := nameParts[len(nameParts)-1] + timestampPart = strings.TrimSuffix(timestampPart, ".BIN") + fileTime, err := time.Parse("20060102-150405", timestampPart) + if err == nil && fileTime.After(latest) { + latest = fileTime + latestFile = f.Name() + } } } - }() + } - // ip locating routien - go func() { - var nextDelayRequest time.Duration - for { - select { - // New request to identify an IP - case reqIp := <-c.locationRequest: - log.Trace("new request has been received for ip:", reqIp) - reqLoop: - for { - // since it didn't exist or did expire, request the ip - // new API call needs to be done - log.Tracef(" making API call for %s", reqIp) - atomic.AddInt32(c.apiCalls, 1) - respC := c.locateIp(reqIp) - select { - case apiResp := <-respC: - nextDelayRequest = apiResp.DelayTime - log.WithFields(log.Fields{ - "delay": nextDelayRequest, - "attempts left": apiResp.AttemptsLeft, - }).Debug("got response from IP-API request ") - // check if there is an error - switch apiResp.Err { - case TooManyRequestError: - // if the error reports that we tried too many calls on the API, sleep given time and try again - log.Debug("call ", reqIp, " -> error received: ", apiResp.Err.Error(), "\nwaiting ", nextDelayRequest+(5*time.Second)) - ticker := time.NewTicker(nextDelayRequest + (5 * time.Second)) - select { - case <-ticker.C: - continue - case <-c.ctx.Done(): - log.Info("context closure has been detecting, closing IpApi caller") - return - } - case nil: - // if the error is different from TooManyRequestError break loop and store the request - log.Debugf("call %s-> api req success", reqIp) - // Upsert the IP into the db - c.dbClient.PersistToDB(apiResp.IpInfo) - break reqLoop - - default: - log.Debug("call ", reqIp, " -> diff error received: ", apiResp.Err.Error()) - break reqLoop - - } - - case <-c.ctx.Done(): - log.Info("context closure has been detecting, closing IpApi caller") - return - } + return latestFile +} + +func cleanupDatabases(dbNames ...string) { + for _, dbName := range dbNames { + cleanupOldDatabases(dbName) + } +} + +func cleanupOldDatabases(baseFilename string) { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + log.Fatal(err) + } + + timestampToFile := make(map[time.Time]string) + + var latest time.Time + + for _, f := range files { + if strings.HasPrefix(f.Name(), baseFilename) && strings.HasSuffix(f.Name(), ".BIN") { + nameParts := strings.Split(f.Name(), "-") + if len(nameParts) >= 2 { + timestampPart := nameParts[len(nameParts)-1] + timestampPart = strings.TrimSuffix(timestampPart, ".BIN") + fileTime, err := time.Parse("20060102-150405", timestampPart) + if err != nil { + log.Printf("Failed to parse time from filename '%s': %s\n", f.Name(), err) + continue } - // check if there is any waiting time that we have to respect before next connection - if nextDelayRequest != time.Duration(0) { - log.Debug("number of allowed requests has been exceed, waiting ", nextDelayRequest+(2*time.Second)) - // set req delay to true, noone can make requests - ticker := time.NewTicker(nextDelayRequest + (2 * time.Second)) - select { - case <-ticker.C: - continue - case <-c.ctx.Done(): - log.Info("context closure has been detecting, closing IpApi caller") - return - } + + timestampToFile[fileTime] = f.Name() + + if fileTime.After(latest) { + latest = fileTime } + } + } + } - // the context has been deleted, end go routine - case <-c.ctx.Done(): - // close the channels - close(c.locationRequest) - return + for t, name := range timestampToFile { + if t.Before(latest) { + err := os.Remove(filepath.Join(DatabaseDir, name)) + if err != nil { + log.Printf("Failed to remove old database file: %s\n", name) + } else { + log.Printf("Removed old database file: %s\n", name) + } + } + } +} + +func getDatabaseFile(ip string) string { + var baseFilename string + if isIPv4(ip) { + baseFilename = "PX11LITEBIN" + } else { + baseFilename = "PX11LITEBINIPV6" + } + + latestFile := findLatestDbFile(baseFilename) + if latestFile == "" || needsUpdate(latestFile) { + updateDb() + latestFile = findLatestDbFile(baseFilename) + } + + return latestFile +} + +func updateDb() error { + dbToken := os.Getenv(IP2LocationToken) + if dbToken == "" { + return errors.New("IP2LOCATION_TOKEN environment variable not set") + } + + IPv4DbLink := fmt.Sprintf(inApiEndpoint, dbToken, IPv4DbName) + IPv6DbLink := fmt.Sprintf(inApiEndpoint, dbToken, IPv6DbName) + + var wg sync.WaitGroup + wg.Add(2) + + go func() { + defer wg.Done() + if needsUpdate(IPv4DbName) { + if err := downloadAndSave(IPv4DbLink, IPv4DbName); err != nil { + log.Println(err) } } }() + + go func() { + defer wg.Done() + if needsUpdate(IPv6DbName) { + if err := downloadAndSave(IPv6DbLink, IPv6DbName); err != nil { + log.Println(err) + } + } + }() + + wg.Wait() + + cleanupDatabases(IPv4DbName, IPv6DbName) + + return nil } -// LocateIP is an externa request that any module could do to identify an IP +// ------------------------------------------------- // + +func isIPv4(ip string) bool { + ipv4Pattern := `^(\d{1,3}\.){3}\d{1,3}$` + match, _ := regexp.MatchString(ipv4Pattern, ip) + return match +} + +func isIPv6(ip string) bool { + ipv6Pattern := `^([0-9a-fA-F]{1,4}:){7}[0-9a-fA-F]{1,4}$` + match, _ := regexp.MatchString(ipv6Pattern, ip) + return match +} + +// ------------------------------------------------- // + func (c *IpLocator) LocateIP(ip string) { // check first if IP is already in queue (to queue same ip) if c.ipQueue.ipExists(ip) { @@ -199,98 +377,7 @@ func (c *IpLocator) LocateIP(ip string) { ticker.Stop() } -func (c *IpLocator) Close() { - log.Info("closing IP-API service") - // close the context for ending up the routine - -} - -func (c *IpLocator) locateIp(ip string) chan models.ApiResp { - respC := make(chan models.ApiResp) - go callIpApi(ip, respC) - return respC -} - -// get location country and City from the multiaddress of the peer on the peerstore -func callIpApi(ip string, respC chan models.ApiResp) { - var apiResponse models.ApiResp - apiResponse.IpInfo, apiResponse.DelayTime, apiResponse.AttemptsLeft, apiResponse.Err = CallIpApi(ip) - respC <- apiResponse - // defer ^ -} - -func CallIpApi(ip string) (ipInfo models.IpInfo, delay time.Duration, attemptsLeft int, err error) { - - url := strings.Replace(ipApiEndpoint, "{__ip__}", ip, 1) - - // Make the IP-APi request - resp, err := http.Get(url) - if err != nil { - err = errors.Wrap(err, "unable to locate IP"+ip) - return - } - timeLeft, _ := strconv.Atoi(resp.Header["X-Ttl"][0]) - // check if the error that we are receiving means that we exeeded the request limit - if resp.StatusCode == 429 { - log.Debugf("limit of requests per minute has been exeeded, wait for next call %s secs", resp.Header["X-Ttl"][0]) - err = TooManyRequestError - delay = time.Duration(timeLeft) * time.Second - return - } - - // Check the attempts left that we have to call the api - attemptsLeft, _ = strconv.Atoi(resp.Header["X-Rl"][0]) - if attemptsLeft <= 0 { - // if there are no more attempts left against the api, check how much time do we have to wait - // until we can call it again - // set the delayTime that we return to the given seconds to wait - delay = time.Duration(timeLeft) * time.Second - } - - // check if the response was success or not - defer resp.Body.Close() - bodyBytes, err := ioutil.ReadAll(resp.Body) - if err != nil { - err = errors.Wrap(err, "could not read response body") - return - } - - var apiMsg models.IpApiMsg - // Convert response body to struct - err = json.Unmarshal(bodyBytes, &apiMsg) - if err != nil { - err = errors.Wrap(err, "could not unmarshall response") - return - } - // Check if the status of the request has been succesful - if apiMsg.Status != "success" { - err = errors.New(fmt.Sprintf("status from ip different than success, resp header:\n %#v \n %+v", resp, apiMsg)) - return - } - - ipInfo.ExpirationTime = time.Now().UTC().Add(defaultIpTTL) - ipInfo.IpApiMsg = apiMsg - return -} - -func newIpQueue(queueSize int) *ipQueue { - return &ipQueue{ - queueSize: queueSize, - ipList: make([]string, 0, queueSize), - } -} - -var ( - ErrorQueueFull = errors.New("queue is full") - ErrorQueueEmpty = errors.New("queue is emtpy") -) - -type ipQueue struct { - sync.RWMutex - queueSize int - ipList []string -} - +// add an item to the IP queue func (q *ipQueue) addItem(newItem string) error { q.Lock() defer q.Unlock() @@ -311,6 +398,7 @@ func (q *ipQueue) addItem(newItem string) error { return nil } +// reads items from the IP queue func (q *ipQueue) readItem() (string, error) { q.Lock() defer q.Unlock() @@ -328,6 +416,102 @@ func (q *ipQueue) readItem() (string, error) { return item, nil } +// this function replaces the API call in the old version of the script +func locate(ip string) (ip2proxy.IP2ProxyRecord, error) { + if !isIPv4(ip) && !isIPv6(ip) { + return ip2proxy.IP2ProxyRecord{}, fmt.Errorf("invalid IP address") + } + + dbFile := getDatabaseFile(ip) + //todo: unzip db file and clean up and use the new name of the db + db, err := ip2proxy.OpenDB(DatabaseDir + dbFile) + if err != nil { + return ip2proxy.IP2ProxyRecord{}, err + } + defer db.Close() + + results, err := db.GetAll(ip) + if err != nil { + return ip2proxy.IP2ProxyRecord{}, err + } + + return results, err +} + +func (c *IpLocator) locatorRoutine() { + go func() { + ticker := time.NewTicker(minIterTime) + for { + ip, err := c.ipQueue.readItem() + if err == nil { + c.locationRequest <- ip + } + select { + case <-ticker.C: + ticker.Reset(minIterTime) + + case <-c.ctx.Done(): + return + } + } + }() + + go func() { + for { + select { + case ip := <-c.locationRequest: + respC := c.locateIp(ip) + case <-c.ctx.Done(): + return + } + } + }() +} + +func (c *IpLocator) locateIp(ip string) chan models.ApiResp { + respC := make(chan models.ApiResp) + go callIpApi(ip, respC) + return respC +} + +// get location country and City from the multiaddress of the peer on the peerstore +func callIpApi(ip string, respC chan models.ApiResp) { + var apiResponse models.ApiResp + apiResponse.IpInfo, apiResponse.DelayTime, apiResponse.Err = CallIpApi(ip) + respC <- apiResponse + // defer ^ +} + +func CallIpApi(ip string) (ipInfo models.IpInfo, delay time.Duration, err error) { + + var tempInfo ip2proxy.IP2ProxyRecord + tempInfo, err = locate(ip) + if err != nil { + return + } + + var apiMsg models.IpApiMsg + apiMsg = models.mapTempIpInfoToApiMsg(tempInfo, ip) + + ipInfo.ExpirationTime = time.Now().UTC().Add(defaultIpTTL) + ipInfo.IpApiMsg = apiMsg + return +} + +// ------------------------------------------------- // + +func (c *IpLocator) Run() { + //l.SetLevel(Logrus.TraceLevel) + c.locatorRoutine() +} + +func (c *IpLocator) Close() { + log.Info("closing IP-API service") + // close the context for ending up the routine + c.ctx.Done() + +} + func (q *ipQueue) ipExists(target string) bool { for _, ip := range q.ipList { if ip == target { From ac44c434b7ca52ae5166afa419e518d840fbbb27 Mon Sep 17 00:00:00 2001 From: 0x00badcode Date: Fri, 26 Jan 2024 05:53:33 +0000 Subject: [PATCH 2/3] refactored for new DB --- pkg/db/models/ip.go | 27 ---- pkg/utils/apis/ip_api.go | 332 ++++++++++++++++++++++++++------------- 2 files changed, 221 insertions(+), 138 deletions(-) diff --git a/pkg/db/models/ip.go b/pkg/db/models/ip.go index cb38e05..8374d04 100644 --- a/pkg/db/models/ip.go +++ b/pkg/db/models/ip.go @@ -2,8 +2,6 @@ package models import ( "time" - - "github.com/ip2location/ip2proxy-go/v4" ) const ( @@ -38,31 +36,6 @@ func (m *IpApiMsg) IsEmpty() bool { return m.Country == "" && m.City == "" } -// some fields are commented because we don't have data for them -func mapTempIpInfoToApiMsg(data ip2proxy.IP2ProxyRecord, ip string) IpApiMsg { - return IpApiMsg{ - IP: ip, - Status: "success", - // Continent: "", - // ContinentCode: "", - Country: data.CountryLong, - CountryCode: data.CountryShort, - Region: data.Region, - // RegionName: "", - City: data.City, - // Zip: "", - // Lat: "", - // Lon: "", - Isp: data.Isp, - // Org: "", - // As: "", - // AsName: "", - Mobile: data.UsageType == "MOB", - Proxy: data.ProxyType != "", - // Hosting: false, - } -} - type ApiResp struct { IpInfo IpInfo DelayTime time.Duration diff --git a/pkg/utils/apis/ip_api.go b/pkg/utils/apis/ip_api.go index 32b7add..307f69a 100644 --- a/pkg/utils/apis/ip_api.go +++ b/pkg/utils/apis/ip_api.go @@ -7,10 +7,13 @@ import ( "fmt" "io" "io/ioutil" + "math" + "mime" "net/http" "os" "path/filepath" "regexp" + "strconv" "strings" "sync" "time" @@ -84,13 +87,13 @@ func newIpQueue(queueSize int) *ipQueue { // ----------------------------------------------------------- // const ( - DatabaseDir = "./database/" - IP2LocationToken = "IP2LOCATION_TOKEN" - InApiEndpoint = "https://www.ip2location.com/download/?token=%s&file=%s" - UpdateThreshold = 24 * time.Hour - IPv4DbName = "PX11LITEBIN" - IPv6DbName = "PX11LITEBINIPV6" - UncompressedFileName = "IP2LOCATION-LITE-DB11.BIN" + DatabaseDir = "./database/" + IP2LocationToken = "IP2LOCATION_TOKEN" + DBDownloadApiEndpoint = "https://www.ip2location.com/download/?token=%s&file=%s" + UpdateThreshold = 24 * time.Hour + IPDbName = "PX11LITEBIN" + UncompressedFileName = "IP2PROXY-LITE-PX11.BIN" + TimestampFormat = "20060102-150405" ) func unzip(zipFile, destDir string) error { @@ -128,80 +131,73 @@ func unzip(zipFile, destDir string) error { return nil } -func downloadAndSave(url, baseFilename string) error { - version := func() string { - if strings.Contains(baseFilename, "IPV6") { - return "IPv6" - } - return "IPv4" - }() - - timestamp := time.Now().Format("20060102-150405") // Format: YYYYMMDD-HHMMSS - - filename := fmt.Sprintf("%s-%s.BIN", baseFilename, timestamp) +func downloadAndSaveZippedDB() error { + dbToken := os.Getenv(IP2LocationToken) + if dbToken == "" { + return errors.New("IP2LOCATION_TOKEN environment variable not set") + } + url := fmt.Sprintf(DBDownloadApiEndpoint, dbToken, IPDbName) resp, err := http.Get(url) if err != nil { return err } defer resp.Body.Close() + contentDisposition := resp.Header.Get("Content-Disposition") + _, params, err := mime.ParseMediaType(contentDisposition) + if err != nil { + return err + } + + filename := params["filename"] + if filename == "" { + filename = "PX11LITEBIN.zip" + } + file, err := os.Create(filename) if err != nil { return err } defer file.Close() - fmt.Println("Starting download of IP2Location DB for " + version) + fmt.Printf("Starting download of IP2Proxy DB to %s\n", filename) _, err = io.Copy(file, resp.Body) if err != nil { - fmt.Println("Error while downloading IP2Location DB for " + version) + fmt.Printf("Error while downloading IP2Proxy DB to %s\n", filename) return err } - fmt.Println("Download completed for IP2Location DB for " + version) + fmt.Printf("Download completed for IP2Proxy DB to %s\n", filename) return nil } -func updateSpecificDb(dbName, dbToken string) { - dbLink := fmt.Sprintf(InApiEndpoint, dbToken, dbName) - if needsUpdate(dbName) { - if err := downloadAndSave(dbLink, dbName); err != nil { - log.Printf("Failed to update database %s: %v\n", dbName, err) - } - } - cleanupOldDatabases(dbName) -} - // checks if the database needs to be updated -func needsUpdate(baseFilename string) bool { +func needsUpdate() bool { fmt.Println("Ip2Location DB: checking time since last update...") files, err := ioutil.ReadDir(DatabaseDir) if err != nil { log.Fatal(err) } - fmt.Println("Finished checking directory...") latest := time.Time{} for _, f := range files { - if strings.HasPrefix(f.Name(), baseFilename) && strings.HasSuffix(f.Name(), ".BIN") { - nameParts := strings.Split(f.Name(), "-") - if len(nameParts) >= 2 { - timestampPart := nameParts[len(nameParts)-1] - timestampPart = strings.TrimSuffix(timestampPart, ".BIN") - fileTime, err := time.Parse("20060102-150405", timestampPart) - if err == nil && fileTime.After(latest) { - latest = fileTime - } + if strings.HasPrefix(f.Name(), "IP2PROXY-LITE-PX11") && strings.HasSuffix(f.Name(), ".BIN") { + timestampPart := strings.TrimSuffix(f.Name(), ".BIN") + timestampPart = strings.TrimPrefix(timestampPart, "IP2PROXY-LITE-PX11") + fileTime, err := time.Parse(TimestampFormat, timestampPart) + if err == nil && fileTime.After(latest) { + latest = fileTime } } } + fmt.Println("Finished checking directory...") - return time.Since(latest) > UpdateThreshold + return latest.IsZero() || time.Since(latest) > UpdateThreshold } // finds the latest database file in the directory to see if it's older than 24 hours -func findLatestDbFile(baseFilename string) string { +func findLatestDbFile() string { files, err := ioutil.ReadDir(DatabaseDir) if err != nil { log.Fatal(err) @@ -210,7 +206,7 @@ func findLatestDbFile(baseFilename string) string { latestFile := "" latest := time.Time{} for _, f := range files { - if strings.HasPrefix(f.Name(), baseFilename) && strings.HasSuffix(f.Name(), ".BIN") { + if strings.HasPrefix(f.Name(), UncompressedFileName) && strings.HasSuffix(f.Name(), ".BIN") { nameParts := strings.Split(f.Name(), "-") if len(nameParts) >= 2 { timestampPart := nameParts[len(nameParts)-1] @@ -227,109 +223,189 @@ func findLatestDbFile(baseFilename string) string { return latestFile } -func cleanupDatabases(dbNames ...string) { - for _, dbName := range dbNames { - cleanupOldDatabases(dbName) - } -} - -func cleanupOldDatabases(baseFilename string) { +func cleanupOldDatabases() error { files, err := ioutil.ReadDir(DatabaseDir) if err != nil { - log.Fatal(err) + return err } - timestampToFile := make(map[time.Time]string) - - var latest time.Time + if len(files) <= 1 { + return nil + } - for _, f := range files { - if strings.HasPrefix(f.Name(), baseFilename) && strings.HasSuffix(f.Name(), ".BIN") { - nameParts := strings.Split(f.Name(), "-") - if len(nameParts) >= 2 { - timestampPart := nameParts[len(nameParts)-1] - timestampPart = strings.TrimSuffix(timestampPart, ".BIN") - fileTime, err := time.Parse("20060102-150405", timestampPart) + timestampToFile := make(map[time.Time]string) + var latestTime time.Time + + for _, file := range files { + if strings.HasPrefix(file.Name(), "IP2PROXY-LITE-PX11-") && strings.HasSuffix(file.Name(), ".BIN") { + nameParts := strings.Split(file.Name(), "-") + if len(nameParts) >= 3 { + timestampPart := strings.TrimSuffix(nameParts[len(nameParts)-1], ".BIN") + fileTime, err := time.Parse("20060102150405", timestampPart) if err != nil { - log.Printf("Failed to parse time from filename '%s': %s\n", f.Name(), err) + log.Printf("Failed to parse time from filename '%s': %s\n", file.Name(), err) continue } - timestampToFile[fileTime] = f.Name() - - if fileTime.After(latest) { - latest = fileTime + timestampToFile[fileTime] = file.Name() + if fileTime.After(latestTime) { + latestTime = fileTime } } } } - for t, name := range timestampToFile { - if t.Before(latest) { - err := os.Remove(filepath.Join(DatabaseDir, name)) + for fileTime, fileName := range timestampToFile { + if fileTime.Before(latestTime) { + err := os.Remove(filepath.Join(DatabaseDir, fileName)) if err != nil { - log.Printf("Failed to remove old database file: %s\n", name) + log.Printf("Failed to remove old database file: %s\n", fileName) } else { - log.Printf("Removed old database file: %s\n", name) + log.Printf("Removed old database file: %s\n", fileName) } } } + + return nil } -func getDatabaseFile(ip string) string { - var baseFilename string - if isIPv4(ip) { - baseFilename = "PX11LITEBIN" - } else { - baseFilename = "PX11LITEBINIPV6" +func cleanupFolder() error { + targetFiles := []string{"LICENSE", "README", ".zip"} + + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + return err } - latestFile := findLatestDbFile(baseFilename) - if latestFile == "" || needsUpdate(latestFile) { - updateDb() - latestFile = findLatestDbFile(baseFilename) + for _, file := range files { + shouldRemove := false + for _, target := range targetFiles { + if strings.Contains(file.Name(), target) { + if file.Name() == "IP2PROXY-LITE-PX11.BIN" || strings.Contains(file.Name(), "IP2PROXY") { + shouldRemove = false + } else { + shouldRemove = true + } + break + } + } + + if shouldRemove { + err := os.Remove(filepath.Join(DatabaseDir, file.Name())) + if err != nil { + log.Printf("Failed to remove file: %s\n", file.Name()) + } else { + log.Printf("Removed file: %s\n", file.Name()) + } + } } - return latestFile + return nil } -func updateDb() error { - dbToken := os.Getenv(IP2LocationToken) - if dbToken == "" { - return errors.New("IP2LOCATION_TOKEN environment variable not set") +func isNumeric(s string) bool { + f, err := strconv.ParseFloat(s, 64) + if err != nil { + return false } + return !math.IsNaN(f) && !math.IsInf(f, 0) +} - IPv4DbLink := fmt.Sprintf(inApiEndpoint, dbToken, IPv4DbName) - IPv6DbLink := fmt.Sprintf(inApiEndpoint, dbToken, IPv6DbName) - - var wg sync.WaitGroup - wg.Add(2) +func verifyDbFile() (bool, error) { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + return false, err + } + if len(files) == 0 { + return false, errors.New("No database file found") + } - go func() { - defer wg.Done() - if needsUpdate(IPv4DbName) { - if err := downloadAndSave(IPv4DbLink, IPv4DbName); err != nil { - log.Println(err) - } + // Check if "PX11LITEBIN.zip" file exists + found := false + for _, file := range files { + if file.Name() == "PX11LITEBIN.zip" { + found = true + break } - }() + } - go func() { - defer wg.Done() - if needsUpdate(IPv6DbName) { - if err := downloadAndSave(IPv6DbLink, IPv6DbName); err != nil { - log.Println(err) + if !found { + return false, errors.New("File PX11LITEBIN.zip not found") + } + + // unzip file and check validity of db + err = unzip("PX11LITEBIN.zip", DatabaseDir) + if err != nil { + return false, err + } + dbPath := filepath.Join(DatabaseDir, "IP2PROXY-LITE-PX11.BIN") // name of the file after unzipping + db, err := ip2proxy.OpenDB(dbPath) + version := db.DatabaseVersion() + if version == "" || !isNumeric(version) { + return false, errors.New("Invalid database version") + } + return true, nil +} + +func renameDbFile() error { + files, err := ioutil.ReadDir(DatabaseDir) + if err != nil { + return err + } + if len(files) == 0 { + return errors.New("No database file found") + } + timeStamp := time.Now().Format(TimestampFormat) + for _, file := range files { + if file.Name() == "IP2PROXY-LITE-PX11.BIN" { + err := os.Rename(filepath.Join(DatabaseDir, file.Name()), filepath.Join(DatabaseDir, "IP2PROXY-LITE-PX11-"+timeStamp+".BIN")) + if err != nil { + return err } } - }() + } + return nil +} - wg.Wait() +func updateDb() error { + dbToken := os.Getenv(IP2LocationToken) + if dbToken == "" { + return errors.New("IP2LOCATION_TOKEN environment variable not set") + } + if err := downloadAndSaveZippedDB(); err != nil { + log.Printf("Failed to download DB: %v\n", err) + return err + } + valid, err := verifyDbFile() + if err != nil { + return err + } + if !valid { + return errors.New("Invalid database file") + } - cleanupDatabases(IPv4DbName, IPv6DbName) + err = renameDbFile() + if err != nil { + return err + } + + cleanupOldDatabases() // removes all the old versions of the db file and leaves the one with the latest timestamp + cleanupFolder() // cleans the folder from all the other files that are not needed (zip, readme, license) return nil } +// NEW VERSION +func getDatabaseFile() string { + latestFile := findLatestDbFile() + + if latestFile == "" || needsUpdate() { + updateDb() + } + return findLatestDbFile() + +} + // ------------------------------------------------- // func isIPv4(ip string) bool { @@ -344,6 +420,31 @@ func isIPv6(ip string) bool { return match } +// some fields are commented because we don't have data for them +func mapTempIpInfoToApiMsg(data ip2proxy.IP2ProxyRecord, ip string) models.IpApiMsg { + return models.IpApiMsg{ + IP: ip, + Status: "success", + // Continent: "", + // ContinentCode: "", + Country: data.CountryLong, + CountryCode: data.CountryShort, + Region: data.Region, + // RegionName: "", + City: data.City, + // Zip: "", + // Lat: "", + // Lon: "", + Isp: data.Isp, + // Org: "", + // As: "", + // AsName: "", + Mobile: data.UsageType == "MOB", + Proxy: data.ProxyType != "", + // Hosting: false, + } +} + // ------------------------------------------------- // func (c *IpLocator) LocateIP(ip string) { @@ -422,8 +523,7 @@ func locate(ip string) (ip2proxy.IP2ProxyRecord, error) { return ip2proxy.IP2ProxyRecord{}, fmt.Errorf("invalid IP address") } - dbFile := getDatabaseFile(ip) - //todo: unzip db file and clean up and use the new name of the db + dbFile := getDatabaseFile() db, err := ip2proxy.OpenDB(DatabaseDir + dbFile) if err != nil { return ip2proxy.IP2ProxyRecord{}, err @@ -461,6 +561,16 @@ func (c *IpLocator) locatorRoutine() { select { case ip := <-c.locationRequest: respC := c.locateIp(ip) + resp := <-respC + if resp.Err != nil { + log.Error("error while locating IP -", resp.Err.Error()) + continue + } + if resp.IpInfo.IsEmpty() { + log.Error("empty response from IP-API") + continue + } + c.dbClient.PersistToDB(resp.IpInfo) case <-c.ctx.Done(): return } @@ -491,7 +601,7 @@ func CallIpApi(ip string) (ipInfo models.IpInfo, delay time.Duration, err error) } var apiMsg models.IpApiMsg - apiMsg = models.mapTempIpInfoToApiMsg(tempInfo, ip) + apiMsg = mapTempIpInfoToApiMsg(tempInfo, ip) ipInfo.ExpirationTime = time.Now().UTC().Add(defaultIpTTL) ipInfo.IpApiMsg = apiMsg From a4d83a03c2f1da63749e193d93664e1644c59674 Mon Sep 17 00:00:00 2001 From: 0x00badcode Date: Fri, 26 Jan 2024 20:58:33 +0000 Subject: [PATCH 3/3] updated the docker-compose files to access volumes --- docker-compose.yaml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docker-compose.yaml b/docker-compose.yaml index 80ceefa..f05e510 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -55,6 +55,7 @@ services: build: context: . dockerfile: Dockerfile + env_file: .env command: | eth2 --log-level=${CRAWLER_LOG_LEVEL} @@ -72,3 +73,5 @@ services: ports: - "${CRAWLER_PORT}:9020" - "127.0.0.1:${CRAWLER_METRICS_PORT}:9080" + volumes: + - ./pkg/utils/apis/database:/app/database