Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Citus #426

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
configuration. (Requires Python 3.9 or higher.)
* Add a `y` command to copy focused query to the system clipboard, using
OSC 52 escape sequence (#311).
* Support Citus query activity (`citus_stat_activity`) views, through a new
`--citus` command-line option.

### Fixed

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ ex:
--rds Enable support for AWS RDS (implies --no-tempfiles and
filters out the rdsadmin database from space
calculation).
--citus Enable support for Citus.
--output FILEPATH Store running queries as CSV.
--db-size, --no-db-size
Enable/disable total size of DB.
Expand Down
5 changes: 5 additions & 0 deletions docs/man/pg_activity.1
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ required by another session. It shows following information:
.Vb 1
\& Enable support for AWS RDS (implies \-\-no\-tempfiles and filters out the rdsadmin database from space calculation).
.Ve
.IP "\fB\-\-citus\fR" 2
.IX Item "--citus"
.Vb 1
\& Enable support for Citus.
.Ve
.IP "\fB\-\-output=FILEPATH\fR" 2
.IX Item "--output=FILEPATH"
.Vb 1
Expand Down
4 changes: 4 additions & 0 deletions docs/man/pg_activity.pod
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ required by another session. It shows following information:

Enable support for AWS RDS (implies --no-tempfiles and filters out the rdsadmin database from space calculation).

=item B<--citus>

Enable support for Citus.

=item B<--output=FILEPATH>

Store running queries as CSV.
Expand Down
7 changes: 7 additions & 0 deletions pgactivity/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,13 @@ def get_parser() -> argparse.ArgumentParser:
help="Enable support for AWS RDS (implies --no-tempfiles and filters out the rdsadmin database from space calculation).",
default=False,
)
group.add_argument(
"--citus",
dest="citus",
action="store_true",
help="Enable support for Citus.",
default=False,
)
group.add_argument(
"--output",
dest="output",
Expand Down
12 changes: 11 additions & 1 deletion pgactivity/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ class Data:
filters: Filters
dsn_parameters: dict[str, str]
failed_queries: FailedQueriesInfo
pg_stat_activity: str

@classmethod
def pg_connect(
Expand All @@ -85,6 +86,7 @@ def pg_connect(
password: str | None = None,
database: str = "postgres",
rds_mode: bool = False,
citus: bool = False,
dsn: str = "",
hide_queries_in_logs: bool = False,
filters: Filters = NO_FILTER,
Expand Down Expand Up @@ -115,6 +117,7 @@ def pg_connect(
failed_queries=FailedQueriesInfo(),
filters=filters,
dsn_parameters=pg.connection_parameters(pg_conn),
pg_stat_activity="citus_stat_activity" if citus else "pg_stat_activity",
)

def try_reconnect(self) -> Data | None:
Expand Down Expand Up @@ -320,7 +323,10 @@ def pg_get_server_information(
else:
query = queries.get("get_server_info_oldest")

qs = sql.SQL(query).format(dbname_filter=self.dbname_filter)
qs = sql.SQL(query).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
)
try:
ret = pg.fetchone(
self.pg_conn,
Expand Down Expand Up @@ -410,6 +416,7 @@ def pg_get_activities(self, duration_mode: int = 1) -> list[RunningProcess]:

duration_column = self.get_duration_column(duration_mode)
query = sql.SQL(qs).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
duration_column=sql.Identifier(duration_column),
min_duration=sql.Literal(self.min_duration),
Expand Down Expand Up @@ -437,6 +444,7 @@ def pg_get_waiting(self, duration_mode: int = 1) -> list[WaitingProcess]:

duration_column = self.get_duration_column(duration_mode)
query = sql.SQL(qs).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
duration_column=sql.Identifier(duration_column),
min_duration=sql.Literal(self.min_duration),
Expand Down Expand Up @@ -466,6 +474,7 @@ def pg_get_blocking(self, duration_mode: int = 1) -> list[BlockingProcess]:

duration_column = self.get_duration_column(duration_mode)
query = sql.SQL(qs).format(
pg_stat_activity=sql.Identifier(self.pg_stat_activity),
dbname_filter=self.dbname_filter,
duration_column=sql.Identifier(duration_column),
min_duration=sql.Literal(self.min_duration),
Expand Down Expand Up @@ -527,6 +536,7 @@ def pg_connect(
password=password,
database=options.dbname,
rds_mode=options.rds,
citus=options.citus,
min_duration=min_duration,
filters=filters,
hide_queries_in_logs=options.hide_queries_in_logs,
Expand Down
48 changes: 24 additions & 24 deletions pgactivity/queries/get_blocking_oldest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ SELECT
SELECT
blocking.pid,
'<unknown>' AS application_name,
pg_stat_activity.current_query AS query,
{pg_stat_activity}.current_query AS query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
NULL AS state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.transactionid = blocked.transactionid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -57,21 +57,21 @@ SELECT
SELECT
blocking.pid,
'<unknown>' AS application_name,
pg_stat_activity.current_query AS query,
{pg_stat_activity}.current_query AS query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
NULL AS state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.virtualxid = blocked.virtualxid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -87,21 +87,21 @@ SELECT
SELECT
blocking.pid,
'<unknown>' AS application_name,
pg_stat_activity.current_query AS query,
{pg_stat_activity}.current_query AS query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
NULL AS state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.database = blocked.database AND blocking.relation = blocked.relation AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.procpid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.procpid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand Down
60 changes: 30 additions & 30 deletions pgactivity/queries/get_blocking_post_090200.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,22 @@ SELECT
-- Transaction id lock
SELECT
blocking.pid,
pg_stat_activity.application_name,
pg_stat_activity.query,
{pg_stat_activity}.application_name,
{pg_stat_activity}.query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
pg_stat_activity.state as state,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
{pg_stat_activity}.state as state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.transactionid = blocked.transactionid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -51,22 +51,22 @@ SELECT
-- VirtualXid Lock
SELECT
blocking.pid,
pg_stat_activity.application_name,
pg_stat_activity.query,
{pg_stat_activity}.application_name,
{pg_stat_activity}.query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
pg_stat_activity.state as state,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
{pg_stat_activity}.state as state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.virtualxid = blocked.virtualxid AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand All @@ -81,22 +81,22 @@ SELECT
-- Relation or tuple Lock
SELECT
blocking.pid,
pg_stat_activity.application_name,
pg_stat_activity.query,
{pg_stat_activity}.application_name,
{pg_stat_activity}.query,
blocking.mode,
pg_stat_activity.datname,
pg_stat_activity.datid,
pg_stat_activity.usename,
pg_stat_activity.client_addr AS client,
{pg_stat_activity}.datname,
{pg_stat_activity}.datid,
{pg_stat_activity}.usename,
{pg_stat_activity}.client_addr AS client,
blocking.locktype,
EXTRACT(epoch FROM (NOW() - pg_stat_activity.{duration_column})) AS duration,
pg_stat_activity.state as state,
EXTRACT(epoch FROM (NOW() - {pg_stat_activity}.{duration_column})) AS duration,
{pg_stat_activity}.state as state,
blocking.relation::regclass AS relation,
pg_stat_activity.waiting
{pg_stat_activity}.waiting
FROM
pg_locks AS blocking
JOIN pg_locks AS blocked ON (blocking.database = blocked.database AND blocking.relation = blocked.relation AND blocking.locktype = blocked.locktype)
JOIN pg_stat_activity ON (blocking.pid = pg_stat_activity.pid)
JOIN {pg_stat_activity} ON (blocking.pid = {pg_stat_activity}.pid)
WHERE
blocking.granted
AND NOT blocked.granted
Expand Down
Loading
Loading