Skip to content

Revert "Remove the private API for the hosted explorer" #6028

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions graph/src/components/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/// Component for running GraphQL queries over HTTP.
pub mod query;

/// Component for the index node server.
pub mod index_node;

pub mod server;
20 changes: 20 additions & 0 deletions graph/src/components/store/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use super::*;
use crate::blockchain::block_stream::{EntitySourceOperation, FirehoseCursor};
use crate::blockchain::{BlockTime, ChainIdentifier, ExtendedBlockPtr};
use crate::components::metrics::stopwatch::StopwatchMetrics;
use crate::components::server::index_node::VersionInfo;
use crate::components::subgraph::SubgraphVersionSwitchingMode;
use crate::components::transaction_receipt;
use crate::components::versions::ApiVersion;
Expand Down Expand Up @@ -686,6 +687,25 @@ pub trait StatusStore: Send + Sync + 'static {

fn status(&self, filter: status::Filter) -> Result<Vec<status::Info>, StoreError>;

/// Support for the explorer-specific API
fn version_info(&self, version_id: &str) -> Result<VersionInfo, StoreError>;

/// Support for the explorer-specific API; note that `subgraph_id` must be
/// the id of an entry in `subgraphs.subgraph`, not that of a deployment.
/// The return values are the ids of the `subgraphs.subgraph_version` for
/// the current and pending versions of the subgraph
fn versions_for_subgraph_id(
&self,
subgraph_id: &str,
) -> Result<(Option<String>, Option<String>), StoreError>;

/// Support for the explorer-specific API. Returns a vector of (name, version) of all
/// subgraphs for a given deployment hash.
fn subgraphs_for_deployment_hash(
&self,
deployment_hash: &str,
) -> Result<Vec<(String, String)>, StoreError>;

/// A value of None indicates that the table is not available. Re-deploying
/// the subgraph fixes this. It is undesirable to force everything to
/// re-sync from scratch, so existing deployments will continue without a
Expand Down
18 changes: 18 additions & 0 deletions graph/src/env/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,15 @@ pub struct EnvVars {
/// Set by the flag `GRAPH_LOG_TRIGGER_DATA`. Off by
/// default.
pub log_trigger_data: bool,
/// Set by the environment variable `GRAPH_EXPLORER_TTL`
/// (expressed in seconds). The default value is 10s.
pub explorer_ttl: Duration,
/// Set by the environment variable `GRAPH_EXPLORER_LOCK_THRESHOLD`
/// (expressed in milliseconds). The default value is 100ms.
pub explorer_lock_threshold: Duration,
/// Set by the environment variable `GRAPH_EXPLORER_QUERY_THRESHOLD`
/// (expressed in milliseconds). The default value is 500ms.
pub explorer_query_threshold: Duration,
/// Set by the environment variable `EXTERNAL_HTTP_BASE_URL`. No default
/// value is provided.
pub external_http_base_url: Option<String>,
Expand Down Expand Up @@ -319,6 +328,9 @@ impl EnvVars {
postpone_attribute_index_creation: inner.postpone_attribute_index_creation.0
|| cfg!(debug_assertions),
log_trigger_data: inner.log_trigger_data.0,
explorer_ttl: Duration::from_secs(inner.explorer_ttl_in_secs),
explorer_lock_threshold: Duration::from_millis(inner.explorer_lock_threshold_in_msec),
explorer_query_threshold: Duration::from_millis(inner.explorer_query_threshold_in_msec),
external_http_base_url: inner.external_http_base_url,
external_ws_base_url: inner.external_ws_base_url,
static_filters_threshold: inner.static_filters_threshold,
Expand Down Expand Up @@ -480,6 +492,12 @@ struct Inner {
postpone_attribute_index_creation: EnvVarBoolean,
#[envconfig(from = "GRAPH_LOG_TRIGGER_DATA", default = "false")]
log_trigger_data: EnvVarBoolean,
#[envconfig(from = "GRAPH_EXPLORER_TTL", default = "10")]
explorer_ttl_in_secs: u64,
#[envconfig(from = "GRAPH_EXPLORER_LOCK_THRESHOLD", default = "100")]
explorer_lock_threshold_in_msec: u64,
#[envconfig(from = "GRAPH_EXPLORER_QUERY_THRESHOLD", default = "500")]
explorer_query_threshold_in_msec: u64,
#[envconfig(from = "EXTERNAL_HTTP_BASE_URL")]
external_http_base_url: Option<String>,
#[envconfig(from = "EXTERNAL_WS_BASE_URL")]
Expand Down
218 changes: 218 additions & 0 deletions server/index-node/src/explorer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
//! Functionality to support the explorer in the hosted service. Everything
//! in this file is private API and experimental and subject to change at
//! any time
use graph::components::server::query::{ServerResponse, ServerResult};
use graph::http_body_util::Full;
use graph::hyper::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
CONTENT_TYPE,
};
use graph::hyper::{Response, StatusCode};
use graph::prelude::r;
use std::{sync::Arc, time::Instant};

use graph::{
components::{
server::{index_node::VersionInfo, query::ServerError},
store::StatusStore,
},
data::subgraph::status,
object,
prelude::{serde_json, warn, Logger, ENV_VARS},
util::timed_cache::TimedCache,
};

// Do not implement `Clone` for this; the IndexNode service puts the `Explorer`
// behind an `Arc` so we don't have to put each `Cache` into an `Arc`
//
// We cache responses for a fixed amount of time with the time given by
// `GRAPH_EXPLORER_TTL`
#[derive(Debug)]
pub struct Explorer<S> {
store: Arc<S>,
versions: TimedCache<String, r::Value>,
version_infos: TimedCache<String, VersionInfo>,
entity_counts: TimedCache<String, r::Value>,
}

impl<S> Explorer<S>
where
S: StatusStore,
{
pub fn new(store: Arc<S>) -> Self {
Self {
store,
versions: TimedCache::new(ENV_VARS.explorer_ttl),
version_infos: TimedCache::new(ENV_VARS.explorer_ttl),
entity_counts: TimedCache::new(ENV_VARS.explorer_ttl),
}
}

pub fn handle(&self, logger: &Logger, req: &[&str]) -> ServerResult {
match req {
["subgraph-versions", subgraph_id] => self.handle_subgraph_versions(subgraph_id),
["subgraph-version", version] => self.handle_subgraph_version(version),
["subgraph-repo", version] => self.handle_subgraph_repo(version),
["entity-count", deployment] => self.handle_entity_count(logger, deployment),
["subgraphs-for-deployment", deployment_hash] => {
self.handle_subgraphs_for_deployment(deployment_hash)
}
_ => handle_not_found(),
}
}

fn handle_subgraph_versions(&self, subgraph_id: &str) -> ServerResult {
if let Some(value) = self.versions.get(subgraph_id) {
return Ok(as_http_response(value.as_ref()));
}

let (current, pending) = self.store.versions_for_subgraph_id(subgraph_id)?;

let value = object! {
currentVersion: current,
pendingVersion: pending
};

let resp = as_http_response(&value);
self.versions.set(subgraph_id.to_string(), Arc::new(value));
Ok(resp)
}

fn handle_subgraph_version(&self, version: &str) -> ServerResult {
let vi = self.version_info(version)?;

let latest_ethereum_block_number = vi.latest_ethereum_block_number;
let total_ethereum_blocks_count = vi.total_ethereum_blocks_count;
let value = object! {
createdAt: vi.created_at.as_str(),
deploymentId: vi.deployment_id.as_str(),
latestEthereumBlockNumber: latest_ethereum_block_number,
totalEthereumBlocksCount: total_ethereum_blocks_count,
synced: vi.synced,
failed: vi.failed,
description: vi.description.as_deref(),
repository: vi.repository.as_deref(),
schema: vi.schema.document_string(),
network: vi.network.as_str()
};
Ok(as_http_response(&value))
}

fn handle_subgraph_repo(&self, version: &str) -> ServerResult {
let vi = self.version_info(version)?;

let value = object! {
createdAt: vi.created_at.as_str(),
deploymentId: vi.deployment_id.as_str(),
repository: vi.repository.as_deref()
};
Ok(as_http_response(&value))
}

fn handle_entity_count(&self, logger: &Logger, deployment: &str) -> ServerResult {
let start = Instant::now();
let count = self.entity_counts.get(deployment);
if start.elapsed() > ENV_VARS.explorer_lock_threshold {
let action = match count {
Some(_) => "cache_hit",
None => "cache_miss",
};
warn!(logger, "Getting entity_count takes too long";
"action" => action,
"deployment" => deployment,
"time_ms" => start.elapsed().as_millis());
}

if let Some(value) = count {
return Ok(as_http_response(value.as_ref()));
}

let start = Instant::now();
let infos = self
.store
.status(status::Filter::Deployments(vec![deployment.to_string()]))?;
if start.elapsed() > ENV_VARS.explorer_query_threshold {
warn!(logger, "Getting entity_count takes too long";
"action" => "query_status",
"deployment" => deployment,
"time_ms" => start.elapsed().as_millis());
}
let info = match infos.first() {
Some(info) => info,
None => {
return handle_not_found();
}
};

let value = object! {
entityCount: info.entity_count as i32
};
let start = Instant::now();
let resp = as_http_response(&value);
if start.elapsed() > ENV_VARS.explorer_lock_threshold {
warn!(logger, "Getting entity_count takes too long";
"action" => "as_http_response",
"deployment" => deployment,
"time_ms" => start.elapsed().as_millis());
}
let start = Instant::now();
self.entity_counts
.set(deployment.to_string(), Arc::new(value));
if start.elapsed() > ENV_VARS.explorer_lock_threshold {
warn!(logger, "Getting entity_count takes too long";
"action" => "cache_set",
"deployment" => deployment,
"time_ms" => start.elapsed().as_millis());
}
Ok(resp)
}

fn version_info(&self, version: &str) -> Result<Arc<VersionInfo>, ServerError> {
match self.version_infos.get(version) {
Some(vi) => Ok(vi),
None => {
let vi = Arc::new(self.store.version_info(version)?);
self.version_infos.set(version.to_string(), vi.clone());
Ok(vi)
}
}
}

fn handle_subgraphs_for_deployment(&self, deployment_hash: &str) -> ServerResult {
let name_version_pairs: Vec<r::Value> = self
.store
.subgraphs_for_deployment_hash(deployment_hash)?
.into_iter()
.map(|(name, version)| {
object! {
name: name,
version: version
}
})
.collect();
let payload = r::Value::List(name_version_pairs);
Ok(as_http_response(&payload))
}
}

fn handle_not_found() -> ServerResult {
Ok(Response::builder()
.status(StatusCode::NOT_FOUND)
.header(CONTENT_TYPE, "text/plain")
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.body(Full::from("Not found\n"))
.unwrap())
}

fn as_http_response(value: &r::Value) -> ServerResponse {
let status_code = StatusCode::OK;
let json = serde_json::to_string(&value).expect("Failed to serialize response to JSON");
Response::builder()
.status(status_code)
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type, User-Agent")
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS, POST")
.header(CONTENT_TYPE, "application/json")
.body(Full::from(json))
.unwrap()
}
1 change: 1 addition & 0 deletions server/index-node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod auth;
mod explorer;
mod resolver;
mod schema;
mod server;
Expand Down
7 changes: 7 additions & 0 deletions server/index-node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use graph_graphql::prelude::{execute_query, Query as PreparedQuery, QueryExecuti

use crate::auth::bearer_token;

use crate::explorer::Explorer;
use crate::resolver::IndexNodeResolver;
use crate::schema::SCHEMA;

Expand All @@ -42,6 +43,7 @@ pub struct IndexNodeService<S> {
logger: Logger,
blockchain_map: Arc<BlockchainMap>,
store: Arc<S>,
explorer: Arc<Explorer<S>>,
link_resolver: Arc<dyn LinkResolver>,
}

Expand All @@ -55,10 +57,13 @@ where
store: Arc<S>,
link_resolver: Arc<dyn LinkResolver>,
) -> Self {
let explorer = Arc::new(Explorer::new(store.clone()));

IndexNodeService {
logger,
blockchain_map,
store,
explorer,
link_resolver,
}
}
Expand Down Expand Up @@ -224,6 +229,8 @@ where
}
(Method::OPTIONS, ["graphql"]) => Ok(Self::handle_graphql_options(req)),

(Method::GET, ["explorer", rest @ ..]) => self.explorer.handle(&self.logger, rest),

_ => Ok(Self::handle_not_found()),
}
}
Expand Down
18 changes: 16 additions & 2 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,15 +318,27 @@ pub fn schema(conn: &mut PgConnection, site: &Site) -> Result<(InputSchema, bool
}

pub struct ManifestInfo {
pub description: Option<String>,
pub repository: Option<String>,
pub spec_version: String,
pub instrument: bool,
}

impl ManifestInfo {
pub fn load(conn: &mut PgConnection, site: &Site) -> Result<ManifestInfo, StoreError> {
use subgraph_manifest as sm;
let (spec_version, features): (String, Vec<String>) = sm::table
.select((sm::spec_version, sm::features))
let (description, repository, spec_version, features): (
Option<String>,
Option<String>,
String,
Vec<String>,
) = sm::table
.select((
sm::description,
sm::repository,
sm::spec_version,
sm::features,
))
.filter(sm::id.eq(site.id))
.first(conn)?;

Expand All @@ -336,6 +348,8 @@ impl ManifestInfo {
let instrument = features.iter().any(|s| s == "instrument");

Ok(ManifestInfo {
description,
repository,
spec_version,
instrument,
})
Expand Down
Loading