diff --git a/pkg/ingress/kube/common/metrics.go b/pkg/ingress/kube/common/metrics.go index a8795796d6..757dc0fe01 100644 --- a/pkg/ingress/kube/common/metrics.go +++ b/pkg/ingress/kube/common/metrics.go @@ -52,11 +52,16 @@ var ( "Total invalid ingresses known to pilot.", monitoring.WithLabels(clusterTag, invalidType), ) + + queryK8sVersionFail = monitoring.NewSum( + "pilot_query_k8s_version_fail", + "query k8s version of remote cluster fail number") ) func init() { monitoring.MustRegister(totalIngresses) monitoring.MustRegister(totalInvalidIngress) + monitoring.MustRegister(queryK8sVersionFail) } func RecordIngressNumber(cluster string, number int) { diff --git a/pkg/ingress/kube/common/tool.go b/pkg/ingress/kube/common/tool.go index 9f39665e18..4e8cfe4375 100644 --- a/pkg/ingress/kube/common/tool.go +++ b/pkg/ingress/kube/common/tool.go @@ -20,6 +20,7 @@ import ( "net" "sort" "strings" + "time" networking "istio.io/api/networking/v1alpha3" "istio.io/istio/pilot/pkg/model" @@ -27,11 +28,36 @@ import ( "istio.io/istio/pkg/kube" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/version" + "k8s.io/apimachinery/pkg/util/wait" netv1 "github.com/alibaba/higress/client/pkg/apis/networking/v1" . "github.com/alibaba/higress/pkg/ingress/log" ) +const ( + defaultInterval = 3 * time.Second + defaultTimeout = 1 * time.Minute +) + +type retry struct { + interval time.Duration + timeout time.Duration +} + +type RetryOption func(o *retry) + +func WithInterval(interval time.Duration) RetryOption { + return func(r *retry) { + r.interval = interval + } +} + +func WithTimeout(timeout time.Duration) RetryOption { + return func(r *retry) { + r.timeout = timeout + } +} + func ValidateBackendResource(resource *v1.TypedLocalObjectReference) bool { if resource == nil || resource.APIGroup == nil || *resource.APIGroup != netv1.SchemeGroupVersion.Group || @@ -42,43 +68,99 @@ func ValidateBackendResource(resource *v1.TypedLocalObjectReference) bool { } // V1Available check if the "networking/v1" Ingress is available. -func V1Available(client kube.Client) bool { +func V1Available(client kube.Client, retryOptions ...RetryOption) bool { + retry := &retry{ + interval: defaultInterval, + timeout: defaultTimeout, + } + + for _, option := range retryOptions { + option(retry) + } + + // most case is greater than 1.18 + supportV1 := true + err := wait.PollImmediate(retry.interval, retry.timeout, func() (done bool, err error) { + available, err := v1Available(client) + if err != nil { + IngressLog.Errorf("check v1 available error: %v", err) + // retry + return false, nil + } + supportV1 = available + // we have done. + return true, nil + }) + + if err != nil { + IngressLog.Errorf("check v1 available finally error: %v", err) + } + + return supportV1 +} + +// v1Available check if the "networking/v1" Ingress is available. +func v1Available(client kube.Client) (bool, error) { // check kubernetes version to use new ingress package or not - version119, _ := version.ParseGeneric("v1.19.0") + return IsRunningVersionAtLeast("v1.19.0", client) +} + +// IsRunningVersionAtLeast check if the running version is greater than or equal to the atLeastVersion. +func IsRunningVersionAtLeast(atLeastVersionStr string, client kube.Client) (bool, error) { + atLeastVersion, _ := version.ParseGeneric(atLeastVersionStr) serverVersion, err := client.GetKubernetesVersion() if err != nil { - // Consider the new ingress package is available as default - return true + queryK8sVersionFail.Increment() + return false, err } runningVersion, err := version.ParseGeneric(serverVersion.String()) if err != nil { - // Consider the new ingress package is available as default - IngressLog.Errorf("unexpected error parsing running Kubernetes version: %v", err) - return true + queryK8sVersionFail.Increment() + return false, err } - return runningVersion.AtLeast(version119) + return runningVersion.AtLeast(atLeastVersion), nil } // NetworkingIngressAvailable check if the "networking" group Ingress is available. -func NetworkingIngressAvailable(client kube.Client) bool { - // check kubernetes version to use new ingress package or not - version118, _ := version.ParseGeneric("v1.18.0") +func NetworkingIngressAvailable(client kube.Client, retryOptions ...RetryOption) bool { + retry := &retry{ + interval: defaultInterval, + timeout: defaultTimeout, + } - serverVersion, err := client.GetKubernetesVersion() - if err != nil { - return false + for _, option := range retryOptions { + option(retry) } - runningVersion, err := version.ParseGeneric(serverVersion.String()) + // most case is greater than or equal 1.18. + supportNetworking := true + + err := wait.PollImmediate(retry.interval, retry.timeout, func() (done bool, err error) { + available, err := networkingIngressAvailable(client) + if err != nil { + IngressLog.Errorf("check networking available error: %v", err) + // retry + return false, nil + } + supportNetworking = available + // we have done. + return true, nil + }) + if err != nil { - IngressLog.Errorf("unexpected error parsing running Kubernetes version: %v", err) - return false + IngressLog.Errorf("check networking available finally error: %v", err) } - return runningVersion.AtLeast(version118) + return supportNetworking +} + +// networkingIngressAvailable check if the "networking" group Ingress is available. +func networkingIngressAvailable(client kube.Client) (bool, error) { + // check kubernetes version to use new ingress package or not + return IsRunningVersionAtLeast("v1.18.0", client) } // SortIngressByCreationTime sorts the list of config objects in ascending order by their creation time (if available). diff --git a/pkg/ingress/kube/common/tool_test.go b/pkg/ingress/kube/common/tool_test.go index d2f92009e9..c22ac1c827 100644 --- a/pkg/ingress/kube/common/tool_test.go +++ b/pkg/ingress/kube/common/tool_test.go @@ -15,7 +15,10 @@ package common import ( + "istio.io/istio/pkg/kube" + kubeVersion "k8s.io/apimachinery/pkg/version" "testing" + "time" networking "istio.io/api/networking/v1alpha3" "istio.io/istio/pkg/config" @@ -556,3 +559,107 @@ func TestSortHTTPRoutesWithMoreRules(t *testing.T) { } } } + +type supportV1Client struct { + kube.Client +} + +func (s *supportV1Client) GetKubernetesVersion() (*kubeVersion.Info, error) { + return &kubeVersion.Info{ + GitVersion: "v1.28.3-aliyun.1", + }, nil +} + +type unSupportV1Client struct { + kube.Client +} + +func (u *unSupportV1Client) GetKubernetesVersion() (*kubeVersion.Info, error) { + return &kubeVersion.Info{ + GitVersion: "v1.18.0", + }, nil +} + +type supportNetworkingClient struct { + kube.Client +} + +func (s *supportNetworkingClient) GetKubernetesVersion() (*kubeVersion.Info, error) { + return &kubeVersion.Info{ + GitVersion: "v1.18.0-aliyun.1", + }, nil +} + +type unSupportNetworkingClient struct { + kube.Client +} + +func (u *unSupportNetworkingClient) GetKubernetesVersion() (*kubeVersion.Info, error) { + return &kubeVersion.Info{ + GitVersion: "v1.17.0-aliyun.1", + }, nil +} + +type errorClient struct { + kube.Client +} + +func (e *errorClient) GetKubernetesVersion() (*kubeVersion.Info, error) { + return &kubeVersion.Info{ + GitVersion: "error", + }, nil +} + +func TestV1Available(t *testing.T) { + fakeClient := kube.NewFakeClient() + + v1Client := &supportV1Client{ + fakeClient, + } + + if !V1Available(v1Client) { + t.Fatal("should support v1") + } + + v1Beta1Client := &unSupportV1Client{ + fakeClient, + } + if V1Available(v1Beta1Client) { + t.Fatal("should not support v1") + } + + errorClient := &errorClient{ + fakeClient, + } + // will fallback to v1 + if !V1Available(errorClient, WithInterval(1*time.Second), WithTimeout(3*time.Second)) { + t.Fatal("should fallback to v1") + } +} + +func TestNetworkingIngressAvailable(t *testing.T) { + fakeClient := kube.NewFakeClient() + + networkingClient := &supportNetworkingClient{ + fakeClient, + } + + if !NetworkingIngressAvailable(networkingClient) { + t.Fatal("should support networking") + } + + notNetworkingClient := &unSupportNetworkingClient{ + fakeClient, + } + if NetworkingIngressAvailable(notNetworkingClient) { + t.Fatal("should not support networking") + } + + errorClient := &errorClient{ + fakeClient, + } + // will fallback to networking + if !NetworkingIngressAvailable(errorClient, WithInterval(1*time.Second), WithTimeout(3*time.Second)) { + t.Fatal("should fallback to networking") + } +}