Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Add mutex to injection resolution #4206

Merged
merged 3 commits into from
Dec 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion app/dispatcher/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func init() {
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
d := new(DefaultDispatcher)
if err := core.RequireFeatures(ctx, func(om outbound.Manager, router routing.Router, pm policy.Manager, sm stats.Manager, dc dns.Client) error {
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional
core.OptionalFeatures(ctx, func(fdns dns.FakeDNSEngine) {
d.fdns = fdns
})
return d.Init(config.(*Config), om, router, pm, sm, dc)
Expand Down
2 changes: 1 addition & 1 deletion app/dns/nameserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func NewServer(ctx context.Context, dest net.Destination, dispatcher routing.Dis
return NewTCPLocalNameServer(u, queryStrategy)
case strings.EqualFold(u.String(), "fakedns"):
var fd dns.FakeDNSEngine
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) { // FakeDNSEngine is optional
core.RequireFeatures(ctx, func(fdns dns.FakeDNSEngine) {
fd = fdns
})
return NewFakeDNSServer(fd), nil
Expand Down
2 changes: 1 addition & 1 deletion app/observatory/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func init() {
sv := &service{v: s}
err := s.RequireFeatures(func(Observatory extension.Observatory) {
sv.observatory = Observatory
})
}, false)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion app/proxyman/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ func (s *service) Register(server *grpc.Server) {
common.Must(s.v.RequireFeatures(func(im inbound.Manager, om outbound.Manager) {
hs.ihm = im
hs.ohm = om
}))
}, false))
RegisterHandlerServiceServer(server, hs)

// For compatibility purposes
Expand Down
6 changes: 4 additions & 2 deletions app/router/balancing.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
sync "sync"

"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension"
Expand All @@ -31,9 +32,10 @@ type RoundRobinStrategy struct {
func (s *RoundRobinStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx
if len(s.FallbackTag) > 0 {
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) {
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observatory = observatory
})
return nil
}))
}
}

Expand Down
2 changes: 1 addition & 1 deletion app/router/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (s *service) Register(server *grpc.Server) {
vCoreDesc := RoutingService_ServiceDesc
vCoreDesc.ServiceName = "v2ray.core.app.router.command.RoutingService"
server.RegisterService(&vCoreDesc, rs)
}))
}, false))
}

func init() {
Expand Down
6 changes: 4 additions & 2 deletions app/router/strategy_leastload.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"time"

"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/dice"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/core"
Expand Down Expand Up @@ -59,9 +60,10 @@ type node struct {

func (s *LeastLoadStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) {
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observer = observatory
})
return nil
}))
}

func (s *LeastLoadStrategy) PickOutbound(candidates []string) string {
Expand Down
6 changes: 4 additions & 2 deletions app/router/strategy_leastping.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension"
Expand All @@ -20,9 +21,10 @@ func (l *LeastPingStrategy) GetPrincipleTarget(strings []string) []string {

func (l *LeastPingStrategy) InjectContext(ctx context.Context) {
l.ctx = ctx
core.RequireFeaturesAsync(l.ctx, func(observatory extension.Observatory) {
common.Must(core.RequireFeatures(l.ctx, func(observatory extension.Observatory) error {
l.observatory = observatory
})
return nil
}))
}

func (l *LeastPingStrategy) PickOutbound(strings []string) string {
Expand Down
6 changes: 4 additions & 2 deletions app/router/strategy_random.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"

"github.com/xtls/xray-core/app/observatory"
"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/dice"
"github.com/xtls/xray-core/core"
"github.com/xtls/xray-core/features/extension"
Expand All @@ -20,9 +21,10 @@ type RandomStrategy struct {
func (s *RandomStrategy) InjectContext(ctx context.Context) {
s.ctx = ctx
if len(s.FallbackTag) > 0 {
core.RequireFeaturesAsync(s.ctx, func(observatory extension.Observatory) {
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error {
s.observatory = observatory
})
return nil
}))
}
}

Expand Down
158 changes: 82 additions & 76 deletions core/xray.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"reflect"
"sync"
"time"

"github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/errors"
Expand Down Expand Up @@ -45,22 +44,13 @@ func getFeature(allFeatures []features.Feature, t reflect.Type) features.Feature
return nil
}

func (r *resolution) resolve(allFeatures []features.Feature) (bool, error) {
var fs []features.Feature
for _, d := range r.deps {
f := getFeature(allFeatures, d)
if f == nil {
return false, nil
}
fs = append(fs, f)
}

func (r *resolution) callbackResolution(allFeatures []features.Feature) error {
callback := reflect.ValueOf(r.callback)
var input []reflect.Value
callbackType := callback.Type()
for i := 0; i < callbackType.NumIn(); i++ {
pt := callbackType.In(i)
for _, f := range fs {
for _, f := range allFeatures {
if reflect.TypeOf(f).AssignableTo(pt) {
input = append(input, reflect.ValueOf(f))
break
Expand All @@ -85,15 +75,17 @@ func (r *resolution) resolve(allFeatures []features.Feature) (bool, error) {
}
}

return true, err
return err
}

// Instance combines all Xray features.
type Instance struct {
access sync.Mutex
features []features.Feature
featureResolutions []resolution
running bool
statusLock sync.Mutex
features []features.Feature
pendingResolutions []resolution
pendingOptionalResolutions []resolution
running bool
resolveLock sync.Mutex

ctx context.Context
}
Expand Down Expand Up @@ -154,13 +146,14 @@ func addOutboundHandlers(server *Instance, configs []*OutboundHandlerConfig) err
// See Instance.RequireFeatures for more information.
func RequireFeatures(ctx context.Context, callback interface{}) error {
v := MustFromContext(ctx)
return v.RequireFeatures(callback)
return v.RequireFeatures(callback, false)
}

// RequireFeaturesAsync registers a callback, which will be called when all dependent features are registered. The order of app init doesn't matter
func RequireFeaturesAsync(ctx context.Context, callback interface{}) {
// OptionalFeatures is a helper function to aquire features from Instance in context.
// See Instance.RequireFeatures for more information.
func OptionalFeatures(ctx context.Context, callback interface{}) error {
v := MustFromContext(ctx)
v.RequireFeaturesAsync(callback)
return v.RequireFeatures(callback, true)
}

// New returns a new Xray instance based on given configuration.
Expand Down Expand Up @@ -234,9 +227,12 @@ func initInstanceWithConfig(config *Config, server *Instance) (bool, error) {
}(),
)

if server.featureResolutions != nil {
server.resolveLock.Lock()
if server.pendingResolutions != nil {
server.resolveLock.Unlock()
return true, errors.New("not all dependencies are resolved.")
}
server.resolveLock.Unlock()

if err := addInboundHandlers(server, config.Inbound); err != nil {
return true, err
Expand All @@ -255,8 +251,8 @@ func (s *Instance) Type() interface{} {

// Close shutdown the Xray instance.
func (s *Instance) Close() error {
s.access.Lock()
defer s.access.Unlock()
s.statusLock.Lock()
defer s.statusLock.Unlock()

s.running = false

Expand All @@ -275,7 +271,7 @@ func (s *Instance) Close() error {

// RequireFeatures registers a callback, which will be called when all dependent features are registered.
// The callback must be a func(). All its parameters must be features.Feature.
func (s *Instance) RequireFeatures(callback interface{}) error {
func (s *Instance) RequireFeatures(callback interface{}, optional bool) error {
callbackType := reflect.TypeOf(callback)
if callbackType.Kind() != reflect.Func {
panic("not a function")
Expand All @@ -290,75 +286,85 @@ func (s *Instance) RequireFeatures(callback interface{}) error {
deps: featureTypes,
callback: callback,
}
if finished, err := r.resolve(s.features); finished {
return err
}
s.featureResolutions = append(s.featureResolutions, r)
return nil
}

// RequireFeaturesAsync registers a callback, which will be called when all dependent features are registered. The order of app init doesn't matter
func (s *Instance) RequireFeaturesAsync(callback interface{}) {
callbackType := reflect.TypeOf(callback)
if callbackType.Kind() != reflect.Func {
panic("not a function")
}

var featureTypes []reflect.Type
for i := 0; i < callbackType.NumIn(); i++ {
featureTypes = append(featureTypes, reflect.PtrTo(callbackType.In(i)))
}

r := resolution{
deps: featureTypes,
callback: callback,
s.resolveLock.Lock()
foundAll := true
for _, d := range r.deps {
f := getFeature(s.features, d)
if f == nil {
foundAll = false
break
}
}
go func() {
var finished = false
for i := 0; !finished; i++ {
if i > 100000 {
errors.LogError(s.ctx, "RequireFeaturesAsync failed after count ", i)
break;
}
finished, _ = r.resolve(s.features)
time.Sleep(time.Millisecond)
if foundAll {
s.resolveLock.Unlock()
return r.callbackResolution(s.features)
} else {
if optional {
s.pendingOptionalResolutions = append(s.pendingOptionalResolutions, r)
} else {
s.pendingResolutions = append(s.pendingResolutions, r)
}
s.featureResolutions = append(s.featureResolutions, r)
}()
s.resolveLock.Unlock()
return nil
}
}

// AddFeature registers a feature into current Instance.
func (s *Instance) AddFeature(feature features.Feature) error {
s.features = append(s.features, feature)

if s.running {
if err := feature.Start(); err != nil {
errors.LogInfoInner(s.ctx, err, "failed to start feature")
}
return nil
}

if s.featureResolutions == nil {
return nil
s.resolveLock.Lock()
s.features = append(s.features, feature)

var availableResolution []resolution
var pending []resolution
for _, r := range s.pendingResolutions {
foundAll := true
for _, d := range r.deps {
f := getFeature(s.features, d)
if f == nil {
foundAll = false
break
}
}
if foundAll {
availableResolution = append(availableResolution, r)
} else {
pending = append(pending, r)
}
}
s.pendingResolutions = pending

var pendingResolutions []resolution
for _, r := range s.featureResolutions {
finished, err := r.resolve(s.features)
if finished && err != nil {
return err
var pendingOptional []resolution
for _, r := range s.pendingOptionalResolutions {
foundAll := true
for _, d := range r.deps {
f := getFeature(s.features, d)
if f == nil {
foundAll = false
break
}
}
if !finished {
pendingResolutions = append(pendingResolutions, r)
if foundAll {
availableResolution = append(availableResolution, r)
} else {
pendingOptional = append(pendingOptional, r)
}
}
if len(pendingResolutions) == 0 {
s.featureResolutions = nil
} else if len(pendingResolutions) < len(s.featureResolutions) {
s.featureResolutions = pendingResolutions
s.pendingOptionalResolutions = pendingOptional
s.resolveLock.Unlock()

var err error
for _, r := range availableResolution {
err = r.callbackResolution(s.features) // only return the last error for now
}

return nil
return err
}

// GetFeature returns a feature of the given type, or nil if such feature is not registered.
Expand All @@ -371,8 +377,8 @@ func (s *Instance) GetFeature(featureType interface{}) features.Feature {
//
// xray:api:stable
func (s *Instance) Start() error {
s.access.Lock()
defer s.access.Unlock()
s.statusLock.Lock()
defer s.statusLock.Unlock()

s.running = true
for _, f := range s.features {
Expand Down
2 changes: 1 addition & 1 deletion core/xray_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestXrayDependency(t *testing.T) {
t.Error("expected dns client fulfilled, but actually nil")
}
wait <- true
})
}, false)
instance.AddFeature(localdns.New())
<-wait
}
Expand Down
Loading
Loading