diff --git a/clients/input_copy.go b/clients/input_copy.go index 4c6ba375..76956c18 100644 --- a/clients/input_copy.go +++ b/clients/input_copy.go @@ -287,6 +287,30 @@ func GetFile(ctx context.Context, requestID, url string, dStorage *DStorageDownl } } +func GetFileWithBackup(ctx context.Context, requestID, url string, dStorage *DStorageDownload) (io.ReadCloser, string, error) { + rc, err := GetFile(ctx, requestID, url, dStorage) + if err == nil { + return rc, url, nil + } + + backupURL := config.GetStorageBackupURL(url) + if backupURL == "" { + return nil, url, err + } + rc, backupErr := GetFile(ctx, requestID, backupURL, dStorage) + if backupErr == nil { + return rc, backupURL, nil + } + + // prioritize retriable errors in the response so we don't skip retries + if !catErrs.IsUnretriable(err) { + return nil, url, err + } else if !catErrs.IsUnretriable(backupErr) { + return nil, backupURL, backupErr + } + return nil, url, err +} + var retryableHttpClient = newRetryableHttpClient() func newRetryableHttpClient() *http.Client { diff --git a/clients/manifest.go b/clients/manifest.go index 99fa35ca..8337d804 100644 --- a/clients/manifest.go +++ b/clients/manifest.go @@ -1,6 +1,7 @@ package clients import ( + "bytes" "context" "fmt" "io" @@ -11,19 +12,23 @@ import ( "sort" "strconv" "strings" + "sync" "time" "github.com/cenkalti/backoff/v4" "github.com/grafov/m3u8" + "github.com/livepeer/catalyst-api/config" + "github.com/livepeer/catalyst-api/errors" "github.com/livepeer/catalyst-api/video" ) const ( - MasterManifestFilename = "index.m3u8" - DashManifestFilename = "index.mpd" - ClipManifestFilename = "clip.m3u8" - ManifestUploadTimeout = 5 * time.Minute - Fmp4PostfixDir = "fmp4" + MasterManifestFilename = "index.m3u8" + DashManifestFilename = "index.mpd" + ClipManifestFilename = "clip.m3u8" + ManifestUploadTimeout = 5 * time.Minute + Fmp4PostfixDir = "fmp4" + manifestNotFoundTolerance = 10 * time.Second ) func DownloadRetryBackoffLong() backoff.BackOff { @@ -33,27 +38,73 @@ func DownloadRetryBackoffLong() backoff.BackOff { var DownloadRetryBackoff = DownloadRetryBackoffLong func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.MediaPlaylist, error) { - var playlist m3u8.Playlist - var playlistType m3u8.ListType + playlist, playlistType, _, err := downloadManifest(requestID, sourceManifestOSURL) + if err != nil { + return m3u8.MediaPlaylist{}, err + } + return convertToMediaPlaylist(playlist, playlistType) +} + +// RecordingBackupCheck checks whether manifests and segments are available on the primary or +// the backup store and returns a URL to new manifest with absolute segment URLs pointing to either primary or +// backup locations depending on where the segments are available. +func RecordingBackupCheck(requestID string, primaryManifestURL, osTransferURL *url.URL) (*url.URL, error) { + if config.GetStorageBackupURL(primaryManifestURL.String()) == "" { + return primaryManifestURL, nil + } + playlist, playlistType, err := downloadManifestWithBackup(requestID, primaryManifestURL.String()) + if err != nil { + return nil, fmt.Errorf("error downloading manifest: %w", err) + } + mediaPlaylist, err := convertToMediaPlaylist(playlist, playlistType) + if err != nil { + return nil, err + } + + // Check whether segments are available from primary or backup storage dStorage := NewDStorageDownload() - err := backoff.Retry(func() error { - rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage) + for _, segment := range mediaPlaylist.GetAllSegments() { + segURL, err := ManifestURLToSegmentURL(primaryManifestURL.String(), segment.URI) if err != nil { - return fmt.Errorf("error downloading manifest: %s", err) + return nil, fmt.Errorf("error getting segment URL: %w", err) } - defer rc.Close() - - playlist, playlistType, err = m3u8.DecodeFrom(rc, true) + var actualSegURL string + err = backoff.Retry(func() error { + var rc io.ReadCloser + rc, actualSegURL, err = GetFileWithBackup(context.Background(), requestID, segURL.String(), dStorage) + if rc != nil { + rc.Close() + } + return err + }, DownloadRetryBackoff()) if err != nil { - return fmt.Errorf("error decoding manifest: %s", err) + return nil, fmt.Errorf("failed to find segment file %s: %w", segURL.Redacted(), err) } - return nil - }, DownloadRetryBackoff()) + segment.URI = actualSegURL + } + + // write the manifest to storage and update the manifestURL variable + outputStorageURL := osTransferURL.JoinPath("input.m3u8") + err = backoff.Retry(func() error { + return UploadToOSURL(outputStorageURL.String(), "", strings.NewReader(mediaPlaylist.String()), ManifestUploadTimeout) + }, UploadRetryBackoff()) if err != nil { - return m3u8.MediaPlaylist{}, err + return nil, fmt.Errorf("failed to upload rendition playlist: %w", err) + } + manifestURL, err := SignURL(outputStorageURL) + if err != nil { + return nil, fmt.Errorf("failed to sign manifest url: %w", err) + } + + newURL, err := url.Parse(manifestURL) + if err != nil { + return nil, fmt.Errorf("failed to parse new manifest URL: %w", err) } + return newURL, nil +} +func convertToMediaPlaylist(playlist m3u8.Playlist, playlistType m3u8.ListType) (m3u8.MediaPlaylist, error) { // We shouldn't ever receive Master playlists from the previous section if playlistType != m3u8.MEDIA { return m3u8.MediaPlaylist{}, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported") @@ -64,10 +115,86 @@ func DownloadRenditionManifest(requestID, sourceManifestOSURL string) (m3u8.Medi if !ok || mediaPlaylist == nil { return m3u8.MediaPlaylist{}, fmt.Errorf("failed to parse playlist as MediaPlaylist") } - return *mediaPlaylist, nil } +func downloadManifestWithBackup(requestID, sourceManifestOSURL string) (m3u8.Playlist, m3u8.ListType, error) { + var playlist, playlistBackup m3u8.Playlist + var playlistType, playlistTypeBackup m3u8.ListType + var size, sizeBackup int + var errPrimary, errBackup error + + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + playlist, playlistType, size, errPrimary = downloadManifest(requestID, sourceManifestOSURL) + }() + + backupManifestURL := config.GetStorageBackupURL(sourceManifestOSURL) + if backupManifestURL != "" { + wg.Add(1) + go func() { + defer wg.Done() + playlistBackup, playlistTypeBackup, sizeBackup, errBackup = downloadManifest(requestID, backupManifestURL) + }() + } + wg.Wait() + + // If the file is not found in either storage, return the not found err from + // the primary. Otherwise, return any error that is not a simple not found + // (only not found errors passthrough below) + primaryNotFound, backupNotFound := errors.IsObjectNotFound(errPrimary), errors.IsObjectNotFound(errBackup) + if primaryNotFound && backupNotFound { + return nil, 0, errPrimary + } + if errPrimary != nil && !primaryNotFound { + return nil, 0, errPrimary + } + if errBackup != nil && !backupNotFound { + return nil, 0, errBackup + } + + // Return the largest manifest as the most recent version + hasBackup := backupManifestURL != "" && errBackup == nil + if hasBackup && (errPrimary != nil || sizeBackup > size) { + return playlistBackup, playlistTypeBackup, nil + } + return playlist, playlistType, errPrimary +} + +func downloadManifest(requestID, sourceManifestOSURL string) (playlist m3u8.Playlist, playlistType m3u8.ListType, size int, err error) { + dStorage := NewDStorageDownload() + start := time.Now() + err = backoff.Retry(func() error { + rc, err := GetFile(context.Background(), requestID, sourceManifestOSURL, dStorage) + if err != nil { + if time.Since(start) > manifestNotFoundTolerance && errors.IsObjectNotFound(err) { + // bail out of the retries earlier for not found errors because it will be quite a common scenario + // where the backup manifest does not exist and we don't want to wait the whole 50s of retries for + // every recording job + return backoff.Permanent(err) + } + return err + } + defer rc.Close() + + data := new(bytes.Buffer) + _, err = data.ReadFrom(rc) + if err != nil { + return fmt.Errorf("error reading manifest: %s", err) + } + + size = data.Len() + playlist, playlistType, err = m3u8.Decode(*data, true) + if err != nil { + return fmt.Errorf("error decoding manifest: %s", err) + } + return nil + }, DownloadRetryBackoff()) + return +} + type SourceSegment struct { URL *url.URL DurationMillis int64 @@ -76,13 +203,7 @@ type SourceSegment struct { // Loop over each segment in a given manifest and convert it from a relative path to a full ObjectStore-compatible URL func GetSourceSegmentURLs(sourceManifestURL string, manifest m3u8.MediaPlaylist) ([]SourceSegment, error) { var urls []SourceSegment - for _, segment := range manifest.Segments { - // The segments list is a ring buffer - see https://github.com/grafov/m3u8/issues/140 - // and so we only know we've hit the end of the list when we find a nil element - if segment == nil { - break - } - + for _, segment := range manifest.GetAllSegments() { u, err := ManifestURLToSegmentURL(sourceManifestURL, segment.URI) if err != nil { return nil, err diff --git a/clients/manifest_test.go b/clients/manifest_test.go index 4808e328..ddda63ef 100644 --- a/clients/manifest_test.go +++ b/clients/manifest_test.go @@ -1,6 +1,8 @@ package clients import ( + "fmt" + "net/url" "os" "path/filepath" "strings" @@ -9,6 +11,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/grafov/m3u8" + "github.com/livepeer/catalyst-api/config" "github.com/livepeer/catalyst-api/video" "github.com/stretchr/testify/require" ) @@ -39,7 +42,8 @@ func TestDownloadRenditionManifestFailsWhenItCantFindTheManifest(t *testing.T) { defer func() { DownloadRetryBackoff = DownloadRetryBackoffLong }() _, err := DownloadRenditionManifest("blah", "/tmp/something/x.m3u8") require.Error(t, err) - require.Contains(t, err.Error(), "error downloading manifest") + require.Contains(t, err.Error(), "the specified file does not exist") + require.Contains(t, err.Error(), "ObjectNotFoundError") } func TestDownloadRenditionManifestFailsWhenItCantParseTheManifest(t *testing.T) { @@ -333,3 +337,131 @@ func createDummyMediaSegments() []*m3u8.MediaSegment { }, } } + +func TestDownloadRenditionManifestWithBackup(t *testing.T) { + completeManifest := `#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-TARGETDURATION:10 +#EXTINF:10.000000, +seg-0.ts +#EXTINF:10.000000, +seg-1.ts +#EXTINF:10.000000, +seg-2.ts +#EXTINF:10.000000, +seg-3.ts +#EXT-X-ENDLIST +` + inCompleteManifest := `#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-TARGETDURATION:10 +#EXTINF:10.000000, +seg-0.ts +#EXTINF:10.000000, +seg-1.ts +#EXT-X-ENDLIST +` + + tests := []struct { + name string + primaryManifest string + backupManifest string + primarySegments []string + backupSegments []string + }{ + { + name: "happy. all segments and manifest available on primary", + primaryManifest: completeManifest, + backupManifest: "", + primarySegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"}, + }, + { + name: "all segments and manifest available on backup", + primaryManifest: inCompleteManifest, + backupManifest: completeManifest, + backupSegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"}, + }, + { + name: "all segments on backup and newest manifest on primary", + primaryManifest: completeManifest, + backupManifest: inCompleteManifest, + backupSegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"}, + }, + { + name: "all segments on primary and newest manifest on backup", + primaryManifest: inCompleteManifest, + backupManifest: completeManifest, + primarySegments: []string{"seg-0.ts", "seg-1.ts", "seg-2.ts", "seg-3.ts"}, + }, + { + name: "segments split between primary and backup, newest manifest on primary", + primaryManifest: completeManifest, + backupManifest: inCompleteManifest, + primarySegments: []string{"seg-0.ts", "seg-2.ts"}, + backupSegments: []string{"seg-1.ts", "seg-3.ts"}, + }, + { + name: "segments split between primary and backup, newest manifest on backup", + primaryManifest: inCompleteManifest, + backupManifest: completeManifest, + primarySegments: []string{"seg-0.ts", "seg-2.ts"}, + backupSegments: []string{"seg-1.ts", "seg-3.ts"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + dir, err := os.MkdirTemp(os.TempDir(), "manifest-test-*") + require.NoError(t, err) + defer os.RemoveAll(dir) + + err = os.Mkdir(filepath.Join(dir, "primary"), 0755) + require.NoError(t, err) + err = os.Mkdir(filepath.Join(dir, "backup"), 0755) + require.NoError(t, err) + config.StorageFallbackURLs = map[string]string{filepath.Join(dir, "primary"): filepath.Join(dir, "backup")} + + err = os.WriteFile(filepath.Join(dir, "primary", "index.m3u8"), []byte(tt.primaryManifest), 0644) + require.NoError(t, err) + if tt.backupManifest != "" { + err = os.WriteFile(filepath.Join(dir, "backup", "index.m3u8"), []byte(tt.backupManifest), 0644) + require.NoError(t, err) + } + + for _, segment := range tt.primarySegments { + err = os.WriteFile(filepath.Join(dir, "primary", segment), []byte{}, 0644) + require.NoError(t, err) + } + for _, segment := range tt.backupSegments { + err = os.WriteFile(filepath.Join(dir, "backup", segment), []byte{}, 0644) + require.NoError(t, err) + } + + renditionUrl, err := RecordingBackupCheck("requestID", toUrl(t, filepath.Join(dir, "primary", "index.m3u8")), toUrl(t, filepath.Join(dir, "transfer"))) + require.NoError(t, err) + + file, err := os.Open(renditionUrl.String()) + require.NoError(t, err) + + // read resulting playlist + playlist, playlistType, err := m3u8.DecodeFrom(file, true) + require.NoError(t, err) + require.Equal(t, m3u8.MEDIA, playlistType) + mediaPlaylist, ok := playlist.(*m3u8.MediaPlaylist) + require.True(t, ok) + + require.Len(t, mediaPlaylist.GetAllSegments(), 4) + for i, segment := range mediaPlaylist.GetAllSegments() { + require.True(t, filepath.IsAbs(segment.URI)) + require.True(t, true, strings.HasSuffix(segment.URI, fmt.Sprintf("seg-%d.ts", i))) + } + }) + } +} + +func toUrl(t *testing.T, in string) *url.URL { + u, err := url.Parse(in) + require.NoError(t, err) + return u +} diff --git a/config/cli.go b/config/cli.go index 4243282e..f99634a7 100644 --- a/config/cli.go +++ b/config/cli.go @@ -59,6 +59,7 @@ type Cli struct { EncryptKey string VodDecryptPublicKey string VodDecryptPrivateKey string + StorageFallbackURLs map[string]string GateURL string DataURL string StreamHealthHookURL string diff --git a/config/config.go b/config/config.go index 487a1188..80b68e79 100644 --- a/config/config.go +++ b/config/config.go @@ -52,4 +52,6 @@ var ImportIPFSGatewayURLs []*url.URL var ImportArweaveGatewayURLs []*url.URL +var StorageFallbackURLs map[string]string + var HTTPInternalAddress string diff --git a/config/storage_backup_url.go b/config/storage_backup_url.go new file mode 100644 index 00000000..e197cd18 --- /dev/null +++ b/config/storage_backup_url.go @@ -0,0 +1,15 @@ +package config + +import "strings" + +// GetStorageBackupURL returns the backup URL for the given URL or an empty string if it doesn't exist. The backup URL +// is found by checking the `StorageFallbackURLs` global config map. If any of the primary URL prefixes (keys in map) +// are in `urlStr`, it is replaced with the backup URL prefix (associated value of the key in the map). +func GetStorageBackupURL(urlStr string) string { + for primary, backup := range StorageFallbackURLs { + if strings.HasPrefix(urlStr, primary) { + return strings.Replace(urlStr, primary, backup, 1) + } + } + return "" +} diff --git a/config/storage_backup_url_test.go b/config/storage_backup_url_test.go new file mode 100644 index 00000000..2ad4b369 --- /dev/null +++ b/config/storage_backup_url_test.go @@ -0,0 +1,31 @@ +package config + +import "testing" + +func TestGetStorageBackupURL(t *testing.T) { + StorageFallbackURLs = map[string]string{"https://storj.livepeer.com/catalyst-recordings-com/hls": "https://google.livepeer.com/catalyst-recordings-com/hls"} + defer func() { StorageFallbackURLs = nil }() + tests := []struct { + name string + urlStr string + want string + }{ + { + name: "should replace", + urlStr: "https://storj.livepeer.com/catalyst-recordings-com/hls/foo", + want: "https://google.livepeer.com/catalyst-recordings-com/hls/foo", + }, + { + name: "should not replace", + urlStr: "https://blah.livepeer.com/catalyst-recordings-com/hls/foo", + want: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := GetStorageBackupURL(tt.urlStr); got != tt.want { + t.Errorf("GetStorageBackupURL() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/go.mod b/go.mod index f264033d..e6585061 100644 --- a/go.mod +++ b/go.mod @@ -24,6 +24,7 @@ require ( github.com/livepeer/m3u8 v0.11.1 github.com/mileusna/useragent v1.3.4 github.com/minio/madmin-go v1.7.5 + github.com/minio/minio-go/v7 v7.0.45 github.com/mmcloughlin/geohash v0.10.0 github.com/peterbourgon/ff/v3 v3.4.0 github.com/pquerna/cachecontrol v0.2.0 @@ -52,6 +53,7 @@ require ( github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cucumber/gherkin/go/v26 v26.2.0 // indirect github.com/cucumber/messages/go/v21 v21.0.1 // indirect + github.com/dustin/go-humanize v1.0.0 // indirect github.com/eventials/go-tus v0.0.0-20220610120217-05d0564bb571 // indirect github.com/fatih/color v1.13.0 // indirect github.com/go-logr/logr v1.2.4 // indirect @@ -111,7 +113,7 @@ require ( github.com/mattn/go-isatty v0.0.19 // indirect github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect github.com/miekg/dns v1.1.50 // indirect - github.com/minio/minio-go/v7 v7.0.45 // indirect + github.com/minio/md5-simd v1.1.2 // indirect github.com/minio/sha256-simd v1.0.1 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect @@ -138,6 +140,7 @@ require ( github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect github.com/secure-io/sio-go v0.3.1 // indirect github.com/shoenig/go-m1cpu v0.1.6 // indirect + github.com/sirupsen/logrus v1.9.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.0 // indirect diff --git a/go.sum b/go.sum index c4ab2af4..a198dd7e 100644 --- a/go.sum +++ b/go.sum @@ -110,6 +110,8 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc= github.com/disintegration/imaging v1.6.2/go.mod h1:44/5580QXChDfwIclfc/PCwrr44amcmDAg8hxG0Ewe4= +github.com/dustin/go-humanize v1.0.0 h1:VSnTsYCnlFHaM2/igO1h6X3HA71jcobQuxemgkq4zYo= +github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= @@ -408,6 +410,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/cpuid/v2 v2.0.1/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.4 h1:acbojRNwl3o09bUq+yDCtZFc1aiwaAAxtcn8YkZXnvk= github.com/klauspost/cpuid/v2 v2.2.4/go.mod h1:RVVoqg1df56z8g3pUjL/3lE5UfnlrJX8tyFgg4nqhuY= @@ -489,6 +492,8 @@ github.com/mileusna/useragent v1.3.4/go.mod h1:3d8TOmwL/5I8pJjyVDteHtgDGcefrFUX4 github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/madmin-go v1.7.5 h1:IF8j2HR0jWc7msiOcy0KJ8EyY7Q3z+j+lsmSDksQm+I= github.com/minio/madmin-go v1.7.5/go.mod h1:3SO8SROxHN++tF6QxdTii2SSUaYSrr8lnE9EJWjvz0k= +github.com/minio/md5-simd v1.1.2 h1:Gdi1DZK69+ZVMoNHRXJyNcxrMA4dSxoYHZSQbirFg34= +github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.45 h1:g4IeM9M9pW/Lo8AGGNOjBZYlvmtlE1N5TQEYWXRWzIs= github.com/minio/minio-go/v7 v7.0.45/go.mod h1:nCrRzjoSUQh8hgKKtu3Y708OLvRLtuASMg2/nvmbarw= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= @@ -651,6 +656,8 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= +github.com/sirupsen/logrus v1.9.0 h1:trlNQbNUG3OdDrDil03MCb1H2o9nJ1x4/5LYw7byDE0= +github.com/sirupsen/logrus v1.9.0/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= github.com/smartystreets/assertions v0.0.0-20180927180507-b2de0cb4f26d/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/assertions v1.2.0 h1:42S6lae5dvLc7BrLu/0ugRtcFVjoJNMC/N3yZFZkDFs= github.com/smartystreets/assertions v1.2.0/go.mod h1:tcbTF8ujkAEcZ8TElKY+i30BzYlVhC/LOxJk7iOWnoo= @@ -948,6 +955,7 @@ golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220708085239-5a0f0661e09d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/main.go b/main.go index be55d6cf..7d66a885 100644 --- a/main.go +++ b/main.go @@ -119,6 +119,7 @@ func main() { fs.StringVar(&cli.EncryptKey, "encrypt", "", "Key for encrypting network traffic within Serf. Must be a base64-encoded 32-byte key.") fs.StringVar(&cli.VodDecryptPublicKey, "catalyst-public-key", "", "Public key of the catalyst node for encryption") fs.StringVar(&cli.VodDecryptPrivateKey, "catalyst-private-key", "", "Private key of the catalyst node for encryption") + config.CommaMapFlag(fs, &cli.StorageFallbackURLs, "storage-fallback-urls", map[string]string{}, `Comma-separated map of primary to backup storage URLs. If a file fails downloading from one of the primary storages (detected by prefix), it will fallback to the corresponding backup URL after having the prefix replaced. E.g. https://storj.livepeer.com/catalyst-recordings-com/hls=https://google.livepeer.com/catalyst-recordings-com/hls`) fs.StringVar(&cli.GateURL, "gate-url", "http://localhost:3004/api/access-control/gate", "Address to contact playback gating API for access control verification") fs.StringVar(&cli.DataURL, "data-url", "http://localhost:3004/api/data", "Address of the Livepeer Data Endpoint") config.InvertedBoolFlag(fs, &cli.MistTriggerSetup, "mist-trigger-setup", true, "Overwrite Mist triggers with the ones built into catalyst-api") @@ -179,6 +180,8 @@ func main() { return } + config.StorageFallbackURLs = cli.StorageFallbackURLs + var ( metricsDB *sql.DB vodEngine *pipeline.Coordinator diff --git a/pipeline/coordinator.go b/pipeline/coordinator.go index 0e8c0d2f..3f625732 100644 --- a/pipeline/coordinator.go +++ b/pipeline/coordinator.go @@ -290,6 +290,12 @@ func (c *Coordinator) StartUploadJob(p UploadJobPayload) { // Update osTransferURL if needed if clients.IsHLSInput(sourceURL) { + // Handle falling back to backup bucket for manifest and segments + sourceURL, err = clients.RecordingBackupCheck(p.RequestID, sourceURL, osTransferURL.JoinPath("..")) + if err != nil { + return nil, err + } + // Currently we only clip an HLS source (e.g recordings or transcoded asset) if p.ClipStrategy.Enabled { err := backoff.Retry(func() error { @@ -323,7 +329,7 @@ func (c *Coordinator) StartUploadJob(p UploadJobPayload) { if p.C2PA { si.C2PA = c.C2PA } - si.SourceFile = osTransferURL.String() // OS URL used by mist + si.SourceFile = osTransferURL.String() // OS URL used by ffmpeg pipeline log.AddContext(p.RequestID, "new_source_url", si.SourceFile) si.SignedSourceURL = signedNewSourceURL // http(s) URL used by mediaconvert diff --git a/pipeline/ffmpeg.go b/pipeline/ffmpeg.go index 760951ba..a0b8bf64 100644 --- a/pipeline/ffmpeg.go +++ b/pipeline/ffmpeg.go @@ -152,7 +152,7 @@ func (f *ffmpeg) HandleStartUploadJob(job *JobInfo) (*HandlerOutput, error) { sourceManifest, err := clients.DownloadRenditionManifest(transcodeRequest.RequestID, transcodeRequest.SourceManifestURL) if err != nil { - return nil, fmt.Errorf("error downloading source manifest: %s", err) + return nil, fmt.Errorf("error downloading source manifest %s: %w", log.RedactURL(transcodeRequest.SourceManifestURL), err) } sourceSegments := sourceManifest.GetAllSegments() diff --git a/test/features/vod.feature b/test/features/vod.feature index ea66f31f..b744620a 100644 --- a/test/features/vod.feature +++ b/test/features/vod.feature @@ -7,6 +7,7 @@ Feature: VOD Streaming Given the VOD API is running And the Client app is authenticated And an object store is available +# And a fallback object And Studio API server is running at "localhost:13000" And a Broadcaster is running at "localhost:18935" And Mediaconvert is running at "localhost:11111" @@ -86,29 +87,30 @@ Feature: VOD Streaming Then I get an HTTP response with code "200" And I receive a Request ID in the response body And my "successful" vod request metrics get recorded - And the Broadcaster receives "3" segments for transcoding within "10" seconds - And "3" transcoded segments and manifests have been written to disk for profiles "270p0,low-bitrate" within "30" seconds + And the Broadcaster receives "" segments for transcoding within "10" seconds + And "" transcoded segments and manifests have been written to disk for profiles "270p0,low-bitrate" within "30" seconds And a source copy has not been written to disk And I receive a "success" callback within "30" seconds And thumbnails are written to storage within "10" seconds And a row is written to the "vod_completed" database table containing the following values - | column | value | - | in_fallback_mode | false | - | is_clip | false | - | pipeline | catalyst_ffmpeg | - | profiles_count | 2 | - | source_codec_audio | aac | - | source_codec_video | h264 | - | source_duration | 30000 | - | source_segment_count | 3 | - | state | completed | - | transcoded_segment_count | 3 | + | column | value | + | in_fallback_mode | false | + | is_clip | false | + | pipeline | catalyst_ffmpeg | + | profiles_count | 2 | + | source_codec_audio | aac | + | source_codec_video | h264 | + | source_duration | | + | source_segment_count | | + | state | completed | + | transcoded_segment_count | | Examples: - | payload | - | a valid ffmpeg upload vod request with a source manifest | - | a valid ffmpeg upload vod request with a source manifest and source copying | - | a valid ffmpeg upload vod request with a source manifest and thumbnails | + | payload | segment_count | source_duration | + | a valid ffmpeg upload vod request with a source manifest | 3 | 30000 | + | a valid ffmpeg upload vod request with a source manifest and source copying | 3 | 30000 | + | a valid ffmpeg upload vod request with a source manifest and thumbnails | 3 | 30000 | + | a valid ffmpeg upload vod request with a source manifest from object store | 4 | 40000 | Scenario Outline: Submit an audio-only asset for ingestion When I submit to the internal "/api/vod" endpoint with "" diff --git a/test/fixtures/rec-fallback-bucket/seg-3.ts b/test/fixtures/rec-fallback-bucket/seg-3.ts new file mode 100644 index 00000000..476821ce Binary files /dev/null and b/test/fixtures/rec-fallback-bucket/seg-3.ts differ diff --git a/test/fixtures/rec-fallback-bucket/tiny.m3u8 b/test/fixtures/rec-fallback-bucket/tiny.m3u8 new file mode 100644 index 00000000..950dae51 --- /dev/null +++ b/test/fixtures/rec-fallback-bucket/tiny.m3u8 @@ -0,0 +1,13 @@ +#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-MEDIA-SEQUENCE:0 +#EXT-X-TARGETDURATION:10 +#EXTINF:10.000000, +seg-0.ts +#EXTINF:10.000000, +seg-1.ts +#EXTINF:10.000000, +seg-2.ts +#EXTINF:10.000000, +seg-3.ts +#EXT-X-ENDLIST diff --git a/test/steps/ffmpeg.go b/test/steps/ffmpeg.go index aff0e836..49ef4396 100644 --- a/test/steps/ffmpeg.go +++ b/test/steps/ffmpeg.go @@ -2,14 +2,17 @@ package steps import ( "bytes" + "context" "fmt" - "os" + "io" + "log" "os/exec" "path/filepath" "strings" "time" "github.com/grafov/m3u8" + "github.com/livepeer/go-tools/drivers" ) // Confirm that we have an ffmpeg binary on the system the tests are running on @@ -21,35 +24,46 @@ func (s *StepContext) CheckFfmpeg() error { } func (s *StepContext) AllOfTheSourceSegmentsAreWrittenToStorageWithinSeconds(numSegments, secs int) error { - // Comes in looking like file:/var/folders/qr/sr8gs8916zd2wjbx50d3c3yc0000gn/T/livepeer/source - // and we want /var/folders/qr/sr8gs8916zd2wjbx50d3c3yc0000gn/T/livepeer/source/aceaegdf/source/*.ts - segmentingDir := filepath.Join(strings.TrimPrefix(s.SourceOutputDir, "file:"), s.latestRequestID, "source/*.ts") + osDriver, err := drivers.ParseOSURL(s.SourceOutputDir, true) + if err != nil { + return fmt.Errorf("could not parse object store url: %w", err) + } + session := osDriver.NewSession(filepath.Join(s.latestRequestID, "source")) var latestNumSegments int - for x := 0; x < secs; x++ { - files, err := filepath.Glob(segmentingDir) + for x := 0; x < secs; x++ { // retry loop + if x > 0 { + time.Sleep(time.Second) + } + page, err := session.ListFiles(context.Background(), "", "") if err != nil { - return err + log.Println("failed to list files: ", err) + continue } - latestNumSegments = len(files) - if latestNumSegments == numSegments { + + latestNumSegments = len(page.Files()) + if latestNumSegments == numSegments+1 { return nil } - time.Sleep(time.Second) } - return fmt.Errorf("did not find the expected number of source segments in %s (wanted %d, got %d)", segmentingDir, numSegments, latestNumSegments) + return fmt.Errorf("did not find the expected number of source segments in %s (wanted %d, got %d)", s.SourceOutputDir, numSegments, latestNumSegments) } func (s *StepContext) TheSourceManifestIsWrittenToStorageWithinSeconds(secs, numSegments int) error { - // Comes in looking like file:/var/folders/qr/sr8gs8916zd2wjbx50d3c3yc0000gn/T/livepeer/source - // and we want /var/folders/qr/sr8gs8916zd2wjbx50d3c3yc0000gn/T/livepeer/source/aceaegdf/source/index.m3u8 - sourceManifest := filepath.Join(strings.TrimPrefix(s.SourceOutputDir, "file:"), s.latestRequestID, "source/index.m3u8") + osDriver, err := drivers.ParseOSURL(s.SourceOutputDir, true) + if err != nil { + return fmt.Errorf("could not parse object store url: %w", err) + } + session := osDriver.NewSession(filepath.Join(s.latestRequestID, "source/index.m3u8")) - var manifestBytes []byte - var err error + var ( + manifestBytes []byte + fileInfoReader *drivers.FileInfoReader + ) for x := 0; x < secs; x++ { - manifestBytes, err = os.ReadFile(sourceManifest) + fileInfoReader, err = session.ReadData(context.Background(), "") if err == nil { + manifestBytes, err = io.ReadAll(fileInfoReader.Body) // Only break if the full manifest has been written if strings.HasSuffix(strings.TrimSpace(string(manifestBytes)), "#EXT-X-ENDLIST") { break diff --git a/test/steps/http.go b/test/steps/http.go index 6be3d939..e353768e 100644 --- a/test/steps/http.go +++ b/test/steps/http.go @@ -163,6 +163,10 @@ func (s *StepContext) postRequest(baseURL, endpoint, payload string, headers map } if strings.HasPrefix(payload, "a valid ffmpeg upload vod request with a source manifest") { req.URL = "file://" + filepath.Join(sourceManifestDir, "tiny.m3u8") + if strings.Contains(payload, "from object store") { + req.URL = "http://" + minioAddress + "/rec-bucket/tiny.m3u8" + } + req.PipelineStrategy = "catalyst_ffmpeg" req.OutputLocations = []OutputLocation{ { @@ -211,7 +215,7 @@ func (s *StepContext) postRequest(baseURL, endpoint, payload string, headers map } func (s *StepContext) StartApp() error { - s.SourceOutputDir = fmt.Sprintf("file://%s/%s/", os.TempDir(), "livepeer/source") + s.SourceOutputDir = fmt.Sprintf("s3+http://%s:%s@%s/source", minioKey, minioKey, minioAddress) App = exec.Command( "./app", @@ -220,12 +224,11 @@ func (s *StepContext) StartApp() error { "-cluster-addr=127.0.0.1:19935", "-broadcaster-url=http://127.0.0.1:18935", `-metrics-db-connection-string=`+DB_CONNECTION_STRING, - "-private-bucket", - "fixtures/playback-bucket", + "-private-bucket=fixtures/playback-bucket", "-gate-url=http://localhost:13000/api/access-control/gate", "-external-transcoder=mediaconverthttp://examplekey:examplepass@127.0.0.1:11111?region=us-east-1&role=arn:aws:iam::exampleaccountid:examplerole&s3_aux_bucket=s3://example-bucket", - "-source-output", - s.SourceOutputDir, + fmt.Sprintf("-storage-fallback-urls=http://%s/rec-bucket=http://%s/rec-fallback-bucket", minioAddress, minioAddress), + fmt.Sprintf("-source-output=%s", s.SourceOutputDir), "-no-mist", ) outfile, err := os.Create("logs/app.log") diff --git a/test/steps/init.go b/test/steps/init.go index 29852160..2046e21d 100644 --- a/test/steps/init.go +++ b/test/steps/init.go @@ -1,12 +1,16 @@ package steps import ( + "context" "encoding/json" "fmt" + "github.com/minio/minio-go/v7" + "github.com/minio/minio-go/v7/pkg/credentials" "io" "net/http" "os" "os/exec" + "path" "time" "github.com/cenkalti/backoff/v4" @@ -14,7 +18,10 @@ import ( "github.com/minio/madmin-go" ) -var minioAddress = "127.0.0.1:9000" +const ( + minioAddress = "127.0.0.1:9000" + minioKey = "minioadmin" +) func (s *StepContext) StartStudioAPI(listen string) error { router := httprouter.New() @@ -62,7 +69,7 @@ func WaitForStartup(url string) { } func (s *StepContext) StartObjectStore() error { - app := exec.Command("./minio", "--address "+minioAddress, "server", fmt.Sprint(os.TempDir(), "/minio")) + app := exec.Command("./minio", "server", "--address", minioAddress, path.Join(os.TempDir(), "catalyst-minio")) outfile, err := os.Create("logs/minio.log") if err != nil { return err @@ -74,12 +81,77 @@ func (s *StepContext) StartObjectStore() error { return err } - madmin, err := madmin.New(minioAddress, "minioadmin", "minioadmin", false) + admin, err := madmin.New(minioAddress, minioKey, minioKey, false) if err != nil { return err } + s.MinioAdmin = admin + + minioClient, err := minio.New(minioAddress, &minio.Options{ + Creds: credentials.NewStaticV4(minioKey, minioKey, ""), + Secure: false, + }) + if err != nil { + return err + } + + WaitForStartup("http://" + minioAddress + "/minio/health/live") - s.MinioAdmin = madmin + ctx := context.Background() + + // Create buckets if they do not exist. + buckets := []string{"rec-bucket", "rec-fallback-bucket", "source"} + for _, bucket := range buckets { + exists, err := minioClient.BucketExists(ctx, bucket) + if err != nil { + return fmt.Errorf("failed to check if bucket exists: %w", err) + } + if exists { + continue + } + err = minioClient.MakeBucket(ctx, bucket, minio.MakeBucketOptions{}) + if err != nil { + return err + } + } + + // Set bucket policy to allow anonymous download. + for _, bucket := range buckets { + policy := fmt.Sprintf(`{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"AWS": ["*"]}, + "Action": ["s3:GetObject"], + "Resource": ["arn:aws:s3:::%s/*"] + } + ] + }`, bucket) + + err = minioClient.SetBucketPolicy(ctx, bucket, policy) + if err != nil { + return err + } + } + + // populate recording bucket + files := []string{"fixtures/tiny.m3u8", "fixtures/seg-0.ts", "fixtures/seg-1.ts", "fixtures/seg-2.ts"} + for _, file := range files { + _, err := minioClient.FPutObject(ctx, "rec-bucket", path.Base(file), file, minio.PutObjectOptions{}) + if err != nil { + return err + } + } + + // populate recording fallback bucket + files = []string{"fixtures/rec-fallback-bucket/tiny.m3u8", "fixtures/rec-fallback-bucket/seg-3.ts"} + for _, file := range files { + _, err := minioClient.FPutObject(ctx, "rec-fallback-bucket", path.Base(file), file, minio.PutObjectOptions{}) + if err != nil { + return err + } + } return nil } diff --git a/thumbnails/thumbnails.go b/thumbnails/thumbnails.go index 7d138d5d..a3c6cd44 100644 --- a/thumbnails/thumbnails.go +++ b/thumbnails/thumbnails.go @@ -31,35 +31,6 @@ func thumbWaitBackoff() backoff.BackOff { return backoff.WithMaxRetries(backoff.NewConstantBackOff(30*time.Second), 10) } -func getMediaManifest(requestID string, input string) (*m3u8.MediaPlaylist, error) { - var ( - rc io.ReadCloser - err error - ) - err = backoff.Retry(func() error { - rc, err = clients.GetFile(context.Background(), requestID, input, nil) - return err - }, clients.DownloadRetryBackoff()) - if err != nil { - return nil, fmt.Errorf("error downloading manifest: %w", err) - } - defer rc.Close() - - manifest, playlistType, err := m3u8.DecodeFrom(rc, true) - if err != nil { - return nil, fmt.Errorf("failed to decode manifest: %w", err) - } - - if playlistType != m3u8.MEDIA { - return nil, fmt.Errorf("received non-Media manifest, but currently only Media playlists are supported") - } - mediaPlaylist, ok := manifest.(*m3u8.MediaPlaylist) - if !ok || mediaPlaylist == nil { - return nil, fmt.Errorf("failed to parse playlist as MediaPlaylist") - } - return mediaPlaylist, nil -} - func getSegmentOffset(mediaPlaylist *m3u8.MediaPlaylist) (int64, error) { segments := mediaPlaylist.GetAllSegments() if len(segments) < 1 { @@ -74,7 +45,7 @@ func getSegmentOffset(mediaPlaylist *m3u8.MediaPlaylist) (int64, error) { func GenerateThumbsVTT(requestID string, input string, output *url.URL) error { // download and parse the manifest - mediaPlaylist, err := getMediaManifest(requestID, input) + mediaPlaylist, err := clients.DownloadRenditionManifest(requestID, input) if err != nil { return err } @@ -86,7 +57,7 @@ func GenerateThumbsVTT(requestID string, input string, output *url.URL) error { if err != nil { return err } - segmentOffset, err := getSegmentOffset(mediaPlaylist) + segmentOffset, err := getSegmentOffset(&mediaPlaylist) if err != nil { return err } @@ -187,7 +158,7 @@ func GenerateThumbsAndVTT(requestID, input string, output *url.URL) error { func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error { // parse manifest and generate one thumbnail per segment - mediaPlaylist, err := getMediaManifest(requestID, input) + mediaPlaylist, err := clients.DownloadRenditionManifest(requestID, input) if err != nil { return err } @@ -195,7 +166,7 @@ func GenerateThumbsFromManifest(requestID, input string, output *url.URL) error if err != nil { return err } - segmentOffset, err := getSegmentOffset(mediaPlaylist) + segmentOffset, err := getSegmentOffset(&mediaPlaylist) if err != nil { return err }