Skip to content

Commit

Permalink
Merge branch 'main' into rafal/transcode-quality-params-phase3
Browse files Browse the repository at this point in the history
  • Loading branch information
thomshutt authored Oct 26, 2023
2 parents a82a817 + 910910d commit a8c240f
Show file tree
Hide file tree
Showing 16 changed files with 268 additions and 13 deletions.
80 changes: 80 additions & 0 deletions c2pa/c2pa.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package c2pa

import (
"bytes"
"context"
"fmt"
"os/exec"
)

const c2paManifestTemplate = `
{
"alg": "%s",
"private_key": "%s",
"sign_cert": "%s",
"ta_url": "http://timestamp.digicert.com",
"claim_generator": "LivepeerStudio",
"title": "%s",
"assertions": [
{
"label": "c2pa.actions",
"data": {
"actions": [
{
"action": "c2pa.published"
}
]
}
}
]
}
`

type C2PA struct {
alg string
privateKeyPath string
signCertPath string
}

func NewC2PA(alg, privateKeyPath, signCertPath string) C2PA {
return C2PA{
alg: alg,
privateKeyPath: privateKeyPath,
signCertPath: signCertPath,
}
}

func (c C2PA) c2paManifest(title string) string {
return fmt.Sprintf(c2paManifestTemplate, c.alg, c.privateKeyPath, c.signCertPath, title)
}

func (c C2PA) SignFile(inFile, outFile, title, parent string) error {
args := []string{
inFile,
"--config",
c.c2paManifest(title),
"--force",
"--output",
outFile,
}
if parent != "" {
args = append(args, "--parent", parent)
}
_, err := runCmd(exec.CommandContext(context.TODO(), "c2patool", args...))
return err
}

func runCmd(cmd *exec.Cmd) (string, error) {
var stdOut bytes.Buffer
var stdErr bytes.Buffer
cmd.Stdout = &stdOut
cmd.Stderr = &stdErr
err := cmd.Run()

if len(stdErr.Bytes()) > 0 {
return "", fmt.Errorf("failed creating C2PA Manifest: %s", stdErr.String())
}

return stdOut.String(), err
}
74 changes: 74 additions & 0 deletions c2pa/c2pa_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package c2pa

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"os"
"os/exec"
"testing"
)

func TestSign(t *testing.T) {
_, err := exec.LookPath("c2patool")
if err != nil {
fmt.Println("No c2patool installed, test skipped")
return
}

outFile := "test/tiny_output_signed.mp4"
c := NewC2PA("es256", "test/es256_private.key", "test/es256_certs.pem")
defer os.Remove(outFile)

err = c.SignFile("test/tiny.mp4", outFile, "Tiny", "")

require.Nil(t, err)
out, err := runCmd(exec.CommandContext(context.TODO(), "c2patool", outFile))
require.Nil(t, err)
assert.Contains(t, out, "\"action\": \"c2pa.published\"")
}

func TestSignWithParent(t *testing.T) {
_, err := exec.LookPath("c2patool")
if err != nil {
fmt.Println("No c2patool installed, test skipped")
return
}

outFile := "test/tiny_cut_output_signed.mp4"
c := NewC2PA("es256", "test/es256_private.key", "test/es256_certs.pem")
defer os.Remove(outFile)

err = c.SignFile("test/tiny_cut.mp4", outFile, "Tiny", "test/tiny_signed.mp4")

require.Nil(t, err)
out, err := runCmd(exec.CommandContext(context.TODO(), "c2patool", outFile))
require.Nil(t, err)
assert.Contains(t, out, "\"action\": \"c2pa.published\"")
assert.Contains(t, out, "\"relationship\": \"parentOf\"")
}

func TestSign_NotExistingPrivateKey(t *testing.T) {
_, err := exec.LookPath("c2patool")
if err != nil {
fmt.Println("No c2patool installed, test skipped")
return
}

c := NewC2PA("es256", "some/path/notexisting", "test/es256_certs.pem")
err = c.SignFile("test/tiny.mp4", "test/tiny_signed.mp4", "Tiny", "")
require.ErrorContains(t, err, "No such file or directory")
}

func TestSign_NotExistingSigningCert(t *testing.T) {
_, err := exec.LookPath("c2patool")
if err != nil {
fmt.Println("No c2patool installed, test skipped")
return
}

c := NewC2PA("es256", "test/es256_private.key", "some/path/notexisting")
err = c.SignFile("test/tiny.mp4", "test/tiny_signed.mp4", "Tiny", "")
require.ErrorContains(t, err, "No such file or directory")
}
32 changes: 32 additions & 0 deletions c2pa/test/es256_certs.pem
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
-----BEGIN CERTIFICATE-----
MIIChzCCAi6gAwIBAgIUcCTmJHYF8dZfG0d1UdT6/LXtkeYwCgYIKoZIzj0EAwIw
gYwxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTESMBAGA1UEBwwJU29tZXdoZXJl
MScwJQYDVQQKDB5DMlBBIFRlc3QgSW50ZXJtZWRpYXRlIFJvb3QgQ0ExGTAXBgNV
BAsMEEZPUiBURVNUSU5HX09OTFkxGDAWBgNVBAMMD0ludGVybWVkaWF0ZSBDQTAe
Fw0yMjA2MTAxODQ2NDBaFw0zMDA4MjYxODQ2NDBaMIGAMQswCQYDVQQGEwJVUzEL
MAkGA1UECAwCQ0ExEjAQBgNVBAcMCVNvbWV3aGVyZTEfMB0GA1UECgwWQzJQQSBU
ZXN0IFNpZ25pbmcgQ2VydDEZMBcGA1UECwwQRk9SIFRFU1RJTkdfT05MWTEUMBIG
A1UEAwwLQzJQQSBTaWduZXIwWTATBgcqhkjOPQIBBggqhkjOPQMBBwNCAAQPaL6R
kAkYkKU4+IryBSYxJM3h77sFiMrbvbI8fG7w2Bbl9otNG/cch3DAw5rGAPV7NWky
l3QGuV/wt0MrAPDoo3gwdjAMBgNVHRMBAf8EAjAAMBYGA1UdJQEB/wQMMAoGCCsG
AQUFBwMEMA4GA1UdDwEB/wQEAwIGwDAdBgNVHQ4EFgQUFznP0y83joiNOCedQkxT
tAMyNcowHwYDVR0jBBgwFoAUDnyNcma/osnlAJTvtW6A4rYOL2swCgYIKoZIzj0E
AwIDRwAwRAIgOY/2szXjslg/MyJFZ2y7OH8giPYTsvS7UPRP9GI9NgICIDQPMKrE
LQUJEtipZ0TqvI/4mieoyRCeIiQtyuS0LACz
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIICajCCAg+gAwIBAgIUfXDXHH+6GtA2QEBX2IvJ2YnGMnUwCgYIKoZIzj0EAwIw
dzELMAkGA1UEBhMCVVMxCzAJBgNVBAgMAkNBMRIwEAYDVQQHDAlTb21ld2hlcmUx
GjAYBgNVBAoMEUMyUEEgVGVzdCBSb290IENBMRkwFwYDVQQLDBBGT1IgVEVTVElO
R19PTkxZMRAwDgYDVQQDDAdSb290IENBMB4XDTIyMDYxMDE4NDY0MFoXDTMwMDgy
NzE4NDY0MFowgYwxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTESMBAGA1UEBwwJ
U29tZXdoZXJlMScwJQYDVQQKDB5DMlBBIFRlc3QgSW50ZXJtZWRpYXRlIFJvb3Qg
Q0ExGTAXBgNVBAsMEEZPUiBURVNUSU5HX09OTFkxGDAWBgNVBAMMD0ludGVybWVk
aWF0ZSBDQTBZMBMGByqGSM49AgEGCCqGSM49AwEHA0IABHllI4O7a0EkpTYAWfPM
D6Rnfk9iqhEmCQKMOR6J47Rvh2GGjUw4CS+aLT89ySukPTnzGsMQ4jK9d3V4Aq4Q
LsOjYzBhMA8GA1UdEwEB/wQFMAMBAf8wDgYDVR0PAQH/BAQDAgGGMB0GA1UdDgQW
BBQOfI1yZr+iyeUAlO+1boDitg4vazAfBgNVHSMEGDAWgBRembiG4Xgb2VcVWnUA
UrYpDsuojDAKBggqhkjOPQQDAgNJADBGAiEAtdZ3+05CzFo90fWeZ4woeJcNQC4B
84Ill3YeZVvR8ZECIQDVRdha1xEDKuNTAManY0zthSosfXcvLnZui1A/y/DYeg==
-----END CERTIFICATE-----

5 changes: 5 additions & 0 deletions c2pa/test/es256_private.key
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
-----BEGIN PRIVATE KEY-----
MIGHAgEAMBMGByqGSM49AgEGCCqGSM49AwEHBG0wawIBAQQgfNJBsaRLSeHizv0m
GL+gcn78QmtfLSm+n+qG9veC2W2hRANCAAQPaL6RkAkYkKU4+IryBSYxJM3h77sF
iMrbvbI8fG7w2Bbl9otNG/cch3DAw5rGAPV7NWkyl3QGuV/wt0MrAPDo
-----END PRIVATE KEY-----
Binary file added c2pa/test/tiny.mp4
Binary file not shown.
Binary file added c2pa/test/tiny_cut.mp4
Binary file not shown.
Binary file added c2pa/test/tiny_signed.mp4
Binary file not shown.
2 changes: 2 additions & 0 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ type Cli struct {
MaxBitrateFactor float64
CdnRedirectPrefix *url.URL
CdnRedirectPlaybackIDs []string
C2PAPrivateKeyPath string
C2PACertsPath string
}

// Return our own URL for callback trigger purposes
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/hashicorp/serf v0.10.1
github.com/julienschmidt/httprouter v1.3.0
github.com/lib/pq v1.10.9
github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1
github.com/livepeer/go-api-client v0.4.10
github.com/livepeer/go-tools v0.3.2
github.com/livepeer/joy4 v0.1.1
github.com/livepeer/livepeer-data v0.7.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ github.com/libp2p/go-msgio v0.2.0 h1:W6shmB+FeynDrUVl2dgFQvzfBZcXiyqY4VmpQLu9FqU
github.com/libp2p/go-nat v0.1.0 h1:MfVsH6DLcpa04Xr+p8hmVRG4juse0s3J8HyNWYHffXg=
github.com/libp2p/go-netroute v0.2.0 h1:0FpsbsvuSnAhXFnCY0VLFbJOzaK0VnP0r1QT/o4nWRE=
github.com/libp2p/go-openssl v0.1.0 h1:LBkKEcUv6vtZIQLVTegAil8jbNpJErQ9AnT+bWV+Ooo=
github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1 h1:37vDY3affAfGfWc25IiNij7TVbRRnqVC53w1qQ0wkII=
github.com/livepeer/go-api-client v0.4.10-0.20231016161852-adc198420ea1/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-api-client v0.4.10 h1:WMWJ2guElf000TBcqQpbrC2zGGRlVlin/7w6lMvSGkY=
github.com/livepeer/go-api-client v0.4.10/go.mod h1:Jdb+RI7JyzEZOHd1GUuKofwFDKMO/btTa80SdpUpYQw=
github.com/livepeer/go-tools v0.3.2 h1:5pOUrOmkkGbbcWnpCt2yrSD6cD85G4GcpO4B25NpMJM=
github.com/livepeer/go-tools v0.3.2/go.mod h1:qs31y68b3PQPmSr8nR8l5WQiIWI623z6pqOccqebjos=
github.com/livepeer/joy4 v0.1.1 h1:Tz7gVcmvpG/nfUKHU+XJn6Qke/k32mTWMiH9qB0bhnM=
Expand Down
2 changes: 2 additions & 0 deletions handlers/schemas/UploadVOD.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ properties:
type: "string"
target_segment_size_secs:
type: "integer"
c2pa:
type: "boolean"
encryption:
type: "object"
properties:
Expand Down
2 changes: 2 additions & 0 deletions handlers/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type UploadVODRequest struct {
AccessToken string `json:"accessToken"`
TranscodeAPIUrl string `json:"transcodeAPIUrl"`
Encryption *pipeline.EncryptionPayload `json:"encryption,omitempty"`
C2PA bool `json:"c2pa,omitempty"`

// Forwarded to transcoding stage:
TargetSegmentSizeSecs int64 `json:"target_segment_size_secs"`
Expand Down Expand Up @@ -277,6 +278,7 @@ func (d *CatalystAPIHandlersCollection) handleUploadVOD(w http.ResponseWriter, r
Encryption: uploadVODRequest.Encryption,
SourceCopy: uploadVODRequest.getSourceCopyEnabled(),
ClipStrategy: uploadVODRequest.ClipStrategy,
C2PA: uploadVODRequest.C2PA,
})

respBytes, err := json.Marshal(UploadVODResponse{RequestID: requestID})
Expand Down
28 changes: 27 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
_ "github.com/lib/pq"
"github.com/livepeer/catalyst-api/api"
"github.com/livepeer/catalyst-api/balancer"
"github.com/livepeer/catalyst-api/c2pa"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/cluster"
"github.com/livepeer/catalyst-api/config"
Expand Down Expand Up @@ -68,6 +69,8 @@ func main() {
config.CommaMapFlag(fs, &cli.SourcePlaybackHosts, "source-playback-hosts", map[string]string{}, "Hostname to prefix mappings for source playback URLs")
fs.UintVar(&video.DefaultQuality, "default-quality", 27, "Default transcoded video quality")
fs.Float64Var(&video.MaxBitrateFactor, "max-bitrate-factor", 1.2, "Factor to limit the max video bitrate with relation to the source average bitrate")
fs.StringVar(&cli.C2PAPrivateKeyPath, "c2pa-private-key", "", "Path to the private key used to sign C2PA manifest")
fs.StringVar(&cli.C2PACertsPath, "c2pa-certs", "", "Path to the certs used to sign C2PA manifest")

// mist-api-connector parameters
fs.IntVar(&cli.MistPort, "mist-port", 4242, "Port to connect to Mist")
Expand Down Expand Up @@ -194,9 +197,14 @@ func main() {
}
}

c2, err := createC2PA(&cli)
if err != nil {
// Log warning, but still start without C2PA signing
glog.Warning(err)
}
// Start the "co-ordinator" that determines whether to send jobs to the Catalyst transcoding pipeline
// or an external one
vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts)
vodEngine, err := pipeline.NewCoordinator(pipeline.Strategy(cli.VodPipelineStrategy), cli.SourceOutput, cli.ExternalTranscoder, statusClient, metricsDB, vodDecryptPrivateKey, cli.BroadcasterURL, cli.SourcePlaybackHosts, c2)
if err != nil {
glog.Fatalf("Error creating VOD pipeline coordinator: %v", err)
}
Expand Down Expand Up @@ -366,3 +374,21 @@ func handleSignals(ctx context.Context) error {
}
}
}

func createC2PA(cli *config.Cli) (*c2pa.C2PA, error) {
if cli == nil {
return nil, nil
}
if cli.C2PAPrivateKeyPath == "" || cli.C2PACertsPath == "" {
glog.Infof("C2PA private key and/or C2PA certs are not set, will not use C2PA signing")
return nil, nil
}
if _, err := os.Stat(cli.C2PAPrivateKeyPath); err != nil {
return nil, fmt.Errorf("C2PA private key file not found: %s", cli.C2PAPrivateKeyPath)
}
if _, err := os.Stat(cli.C2PACertsPath); err != nil {
return nil, fmt.Errorf("C2PA certs file not found: %s", cli.C2PACertsPath)
}
c := c2pa.NewC2PA("ps256", cli.C2PAPrivateKeyPath, cli.C2PACertsPath)
return &c, nil
}
10 changes: 9 additions & 1 deletion pipeline/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/cenkalti/backoff/v4"

_ "github.com/lib/pq"
"github.com/livepeer/catalyst-api/c2pa"
"github.com/livepeer/catalyst-api/cache"
"github.com/livepeer/catalyst-api/clients"
"github.com/livepeer/catalyst-api/config"
Expand Down Expand Up @@ -78,6 +79,7 @@ type UploadJobPayload struct {
InputFileInfo video.InputVideo
SourceCopy bool
ClipStrategy video.ClipStrategy
C2PA bool
}

type EncryptionPayload struct {
Expand Down Expand Up @@ -127,6 +129,7 @@ type JobInfo struct {
inFallbackMode bool
SignedSourceURL string
LivepeerSupported bool
C2PA *c2pa.C2PA
}

// PipelineInfo represents the state of an individual pipeline, i.e. ffmpeg or mediaconvert
Expand Down Expand Up @@ -165,9 +168,10 @@ type Coordinator struct {
InputCopy clients.InputCopier
VodDecryptPrivateKey *rsa.PrivateKey
SourceOutputURL *url.URL
C2PA *c2pa.C2PA
}

func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string, statusClient clients.TranscodeStatusClient, metricsDB *sql.DB, VodDecryptPrivateKey *rsa.PrivateKey, broadcasterURL string, sourcePlaybackHosts map[string]string) (*Coordinator, error) {
func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string, statusClient clients.TranscodeStatusClient, metricsDB *sql.DB, VodDecryptPrivateKey *rsa.PrivateKey, broadcasterURL string, sourcePlaybackHosts map[string]string, c2pa *c2pa.C2PA) (*Coordinator, error) {
if !strategy.IsValid() {
return nil, fmt.Errorf("invalid strategy: %s", strategy)
}
Expand Down Expand Up @@ -207,6 +211,7 @@ func NewCoordinator(strategy Strategy, sourceOutputURL, extTranscoderURL string,
InputCopy: clients.NewInputCopy(),
VodDecryptPrivateKey: VodDecryptPrivateKey,
SourceOutputURL: sourceOutput,
C2PA: c2pa,
}, nil
}

Expand Down Expand Up @@ -311,6 +316,9 @@ func (c *Coordinator) StartUploadJob(p UploadJobPayload) {
return nil, fmt.Errorf("error copying input to storage: %w", err)
}

if p.C2PA {
si.C2PA = c.C2PA
}
si.SourceFile = osTransferURL.String() // OS URL used by mist
si.SignedSourceURL = signedNewSourceURL // http(s) URL used by mediaconvert
si.InputFileInfo = inputVideoProbe
Expand Down
Loading

0 comments on commit a8c240f

Please sign in to comment.