Skip to content

Merge Kubernetes.DNS implementations #115

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 129 additions & 13 deletions lib/strategy/kubernetes_dns.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,24 @@
defmodule Cluster.Strategy.Kubernetes.DNS do
@moduledoc """
This clustering strategy works by loading all your Erlang nodes (within Pods) in the current [Kubernetes
namespace](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/).
It will fetch the addresses of all pods under a shared headless service and attempt to connect.
It will continually monitor and update its connections every 5s.
This clustering strategy works by loading all your Erlang nodes (within Pods) in the current
[Kubernetes namespace](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/).
For more information, see the kubernetes stateful-application [documentation](https://kubernetes.io/docs/tutorials/stateful-application/basic-stateful-set/#using-stable-network-identities)

* It will fetch the FQDN of all pods under the headless service and attempt to connect.
* It will continually monitor and update its connections according to the polling_interval (default 5s)

The `application_name`, `namespace` and `method` is configurable (you may have launched erlang with a different configured name),
but will in most cases be the name of your application
It assumes that all Erlang nodes were launched under a base name, are using longnames, and are unique
based on their FQDN, rather than the base hostname. In other words, in the following
longname, `<basename>@<ip>`, `basename` would be the value configured through
`application_name`.

An example configuration is below:
It uses one of two methods for the lookups:
* `:srv`
* `:a` (default)

An example for using A query configuration is below:

config :libcluster,
topologies: [
Expand All @@ -20,8 +27,110 @@ defmodule Cluster.Strategy.Kubernetes.DNS do
config: [
service: "myapp-headless",
application_name: "myapp",
polling_interval: 10_000]]]
polling_interval: 10_000,
method: :a
]
]
]




An example for SRV query configuration is below:

config :libcluster,
topologies: [
k8s_example: [
strategy: #{__MODULE__},
config: [
service: "elixir-plug-poc",
application_name: "elixir_plug_poc",
polling_interval: 10_000,
namespace: "default",
method: :srv
]
]
]


An example of how this strategy extracts topology information from DNS follows:

```
bash-5.0# hostname -f
elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local
bash-5.0# dig SRV elixir-plug-poc.default.svc.cluster.local

; <<>> DiG 9.14.3 <<>> SRV elixir-plug-poc.default.svc.cluster.local
;; global options: +cmd
;; Got answer:
;; WARNING: .local is reserved for Multicast DNS
;; You are currently testing what happens when an mDNS query is leaked to DNS
;; ->>HEADER<<- opcode: QUERY, status: NOERROR, id: 7169
;; flags: qr aa rd ra; QUERY: 1, ANSWER: 2, AUTHORITY: 0, ADDITIONAL: 2

;; QUESTION SECTION:
;elixir-plug-poc.default.svc.cluster.local. IN SRV

;; ANSWER SECTION:
elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local.
elixir-plug-poc.default.svc.cluster.local. 30 IN SRV 10 50 0 elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local.

;; ADDITIONAL SECTION:
elixir-plug-poc-0.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.95
elixir-plug-poc-1.elixir-plug-poc.default.svc.cluster.local. 30 IN A 10.1.0.96

;; Query time: 0 msec
;; SERVER: 10.96.0.10#53(10.96.0.10)
;; WHEN: Wed Jul 03 11:55:27 UTC 2019
;; MSG SIZE rcvd: 167
```

And here is an example of a corresponding kubernetes statefulset/service definition:

```yaml
apiVersion: v1
kind: Service
metadata:
name: elixir-plug-poc
labels:
app: elixir-plug-poc
spec:
ports:
- port: 4000
name: web
clusterIP: None
selector:
app: elixir-plug-poc
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: elixir-plug-poc
spec:
serviceName: "elixir-plug-poc"
replicas: 2
selector:
matchLabels:
app: elixir-plug-poc
template:
metadata:
labels:
app: elixir-plug-poc
spec:
containers:
- name: elixir-plug-poc
image: binarytemple/elixir_plug_poc
args:
- foreground
env:
- name: ERLANG_COOKIE
value: "cookie"
imagePullPolicy: Always
ports:
- containerPort: 4000
name: http
protocol: TCP
```
"""
use GenServer
use Cluster.Strategy
Expand Down Expand Up @@ -106,17 +215,21 @@ defmodule Cluster.Strategy.Kubernetes.DNS do

@spec get_nodes(State.t()) :: [atom()]
defp get_nodes(%State{topology: topology, config: config}) do
app_name = Keyword.fetch!(config, :application_name)
service = Keyword.fetch!(config, :service)
resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, :a))
method = Keyword.get(config, :method, :a)
app_name = Keyword.get(config, :application_name)
service_name = Keyword.get(config, :service)
namespace = Keyword.get(config, :namespace, "default")
service_k8s_path = "#{service_name}.#{namespace}.svc.cluster.local."
service = if method == :a, do: service_name, else: service_k8s_path
resolver = Keyword.get(config, :resolver, &:inet_res.getbyname(&1, method))

cond do
app_name != nil and service != nil ->
headless_service = to_charlist(service)

case resolver.(headless_service) do
{:ok, {:hostent, _fqdn, [], :inet, _value, addresses}} ->
parse_response(addresses, app_name)
{:ok, {:hostent, _, _, addrtype, _count, addresses}} when addrtype in [:srv, :inet] ->
parse_response(addresses, {app_name, method})

{:error, reason} ->
error(topology, "lookup against #{service} failed: #{inspect(reason)}")
Expand Down Expand Up @@ -145,9 +258,12 @@ defmodule Cluster.Strategy.Kubernetes.DNS do
Keyword.get(config, :polling_interval, @default_polling_interval)
end

defp parse_response(addresses, app_name) do
defp parse_response(addresses, {app_name, method}) do
parser =
if method == :a, do: &:inet_parse.ntoa(&1), else: &:erlang.list_to_binary(elem(&1, 3))

addresses
|> Enum.map(&:inet_parse.ntoa(&1))
|> Enum.map(parser)
|> Enum.map(&"#{app_name}@#{&1}")
|> Enum.map(&String.to_atom(&1))
end
Expand Down
Loading