Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove mesos code from get_running_task_allocation #3772

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 7 additions & 118 deletions paasta_tools/contrib/get_running_task_allocation.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#!/opt/venvs/paasta-tools/bin/python
import argparse
import json
import time
from typing import Any
from typing import Dict
Expand All @@ -11,16 +12,11 @@
from typing import Optional
from typing import Tuple

import a_sync
import simplejson as json
from kubernetes.client import V1Pod
from kubernetes.client import V1ResourceRequirements

from paasta_tools import kubernetes_tools
from paasta_tools import mesos_tools
from paasta_tools.kubernetes_tools import KubeClient
from paasta_tools.mesos.exceptions import SlaveDoesNotExist
from paasta_tools.mesos.task import Task
from paasta_tools.utils import load_system_paasta_config


Expand All @@ -40,81 +36,12 @@ class TaskAllocationInfo(NamedTuple):
host_ip: str
git_sha: str
config_sha: str
mesos_container_id: str # Because Mesos task info does not have docker id
mesos_container_id: Optional[
str
] # XXX(luisp): can we delete this now or do we need to cleanup splunk usages first?
namespace: Optional[str]


def get_container_info_from_mesos_task(
task: Task,
) -> Tuple[Optional[str], Optional[float]]:
for status in task["statuses"]:
if status["state"] != "TASK_RUNNING":
continue
container_id = (
status.get("container_status", {}).get("container_id", {}).get("value")
)
time_start = status.get("timestamp")
return container_id, time_start
return None, None


def get_paasta_service_instance_from_mesos_task(
task: Task,
) -> Tuple[Optional[str], Optional[str]]:
try:
docker_params = task["container"].get("docker", {}).get("parameters", [])
except KeyError:
return None, None
service, instance = None, None
for param in docker_params:
if param["key"] == "label":
label = param["value"]
if label.startswith("paasta_service="):
service = label.split("=")[1]
if label.startswith("paasta_instance="):
instance = label.split("=")[1]
return service, instance


async def get_pool_from_mesos_task(task: Task) -> Optional[str]:
try:
attributes = (await task.slave())["attributes"]
return attributes.get("pool", "default")
except SlaveDoesNotExist:
return None


@a_sync.to_blocking
async def get_mesos_task_allocation_info() -> Iterable[TaskAllocationInfo]:
tasks = await mesos_tools.get_cached_list_of_running_tasks_from_frameworks()
info_list = []
for task in tasks:
mesos_container_id, start_time = get_container_info_from_mesos_task(task)
paasta_service, paasta_instance = get_paasta_service_instance_from_mesos_task(
task
)
paasta_pool = await get_pool_from_mesos_task(task)
info_list.append(
TaskAllocationInfo(
paasta_service=paasta_service,
paasta_instance=paasta_instance,
container_type=MAIN_CONTAINER_TYPE,
paasta_pool=paasta_pool,
resources=task["resources"],
start_time=start_time,
docker_id=None,
pod_name=None,
pod_ip=None,
host_ip=None,
git_sha=None,
config_sha=None,
mesos_container_id=mesos_container_id,
namespace=None,
)
)
return info_list


def get_all_running_kubernetes_pods(
kube_client: KubeClient, namespace: str
) -> Iterable[V1Pod]:
Expand Down Expand Up @@ -256,50 +183,15 @@ def get_kubernetes_task_allocation_info(
return info_list


def get_task_allocation_info(
scheduler: str,
namespace: str,
kube_client: Optional[KubeClient],
) -> Iterable[TaskAllocationInfo]:
if scheduler == "mesos":
return get_mesos_task_allocation_info()
elif scheduler == "kubernetes":
return get_kubernetes_task_allocation_info(namespace, kube_client)
else:
return []


def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="")
parser.add_argument(
"--scheduler",
help="Scheduler to get task info from",
dest="scheduler",
default="kubernetes",
choices=["mesos", "kubernetes"],
)
parser.add_argument(
"--additional-namespaces-exclude",
help="full names of namespaces to not fetch allocation info for those that don't match --namespace-prefix-exlude",
dest="additional_namespaces_exclude",
nargs="+",
default=[],
)
parser.add_argument(
"--namespace-prefix",
help=argparse.SUPPRESS,
dest="namespace_prefix",
default="paasta",
)
parser.add_argument(
"--additional-namespaces",
help=argparse.SUPPRESS,
dest="additional_namespaces",
nargs="+",
# we default this to tron since this is really the only non-paasta-prefix namespaced that is part of paasta
# and we'd like to not run two cronjobs to get this information :p
default=["tron"],
)
args = parser.parse_args()

args.additional_namespaces_exclude = set(args.additional_namespaces_exclude)
Expand All @@ -321,18 +213,15 @@ def main(args: argparse.Namespace) -> None:
all_namespaces,
args.additional_namespaces_exclude,
):
display_task_allocation_info(
cluster, args.scheduler, matching_namespace, kube_client
)
display_task_allocation_info(cluster, matching_namespace, kube_client)


def display_task_allocation_info(
cluster: str,
scheduler: str,
namespace: str,
kube_client: Optional[KubeClient],
kube_client: KubeClient,
) -> None:
info_list = get_task_allocation_info(scheduler, namespace, kube_client)
info_list = get_kubernetes_task_allocation_info(namespace, kube_client)
timestamp = time.time()
for info in info_list:
info_dict = info._asdict()
Expand Down
Loading