Skip to content

Commit

Permalink
Merge pull request #207 from Argonus/support-cached-endpoints
Browse files Browse the repository at this point in the history
Add resource version parameter for kubernetes strategy
  • Loading branch information
hansihe authored Jan 9, 2025
2 parents da2a8bb + 81e1710 commit ff80ad7
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 3 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## Unreleased

- Add `kubernetes_use_cached_resources` option to Kubernetes strategy

## 3.4.1

- Use new cypher names
- Allow Epmd strategy to reconnect after connection failures
- Detect Self Signed Certificate Authority for Kubernetes Strategy
Expand Down
28 changes: 25 additions & 3 deletions lib/strategy/kubernetes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ defmodule Cluster.Strategy.Kubernetes do
- `:kubernetes_selector`
- `:kubernetes_service_name`
- `:kubernetes_ip_lookup_mode`
- `:kubernetes_use_cached_resources`
- `:mode`
## Getting `<basename>`
Expand Down Expand Up @@ -70,6 +71,11 @@ defmodule Cluster.Strategy.Kubernetes do
Then, this strategy will fetch the IP of all pods with that label and attempt to connect.
### `kubernetes_use_cached_resources` option
When setting this value, this strategy will use cached resource version value to fetch k8s resources.
In k8s resources are incremented by 1 on every change, this version will set requested resourceVersion
to 0, that will use cached versions of resources, take in mind that this may be outdated or unavailable.
### `:mode` option
Expand Down Expand Up @@ -362,6 +368,9 @@ defmodule Cluster.Strategy.Kubernetes do
selector = Keyword.fetch!(config, :kubernetes_selector)
ip_lookup_mode = Keyword.get(config, :kubernetes_ip_lookup_mode, :endpoints)

use_cache = Keyword.get(config, :kubernetes_use_cached_resources, false)
resource_version = if use_cache, do: 0, else: nil

master_name = Keyword.get(config, :kubernetes_master, @kubernetes_master)
cluster_domain = System.get_env("CLUSTER_DOMAIN", "#{cluster_name}.local")

Expand All @@ -380,12 +389,19 @@ defmodule Cluster.Strategy.Kubernetes do

cond do
app_name != nil and selector != nil ->
selector = URI.encode(selector)
query_params =
[]
|> apply_param(:labelSelector, selector)
|> apply_param(:resourceVersion, resource_version)
|> URI.encode_query(:rfc3986)

path =
case ip_lookup_mode do
:endpoints -> "api/v1/namespaces/#{namespace}/endpoints?labelSelector=#{selector}"
:pods -> "api/v1/namespaces/#{namespace}/pods?labelSelector=#{selector}"
:endpoints ->
"api/v1/namespaces/#{namespace}/endpoints?#{query_params}"

:pods ->
"api/v1/namespaces/#{namespace}/pods?#{query_params}"
end

headers = [{~c"authorization", ~c"Bearer #{token}"}]
Expand Down Expand Up @@ -440,6 +456,12 @@ defmodule Cluster.Strategy.Kubernetes do
end
end

defp apply_param(params, key, value) when value != nil do
[{key, value} | params]
end

defp apply_param(params, _key, _value), do: params

defp parse_response(:endpoints, resp) do
case resp do
%{"items" => items} when is_list(items) ->
Expand Down
32 changes: 32 additions & 0 deletions test/fixtures/vcr_cassettes/kubernetes.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,38 @@
"type": "ok"
}
},
{
"request": {
"body": "",
"headers": {
"authorization": "***"
},
"method": "get",
"options": {
"httpc_options": [],
"http_options": {
"ssl": "[verify: :verify_none]"
}
},
"request_body": "",
"url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/endpoints?labelSelector=app=test_selector&resourceVersion=0"
},
"response": {
"binary": false,
"body": "{\"kind\":\"EndpointsList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"subsets\":[{\"addresses\":[{\"hostname\":\"my-hostname-0\",\"ip\":\"10.48.33.136\",\"nodeName\":\"gke-jshmrtn-cluster-default-pool-a61da41f-db9x\",\"targetRef\":{\"kind\":\"Pod\",\"namespace\":\"airatel-service-localization\",\"name\":\"development-4292695165-mgq9f\",\"uid\":\"eb0f3e80-0295-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037783\"}}],\"ports\":[{\"name\":\"web\",\"port\":8443,\"protocol\":\"TCP\"}]}]}]}\n",
"headers": {
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
"content-length": "877",
"content-type": "application/json"
},
"status_code": [
"HTTP/1.1",
200,
"OK"
],
"type": "ok"
}
},
{
"request": {
"body": "",
Expand Down
32 changes: 32 additions & 0 deletions test/fixtures/vcr_cassettes/kubernetes_pods.json
Original file line number Diff line number Diff line change
Expand Up @@ -30,5 +30,37 @@
],
"type": "ok"
}
},
{
"request": {
"body": "",
"headers": {
"authorization": "***"
},
"method": "get",
"options": {
"httpc_options": [],
"http_options": {
"ssl": "[verify: :verify_none]"
}
},
"request_body": "",
"url": "https://cluster.localhost./api/v1/namespaces/__libcluster_test/pods?labelSelector=app=test_selector&resourceVersion=0"
},
"response": {
"binary": false,
"body": "{\"kind\":\"PodList\",\"apiVersion\":\"v1\",\"metadata\":{\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"resourceVersion\":\"17042410\"},\"items\":[{\"metadata\":{\"name\":\"development-development\",\"namespace\":\"airatel-service-localization\",\"selfLink\":\"SELFLINK_PLACEHOLDER\",\"uid\":\"7e3faf1e-0294-11e8-bcad-42010a9c01cc\",\"resourceVersion\":\"17037787\",\"creationTimestamp\":\"2018-01-26T12:29:03Z\",\"labels\":{\"app\":\"development\",\"chart\":\"CHART_PLACEHOLDER\"}},\"spec\": { \"hostname\": \"my-hostname-0\" },\"status\":{\"podIP\": \"10.48.33.136\"}}]}\n",
"headers": {
"date": "Fri, 26 Jan 2018 13:18:46 GMT",
"content-length": "877",
"content-type": "application/json"
},
"status_code": [
"HTTP/1.1",
200,
"OK"
],
"type": "ok"
}
}
]
82 changes: 82 additions & 0 deletions test/kubernetes_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,60 @@ defmodule Cluster.Strategy.KubernetesTest do
end
end

test "works with cached resources" do
use_cassette "kubernetes", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_selector: "app=test_selector",
kubernetes_use_cached_resources: true,
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect, _}, 5_000
end)
end
end

test "works with no cached resources" do
use_cassette "kubernetes", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_selector: "app=test_selector",
kubernetes_use_cached_resources: false,
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect, _}, 5_000
end)
end
end

test "works with dns and cluster_name" do
use_cassette "kubernetes", custom: true do
capture_log(fn ->
Expand Down Expand Up @@ -201,6 +255,34 @@ defmodule Cluster.Strategy.KubernetesTest do
end
end

test "works with pods and cached resources" do
use_cassette "kubernetes_pods", custom: true do
capture_log(fn ->
start_supervised!({Kubernetes,
[
%Cluster.Strategy.State{
topology: :name,
config: [
kubernetes_node_basename: "test_basename",
kubernetes_selector: "app=test_selector",
# If you want to run the test freshly, you'll need to create a DNS Entry
kubernetes_master: "cluster.localhost.",
kubernetes_ip_lookup_mode: :pods,
kubernetes_use_cached_resources: true,
kubernetes_service_account_path:
Path.join([__DIR__, "fixtures", "kubernetes", "service_account"])
],
connect: {Nodes, :connect, [self()]},
disconnect: {Nodes, :disconnect, [self()]},
list_nodes: {Nodes, :list_nodes, [[]]}
}
]})

assert_receive {:connect, :"[email protected]"}, 5_000
end)
end
end

test "works with pods and dns" do
use_cassette "kubernetes_pods", custom: true do
capture_log(fn ->
Expand Down

0 comments on commit ff80ad7

Please sign in to comment.