Skip to content

Commit

Permalink
Fix IDENTITY_INSERT to work cross-db (#3402) (#3424)
Browse files Browse the repository at this point in the history
Previously, passing three part object names to SET IDENTITY_INSERT was
throwing an error. In this commit, if the relation name is a three part
name, we will parse the catalog name and set the IDENTITY_INSERT
property for the relation in that catalog.

Also with this commit, we will check if current user has permission to SET
IDENTITY_INSERT for the relation.

Task: BABEL-3212

Signed-off-by: Sharu Goel <[email protected]>
  • Loading branch information
thephantomthief authored Jan 22, 2025
1 parent 08e8898 commit 31c6889
Show file tree
Hide file tree
Showing 3 changed files with 689 additions and 45 deletions.
126 changes: 100 additions & 26 deletions contrib/babelfishpg_tsql/src/pl_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,19 @@ check_identity_insert(char** newval, void **extra, GucSource source)
return true;
}

static void
throw_error_for_identity_insert(char *database, char *schema, char* object)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("Cannot find the object \"%s%s%s%s%s\" because it does not exist or you do not have permissions.",
database ? database : "",
database ? "." : "",
schema ? schema : "",
schema ? "." : "",
object)));
}

static void
assign_identity_insert(const char *newval, void *extra)
{
Expand All @@ -336,6 +349,10 @@ assign_identity_insert(const char *newval, void *extra)
char *id_insert_rel_name = NULL;
char *id_insert_schema_name = NULL;
char *cur_db_name;
char *catalog_name = NULL;
char *logical_schema_name = NULL;
char *curr_user_if_cross_db = NULL;
Oid curr_user_id = InvalidOid;

cur_db_name = get_cur_db_name();

Expand Down Expand Up @@ -363,40 +380,61 @@ assign_identity_insert(const char *newval, void *extra)
option_flag = (char *) linitial(elemlist);
rel_name = (char *) lsecond(elemlist);

/* Use catalog name if provided */
if (list_length(elemlist) == 4)
{
catalog_name = (char *) lfourth(elemlist);

if(!DbidIsValid(get_db_id(catalog_name)))
{
/* Get schema name for error message */
logical_schema_name = (char *) lthird(elemlist);
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);
}

cur_db_name = catalog_name;
curr_user_if_cross_db = get_user_for_database(cur_db_name);

/*
* User in cross-db case can be NULL if login has no user
* that it can use to connect to that database. We should
* throw permission denied error in that case
*/
if (!curr_user_if_cross_db)
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);
}

/* Check the user provided schema value */
if (list_length(elemlist) >= 3)
{
schema_name = (char *) lthird(elemlist);
logical_schema_name = (char *) lthird(elemlist);

if (cur_db_name)
schema_name = get_physical_schema_name(cur_db_name,
schema_name);
logical_schema_name);

/* If no schema name is provided, we should use default schema */
if (!schema_name)
{
char *user_name = NULL;

/* If catalog name is not NULL, it is cross-db */
if (catalog_name)
user_name = curr_user_if_cross_db;
else
user_name = GetUserNameFromId(GetUserId(), false);

schema_name = get_physical_schema_name(cur_db_name,
get_authid_user_ext_schema_name(cur_db_name, user_name));
}

schema_oid = LookupExplicitNamespace(schema_name, true);
if (!OidIsValid(schema_oid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_SCHEMA),
errmsg("schema \"%s\" does not exist",
schema_name)));
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);

rel_oid = get_relname_relid(rel_name, schema_oid);
if (!OidIsValid(rel_oid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
rel_name)));
}

/* Check the catalog name then ignore it */
if (list_length(elemlist) == 4)
{
char *catalog_name = (char *) lfourth(elemlist);

if (strcmp(catalog_name, get_database_name(MyDatabaseId)) != 0)
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cross-database references are not implemented: \"%s.%s.%s\"",
catalog_name, schema_name, rel_name)));
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);
}

/* If schema is not provided, find it from the search path. */
Expand All @@ -408,15 +446,51 @@ assign_identity_insert(const char *newval, void *extra)
*/
rel_oid = RelnameGetRelid(rel_name);
if (!OidIsValid(rel_oid))
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_TABLE),
errmsg("relation \"%s\" does not exist",
rel_name)));
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);

schema_oid = get_rel_namespace(rel_oid);
schema_name = get_namespace_name(schema_oid);
}

/* If catalog name is not NULL, it is cross-db */
if (catalog_name)
curr_user_id = get_role_oid(curr_user_if_cross_db, false);
else
curr_user_id = GetUserId();

/* Check if the physical schema is actually associated with current logical database */
/* Ignore for temporary tables */
if (schema_name && !isTempNamespace(get_namespace_oid(schema_name, false)))
{
Datum datum;
int16 db_id;
bool isnull;
HeapTuple tuple = SearchSysCache1(SYSNAMESPACENAME, CStringGetDatum(schema_name));

if (!HeapTupleIsValid(tuple))
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);

datum = SysCacheGetAttr(SYSNAMESPACENAME, tuple, Anum_namespace_ext_dbid, &isnull);
db_id = DatumGetInt16(datum);

if (!DbidIsValid(db_id) || db_id != get_db_id(cur_db_name))
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);

ReleaseSysCache(tuple);
}

/*
* For SET IDENTITY_INSERT, ALTER permission on relation is needed. In
* Babelfish, current user has ALTER permission on relation if either:
*
* 1. User is owner of object
* 2. User is member of db_ddladmin or db_owner database role
*/
if (!(object_ownercheck(RelationRelationId, rel_oid, curr_user_id) ||
has_privs_of_role(curr_user_id, get_db_ddladmin_oid(cur_db_name, false)) ||
has_privs_of_role(curr_user_id, get_db_owner_oid(cur_db_name, false))))
throw_error_for_identity_insert(catalog_name, logical_schema_name, rel_name);

/* Process assignment logic */
if (strcmp(option_flag, "on") == 0)
{
Expand Down
Loading

0 comments on commit 31c6889

Please sign in to comment.