From 8b127b1025d9d79e6015902a2de05e8324701af6 Mon Sep 17 00:00:00 2001 From: Vadimka Komissarov Date: Sat, 11 Dec 2021 19:11:55 +0000 Subject: [PATCH] feature/queue-with-rpc - pre final draft #5 --- README.md | 3 ++ cloner/api.go | 30 ++++++++++++- cloner/asset.go | 6 ++- cloner/cloner.go | 5 ++- cloner/nexus.go | 110 ++++++++++++++++++++++++++++++++++++++++++++--- cloner/queue.go | 49 +++++++++++++++------ cloner/rpc.go | 53 ++++++++++++++++++----- main.go | 4 ++ 8 files changed, 227 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 48fd929..49c0427 100644 --- a/README.md +++ b/README.md @@ -104,6 +104,9 @@ The second method is collection via Nexus RPC. As I saw, there is no documentati By default the application is use API for all communications with Nexus. If you have some troubles with this method, try RPC with --skip-download and --skip-upload for verifing that RPC method has no crashes with your Nexus installation. +## Upload method and rebuild repository metadata +Repair - Rebuild Maven repository metadata (maven-metadata.xml) + ## Testing There is no test files, sorry =( diff --git a/cloner/api.go b/cloner/api.go index b36f68c..262d880 100644 --- a/cloner/api.go +++ b/cloner/api.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "encoding/json" "errors" + "fmt" "io" "io/ioutil" "net/http" @@ -104,7 +105,7 @@ func (m *nexusApi) getNexusFile(url string, file *os.File) (e error) { return } -func (m *nexusApi) putNexusFile(url string, body *bytes.Buffer, contentType string) (e error) { +func (m *nexusApi) postNexusFile(url string, body *bytes.Buffer, contentType string) (e error) { var req *http.Request if req, e = http.NewRequest("POST", url, body); e != nil { return @@ -119,6 +120,7 @@ func (m *nexusApi) putNexusFile(url string, body *bytes.Buffer, contentType stri if rsp, e = m.Client.Do(req); e != nil { return } + defer rsp.Body.Close() if gIsDebug { // fmt.Println(m.dumpNexusRequest(req)) @@ -126,6 +128,32 @@ func (m *nexusApi) putNexusFile(url string, body *bytes.Buffer, contentType stri } if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusNoContent { + tmp, _ := ioutil.ReadAll(rsp.Body) + fmt.Println(string(tmp)) + gLog.Warn().Int("status", rsp.StatusCode).Msg("Abnormal API response! Check it immediately!") + return nxsErrRq404 + } + + return +} + +func (m *nexusApi) putNexusFile(url string, body io.Reader) (e error) { + var req *http.Request + if req, e = http.NewRequest("PUT", url, body); e != nil { + return + } + + m.authorizeNexusRequest(req) + + var rsp *http.Response + if rsp, e = m.Client.Do(req); e != nil { + return + } + defer rsp.Body.Close() + + if rsp.StatusCode != http.StatusCreated { + tmp, _ := ioutil.ReadAll(rsp.Body) + fmt.Println(string(tmp)) gLog.Warn().Int("status", rsp.StatusCode).Msg("Abnormal API response! Check it immediately!") return nxsErrRq404 } diff --git a/cloner/asset.go b/cloner/asset.go index 9b51383..5370a56 100644 --- a/cloner/asset.go +++ b/cloner/asset.go @@ -15,6 +15,7 @@ type NexusAsset2 interface { // getBinaryFile() (*os.File, error) getTemporaryFile(string) (*os.File, error) isFileExists(string) (*os.File, error) + getTemporaryFilePath(string) (string, error) getDownloadUrl(string, *url.URL) (string, error) getExtension() (string, error) @@ -23,9 +24,12 @@ type NexusAsset2 interface { getVersion() (string, error) getId() (string, error) getAssetFd() *os.File + getClassifier() (string, error) + getBaseVersion() (string, error) - setDownloaded() addAttributes(map[string]json.RawMessage) error + setDownloaded() + deleteAsset() } type ( diff --git a/cloner/cloner.go b/cloner/cloner.go index 0bfc693..1576092 100644 --- a/cloner/cloner.go +++ b/cloner/cloner.go @@ -137,8 +137,8 @@ func (m *Cloner) syncRPC(ep chan error) (e error) { m.mainDispatcher = newDispatcher(gCli.Int("queue-buffer"), gCli.Int("queue-workers-capacity"), gCli.Int("queue-workers")) gQueue = m.mainDispatcher + wg.Add(1) go func() { - wg.Add(1) defer wg.Done() gLog.Info().Msg("Main queue spawning ...") @@ -148,8 +148,9 @@ func (m *Cloner) syncRPC(ep chan error) (e error) { if e = m.srcNexus.createTemporaryDirectory(); e != nil { return } + m.dstNexus.setTemporaryDirectory(m.srcNexus.getTemporaryDirectory()) - if e = m.srcNexus.downloadMissingAssetsRPC(missAssets); e != nil { + if e = m.srcNexus.downloadMissingAssetsRPC(missAssets, m.dstNexus); e != nil { return } diff --git a/cloner/nexus.go b/cloner/nexus.go index 93c97b6..368d1f0 100644 --- a/cloner/nexus.go +++ b/cloner/nexus.go @@ -374,7 +374,7 @@ func (m *nexus) getTemporaryDirectory() string { return m.tempPath } -func (m *nexus) downloadMissingAssetsRPC(assets []NexusAsset2) (e error) { +func (m *nexus) downloadMissingAssetsRPC(assets []NexusAsset2, dstNexus *nexus) (e error) { var dwnListLen = len(assets) gLog.Info().Msgf("There are %d marked for downloading.", dwnListLen) @@ -387,16 +387,12 @@ func (m *nexus) downloadMissingAssetsRPC(assets []NexusAsset2) (e error) { for _, asset := range assets { gQueue.newJob(&job{ action: jobActDownloadAsset, - payload: []interface{}{m, asset}, + payload: []interface{}{m, asset, dstNexus}, }) } return } -func (m *nexus) uploadAssetRPC(asset NexusAsset2) (e error) { - return -} - func (m *nexus) downloadAssetRPC(asset NexusAsset2) (e error) { var dwnUrl string if dwnUrl, e = asset.getDownloadUrl(m.repository, m.endpoint); e != nil { @@ -465,6 +461,105 @@ func (m *nexus) downloadMissingAssets(assets []*NexusAsset) (e error) { return nil } +func (m *nexus) deleteAssetTemporaryFile(asset NexusAsset2) (e error) { + var filepath string + if filepath, e = asset.getTemporaryFilePath(m.tempPath); e != nil { + return + } + + if e = os.Remove(filepath); e != nil { + gLog.Error().Msg("Could not delete file!") + return + } + + gLog.Debug().Msg("Uploaded assets has been successfully deleted from local filesystem.") + asset.deleteAsset() + runtime.GC() + return +} + +// if used it, do not forget about "Repair - Rebuild Maven repository metadata (maven-metadata.xml)" task +func (m *nexus) uploadAssetHttpFormatRPC(asset NexusAsset2) (e error) { + var fd *os.File + if fd, e = asset.isFileExists(m.tempPath); e != nil { + return + } + defer fd.Close() + + var assetReqUri string + if assetReqUri, e = asset.getDownloadUrl(m.repository, m.endpoint); e != nil { + return + } + + var rrl *url.URL + if rrl, e = url.Parse(assetReqUri); e != nil { + return + } + + if e = m.api.putNexusFile(rrl.String(), fd); e != nil { + return + } + + return +} + +func (m *nexus) uploadAssetRPC(asset NexusAsset2) (e error) { + var fd *os.File + if fd, e = asset.isFileExists(m.tempPath); e != nil { + return + } + defer fd.Close() + + var apiForm = make(map[string]io.Reader) + apiForm["asset0"] = fd + var classifier, extension, groupId, artifactId, version string + if classifier, e = asset.getClassifier(); e != nil { + return + } + if extension, e = asset.getExtension(); e != nil { + return + } + if groupId, e = asset.getGroupId(); e != nil { + return + } + if artifactId, e = asset.getArtifactId(); e != nil { + return + } + if version, e = asset.getVersion(); e != nil { + return + } + + apiForm["asset0.classifier"] = strings.NewReader(classifier) + apiForm["asset0.extension"] = strings.NewReader(extension) + apiForm["groupId"] = strings.NewReader(groupId) + apiForm["artifactId"] = strings.NewReader(artifactId) + apiForm["version"] = strings.NewReader(version) + apiForm["generate-pom"] = strings.NewReader("on") + + var buffer *bytes.Buffer + var contentType string + if buffer, contentType, e = m.getNexusFileMeta(apiForm); e != nil { + gLog.Error().Err(e).Str("filename", asset.getHumanReadbleName()). + Msg("Could not get meta data for the asset's file.") + return + } + + var rrl *url.URL + if rrl, e = m.endpoint.Parse("/service/rest/v1/components"); e != nil { + return + } + + var rgs = &url.Values{} + rgs.Set("repository", m.repository) + rrl.RawQuery = rgs.Encode() + + if e = m.api.postNexusFile(rrl.String(), buffer, contentType); e != nil { + return + } + + return +} + func (m *nexus) uploadMissingAssets(assets []*NexusAsset) (e error) { var isErrored bool var assetsCount = len(assets) @@ -477,6 +572,7 @@ func (m *nexus) uploadMissingAssets(assets []*NexusAsset) (e error) { Msg("Could not find the asset's file. Asset will be skipped!") continue } + defer file.Close() gLog.Debug().Msg("asset - " + asset.getHumanReadbleName()) @@ -518,7 +614,7 @@ func (m *nexus) uploadMissingAssets(assets []*NexusAsset) (e error) { rgs.Set("repository", m.repository) rrl.RawQuery = rgs.Encode() - if e = m.api.putNexusFile(rrl.String(), body, contentType); e != nil { + if e = m.api.postNexusFile(rrl.String(), body, contentType); e != nil { isErrored = true continue } diff --git a/cloner/queue.go b/cloner/queue.go index eb6cccd..8e91ae5 100644 --- a/cloner/queue.go +++ b/cloner/queue.go @@ -255,25 +255,29 @@ func (m *worker) setStatus(status uint8) { func (m *worker) doJob(j *job) { switch j.action { case jobActParseAsset: - nexus := j.payload[0].(*nexus) - if e := nexus.getRepositoryAssetsRPC(j.payload[1].(string)); e != nil { + nxs := j.payload[0].(*nexus) + if e := nxs.getRepositoryAssetsRPC(j.payload[1].(string)); e != nil { m.errors <- j.newError(e) + return } case jobActGetAsset: - nexus := j.payload[0].(*nexus) - if e := nexus.getRepositoryAssetInfo(j.payload[1].(NexusAsset2)); e != nil { + nxs := j.payload[0].(*nexus) + if e := nxs.getRepositoryAssetInfo(j.payload[1].(NexusAsset2)); e != nil { m.errors <- j.newError(e) + return } case jobActFindAsset: case jobActDownloadAsset: - nexus := j.payload[0].(*nexus) + nxsFrom := j.payload[0].(*nexus) asset := j.payload[1].(NexusAsset2) + nxsTo := j.payload[2].(*nexus) - if e := nexus.downloadAssetRPC(asset); e != nil { + if e := nxsFrom.downloadAssetRPC(asset); e != nil { m.errors <- j.newError(e) + return } - nexus.incDownloadedAssets() + nxsFrom.incDownloadedAssets() gLog.Info().Msgf("Asset %s has been downloaded successfully.", asset.getHumanReadbleName()) if gCli.Bool("skip-upload") { @@ -283,19 +287,40 @@ func (m *worker) doJob(j *job) { gQueue.newJob(&job{ action: jobActUploadAsset, - payload: []interface{}{nexus, asset}, + payload: []interface{}{nxsTo, asset}, }) case jobActUploadAsset: - nexus := j.payload[0].(*nexus) + nxs := j.payload[0].(*nexus) asset := j.payload[1].(NexusAsset2) - if e := nexus.uploadAssetRPC(asset); e != nil { - return + if gCli.Bool("use-put-upload") { + if e := nxs.uploadAssetHttpFormatRPC(asset); e != nil { + m.errors <- j.newError(e) + return + } + } else { + if e := nxs.uploadAssetRPC(asset); e != nil { + m.errors <- j.newError(e) + return + } } - nexus.incUploadedAssets() + nxs.incUploadedAssets() gLog.Info().Msgf("Asset %s has been upload successfully.", asset.getHumanReadbleName()) + gQueue.newJob(&job{ + action: jobActDeleteAsset, + payload: []interface{}{nxs, asset}, + }) + case jobActDeleteAsset: + nxs := j.payload[0].(*nexus) + asset := j.payload[1].(NexusAsset2) + + if e := nxs.deleteAssetTemporaryFile(asset); e != nil { + m.errors <- j.newError(e) + return + } + // case jobActCustomFunc: // if j.payloadFunc != nil { // if e := j.payloadFunc(j.payload); e != nil { diff --git a/cloner/rpc.go b/cloner/rpc.go index 791faab..6da2f3a 100644 --- a/cloner/rpc.go +++ b/cloner/rpc.go @@ -66,11 +66,12 @@ type ( Sha512 string `json:"sha512,omitempty"` } rpcAssetAttrsMaven2 struct { - Extension string `json:"extension,omitempty"` - GroupId string `json:"groupId,omitempty"` - ArtifactId string `json:"artifactId,omitempty"` - Version string `json:"version,omitempty"` - Classifier string `json:"classifier,omitempty"` + Extension string `json:"extension,omitempty"` + GroupId string `json:"groupId,omitempty"` + ArtifactId string `json:"artifactId,omitempty"` + Version string `json:"version,omitempty"` + Classifier string `json:"classifier,omitempty"` + BaseVersion string `json:"baseVersion,omitempty"` } ) @@ -188,6 +189,26 @@ func (m *rpcAsset) getId() (data string, e error) { return } +func (m *rpcAsset) getClassifier() (data string, e error) { + defer m.catchPanic(&e) + + if len(m.Attributes.Maven2.Classifier) != 0 { + data = m.Attributes.Maven2.Classifier + } + + return +} + +func (m *rpcAsset) getBaseVersion() (data string, e error) { + defer m.catchPanic(&e) + + if len(m.Attributes.Maven2.BaseVersion) != 0 { + data = m.Attributes.Maven2.BaseVersion + } + + return +} + func (m *rpcAsset) getHumanReadbleName() string { return strings.ReplaceAll(m.Name, "/", "_") } @@ -207,6 +228,19 @@ func (m *rpcAsset) isFileExists(tmpdir string) (file *os.File, e error) { return } +func (m *rpcAsset) getTemporaryFilePath(tmpdir string) (filepath string, e error) { + filename := path.Base(m.Name) + filepath = tmpdir + "/" + filename + if _, e = os.Stat(filepath); e != nil { + if errors.Is(e, os.ErrNotExist) { + gLog.Error().Err(e).Msg("There is internal error in asset temporary file processing! Given asset was not found!") + return + } + return + } + return +} + // TODO // OPTIMIZE - https://pkg.go.dev/os@go1.17.2#OpenFile // !! Note - returned FD must be closed!! @@ -304,10 +338,9 @@ func (m *rpcAsset) getAssetFd() *os.File { return m.dwnedFd } func (m *rpcAsset) isDownloaded() bool { return m.dwnedSuccess } func (m *rpcAsset) setDownloaded() { m.dwnedSuccess = true } -func (m *rpcAsset) uploadAsset() (e error) { - return -} - -func (m *rpcAsset) deleteAsset() (e error) { +func (m *rpcAsset) deleteAsset() { + // rewrite current struct with empty object for memory free + // it will be freed by the GC in him next iteration + *m = rpcAsset{} return } diff --git a/main.go b/main.go index 0be5765..c83df87 100644 --- a/main.go +++ b/main.go @@ -89,6 +89,10 @@ func main() { Usage: "Regexp value with `path` for syncing.", Value: ".*", }, + cli.BoolFlag{ + Name: "use-put-upload", + Usage: "Use PUT asset upload instead of POST via API.", + }, cli.BoolFlag{ Name: "use-rpc", Usage: "Use RPC protocol for assets collection on source nexus server. Unstable, testing feature.",