Skip to content

Commit

Permalink
Merge branch 'main' into refactor--dataframe-join-params
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco authored Nov 1, 2024
2 parents bb83a74 + aedffe0 commit 2f8e7af
Show file tree
Hide file tree
Showing 38 changed files with 588 additions and 166 deletions.
24 changes: 12 additions & 12 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ datafusion = { version = "42.0.0", features = ["pyarrow", "avro", "unicode_expre
datafusion-substrait = { version = "42.0.0", optional = true }
datafusion-proto = { version = "42.0.0" }
prost = "0.13" # keep in line with `datafusion-substrait`
uuid = { version = "1.9", features = ["v4"] }
uuid = { version = "1.11", features = ["v4"] }
mimalloc = { version = "0.1", optional = true, default-features = false, features = ["local_dynamic_tls"] }
async-trait = "0.1"
futures = "0.3"
Expand Down
29 changes: 28 additions & 1 deletion docs/source/user-guide/common-operations/expressions.rst
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,39 @@ approaches.
df = ctx.from_pydict({"a": [[1, 2, 3], [4, 5, 6]]})
df.select(col("a")[0].alias("a0"))
.. warning::

Indexing an element of an array via ``[]`` starts at index 0 whereas
:py:func:`~datafusion.functions.array_element` starts at index 1.

To check if an array is empty, you can use the function :py:func:`datafusion.functions.array_empty` or `datafusion.functions.empty`.
This function returns a boolean indicating whether the array is empty.

.. ipython:: python
from datafusion import SessionContext, col
from datafusion.functions import array_empty
ctx = SessionContext()
df = ctx.from_pydict({"a": [[], [1, 2, 3]]})
df.select(array_empty(col("a")).alias("is_empty"))
In this example, the `is_empty` column will contain `True` for the first row and `False` for the second row.

To get the total number of elements in an array, you can use the function :py:func:`datafusion.functions.cardinality`.
This function returns an integer indicating the total number of elements in the array.

.. ipython:: python
from datafusion import SessionContext, col
from datafusion.functions import cardinality
ctx = SessionContext()
df = ctx.from_pydict({"a": [[1, 2, 3], [4, 5, 6]]})
df.select(cardinality(col("a")).alias("num_elements"))
In this example, the `num_elements` column will contain `3` for both rows.

Structs
-------

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ DataFusion can work with several file types, to start simple we can use a subset
ctx = SessionContext()
df = ctx.read_parquet("yellow_trip_data.parquet")
df.select_columns("trip_distance", "passenger_count")
df.select("trip_distance", "passenger_count")
For mathematical or logical operations use :py:func:`~datafusion.col` to select columns, and give meaningful names to the resulting
operations using :py:func:`~datafusion.expr.Expr.alias`
Expand All @@ -48,7 +48,7 @@ operations using :py:func:`~datafusion.expr.Expr.alias`

Please be aware that all identifiers are effectively made lower-case in SQL, so if your file has capital letters
(ex: Name) you must put your column name in double quotes or the selection won’t work. As an alternative for simple
column selection use :py:func:`~datafusion.dataframe.DataFrame.select_columns` without double quotes
column selection use :py:func:`~datafusion.dataframe.DataFrame.select` without double quotes

For selecting columns with capital letters use ``'"VendorID"'``

Expand Down
10 changes: 5 additions & 5 deletions examples/import.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# The dictionary keys represent column names and the dictionary values
# represent column values
df = ctx.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
assert type(df) == datafusion.DataFrame
assert type(df) is datafusion.DataFrame
# Dataframe:
# +---+---+
# | a | b |
Expand All @@ -40,19 +40,19 @@

# Create a datafusion DataFrame from a Python list of rows
df = ctx.from_pylist([{"a": 1, "b": 4}, {"a": 2, "b": 5}, {"a": 3, "b": 6}])
assert type(df) == datafusion.DataFrame
assert type(df) is datafusion.DataFrame

# Convert pandas DataFrame to datafusion DataFrame
pandas_df = pd.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
df = ctx.from_pandas(pandas_df)
assert type(df) == datafusion.DataFrame
assert type(df) is datafusion.DataFrame

# Convert polars DataFrame to datafusion DataFrame
polars_df = pl.DataFrame({"a": [1, 2, 3], "b": [4, 5, 6]})
df = ctx.from_polars(polars_df)
assert type(df) == datafusion.DataFrame
assert type(df) is datafusion.DataFrame

# Convert Arrow Table to datafusion DataFrame
arrow_table = pa.Table.from_pydict({"a": [1, 2, 3], "b": [4, 5, 6]})
df = ctx.from_arrow(arrow_table)
assert type(df) == datafusion.DataFrame
assert type(df) is datafusion.DataFrame
2 changes: 1 addition & 1 deletion examples/tpch/convert_data_to_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,6 @@

df = ctx.read_csv(source_file, schema=schema, has_header=False, delimiter="|")

df = df.select_columns(*output_cols)
df = df.select(*output_cols)

df.write_parquet(dest_file, compression="snappy")
12 changes: 6 additions & 6 deletions examples/tpch/q02_minimum_cost_supplier.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,10 @@

ctx = SessionContext()

df_part = ctx.read_parquet(get_data_path("part.parquet")).select_columns(
df_part = ctx.read_parquet(get_data_path("part.parquet")).select(
"p_partkey", "p_mfgr", "p_type", "p_size"
)
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select_columns(
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select(
"s_acctbal",
"s_name",
"s_address",
Expand All @@ -55,13 +55,13 @@
"s_nationkey",
"s_suppkey",
)
df_partsupp = ctx.read_parquet(get_data_path("partsupp.parquet")).select_columns(
df_partsupp = ctx.read_parquet(get_data_path("partsupp.parquet")).select(
"ps_partkey", "ps_suppkey", "ps_supplycost"
)
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select_columns(
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select(
"n_nationkey", "n_regionkey", "n_name"
)
df_region = ctx.read_parquet(get_data_path("region.parquet")).select_columns(
df_region = ctx.read_parquet(get_data_path("region.parquet")).select(
"r_regionkey", "r_name"
)

Expand Down Expand Up @@ -115,7 +115,7 @@

# From the problem statement, these are the values we wish to output

df = df.select_columns(
df = df.select(
"s_acctbal",
"s_name",
"n_name",
Expand Down
8 changes: 4 additions & 4 deletions examples/tpch/q03_shipping_priority.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@

ctx = SessionContext()

df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select_columns(
df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select(
"c_mktsegment", "c_custkey"
)
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select_columns(
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select(
"o_orderdate", "o_shippriority", "o_custkey", "o_orderkey"
)
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select_columns(
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select(
"l_orderkey", "l_extendedprice", "l_discount", "l_shipdate"
)

Expand Down Expand Up @@ -80,7 +80,7 @@

# Change the order that the columns are reported in just to match the spec

df = df.select_columns("l_orderkey", "revenue", "o_orderdate", "o_shippriority")
df = df.select("l_orderkey", "revenue", "o_orderdate", "o_shippriority")

# Show result

Expand Down
6 changes: 3 additions & 3 deletions examples/tpch/q04_order_priority_checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@

ctx = SessionContext()

df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select_columns(
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select(
"o_orderdate", "o_orderpriority", "o_orderkey"
)
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select_columns(
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select(
"l_orderkey", "l_commitdate", "l_receiptdate"
)

Expand All @@ -54,7 +54,7 @@
# Limit results to cases where commitment date before receipt date
# Aggregate the results so we only get one row to join with the order table.
# Alternately, and likely more idiomatic is instead of `.aggregate` you could
# do `.select_columns("l_orderkey").distinct()`. The goal here is to show
# do `.select("l_orderkey").distinct()`. The goal here is to show
# multiple examples of how to use Data Fusion.
df_lineitem = df_lineitem.filter(col("l_commitdate") < col("l_receiptdate")).aggregate(
[col("l_orderkey")], []
Expand Down
12 changes: 6 additions & 6 deletions examples/tpch/q05_local_supplier_volume.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,22 @@

ctx = SessionContext()

df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select_columns(
df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select(
"c_custkey", "c_nationkey"
)
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select_columns(
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select(
"o_custkey", "o_orderkey", "o_orderdate"
)
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select_columns(
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select(
"l_orderkey", "l_suppkey", "l_extendedprice", "l_discount"
)
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select_columns(
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select(
"s_suppkey", "s_nationkey"
)
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select_columns(
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select(
"n_nationkey", "n_regionkey", "n_name"
)
df_region = ctx.read_parquet(get_data_path("region.parquet")).select_columns(
df_region = ctx.read_parquet(get_data_path("region.parquet")).select(
"r_regionkey", "r_name"
)

Expand Down
2 changes: 1 addition & 1 deletion examples/tpch/q06_forecasting_revenue_change.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@

ctx = SessionContext()

df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select_columns(
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select(
"l_shipdate", "l_quantity", "l_extendedprice", "l_discount"
)

Expand Down
10 changes: 5 additions & 5 deletions examples/tpch/q07_volume_shipping.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@

ctx = SessionContext()

df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select_columns(
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select(
"s_suppkey", "s_nationkey"
)
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select_columns(
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select(
"l_shipdate", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey"
)
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select_columns(
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select(
"o_orderkey", "o_custkey"
)
df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select_columns(
df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select(
"c_custkey", "c_nationkey"
)
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select_columns(
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select(
"n_nationkey", "n_name"
)

Expand Down
18 changes: 8 additions & 10 deletions examples/tpch/q08_market_share.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,25 +47,23 @@

ctx = SessionContext()

df_part = ctx.read_parquet(get_data_path("part.parquet")).select_columns(
"p_partkey", "p_type"
)
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select_columns(
df_part = ctx.read_parquet(get_data_path("part.parquet")).select("p_partkey", "p_type")
df_supplier = ctx.read_parquet(get_data_path("supplier.parquet")).select(
"s_suppkey", "s_nationkey"
)
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select_columns(
df_lineitem = ctx.read_parquet(get_data_path("lineitem.parquet")).select(
"l_partkey", "l_extendedprice", "l_discount", "l_suppkey", "l_orderkey"
)
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select_columns(
df_orders = ctx.read_parquet(get_data_path("orders.parquet")).select(
"o_orderkey", "o_custkey", "o_orderdate"
)
df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select_columns(
df_customer = ctx.read_parquet(get_data_path("customer.parquet")).select(
"c_custkey", "c_nationkey"
)
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select_columns(
df_nation = ctx.read_parquet(get_data_path("nation.parquet")).select(
"n_nationkey", "n_name", "n_regionkey"
)
df_region = ctx.read_parquet(get_data_path("region.parquet")).select_columns(
df_region = ctx.read_parquet(get_data_path("region.parquet")).select(
"r_regionkey", "r_name"
)

Expand Down Expand Up @@ -133,7 +131,7 @@

# When we join to the customer dataframe, we don't want to confuse other columns, so only
# select the supplier key that we need
df_national_suppliers = df_national_suppliers.select_columns("s_suppkey")
df_national_suppliers = df_national_suppliers.select("s_suppkey")


# Part 3: Combine suppliers and customers and compute the market share
Expand Down
Loading

0 comments on commit 2f8e7af

Please sign in to comment.