Skip to content
This repository has been archived by the owner on Oct 20, 2024. It is now read-only.

Commit

Permalink
feature/queue-with-rpc - pre final draft #5
Browse files Browse the repository at this point in the history
  • Loading branch information
MindHunter86 committed Dec 11, 2021
1 parent c571bc1 commit 8b127b1
Show file tree
Hide file tree
Showing 8 changed files with 227 additions and 33 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ The second method is collection via Nexus RPC. As I saw, there is no documentati
By default the application is use API for all communications with Nexus. If you have some troubles with this method, try RPC with --skip-download and --skip-upload for verifing that RPC method has no crashes with your Nexus installation.


## Upload method and rebuild repository metadata
Repair - Rebuild Maven repository metadata (maven-metadata.xml)

## Testing
There is no test files, sorry =(

Expand Down
30 changes: 29 additions & 1 deletion cloner/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"crypto/tls"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
Expand Down Expand Up @@ -104,7 +105,7 @@ func (m *nexusApi) getNexusFile(url string, file *os.File) (e error) {
return
}

func (m *nexusApi) putNexusFile(url string, body *bytes.Buffer, contentType string) (e error) {
func (m *nexusApi) postNexusFile(url string, body *bytes.Buffer, contentType string) (e error) {
var req *http.Request
if req, e = http.NewRequest("POST", url, body); e != nil {
return
Expand All @@ -119,13 +120,40 @@ func (m *nexusApi) putNexusFile(url string, body *bytes.Buffer, contentType stri
if rsp, e = m.Client.Do(req); e != nil {
return
}
defer rsp.Body.Close()

if gIsDebug {
// fmt.Println(m.dumpNexusRequest(req))
// fmt.Println(m.dumpNexusResponse(rsp))
}

if rsp.StatusCode != http.StatusOK && rsp.StatusCode != http.StatusNoContent {
tmp, _ := ioutil.ReadAll(rsp.Body)
fmt.Println(string(tmp))
gLog.Warn().Int("status", rsp.StatusCode).Msg("Abnormal API response! Check it immediately!")
return nxsErrRq404
}

return
}

func (m *nexusApi) putNexusFile(url string, body io.Reader) (e error) {
var req *http.Request
if req, e = http.NewRequest("PUT", url, body); e != nil {
return
}

m.authorizeNexusRequest(req)

var rsp *http.Response
if rsp, e = m.Client.Do(req); e != nil {
return
}
defer rsp.Body.Close()

if rsp.StatusCode != http.StatusCreated {
tmp, _ := ioutil.ReadAll(rsp.Body)
fmt.Println(string(tmp))
gLog.Warn().Int("status", rsp.StatusCode).Msg("Abnormal API response! Check it immediately!")
return nxsErrRq404
}
Expand Down
6 changes: 5 additions & 1 deletion cloner/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ type NexusAsset2 interface {
// getBinaryFile() (*os.File, error)
getTemporaryFile(string) (*os.File, error)
isFileExists(string) (*os.File, error)
getTemporaryFilePath(string) (string, error)

getDownloadUrl(string, *url.URL) (string, error)
getExtension() (string, error)
Expand All @@ -23,9 +24,12 @@ type NexusAsset2 interface {
getVersion() (string, error)
getId() (string, error)
getAssetFd() *os.File
getClassifier() (string, error)
getBaseVersion() (string, error)

setDownloaded()
addAttributes(map[string]json.RawMessage) error
setDownloaded()
deleteAsset()
}

type (
Expand Down
5 changes: 3 additions & 2 deletions cloner/cloner.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,8 @@ func (m *Cloner) syncRPC(ep chan error) (e error) {
m.mainDispatcher = newDispatcher(gCli.Int("queue-buffer"), gCli.Int("queue-workers-capacity"), gCli.Int("queue-workers"))
gQueue = m.mainDispatcher

wg.Add(1)
go func() {
wg.Add(1)
defer wg.Done()

gLog.Info().Msg("Main queue spawning ...")
Expand All @@ -148,8 +148,9 @@ func (m *Cloner) syncRPC(ep chan error) (e error) {
if e = m.srcNexus.createTemporaryDirectory(); e != nil {
return
}
m.dstNexus.setTemporaryDirectory(m.srcNexus.getTemporaryDirectory())

if e = m.srcNexus.downloadMissingAssetsRPC(missAssets); e != nil {
if e = m.srcNexus.downloadMissingAssetsRPC(missAssets, m.dstNexus); e != nil {
return
}

Expand Down
110 changes: 103 additions & 7 deletions cloner/nexus.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (m *nexus) getTemporaryDirectory() string {
return m.tempPath
}

func (m *nexus) downloadMissingAssetsRPC(assets []NexusAsset2) (e error) {
func (m *nexus) downloadMissingAssetsRPC(assets []NexusAsset2, dstNexus *nexus) (e error) {
var dwnListLen = len(assets)

gLog.Info().Msgf("There are %d marked for downloading.", dwnListLen)
Expand All @@ -387,16 +387,12 @@ func (m *nexus) downloadMissingAssetsRPC(assets []NexusAsset2) (e error) {
for _, asset := range assets {
gQueue.newJob(&job{
action: jobActDownloadAsset,
payload: []interface{}{m, asset},
payload: []interface{}{m, asset, dstNexus},
})
}
return
}

func (m *nexus) uploadAssetRPC(asset NexusAsset2) (e error) {
return
}

func (m *nexus) downloadAssetRPC(asset NexusAsset2) (e error) {
var dwnUrl string
if dwnUrl, e = asset.getDownloadUrl(m.repository, m.endpoint); e != nil {
Expand Down Expand Up @@ -465,6 +461,105 @@ func (m *nexus) downloadMissingAssets(assets []*NexusAsset) (e error) {
return nil
}

func (m *nexus) deleteAssetTemporaryFile(asset NexusAsset2) (e error) {
var filepath string
if filepath, e = asset.getTemporaryFilePath(m.tempPath); e != nil {
return
}

if e = os.Remove(filepath); e != nil {
gLog.Error().Msg("Could not delete file!")
return
}

gLog.Debug().Msg("Uploaded assets has been successfully deleted from local filesystem.")
asset.deleteAsset()
runtime.GC()
return
}

// if used it, do not forget about "Repair - Rebuild Maven repository metadata (maven-metadata.xml)" task
func (m *nexus) uploadAssetHttpFormatRPC(asset NexusAsset2) (e error) {
var fd *os.File
if fd, e = asset.isFileExists(m.tempPath); e != nil {
return
}
defer fd.Close()

var assetReqUri string
if assetReqUri, e = asset.getDownloadUrl(m.repository, m.endpoint); e != nil {
return
}

var rrl *url.URL
if rrl, e = url.Parse(assetReqUri); e != nil {
return
}

if e = m.api.putNexusFile(rrl.String(), fd); e != nil {
return
}

return
}

func (m *nexus) uploadAssetRPC(asset NexusAsset2) (e error) {
var fd *os.File
if fd, e = asset.isFileExists(m.tempPath); e != nil {
return
}
defer fd.Close()

var apiForm = make(map[string]io.Reader)
apiForm["asset0"] = fd
var classifier, extension, groupId, artifactId, version string
if classifier, e = asset.getClassifier(); e != nil {
return
}
if extension, e = asset.getExtension(); e != nil {
return
}
if groupId, e = asset.getGroupId(); e != nil {
return
}
if artifactId, e = asset.getArtifactId(); e != nil {
return
}
if version, e = asset.getVersion(); e != nil {
return
}

apiForm["asset0.classifier"] = strings.NewReader(classifier)
apiForm["asset0.extension"] = strings.NewReader(extension)
apiForm["groupId"] = strings.NewReader(groupId)
apiForm["artifactId"] = strings.NewReader(artifactId)
apiForm["version"] = strings.NewReader(version)
apiForm["generate-pom"] = strings.NewReader("on")

var buffer *bytes.Buffer
var contentType string
if buffer, contentType, e = m.getNexusFileMeta(apiForm); e != nil {
gLog.Error().Err(e).Str("filename", asset.getHumanReadbleName()).
Msg("Could not get meta data for the asset's file.")
return
}

var rrl *url.URL
if rrl, e = m.endpoint.Parse("/service/rest/v1/components"); e != nil {
return
}

var rgs = &url.Values{}
rgs.Set("repository", m.repository)
rrl.RawQuery = rgs.Encode()

if e = m.api.postNexusFile(rrl.String(), buffer, contentType); e != nil {
return
}

return
}

func (m *nexus) uploadMissingAssets(assets []*NexusAsset) (e error) {
var isErrored bool
var assetsCount = len(assets)
Expand All @@ -477,6 +572,7 @@ func (m *nexus) uploadMissingAssets(assets []*NexusAsset) (e error) {
Msg("Could not find the asset's file. Asset will be skipped!")
continue
}
defer file.Close()

gLog.Debug().Msg("asset - " + asset.getHumanReadbleName())

Expand Down Expand Up @@ -518,7 +614,7 @@ func (m *nexus) uploadMissingAssets(assets []*NexusAsset) (e error) {
rgs.Set("repository", m.repository)
rrl.RawQuery = rgs.Encode()

if e = m.api.putNexusFile(rrl.String(), body, contentType); e != nil {
if e = m.api.postNexusFile(rrl.String(), body, contentType); e != nil {
isErrored = true
continue
}
Expand Down
49 changes: 37 additions & 12 deletions cloner/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,25 +255,29 @@ func (m *worker) setStatus(status uint8) {
func (m *worker) doJob(j *job) {
switch j.action {
case jobActParseAsset:
nexus := j.payload[0].(*nexus)
if e := nexus.getRepositoryAssetsRPC(j.payload[1].(string)); e != nil {
nxs := j.payload[0].(*nexus)
if e := nxs.getRepositoryAssetsRPC(j.payload[1].(string)); e != nil {
m.errors <- j.newError(e)
return
}
case jobActGetAsset:
nexus := j.payload[0].(*nexus)
if e := nexus.getRepositoryAssetInfo(j.payload[1].(NexusAsset2)); e != nil {
nxs := j.payload[0].(*nexus)
if e := nxs.getRepositoryAssetInfo(j.payload[1].(NexusAsset2)); e != nil {
m.errors <- j.newError(e)
return
}
case jobActFindAsset:
case jobActDownloadAsset:
nexus := j.payload[0].(*nexus)
nxsFrom := j.payload[0].(*nexus)
asset := j.payload[1].(NexusAsset2)
nxsTo := j.payload[2].(*nexus)

if e := nexus.downloadAssetRPC(asset); e != nil {
if e := nxsFrom.downloadAssetRPC(asset); e != nil {
m.errors <- j.newError(e)
return
}

nexus.incDownloadedAssets()
nxsFrom.incDownloadedAssets()
gLog.Info().Msgf("Asset %s has been downloaded successfully.", asset.getHumanReadbleName())

if gCli.Bool("skip-upload") {
Expand All @@ -283,19 +287,40 @@ func (m *worker) doJob(j *job) {

gQueue.newJob(&job{
action: jobActUploadAsset,
payload: []interface{}{nexus, asset},
payload: []interface{}{nxsTo, asset},
})
case jobActUploadAsset:
nexus := j.payload[0].(*nexus)
nxs := j.payload[0].(*nexus)
asset := j.payload[1].(NexusAsset2)

if e := nexus.uploadAssetRPC(asset); e != nil {
return
if gCli.Bool("use-put-upload") {
if e := nxs.uploadAssetHttpFormatRPC(asset); e != nil {
m.errors <- j.newError(e)
return
}
} else {
if e := nxs.uploadAssetRPC(asset); e != nil {
m.errors <- j.newError(e)
return
}
}

nexus.incUploadedAssets()
nxs.incUploadedAssets()
gLog.Info().Msgf("Asset %s has been upload successfully.", asset.getHumanReadbleName())

gQueue.newJob(&job{
action: jobActDeleteAsset,
payload: []interface{}{nxs, asset},
})
case jobActDeleteAsset:
nxs := j.payload[0].(*nexus)
asset := j.payload[1].(NexusAsset2)

if e := nxs.deleteAssetTemporaryFile(asset); e != nil {
m.errors <- j.newError(e)
return
}

// case jobActCustomFunc:
// if j.payloadFunc != nil {
// if e := j.payloadFunc(j.payload); e != nil {
Expand Down
Loading

0 comments on commit 8b127b1

Please sign in to comment.