Skip to content

Commit

Permalink
fix: Parallelize user fetching to make it faster
Browse files Browse the repository at this point in the history
  • Loading branch information
davidgubler committed Jan 29, 2024
1 parent 2f09ad6 commit be1d497
Showing 1 changed file with 90 additions and 6 deletions.
96 changes: 90 additions & 6 deletions pkg/keycloakClient.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,36 @@ func (this *KeycloakClient) GetToken() (string, error) {
return fmt.Sprintf("%s", accessToken), nil
}

func (this *KeycloakClient) GetUsers(token string) ([]*KeycloakUser, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users?max=100000", this.baseURL.String(), this.realm), nil)
func (this *KeycloakClient) getUsersCount(token string) (uint32, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users/count", this.baseURL.String(), this.realm), nil)
if err != nil {
return 0, err
}

req.Header["Authorization"] = []string{"Bearer " + token}
req.Header["cache-control"] = []string{"no-cache"}

r, err := this.client.Do(req)
if err != nil {
return 0, err
}
defer r.Body.Close()

body, err := io.ReadAll(r.Body)
if err != nil {
return 0, err
}

var count uint32 = 0
err = json.Unmarshal(body, &count)
if err != nil {
return 0, err
}
return count, nil
}

func (this *KeycloakClient) getUsers(token string, batchSize uint32, first uint32) ([]*KeycloakUser, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users?max=%d&first=%d", this.baseURL.String(), this.realm, batchSize, first), nil)
if err != nil {
return nil, err
}
Expand All @@ -177,6 +205,62 @@ func (this *KeycloakClient) GetUsers(token string) ([]*KeycloakUser, error) {
return keycloakUsers, nil
}

func (this *KeycloakClient) usersWorker(token string, batchSize uint32, firstChan chan uint32, results *sync.Map, errorCount *uint64, wg *sync.WaitGroup) {
defer wg.Done()

for first := range firstChan {
groups, err := this.getUsers(token, batchSize, first)
if err != nil {
atomic.AddUint64(errorCount, 1)
klog.Error(err)
}
results.Store(first, groups)
}
}

func (this *KeycloakClient) GetUsers(token string) ([]*KeycloakUser, error) {
// This could be a simple straight fetch of all users, but because of https://github.com/keycloak/keycloak/issues/10005
// we need to do parallel fetches to keep the load times reasonable
count, err := this.getUsersCount(token)
if err != nil {
return nil, err
}

results := sync.Map{}
var errorCount uint64
var batchSize uint32 = 50

firstChan := make(chan uint32)
wg := new(sync.WaitGroup)

// creating workers
for i := 0; i < 10; i++ {
wg.Add(1)
go this.usersWorker(token, batchSize, firstChan, &results, &errorCount, wg)
}

// sending batches to workers
var i uint32
for i = 0; i <= count/batchSize; i++ {
firstChan <- i * batchSize
}

close(firstChan)
wg.Wait()

if errorCount > 0 {
return nil, errors.New("Could not fetch all users")
}

users := make([]*KeycloakUser, 0)
results.Range(func(k, v interface{}) bool {
users = append(users, v.([]*KeycloakUser)...)
return true
})

return users, nil
}

func (this *KeycloakClient) GetGroups(token string) ([]*KeycloakGroup, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/groups?max=100000&briefRepresentation=false", this.baseURL.String(), this.realm), nil)
if err != nil {
Expand Down Expand Up @@ -235,7 +319,7 @@ func (this *KeycloakClient) findSubgroup(groups []*KeycloakGroup) {
}
}

func (this *KeycloakClient) GetGroupMembership(token string, user *KeycloakUser) ([]*KeycloakGroup, error) {
func (this *KeycloakClient) getGroupMembership(token string, user *KeycloakUser) ([]*KeycloakGroup, error) {
req, err := http.NewRequest("GET", fmt.Sprintf("%s/auth/admin/realms/%s/users/%s/groups", this.baseURL.String(), this.realm, user.Id), nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -263,11 +347,11 @@ func (this *KeycloakClient) GetGroupMembership(token string, user *KeycloakUser)
return groups, nil
}

func (this *KeycloakClient) worker(token string, userChan chan *KeycloakUser, results *sync.Map, errorCount *uint64, wg *sync.WaitGroup) {
func (this *KeycloakClient) groupMembershipWorker(token string, userChan chan *KeycloakUser, results *sync.Map, errorCount *uint64, wg *sync.WaitGroup) {
defer wg.Done()

for user := range userChan {
groups, err := this.GetGroupMembership(token, user)
groups, err := this.getGroupMembership(token, user)
if err != nil {
atomic.AddUint64(errorCount, 1)
klog.Error(err)
Expand All @@ -286,7 +370,7 @@ func (this *KeycloakClient) GetGroupMemberships(token string, users []*KeycloakU
// creating workers
for i := 0; i < 10; i++ {
wg.Add(1)
go this.worker(token, userChan, &results, &errorCount, wg)
go this.groupMembershipWorker(token, userChan, &results, &errorCount, wg)
}

// sending users to workers
Expand Down

0 comments on commit be1d497

Please sign in to comment.