Skip to content

Commit

Permalink
initial commit towards resolving issue #6722
Browse files Browse the repository at this point in the history
Signed-off-by: Hemant <[email protected]>
  • Loading branch information
hkiiita committed Oct 10, 2024
1 parent e2e5466 commit f380aa5
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 65 deletions.
84 changes: 55 additions & 29 deletions pkg/agent/controller/networkpolicy/fqdn.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,16 @@ func (fs *fqdnSelectorItem) matches(fqdn string) bool {
// expirationTime of the records, which is the DNS response
// receiving time plus lowest applicable TTL.
type dnsMeta struct {
expirationTime time.Time
//expirationTime time.Time
// Key for responseIPs is the string representation of the IP.
// It helps to quickly identify IP address updates when a
// new DNS response is received.
responseIPs map[string]net.IP
responseIPs map[string]ipWithTTL
}

type ipWithTTL struct {
ip net.IP
ttl time.Time
}

// subscriber is a entity that subsribes for datapath rule realization
Expand Down Expand Up @@ -253,8 +258,8 @@ func (f *fqdnController) getIPsForFQDNSelectors(fqdns []string) []net.IP {
}
for fqdn := range fqdnsMatched {
if dnsMeta, ok := f.dnsEntryCache[fqdn]; ok {
for _, ip := range dnsMeta.responseIPs {
matchedIPs = append(matchedIPs, ip)
for _, ipWithMetaData := range dnsMeta.responseIPs {
matchedIPs = append(matchedIPs, ipWithMetaData.ip)
}
}
}
Expand Down Expand Up @@ -416,54 +421,75 @@ func (f *fqdnController) onDNSResponse(
}
return
}
// mustCacheResponse is only true if the FQDN is already tracked by this
// controller, or it matches at least one fqdnSelectorItem from the policy rules.
// addressUpdate is only true if there has been an update in IP addresses
// corresponded with the FQDN.
mustCacheResponse, addressUpdate := false, false
recordTTL := time.Now().Add(time.Duration(lowestTTL) * time.Second)

addressUpdate := false
f.fqdnSelectorMutex.Lock()
defer f.fqdnSelectorMutex.Unlock()
oldDNSMeta, exist := f.dnsEntryCache[fqdn]
ipMetaDataHolder := make(map[string]ipWithTTL)

if exist {
mustCacheResponse = true
for ipStr := range responseIPs {
if _, ok := oldDNSMeta.responseIPs[ipStr]; !ok {
addressUpdate = true
break
// Data related to this domain exists in the cache.
// Besides presence of existing IPs, two scenarios : 1) May get New IPs 2) Some old IPs may be absent.

// check for new IPs and these new IPs need to be added with its new TTL value as received in response.
for ipStr, ip := range responseIPs {
if _, exist := oldDNSMeta.responseIPs[ipStr]; !exist {
ipMetaDataHolder[ipStr] = ipWithTTL{
ip: ip,
ttl: time.Now().Add(time.Duration(lowestTTL) * time.Second),
}
}
}
for oldIPStr, oldIP := range oldDNSMeta.responseIPs {
if _, ok := responseIPs[oldIPStr]; !ok {
if oldDNSMeta.expirationTime.Before(time.Now()) {
// This IP entry has already expired and not seen in the latest DNS response.
// It should be removed from the cache.

for ipStr, ipMetaData := range oldDNSMeta.responseIPs {
if _, exist := responseIPs[ipStr]; !exist {
if ipMetaData.ttl.Before(time.Now()) {
// this ip is expired and stale, remove it by not including it.
addressUpdate = true
} else {
// Add the unexpired IP entry to responseIP and update the lowest applicable TTL if needed.
responseIPs[oldIPStr] = oldIP
if oldDNSMeta.expirationTime.Before(recordTTL) {
recordTTL = oldDNSMeta.expirationTime
// It hasn't expired yet, so just retain it with its existing ttl.
ipMetaDataHolder[ipStr] = ipWithTTL{
ip: ipMetaData.ip,
ttl: ipMetaData.ttl,
}
}
} else {
// This old IP also exists in current response, so update it with new received TTl.
ipMetaDataHolder[ipStr] = ipWithTTL{
ip: ipMetaData.ip,
ttl: time.Now().Add(time.Duration(lowestTTL) * time.Second),
}
}
}

} else {
// First time seeing this domain.
// check if this needs to be tracked, by checking its presence in the rules.
// If a FQDN policy had been applied then there must be rule records but because it's
// not in cache hence its FQDN:SelectorItem mapping may not be present.
for selectorItem := range f.selectorItemToRuleIDs {
// Only track the FQDN if there is at least one fqdnSelectorItem matching it.
if selectorItem.matches(fqdn) {
mustCacheResponse, addressUpdate = true, true
f.setFQDNMatchSelector(fqdn, selectorItem)
for ipStr, ip := range responseIPs {
ipMetaDataHolder[ipStr] = ipWithTTL{
ip: ip,
ttl: time.Now().Add(time.Duration(lowestTTL) * time.Second),
}
}
}
}
}
if mustCacheResponse {

if len(ipMetaDataHolder) != 0 {
addressUpdate = true
f.dnsEntryCache[fqdn] = dnsMeta{
expirationTime: recordTTL,
responseIPs: responseIPs,
responseIPs: ipMetaDataHolder,
}
f.dnsQueryQueue.AddAfter(fqdn, recordTTL.Sub(time.Now()))
// The FQDN will be added to the queue only after `lowestTTL` value which
// would already have been derived using the minTTL logic.
f.dnsQueryQueue.AddAfter(fqdn, time.Duration(lowestTTL)*time.Second)
}
f.syncDirtyRules(fqdn, waitCh, addressUpdate)
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/agent/controller/networkpolicy/fqdn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -402,10 +402,10 @@ func TestGetIPsForFQDNSelectors(t *testing.T) {
},
existingDNSCache: map[string]dnsMeta{
"test.antrea.io": {
responseIPs: map[string]net.IP{
"127.0.0.1": net.ParseIP("127.0.0.1"),
"192.155.12.1": net.ParseIP("192.155.12.1"),
"192.158.1.38": net.ParseIP("192.158.1.38"),
responseIPs: map[string]ipWithTTL{
"127.0.0.1": {net.ParseIP("127.0.0.1"), time.Now()},
"192.155.12.1": {net.ParseIP("192.155.12.1"), time.Now()},
"192.158.1.38": {net.ParseIP("192.158.1.38"), time.Now()},
},
},
},
Expand Down
32 changes: 0 additions & 32 deletions test/e2e/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -3250,38 +3250,6 @@ func (data *TestData) runDNSQuery(
}
}

// patchPodAnnotation Patches a pod with given map of keys and values.
func (data *TestData) patchPodAnnotation(namespace, podName string, annotation map[string]string) error {
annotationPatch := map[string]interface{}{
"metadata": map[string]interface{}{
"annotations": make(map[string]string),
},
}

if annotation == nil {
return fmt.Errorf("no annotations were provided")
} else {
for key, value := range annotation {
annotationPatch["metadata"].(map[string]interface{})["annotations"].(map[string]string)[key] = value
}
}

patchData, err := json.Marshal(annotationPatch)
if err != nil {
log.Infof("Error marshalling annotation: %+v", err)
return err
}

_, err = data.clientset.CoreV1().Pods(namespace).Patch(context.TODO(), podName, types.MergePatchType, patchData, metav1.PatchOptions{})
if err != nil {
log.Infof("Error patching pod %s in namespace %s with annotations. Error: %+v", podName, namespace, err)
return err
}

log.Infof("Successfully patched pod %s in namespace %s with provided annotation", podName, namespace)
return nil
}

// setPodAnnotationToRandomValue Patches a pod by adding an annotation with a specified key and a randomly generated string as the value.
func (data *TestData) setPodAnnotationToRandomValue(namespace, podName, annotationKey string) error {
annotationPatch := map[string]interface{}{
Expand Down

0 comments on commit f380aa5

Please sign in to comment.