diff --git a/src/yb/tserver/tablet_service.cc b/src/yb/tserver/tablet_service.cc index ce6a748537a5..7574eab74802 100644 --- a/src/yb/tserver/tablet_service.cc +++ b/src/yb/tserver/tablet_service.cc @@ -117,6 +117,7 @@ #include "yb/util/mem_tracker.h" #include "yb/util/metrics.h" #include "yb/util/monotime.h" +#include "yb/util/pb_util.h" #include "yb/util/pg_util.h" #include "yb/util/random_util.h" #include "yb/util/scope_exit.h" @@ -281,6 +282,12 @@ DECLARE_bool(TEST_enable_object_locking_for_table_locks); METRIC_DEFINE_gauge_uint64(server, ts_split_op_added, "Split OPs Added to Leader", yb::MetricUnit::kOperations, "Number of split operations added to the leader's Raft log."); +DEFINE_RUNTIME_bool(ysql_debug_log_write_requests, false, + "Print all YSQL write requests received by this process to the log." + "Note: Enabling this flag might log sensitive information."); +TAG_FLAG(ysql_debug_log_write_requests, advanced); +TAG_FLAG(ysql_debug_log_write_requests, unsafe); + double TEST_delay_create_transaction_probability = 0; namespace yb::tserver { @@ -393,6 +400,58 @@ Result GetLocalPgTxnSnapshotImpl( FullyDecodePgTxnSnapshotLocalId(snapshot_id_uuid))); } +Status PrintYSQLWriteRequest( + const WriteRequestPB& req, const RpcContext& context, const Tablet& tablet) { + if (req.pgsql_write_batch_size() == 0) { + return Status::OK(); + } + const auto& metadata = *tablet.metadata(); + + std::stringstream ss; + ss << "YSQL Write Request: " << std::endl << "From: " << context.requestor_string() << std::endl; + if (req.has_write_batch() && req.write_batch().has_transaction()) { + ss << "TransactionId: " + << VERIFY_RESULT(FullyDecodeTransactionId(req.write_batch().transaction().transaction_id())) + << std::endl; + } + for (const auto& entry : req.pgsql_write_batch()) { + ss << std::endl; + + auto stmt_type = PgsqlWriteRequestPB::PgsqlStmtType_Name(entry.stmt_type()); + boost::replace_all(stmt_type, "PGSQL_", ""); + ss << stmt_type << ": " << std::endl; + + if (entry.force_catalog_modifications()) { + ss << "\tforce_catalog_modifications: true" << std::endl; + } + const auto table_info = VERIFY_RESULT(metadata.GetTableInfo(entry.table_id())); + ss << "\tTable: " << table_info->table_name << " [" << entry.table_id() << "]" << std::endl; + + ss << "\tKey: "; + if (entry.has_ybctid_column_value()) { + ss << "ybctid [" << entry.ybctid_column_value().ShortDebugString() << "]"; + } + if (entry.has_ybctid_column_value() && !entry.range_column_values().empty()) { + ss << ", "; + } + if (!entry.range_column_values().empty()) { + ss << "range [" << pb_util::JoinRepeatedPBs(entry.range_column_values()) << "]"; + } + ss << std::endl; + if (!entry.column_new_values().empty() || !entry.column_values().empty()) { + ss << "\tColumns: "; + ss << pb_util::JoinRepeatedPBs(entry.column_new_values()); + if (!entry.column_new_values().empty() && !entry.column_values().empty()) { + ss << ", "; + } + ss << pb_util::JoinRepeatedPBs(entry.column_values()); + ss << std::endl; + } + } + LOG(INFO) << ss.str(); + return Status::OK(); +} + } // namespace typedef ListTabletsResponsePB::StatusAndSchemaPB StatusAndSchemaPB; @@ -2231,44 +2290,25 @@ Status TabletServiceImpl::PerformWrite( TRACE_EVENT1("tserver", "TabletServiceImpl::Write", "tablet_id", req->tablet_id()); VLOG(2) << "Received Write RPC: " << req->DebugString(); + UpdateClock(*req, server_->Clock()); auto tablet = VERIFY_RESULT(LookupLeaderTablet(server_->tablet_peer_lookup(), req->tablet_id(), resp)); RETURN_NOT_OK(CheckWriteThrottling(req->rejection_score(), tablet.peer.get())); + if (FLAGS_ysql_debug_log_write_requests) { + WARN_NOT_OK( + PrintYSQLWriteRequest(*req, *context, *tablet.tablet), + "Failed to print YSQL write request"); + } + if (tablet.tablet->metadata()->hidden()) { return STATUS( NotFound, "Tablet not found", req->tablet_id(), TabletServerError(TabletServerErrorPB::TABLET_NOT_FOUND)); } -#if defined(DUMP_WRITE) - if (req->has_write_batch() && req->write_batch().has_transaction()) { - VLOG(1) << "Write with transaction: " << req->write_batch().transaction().ShortDebugString(); - if (req->pgsql_write_batch_size() != 0) { - auto txn_id = CHECK_RESULT(FullyDecodeTransactionId( - req->write_batch().transaction().transaction_id())); - for (const auto& entry : req->pgsql_write_batch()) { - if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPDATE) { - auto key = entry.column_new_values(0).expr().value().int32_value(); - LOG(INFO) << txn_id << " UPDATE: " << key << " = " - << entry.column_new_values(1).expr().value().string_value(); - } else if ( - entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_INSERT || - entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_UPSERT) { - dockv::DocKey doc_key; - CHECK_OK(doc_key.FullyDecodeFrom(entry.ybctid_column_value().value().binary_value())); - LOG(INFO) << txn_id << " INSERT: " << doc_key.hashed_group()[0].GetInt32() << " = " - << entry.column_values(0).expr().value().string_value(); - } else if (entry.stmt_type() == PgsqlWriteRequestPB::PGSQL_DELETE) { - LOG(INFO) << txn_id << " DELETE: " << entry.ShortDebugString(); - } - } - } - } -#endif - if (PREDICT_FALSE(req->has_write_batch() && !req->has_external_hybrid_time() && (!req->write_batch().write_pairs().empty() || !req->write_batch().read_pairs().empty()))) { return STATUS( diff --git a/src/yb/util/pb_util.h b/src/yb/util/pb_util.h index be25d6df8842..efbe41c2ef4c 100644 --- a/src/yb/util/pb_util.h +++ b/src/yb/util/pb_util.h @@ -184,6 +184,20 @@ Result ParseFromSlice(const Slice& slice) { return result; } +template +std::string JoinRepeatedPBs(const google::protobuf::RepeatedPtrField& pbs) { + std::string result; + bool first = true; + for (const auto& pb : pbs) { + if (!first) { + result += ", "; + } + result += pb.ShortDebugString(); + first = false; + } + return result; +} + // Load a protobuf from the given path. Status ReadPBFromPath(Env* env, const std::string& path, MessageLite* msg);