From 01c574e306b71049b5d1d46c18bc82dbf0ca5eee Mon Sep 17 00:00:00 2001 From: Hadley Wickham Date: Mon, 30 Sep 2024 07:25:46 -0500 Subject: [PATCH] Use bigrquerystorage for downloads (#604) --- DESCRIPTION | 15 ++-- NAMESPACE | 1 + NEWS.md | 6 ++ R/bq-download.R | 109 +++++++++++++++++------ R/bq-perform.R | 72 ++++++++++++--- R/dbi-connection.R | 2 +- R/dbi-result.R | 31 +++++-- R/dplyr.R | 71 +++++++++++---- R/utils.R | 4 + _pkgdown.yml | 1 + man/api-perform.Rd | 11 ++- man/api-table.Rd | 2 +- man/bq_table_download.Rd | 71 +++++++++------ man/collect.tbl_BigQueryConnection.Rd | 54 ++++++++++++ tests/testthat/_snaps/bq-download.md | 16 +++- tests/testthat/test-bq-download.R | 121 +++++++++++++++++++++++--- tests/testthat/test-bq-parse.R | 4 +- tests/testthat/test-bq-perform.R | 12 ++- tests/testthat/test-bq-table.R | 16 ++-- tests/testthat/test-dplyr.R | 21 +++-- 20 files changed, 508 insertions(+), 132 deletions(-) create mode 100644 man/collect.tbl_BigQueryConnection.Rd diff --git a/DESCRIPTION b/DESCRIPTION index 64d05146..34bfa4ba 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -12,9 +12,9 @@ Description: Easily talk to Google's 'BigQuery' database from R. License: MIT + file LICENSE URL: https://bigrquery.r-dbi.org, https://github.com/r-dbi/bigrquery BugReports: https://github.com/r-dbi/bigrquery/issues -Depends: +Depends: R (>= 4.0) -Imports: +Imports: bit64, brio, cli, @@ -29,8 +29,9 @@ Imports: prettyunits, rlang (>= 1.1.0), tibble, - nanoparquet (> 0.3.1) + nanoparquet (>= 0.3.1) Suggests: + bigrquerystorage (>= 1.1.0.9000), blob, covr, dbplyr (>= 2.4.0), @@ -41,9 +42,7 @@ Suggests: testthat (>= 3.1.5), wk (>= 0.3.2), withr -Remotes: - r-lib/nanoparquet -LinkingTo: +LinkingTo: cli, cpp11, rapidjsonr @@ -54,7 +53,7 @@ Config/testthat/start-first: bq-table, dplyr Encoding: UTF-8 Roxygen: list(markdown = TRUE) RoxygenNote: 7.3.2 -Collate: +Collate: 'bigrquery-package.R' 'bq-auth.R' 'bq-dataset.R' @@ -84,3 +83,5 @@ Collate: 'import-standalone-types-check.R' 'utils.R' 'zzz.R' +Remotes: + meztez/bigrquerystorage diff --git a/NAMESPACE b/NAMESPACE index 15cf7aae..8dfaf332 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -88,6 +88,7 @@ export(bq_perform_extract) export(bq_perform_load) export(bq_perform_query) export(bq_perform_query_dry_run) +export(bq_perform_query_schema) export(bq_perform_upload) export(bq_project_datasets) export(bq_project_jobs) diff --git a/NEWS.md b/NEWS.md index 6063a301..8ecfd063 100644 --- a/NEWS.md +++ b/NEWS.md @@ -1,6 +1,12 @@ # bigrquery (development version) +* If the bigrquerystorage package is installed, `bq_table_download()` (and + hence `collect()`, `dbGetQuery()` and `dbFetch()` will use it. This will + drastically improve the speed of downloading large datasets. A big thanks + to @meztez for creating the bigrquerystorage package! + * The `bq_perform_upload()` function now allows users to choose the transmission format (JSON or PARQUET) for data sent to BigQuery (@apalacio9502, #608). + * bigrquery now requires R 4.0, in line with our version support principles. # bigrquery 1.5.1 diff --git a/R/bq-download.R b/R/bq-download.R index 126d1dcf..14d12a66 100644 --- a/R/bq-download.R +++ b/R/bq-download.R @@ -1,30 +1,36 @@ #' Download table data #' -#' This retrieves rows in chunks of `page_size`. It is most suitable for results -#' of smaller queries (<100 MB, say). For larger queries, it is better to -#' export the results to a CSV file stored on google cloud and use the -#' bq command line tool to download locally. +#' @description +#' This function provides two ways to download data from BigQuery, transfering +#' data using either JSON or arrow, depending on the `api` argument. If +#' bigrquerystorage is installed, `api = "arrow"` will be used (because it's +#' so much faster, but see the limitions below), otherwise you can select +#' deliberately by using `api = "json"` or `api = "arrow"`. #' -#' @section Complex data: -#' bigrquery will retrieve nested and repeated columns in to list-columns +#' ## Arrow API +#' +#' The arrow API is much faster, but has heavier dependencies: bigrquerystorage +#' requires the arrow package, which can be tricky to compile on Linux (but you +#' usually should be able to get a binary from +#' [Posit Public Package Manager](https://posit.co/products/cloud/public-package-manager/). +#' +#' There's one known limitation of `api = "arrow"`: when querying public data, +#' you'll now need to provide a `billing` project. +#' +#' ## JSON API +#' +#' The JSON API retrieves rows in chunks of `page_size`. It is most suitable +#' for results of smaller queries (<100 MB, say). Unfortunately due to +#' limitations in the BigQuery API, you may need to vary this parameter +#' depending on the complexity of the underlying data. +#' +#' The JSON API will convert nested and repeated columns in to list-columns #' as follows: #' #' * Repeated values (arrays) will become a list-column of vectors. #' * Records will become list-columns of named lists. #' * Repeated records will become list-columns of data frames. #' -#' @section Larger datasets: -#' In my timings, this code takes around 1 minute per 100 MB of data. -#' If you need to download considerably more than this, I recommend: -#' -#' * Export a `.csv` file to Cloud Storage using [bq_table_save()]. -#' * Use the `gsutil` command line utility to download it. -#' * Read the csv file into R with `readr::read_csv()` or `data.table::fread()`. -#' -#' Unfortunately you can not export nested or repeated formats into CSV, and -#' the formats that BigQuery supports (arvn and ndjson) that allow for -#' nested/repeated values, are not well supported in R. -#' #' @return Because data retrieval may generate list-columns and the `data.frame` #' print method can have problems with list-columns, this method returns #' a tibble. If you need a `data.frame`, coerce the results with @@ -32,30 +38,40 @@ #' @param x A [bq_table] #' @param n_max Maximum number of results to retrieve. Use `Inf` to retrieve all #' rows. -#' @param page_size The number of rows requested per chunk. It is recommended to -#' leave this unspecified until you have evidence that the `page_size` -#' selected automatically by `bq_table_download()` is problematic. +#' @param page_size (JSON only) The number of rows requested per chunk. It is +#' recommended to leave this unspecified until you have evidence that the +#' `page_size` selected automatically by `bq_table_download()` is problematic. #' #' When `page_size = NULL` bigrquery determines a conservative, natural chunk #' size empirically. If you specify the `page_size`, it is important that each #' chunk fits on one page, i.e. that the requested row limit is low enough to #' prevent the API from paginating based on response size. -#' @param start_index Starting row index (zero-based). -#' @param max_connections Number of maximum simultaneous connections to -#' BigQuery servers. +#' @param start_index (JSON only) Starting row index (zero-based). +#' @param max_connections (JSON only) Number of maximum simultaneous +#' connections to BigQuery servers. +#' @param api Which API to use? The `"json"` API works where ever bigrquery +#' does, but is slow and can require fiddling with the `page_size` parameter. +#' The `"arrow"` API is faster and more reliable, but only works if you +#' have also installed the bigrquerystorage package. +#' +#' Because the `"arrow"` API is so much faster, it will be used automatically +#' if the bigrquerystorage package is installed. #' @inheritParams api-job #' @param bigint The R type that BigQuery's 64-bit integer types should be #' mapped to. The default is `"integer"`, which returns R's `integer` type, #' but results in `NA` for values above/below +/- 2147483647. `"integer64"` #' returns a [bit64::integer64], which allows the full range of 64 bit #' integers. +#' @param billing (Arrow only) Project to bill; defaults to the project of `x`, +#' and typically only needs to be specified if you're working with public +#' datasets. #' @param max_results `r lifecycle::badge("deprecated")` Deprecated. Please use #' `n_max` instead. #' @section Google BigQuery API documentation: #' * [list](https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/list) #' @export #' @examplesIf bq_testable() -#' df <- bq_table_download("publicdata.samples.natality", n_max = 35000) +#' df <- bq_table_download("publicdata.samples.natality", n_max = 35000, billing = bq_test_project()) bq_table_download <- function(x, n_max = Inf, @@ -64,6 +80,8 @@ bq_table_download <- max_connections = 6L, quiet = NA, bigint = c("integer", "integer64", "numeric", "character"), + api = c("json", "arrow"), + billing = x$project, max_results = deprecated()) { x <- as_bq_table(x) check_number_whole(n_max, min = 0, allow_infinite = TRUE) @@ -71,6 +89,8 @@ bq_table_download <- check_number_whole(max_connections, min = 1) quiet <- check_quiet(quiet) bigint <- arg_match(bigint) + api <- check_api(api) + if (lifecycle::is_present(max_results)) { lifecycle::deprecate_warn( "1.4.0", "bq_table_download(max_results)", "bq_table_download(n_max)" @@ -78,6 +98,37 @@ bq_table_download <- n_max <- max_results } + if (api == "arrow") { + check_installed("bigrquerystorage", "required to download using arrow API") + if (!missing(page_size)) { + cli::cli_warn( + '{.arg page_size} is ignored when {.code api == "arrow"}', + call = environment() + ) + } + if (!missing(start_index)) { + cli::cli_warn( + '{.arg start_index} is ignored when {.code api == "arrow"}', + call = environment() + ) + } + if (!missing(max_connections)) { + cli::cli_warn( + '{.arg max_connections} is ignored when {.code api == "arrow"}', + call = environment() + ) + } + + return(bigrquerystorage::bqs_table_download( + x = toString(x), + parent = billing, + n_max = n_max, + quiet = quiet, + bigint = bigint, + as_tibble = TRUE + )) + } + params <- set_row_params( nrow = bq_table_nrow(x), n_max = n_max, @@ -202,6 +253,14 @@ bq_table_download <- parse_postprocess(table_data, bigint = bigint) } +check_api <- function(api = c("json", "arrow"), error_call = caller_env()) { + if (identical(api, c("json", "arrow"))) { + if (has_bigrquerystorage()) "arrow" else "json" + } else { + arg_match(api, error_call = error_call) + } +} + # This function is a modified version of # https://github.com/r-dbi/RPostgres/blob/master/R/PqResult.R parse_postprocess <- function(df, bigint) { diff --git a/R/bq-perform.R b/R/bq-perform.R index cd7a1e41..203d97a6 100644 --- a/R/bq-perform.R +++ b/R/bq-perform.R @@ -210,7 +210,7 @@ export_json <- function(values) { #' Google Cloud. #' #' For Google Cloud Storage URIs: Each URI can contain one -#' `'*'`` wildcard character and it must come after the 'bucket' name. +#' `'*'` wildcard character and it must come after the 'bucket' name. #' Size limits related to load jobs apply to external data sources. #' #' For Google Cloud Bigtable URIs: Exactly one URI can be specified and @@ -358,21 +358,13 @@ bq_perform_query_dry_run <- function(query, billing, parameters = NULL, use_legacy_sql = FALSE) { - check_string(query) - check_string(billing) - check_bool(use_legacy_sql) - query <- list( - query = unbox(query), - useLegacySql = unbox(use_legacy_sql) + query <- bq_perform_query_data( + query = query, + default_dataset = default_dataset, + parameters = parameters, + use_legacy_sql = use_legacy_sql ) - if (!is.null(parameters)) { - parameters <- as_bq_params(parameters) - query$queryParameters <- as_json(parameters) - } - if (!is.null(default_dataset)) { - query$defaultDataset <- datasetReference(default_dataset) - } url <- bq_path(billing, jobs = "") body <- list(configuration = list(query = query, dryRun = unbox(TRUE))) @@ -386,6 +378,58 @@ bq_perform_query_dry_run <- function(query, billing, structure(bytes, class = "bq_bytes") } +#' @export +#' @rdname api-perform +bq_perform_query_schema <- function(query, billing, + ..., + default_dataset = NULL, + parameters = NULL) { + + query <- bq_perform_query_data( + query = query, + default_dataset = default_dataset, + parameters = parameters, + use_legacy_sql = FALSE + ) + + url <- bq_path(billing, jobs = "") + body <- list(configuration = list(query = query, dryRun = unbox(TRUE))) + + res <- bq_post( + url, + body = bq_body(body, ...), + query = list(fields = "statistics") + ) + # https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableSchema + res$statistics$query$schema$fields +} + +bq_perform_query_data <- function(query, + ..., + default_dataset = NULL, + parameters = NULL, + use_legacy_sql = FALSE, + call = caller_env()) { + check_string(query, error_call = call) + check_bool(use_legacy_sql, error_call = call) + + query <- list( + query = unbox(query), + useLegacySql = unbox(use_legacy_sql) + ) + if (!is.null(parameters)) { + parameters <- as_bq_params(parameters) + query$queryParameters <- as_json(parameters) + } + if (!is.null(default_dataset)) { + query$defaultDataset <- datasetReference(default_dataset) + } + + query +} + + + #' @export #' @rdname api-perform bq_perform_copy <- function(src, dest, diff --git a/R/dbi-connection.R b/R/dbi-connection.R index a790ba24..18cb4b35 100644 --- a/R/dbi-connection.R +++ b/R/dbi-connection.R @@ -318,7 +318,7 @@ setMethod("dbCreateTable", "BigQueryConnection", dbCreateTable_bq) dbReadTable_bq <- function(conn, name, ...) { tb <- as_bq_table(conn, name) - bq_table_download(tb, ...) + bq_table_download(tb, ..., api = "json") } #' @rdname DBI diff --git a/R/dbi-result.R b/R/dbi-result.R index e5248933..f4543717 100644 --- a/R/dbi-result.R +++ b/R/dbi-result.R @@ -100,18 +100,31 @@ setMethod( "dbFetch", "BigQueryResult", function(res, n = -1, ...) { check_number_whole(n, min = -1, allow_infinite = TRUE) + if (n == -1) n <- Inf - if (n == -1 || n == Inf) { + if (has_bigrquerystorage() && n == Inf && res@cursor$cur() == 0) { + # https://github.com/meztez/bigrquerystorage/issues/48 n <- res@cursor$left() + + # If possible, download complete dataset using arrow + data <- bq_table_download(res@bq_table, + n_max = n, + bigint = res@bigint, + quiet = res@quiet, + api = "arrow" + ) + } else { + # Otherwise, fall back to slower JSON API + data <- bq_table_download(res@bq_table, + n_max = n, + start_index = res@cursor$cur(), + page_size = res@page_size, + bigint = res@bigint, + quiet = res@quiet, + api = "json" + ) } - - data <- bq_table_download(res@bq_table, - n_max = n, - start_index = res@cursor$cur(), - page_size = res@page_size, - bigint = res@bigint, - quiet = res@quiet - ) + res@cursor$adv(nrow(data)) data diff --git a/R/dplyr.R b/R/dplyr.R index 9e8c2ff0..de9cbcaa 100644 --- a/R/dplyr.R +++ b/R/dplyr.R @@ -27,7 +27,10 @@ #' summarise(n = sum(word_count, na.rm = TRUE)) %>% #' arrange(desc(n)) #' } -src_bigquery <- function(project, dataset, billing = project, max_pages = 10) { +src_bigquery <- function(project, + dataset, + billing = project, + max_pages = 10) { check_installed("dbplyr") con <- DBI::dbConnect( @@ -45,10 +48,18 @@ src_bigquery <- function(project, dataset, billing = project, max_pages = 10) { tbl.BigQueryConnection <- function(src, from, ...) { src <- dbplyr::src_dbi(src, auto_disconnect = FALSE) + sql <- dbplyr::sql_query_fields(src$con, from) + dataset <- if (!is.null(src$con@dataset)) as_bq_dataset(src$con) + schema <- bq_perform_query_schema(sql, + billing = src$con@billing, + default_dataset = dataset + ) + vars <- map_chr(schema, "[[", "name") + if (utils::packageVersion("dbplyr") >= "2.4.0.9000") { - tbl <- dplyr::tbl(src, from = from) + tbl <- dplyr::tbl(src, from = from, vars = vars) } else { - tbl <- dplyr::tbl(src, from = from, check_from = FALSE) + tbl <- dplyr::tbl(src, from = from, vars = vars, check_from = FALSE) } # This is ugly, but I don't see a better way of doing this @@ -116,17 +127,35 @@ db_copy_to.BigQueryConnection <- function(con, # Efficient downloads ----------------------------------------------- # registered onLoad + +#' Collect a BigQuery table +#' +#' This collect method is specialised for BigQuery tables, generating the +#' SQL from your dplyr commands, then calling [bq_project_query()] +#' or [bq_dataset_query()] to run the query, then [bq_table_download()] +#' to download the results. Thus the arguments are a combination of the +#' arguments to [dplyr::collect()], `bq_project_query()`/`bq_dataset_query()`, +#' and `bq_table_download()`. +#' +#' @inheritParams dplyr::collect +#' @inheritParams bq_table_download +#' @param n Maximum number of results to retrieve. +#' The default, `Inf`, will retrieve all rows. +#' @param ... Other arguments passed on to +#' `bq_project_query()`/`bq_project_query()` collect.tbl_BigQueryConnection <- function(x, ..., - page_size = NULL, - max_connections = 6L, n = Inf, - warn_incomplete = TRUE) { + api = c("json", "arrow"), + page_size = NULL, + max_connections = 6L + ) { + api <- check_api(api) check_number_whole(n, min = 0, allow_infinite = TRUE) check_number_whole(max_connections, min = 1) - check_bool(warn_incomplete) con <- dbplyr::remote_con(x) + billing <- con@billing if (op_can_download(x)) { lq <- x$lazy_query @@ -136,7 +165,6 @@ collect.tbl_BigQueryConnection <- function(x, ..., } else { sql <- dbplyr::db_sql_render(con, x) - billing <- con@billing if (is.null(con@dataset)) { tb <- bq_project_query(billing, sql, quiet = con@quiet, ...) } else { @@ -147,13 +175,26 @@ collect.tbl_BigQueryConnection <- function(x, ..., quiet <- if (n < 100) TRUE else con@quiet bigint <- con@bigint %||% "integer" - out <- bq_table_download(tb, - n_max = n, - page_size = page_size, - quiet = quiet, - max_connections = max_connections, - bigint = bigint - ) + + if (api == "arrow") { + out <- bq_table_download(tb, + n_max = n, + quiet = quiet, + bigint = bigint, + billing = billing, + api = "arrow" + ) + } else { + out <- bq_table_download(tb, + n_max = n, + page_size = page_size, + quiet = quiet, + max_connections = max_connections, + bigint = bigint, + api = "json" + ) + } + dplyr::grouped_df(out, intersect(dbplyr::op_grps(x), names(out))) } diff --git a/R/utils.R b/R/utils.R index 98e872fd..1e84f9ff 100644 --- a/R/utils.R +++ b/R/utils.R @@ -71,3 +71,7 @@ as_query <- function(x, error_arg = caller_arg(x), error_call = caller_env()) { check_string(x, arg = error_arg, call = error_call) x } + +has_bigrquerystorage <- function() { + is_installed("bigrquerystorage") +} diff --git a/_pkgdown.yml b/_pkgdown.yml index beadd04a..d4bda334 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -16,6 +16,7 @@ reference: contents: - src_bigquery - bigquery + - collect.tbl_BigQueryConnection - title: Low-level API contents: diff --git a/man/api-perform.Rd b/man/api-perform.Rd index 081e2686..736883a1 100644 --- a/man/api-perform.Rd +++ b/man/api-perform.Rd @@ -7,6 +7,7 @@ \alias{bq_perform_load} \alias{bq_perform_query} \alias{bq_perform_query_dry_run} +\alias{bq_perform_query_schema} \alias{bq_perform_copy} \title{BigQuery jobs: perform a job} \usage{ @@ -65,6 +66,14 @@ bq_perform_query_dry_run( use_legacy_sql = FALSE ) +bq_perform_query_schema( + query, + billing, + ..., + default_dataset = NULL, + parameters = NULL +) + bq_perform_copy( src, dest, @@ -148,7 +157,7 @@ to the table. Google Cloud. For Google Cloud Storage URIs: Each URI can contain one -`'*'`` wildcard character and it must come after the 'bucket' name. +\code{'*'} wildcard character and it must come after the 'bucket' name. Size limits related to load jobs apply to external data sources. For Google Cloud Bigtable URIs: Exactly one URI can be specified and diff --git a/man/api-table.Rd b/man/api-table.Rd index 6509d18d..70c8031a 100644 --- a/man/api-table.Rd +++ b/man/api-table.Rd @@ -66,7 +66,7 @@ number of files.} Google Cloud. For Google Cloud Storage URIs: Each URI can contain one -`'*'`` wildcard character and it must come after the 'bucket' name. +\code{'*'} wildcard character and it must come after the 'bucket' name. Size limits related to load jobs apply to external data sources. For Google Cloud Bigtable URIs: Exactly one URI can be specified and diff --git a/man/bq_table_download.Rd b/man/bq_table_download.Rd index 90970863..939f8780 100644 --- a/man/bq_table_download.Rd +++ b/man/bq_table_download.Rd @@ -12,6 +12,8 @@ bq_table_download( max_connections = 6L, quiet = NA, bigint = c("integer", "integer64", "numeric", "character"), + api = c("json", "arrow"), + billing = x$project, max_results = deprecated() ) } @@ -21,19 +23,19 @@ bq_table_download( \item{n_max}{Maximum number of results to retrieve. Use \code{Inf} to retrieve all rows.} -\item{page_size}{The number of rows requested per chunk. It is recommended to -leave this unspecified until you have evidence that the \code{page_size} -selected automatically by \code{bq_table_download()} is problematic. +\item{page_size}{(JSON only) The number of rows requested per chunk. It is +recommended to leave this unspecified until you have evidence that the +\code{page_size} selected automatically by \code{bq_table_download()} is problematic. When \code{page_size = NULL} bigrquery determines a conservative, natural chunk size empirically. If you specify the \code{page_size}, it is important that each chunk fits on one page, i.e. that the requested row limit is low enough to prevent the API from paginating based on response size.} -\item{start_index}{Starting row index (zero-based).} +\item{start_index}{(JSON only) Starting row index (zero-based).} -\item{max_connections}{Number of maximum simultaneous connections to -BigQuery servers.} +\item{max_connections}{(JSON only) Number of maximum simultaneous +connections to BigQuery servers.} \item{quiet}{If \code{FALSE}, displays progress bar; if \code{TRUE} is silent; if \code{NA} picks based on whether or not you're in an interactive context.} @@ -44,6 +46,18 @@ but results in \code{NA} for values above/below +/- 2147483647. \code{"integer64 returns a \link[bit64:bit64-package]{bit64::integer64}, which allows the full range of 64 bit integers.} +\item{api}{Which API to use? The \code{"json"} API works where ever bigrquery +does, but is slow and can require fiddling with the \code{page_size} parameter. +The \code{"arrow"} API is faster and more reliable, but only works if you +have also installed the bigrquerystorage package. + +Because the \code{"arrow"} API is so much faster, it will be used automatically +if the bigrquerystorage package is installed.} + +\item{billing}{(Arrow only) Project to bill; defaults to the project of \code{x}, +and typically only needs to be specified if you're working with public +datasets.} + \item{max_results}{\ifelse{html}{\href{https://lifecycle.r-lib.org/articles/stages.html#deprecated}{\figure{lifecycle-deprecated.svg}{options: alt='[Deprecated]'}}}{\strong{[Deprecated]}} Deprecated. Please use \code{n_max} instead.} } @@ -54,14 +68,30 @@ a tibble. If you need a \code{data.frame}, coerce the results with \code{\link[=as.data.frame]{as.data.frame()}}. } \description{ -This retrieves rows in chunks of \code{page_size}. It is most suitable for results -of smaller queries (<100 MB, say). For larger queries, it is better to -export the results to a CSV file stored on google cloud and use the -bq command line tool to download locally. +This function provides two ways to download data from BigQuery, transfering +data using either JSON or arrow, depending on the \code{api} argument. If +bigrquerystorage is installed, \code{api = "arrow"} will be used (because it's +so much faster, but see the limitions below), otherwise you can select +deliberately by using \code{api = "json"} or \code{api = "arrow"}. +\subsection{Arrow API}{ + +The arrow API is much faster, but has heavier dependencies: bigrquerystorage +requires the arrow package, which can be tricky to compile on Linux (but you +usually should be able to get a binary from +\href{https://posit.co/products/cloud/public-package-manager/}{Posit Public Package Manager}. + +There's one known limitation of \code{api = "arrow"}: when querying public data, +you'll now need to provide a \code{billing} project. } -\section{Complex data}{ -bigrquery will retrieve nested and repeated columns in to list-columns +\subsection{JSON API}{ + +The JSON API retrieves rows in chunks of \code{page_size}. It is most suitable +for results of smaller queries (<100 MB, say). Unfortunately due to +limitations in the BigQuery API, you may need to vary this parameter +depending on the complexity of the underlying data. + +The JSON API will convert nested and repeated columns in to list-columns as follows: \itemize{ \item Repeated values (arrays) will become a list-column of vectors. @@ -69,22 +99,7 @@ as follows: \item Repeated records will become list-columns of data frames. } } - -\section{Larger datasets}{ - -In my timings, this code takes around 1 minute per 100 MB of data. -If you need to download considerably more than this, I recommend: -\itemize{ -\item Export a \code{.csv} file to Cloud Storage using \code{\link[=bq_table_save]{bq_table_save()}}. -\item Use the \code{gsutil} command line utility to download it. -\item Read the csv file into R with \code{readr::read_csv()} or \code{data.table::fread()}. } - -Unfortunately you can not export nested or repeated formats into CSV, and -the formats that BigQuery supports (arvn and ndjson) that allow for -nested/repeated values, are not well supported in R. -} - \section{Google BigQuery API documentation}{ \itemize{ @@ -94,6 +109,6 @@ nested/repeated values, are not well supported in R. \examples{ \dontshow{if (bq_testable()) (if (getRversion() >= "3.4") withAutoprint else force)(\{ # examplesIf} -df <- bq_table_download("publicdata.samples.natality", n_max = 35000) +df <- bq_table_download("publicdata.samples.natality", n_max = 35000, billing = bq_test_project()) \dontshow{\}) # examplesIf} } diff --git a/man/collect.tbl_BigQueryConnection.Rd b/man/collect.tbl_BigQueryConnection.Rd new file mode 100644 index 00000000..ac401ae2 --- /dev/null +++ b/man/collect.tbl_BigQueryConnection.Rd @@ -0,0 +1,54 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/dplyr.R +\name{collect.tbl_BigQueryConnection} +\alias{collect.tbl_BigQueryConnection} +\title{Collect a BigQuery table} +\usage{ +collect.tbl_BigQueryConnection( + x, + ..., + n = Inf, + api = c("json", "arrow"), + page_size = NULL, + max_connections = 6L +) +} +\arguments{ +\item{x}{A data frame, data frame extension (e.g. a tibble), or a lazy +data frame (e.g. from dbplyr or dtplyr). See \emph{Methods}, below, for more +details.} + +\item{...}{Other arguments passed on to +\code{bq_project_query()}/\code{bq_project_query()}} + +\item{n}{Maximum number of results to retrieve. +The default, \code{Inf}, will retrieve all rows.} + +\item{api}{Which API to use? The \code{"json"} API works where ever bigrquery +does, but is slow and can require fiddling with the \code{page_size} parameter. +The \code{"arrow"} API is faster and more reliable, but only works if you +have also installed the bigrquerystorage package. + +Because the \code{"arrow"} API is so much faster, it will be used automatically +if the bigrquerystorage package is installed.} + +\item{page_size}{(JSON only) The number of rows requested per chunk. It is +recommended to leave this unspecified until you have evidence that the +\code{page_size} selected automatically by \code{bq_table_download()} is problematic. + +When \code{page_size = NULL} bigrquery determines a conservative, natural chunk +size empirically. If you specify the \code{page_size}, it is important that each +chunk fits on one page, i.e. that the requested row limit is low enough to +prevent the API from paginating based on response size.} + +\item{max_connections}{(JSON only) Number of maximum simultaneous +connections to BigQuery servers.} +} +\description{ +This collect method is specialised for BigQuery tables, generating the +SQL from your dplyr commands, then calling \code{\link[=bq_project_query]{bq_project_query()}} +or \code{\link[=bq_dataset_query]{bq_dataset_query()}} to run the query, then \code{\link[=bq_table_download]{bq_table_download()}} +to download the results. Thus the arguments are a combination of the +arguments to \code{\link[dplyr:compute]{dplyr::collect()}}, \code{bq_project_query()}/\code{bq_dataset_query()}, +and \code{bq_table_download()}. +} diff --git a/tests/testthat/_snaps/bq-download.md b/tests/testthat/_snaps/bq-download.md index bc9eed7a..5d5d49a0 100644 --- a/tests/testthat/_snaps/bq-download.md +++ b/tests/testthat/_snaps/bq-download.md @@ -1,7 +1,8 @@ # errors when table is known to be incomplete Code - bq_table_download(tb, n_max = 35000, page_size = 35000, bigint = "integer64") + bq_table_download(tb, n_max = 35000, page_size = 35000, bigint = "integer64", + api = "json") Message Downloading first chunk of data. Condition @@ -10,3 +11,16 @@ x 35,000 rows were requested, but only {n} rows were received. i Leave `page_size` unspecified or use an even smaller value. +# warns if supplying unnused arguments + + Code + . <- bq_table_download(tb, api = "arrow", page_size = 1, start_index = 1, + max_connections = 1) + Condition + Warning in `bq_table_download()`: + `page_size` is ignored when `api == "arrow"` + Warning in `bq_table_download()`: + `start_index` is ignored when `api == "arrow"` + Warning in `bq_table_download()`: + `max_connections` is ignored when `api == "arrow"` + diff --git a/tests/testthat/test-bq-download.R b/tests/testthat/test-bq-download.R index e8f64576..b7ee579a 100644 --- a/tests/testthat/test-bq-download.R +++ b/tests/testthat/test-bq-download.R @@ -3,8 +3,8 @@ test_that("same results regardless of page size", { tb <- as_bq_table("bigquery-public-data.moon_phases.moon_phases") - df3 <- bq_table_download(tb, n_max = 30, page_size = 10) - df1 <- bq_table_download(tb, n_max = 30, page_size = 30) + df3 <- bq_table_download(tb, n_max = 30, page_size = 10, api = "json") + df1 <- bq_table_download(tb, n_max = 30, page_size = 30, api = "json") expect_equal(nrow(df1), 30) expect_equal(df1, df3) }) @@ -13,7 +13,7 @@ test_that("can retrieve fraction of page size", { skip_if_no_auth() tb <- as_bq_table("bigquery-public-data.moon_phases.moon_phases") - df <- bq_table_download(tb, n_max = 15, page_size = 10) + df <- bq_table_download(tb, n_max = 15, page_size = 10, api = "json") expect_equal(nrow(df), 15) }) @@ -21,7 +21,7 @@ test_that("can retrieve zero rows", { skip_if_no_auth() tb <- as_bq_table("bigquery-public-data.moon_phases.moon_phases") - df <- bq_table_download(tb, n_max = 0) + df <- bq_table_download(tb, n_max = 0, api = "json") expect_equal(nrow(df), 0) expect_named(df, c("phase", "phase_emoji", "peak_datetime")) }) @@ -34,7 +34,7 @@ test_that("can specify large integers in page params", { withr::local_options(list(scipen = -4)) tb <- as_bq_table("bigquery-public-data.moon_phases.moon_phases") - df <- bq_table_download(tb, n_max = 100, page_size = 20) + df <- bq_table_download(tb, n_max = 100, page_size = 20, api = "json") expect_equal(nrow(df), 100) }) @@ -49,7 +49,8 @@ test_that("errors when table is known to be incomplete", { tb, n_max = 35000, page_size = 35000, - bigint = "integer64" + bigint = "integer64", + api = "json" ), transform = function(x) { gsub("[0-9,]+ rows were received", "{n} rows were received", x, perl = TRUE) @@ -58,6 +59,98 @@ test_that("errors when table is known to be incomplete", { ) }) +# api = "arrow" ---------------------------------------------------------------- + +test_that("check_api respects inputs", { + expect_equal(check_api("arrow"), "arrow") + expect_equal(check_api("json"), "json") +}) + +test_that("uses arrow api if bigrquerystorage installed", { + expect_equal(check_api(), "arrow") + + local_mocked_bindings(is_installed = function(...) FALSE) + expect_equal(check_api(), "json") +}) + +test_that("warns if supplying unnused arguments", { + tb <- bq_project_query(bq_test_project(), "SELECT 1.0", quiet = TRUE) + expect_snapshot( + . <- bq_table_download(tb, + api = "arrow", + page_size = 1, + start_index = 1, + max_connections = 1 + ) + ) +}) + +test_that("arrow api can convert non-nested types", { + sql <- "SELECT + '\U0001f603' as unicode, + datetime, + TRUE as logicaltrue, + FALSE as logicalfalse, + CAST ('Hi' as BYTES) as bytes, + CAST (datetime as DATE) as date, + CAST (datetime as TIME) as time, + CAST (datetime as TIMESTAMP) as timestamp, + ST_GEOGFROMTEXT('POINT (30 10)') as geography + FROM (SELECT DATETIME '2000-01-02 03:04:05.67' as datetime) + " + + tb <- bq_project_query(bq_test_project(), sql, quiet = TRUE) + df <- bq_table_download(tb, api = "arrow", quiet = TRUE) + + base <- ISOdatetime(2000, 1, 2, 3, 4, 5.67, tz = "UTC") + expect_identical(df$unicode, "\U0001f603", ignore_encoding = FALSE) + + expect_equal(df$logicaltrue, TRUE) + expect_equal(df$logicalfalse, FALSE) + + expect_equal(df$bytes, blob::as_blob(as.raw(c(0x48, 0x69)))) + + expect_equal(df$date, as.Date(base)) + expect_equal(df$timestamp, base) + expect_equal(df$datetime, base) + expect_equal(df$time, hms::hms(hours = 3, minutes = 4, seconds = 5.67)) + + expect_identical(df$geography, wk::wkt("POINT(30 10)")) +}) + +test_that("arrow api can convert nested types", { + skip("https://github.com/meztez/bigrquerystorage/issues/54") + sql <- "SELECT + STRUCT(1.0 AS a, 'abc' AS b) as s, + [1.0, 2.0, 3.0] as a, + [STRUCT(1.0 as a, 'a' as b), STRUCT(2.0, 'b'), STRUCT(3, 'c')] as aos, + STRUCT([1.0, 2.0, 3.0] as a, ['a', 'b'] as b) as soa + " + + tb <- bq_project_query(bq_test_project(), sql, quiet = TRUE) + df <- bq_table_download(tb, api = "arrow", quiet = TRUE) + + expect_equal(df$s, list(list(a = 1, b = "abc"))) + expect_equal(df$a, list(c(1, 2, 3))) + expect_equal(df$aos, list(tibble(a = c(1, 2, 3), b = c("a", "b", "c")))) + expect_equal(df$soa, list(list(a = c(1, 2, 3), b = c("a", "b")))) +}) + +test_that("arrow api respects bigint", { + x <- c("-2147483648", "-2147483647", "-1", "0", "1", "2147483647", "2147483648") + sql <- paste0("SELECT * FROM UNNEST ([", paste0(x, collapse = ","), "]) AS x"); + qry <- bq_project_query(bq_test_project(), sql) + + out_int64 <- bq_table_download(qry, bigint = "integer64", api = "arrow", quiet = TRUE)$x + expect_identical(out_int64, bit64::as.integer64(x)) + + out_dbl <- bq_table_download(qry, bigint = "numeric", api = "arrow", quiet = TRUE)$x + expect_identical(out_dbl, as.double(x)) + + out_chr <- bq_table_download(qry, bigint = "character", api = "arrow", quiet = TRUE)$x + expect_identical(out_chr, x) +}) + # helpers around row and chunk params ------------------------------------------ test_that("set_row_params() works ", { @@ -173,7 +266,7 @@ test_that("can convert date time types", { " tb <- bq_project_query(bq_test_project(), sql, quiet = TRUE) - df <- bq_table_download(tb) + df <- bq_table_download(tb, api = "json") base <- ISOdatetime(2000, 1, 2, 3, 4, 5.67, tz = "UTC") @@ -197,7 +290,7 @@ test_that("can parse fractional seconds", { test_that("correctly parse logical values" ,{ query <- "SELECT TRUE as x" tb <- bq_project_query(bq_test_project(), query) - df <- bq_table_download(tb) + df <- bq_table_download(tb, api = "json") expect_true(df$x) }) @@ -208,18 +301,18 @@ test_that("the return type of integer columns is set by the bigint argument", { qry <- bq_project_query(bq_test_project(), sql) expect_warning( - out_int <- bq_table_download(qry, bigint = "integer")$x, + out_int <- bq_table_download(qry, bigint = "integer", api = "json")$x, "integer overflow" ) expect_identical(out_int, suppressWarnings(as.integer(x))) - out_int64 <- bq_table_download(qry, bigint = "integer64")$x + out_int64 <- bq_table_download(qry, bigint = "integer64", api = "json")$x expect_identical(out_int64, bit64::as.integer64(x)) - out_dbl <- bq_table_download(qry, bigint = "numeric")$x + out_dbl <- bq_table_download(qry, bigint = "numeric", api = "json")$x expect_identical(out_dbl, as.double(x)) - out_chr <- bq_table_download(qry, bigint = "character")$x + out_chr <- bq_table_download(qry, bigint = "character", api = "json")$x expect_identical(out_chr, x) }) @@ -227,7 +320,7 @@ test_that("can convert geography type", { skip_if_not_installed("wk") sql <- "SELECT ST_GEOGFROMTEXT('POINT (30 10)') as geography" tb <- bq_project_query(bq_test_project(), sql, quiet = TRUE) - df <- bq_table_download(tb) + df <- bq_table_download(tb, api = "json") expect_identical(df$geography, wk::wkt("POINT(30 10)")) }) @@ -235,7 +328,7 @@ test_that("can convert geography type", { test_that("can convert bytes type", { sql <- "SELECT ST_ASBINARY(ST_GEOGFROMTEXT('POINT (30 10)')) as bytes" tb <- bq_project_query(bq_test_project(), sql, quiet = TRUE) - df <- bq_table_download(tb) + df <- bq_table_download(tb, api = "json") expect_identical( df$bytes, diff --git a/tests/testthat/test-bq-parse.R b/tests/testthat/test-bq-parse.R index 93328768..70ac23b4 100644 --- a/tests/testthat/test-bq-parse.R +++ b/tests/testthat/test-bq-parse.R @@ -128,11 +128,11 @@ test_that("can parse nested structures", { test_that("can parse empty arrays", { tb <- bq_project_query(bq_test_project(), "SELECT ARRAY[] as x") - df <- bq_table_download(tb) + df <- bq_table_download(tb, api = "json") expect_equal(df$x, list(integer(length = 0))) tb <- bq_project_query(bq_test_project(), "SELECT ARRAY>[] as x") - df <- bq_table_download(tb) + df <- bq_table_download(tb, api = "json") expect_equal(df$x, list(tibble::tibble(a = integer(length = 0), b = character()))) }) diff --git a/tests/testthat/test-bq-perform.R b/tests/testthat/test-bq-perform.R index 3b804321..9ad0a06d 100644 --- a/tests/testthat/test-bq-perform.R +++ b/tests/testthat/test-bq-perform.R @@ -95,10 +95,20 @@ test_that("can supply array parameters", { expect_setequal(df$values, c("a", "b")) }) -test_that("can estimate cost", { +test_that("can estimate cost and get schema", { cost <- bq_perform_query_dry_run( "SELECT count(*) FROM bigquery-public-data.moon_phases.moon_phases", billing = bq_test_project() ) expect_equal(cost, structure(0, class = "bq_bytes")) + + schema <- bq_perform_query_schema( + "SELECT * FROM bigquery-public-data.moon_phases.moon_phases", + billing = bq_test_project() + ) + names <- vapply(schema, function(x) x$name, character(1)) + expect_equal(names, c("phase", "phase_emoji", "peak_datetime")) + + types <- vapply(schema, function(x) x$type, character(1)) + expect_equal(types, c("STRING", "STRING", "DATETIME")) }) diff --git a/tests/testthat/test-bq-table.R b/tests/testthat/test-bq-table.R index 2400ee6e..76b30396 100644 --- a/tests/testthat/test-bq-table.R +++ b/tests/testthat/test-bq-table.R @@ -38,7 +38,7 @@ test_that("can round trip to non-default location", { bq_df <- bq_table(dallas, "df") bq_table_upload(bq_df, df1) - df2 <- bq_table_download(bq_df) + df2 <- bq_table_download(bq_df, api = "json") df2 <- df2[order(df2$x), names(df1)] # BQ doesn't guarantee order rownames(df2) <- NULL @@ -54,7 +54,7 @@ test_that("can roundtrip via save + load", { defer(gs_object_delete(gs)) bq_table_load(tb2, gs) - df <- bq_table_download(tb2) + df <- bq_table_download(tb2, api = "json") expect_equal(dim(df), c(32, 11)) }) @@ -79,7 +79,7 @@ test_that("can round trip atomic vectors", { bq_df <- bq_test_table() bq_table_upload(bq_df, df1) - df2 <- bq_table_download(bq_df, bigint = "integer") + df2 <- bq_table_download(bq_df, bigint = "integer", api = "json") df2 <- df2[order(df2[[1]]), names(df1)] # BQ doesn't gaurantee order rownames(df2) <- NULL @@ -94,7 +94,7 @@ test_that("can round-trip POSIXt to either TIMESTAMP or DATETIME", { bq_fields(list(bq_field("datetime", "TIMESTAMP"))) ) bq_table_upload(tb1, df) - df1 <- bq_table_download(tb1) + df1 <- bq_table_download(tb1, api = "json") expect_equal(df1, df) tb2 <- bq_table_create( @@ -102,7 +102,7 @@ test_that("can round-trip POSIXt to either TIMESTAMP or DATETIME", { bq_fields(list(bq_field("datetime", "DATETIME"))) ) bq_table_upload(tb2, df) - df2 <- bq_table_download(tb2) + df2 <- bq_table_download(tb2, api = "json") expect_equal(df2, df) }) @@ -117,7 +117,7 @@ test_that("can round trip data frame with list-cols", { ) bq_table_upload(tb, df1) - df2 <- bq_table_download(tb, bigint = "integer") + df2 <- bq_table_download(tb, bigint = "integer", api = "json") # restore column order df2 <- df2[names(df1)] df2$struct[[1]] <- df2$struct[[1]][c("x", "y", "z")] @@ -164,7 +164,7 @@ test_that("can round-trip GEOGRAPHY", { tb1 <- bq_table_create(bq_test_table(), as_bq_fields(df)) bq_table_upload(tb1, df) - df1 <- bq_table_download(tb1) + df1 <- bq_table_download(tb1, api = "json") expect_equal(df1, df) }) @@ -173,6 +173,6 @@ test_that("can round-trip BYTES", { tb1 <- bq_table_create(bq_test_table(), as_bq_fields(df)) bq_table_upload(tb1, df) - df1 <- bq_table_download(tb1) + df1 <- bq_table_download(tb1, api = "json") expect_equal(df1, df) }) diff --git a/tests/testthat/test-dplyr.R b/tests/testthat/test-dplyr.R index c78bcbaf..01bfeeed 100644 --- a/tests/testthat/test-dplyr.R +++ b/tests/testthat/test-dplyr.R @@ -21,14 +21,25 @@ test_that("can work with literal SQL", { }) test_that("can work with nested table identifier", { - con_us <- DBI::dbConnect( + con1 <- DBI::dbConnect( bigquery(), project = "bigquery-public-data", billing = bq_test_project() ) + # As far as I can tell from the BigQuery API there's no way to provide + # a default project; you can either provide a default dataset + project or + # nothing + table_name <- I("bigquery-public-data.utility_us.country_code_iso") + expect_no_error(dplyr::collect(head(dplyr::tbl(con1, table_name)))) - expect_s3_class(dplyr::collect(head(dplyr::tbl(con_us, I("utility_us.country_code_iso")))), "tbl_df") - expect_error(dplyr::collect(head(dplyr::tbl(con_us, "utility_us.country_code_iso"))), "tbl_df") + + con2 <- DBI::dbConnect( + bigquery(), + project = "bigquery-public-data", + dataset = "utility_us", + billing = bq_test_project(), + ) + expect_no_error(dplyr::collect(head(dplyr::tbl(con2, "country_code_iso")))) }) test_that("can copy_to", { @@ -167,8 +178,8 @@ test_that("all BigQuery tbls share the same src", { billing = bq_test_project() ) - tbl1 <- dplyr::tbl(con1, "basedata.mtcars", vars = "x") - tbl2 <- dplyr::tbl(con2, "publicdata.samples.natality", vars = "x") + tbl1 <- dplyr::tbl(con1, I("basedata.mtcars"), vars = "x") + tbl2 <- dplyr::tbl(con2, I("publicdata.samples.natality"), vars = "x") expect_true(dplyr::same_src(tbl1, tbl2)) expect_false(dplyr::same_src(tbl1, mtcars)) })