Skip to content

Commit

Permalink
fix(cluster): pg_repack does reindex and analyze automatically
Browse files Browse the repository at this point in the history
  • Loading branch information
jdobes authored and psegedy committed Nov 27, 2024
1 parent 06a51bc commit 0a6f26c
Showing 1 changed file with 7 additions and 32 deletions.
39 changes: 7 additions & 32 deletions cluster/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import List

from psycopg2 import connect
from psycopg2 import sql

from common.config import Config
from common.logging import get_logger
Expand All @@ -33,16 +32,9 @@ def get_conn():
)


def launch_reindex(table: str):
"""Launches pg_repack to reindex table"""
process = subprocess.Popen(PG_REPACK_ARGS + ["-t", table, "-x"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
with process.stdout:
for line in iter(process.stdout.readline, b""):
LOGGER.info(line[: len(line) - 1].decode("utf-8"))


def launch_cluster(table: str, fields: List[str]):
"""Launches pg_repack to cluster table by given fields"""
LOGGER.info("Clustering %s table by fields: %s", table, fields)
fields_str = ",".join(fields)
process = subprocess.Popen(PG_REPACK_ARGS + ["-t", table, "-o", fields_str], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
with process.stdout:
Expand All @@ -68,27 +60,10 @@ def search_table_partitions(conn, table_like_name: str) -> List[str]:
return partitions


def cluster_table(conn, table: str, cluster_fields: List[str]):
"""Cluster single table by given fields"""
LOGGER.info("Reindexing %s table", table)
launch_reindex(table)

LOGGER.info("Clustering %s table by fields: %s", table, cluster_fields)
launch_cluster(table, cluster_fields)

LOGGER.info("Analyzing %s table", table)
cur = conn.cursor()
query = sql.SQL("ANALYZE {}").format(sql.Identifier(table))
cur.execute(query)

conn.commit()
cur.close()


def cluster_partitioned_table(conn, partitions: List[str], cluster_fields: List[str]):
def cluster_partitioned_table(partitions: List[str], cluster_fields: List[str]):
"""Cluster partitioned table by given fields"""
for partition in partitions:
cluster_table(conn, partition, cluster_fields)
launch_cluster(partition, cluster_fields)


def cluster_system_vulnerable_package(conn):
Expand All @@ -98,7 +73,7 @@ def cluster_system_vulnerable_package(conn):
LOGGER.info("Clearing %s system_vulnerable_package partitions", len(partitions))

cluster_fields = ["rh_account_id", "system_id"]
cluster_partitioned_table(conn, partitions, cluster_fields)
cluster_partitioned_table(partitions, cluster_fields)


def cluster_system_vulnerabilities(conn):
Expand All @@ -108,7 +83,7 @@ def cluster_system_vulnerabilities(conn):
LOGGER.info("Clearing %s system_vulnerabilities partitions", len(partitions))

cluster_fields = ["rh_account_id", "cve_id"]
cluster_partitioned_table(conn, partitions, cluster_fields)
cluster_partitioned_table(partitions, cluster_fields)


def cluster() -> None:
Expand Down Expand Up @@ -138,9 +113,9 @@ def cluster() -> None:
LOGGER.error("pg_repack extension is not available")
sys.exit(1)

cluster_table(conn, "system_platform", ["rh_account_id"])
launch_cluster("system_platform", ["rh_account_id"])

cluster_table(conn, "cve_metadata", ["impact_id"])
launch_cluster("cve_metadata", ["impact_id"])

if CFG.cluster_system_vulnerabilities:
cluster_system_vulnerabilities(conn)
Expand Down

0 comments on commit 0a6f26c

Please sign in to comment.