Skip to content

Transcript

Daniel Voros edited this page Jun 28, 2017 · 1 revision

Creating your own database

Let's see what we've got here!

show databases;

Create a database where you'll store your tables. We'll refer to this as myname.

create database myname;

Be careful not to disturb others. It's quite simple to drop a database:

drop database yourname cascade;

From now on, we'll be using our own myname database.

use myname;

It should be empty. (unless your name is default)

show tables;

Some basic SQL

Let's create two tables.

create table books(id int, title string, author_id int);
create table authors(id int, name string);

Now make up some authors and books.

insert into authors values
  (1, 'J. K. Rouling'),
  (2, 'J. R. R. Jr. R. C. S. Tolkien'),
  (3, 'Steven Hawkings'),
  (4, 'Virginia Woolf');

insert into books values
  (10, 'The Hobbit', 2),
  (20, 'The Ring', 2),
  (30, 'The return of the Ring', 2),
  (40, 'The return of the Hobbit', 2),
  (50, 'Fantastic Quiddtich Players and Where toFind Them', 1),
  (60, 'Adventures of the young Dumbledore', 1),
  (70, 'A Brief History of Time', 3),
  (80, 'Extremly Brief History of Time', 3),
  (90, 'Even Shorter History of Time', 3),
  (100, 'Star Wars MMXLIV', 5);

Some simple selects:

select * from authors;
select count(*) from authors;
select * from books;
select count(*) from books;

select * from books where id = 10;
select * from books where author_id = 2;
select count(*), author_id from books group by author_id;

You can use subqueries,

select count(*) from
  (select count(*) c, author_id from books group by author_id) t;

and joins

select a.name, b.title from books b left outer join authors a on b.author_id = a.id;
select a.name, b.title from books b right outer join authors a on b.author_id = a.id;
select a.name, b.title from books b full outer join authors a on b.author_id = a.id;
select a.name, b.title from books b inner join authors a on b.author_id = a.id;
select a.name, b.title from books b join authors a on b.author_id = a.id;

select a.name, b.title from books b, authors a where b.author_id = a.id;

Do some grouping:

select a.name, count(*) c
  from books b, authors a
  where b.author_id = a.id
  group by b.author_id, a.name;

select a.name, count(*) c
  from books b, authors a
  where b.author_id = a.id
  group by b.author_id, a.name order by c;

select a.name, b.title
  from books b, authors a
  where b.author_id = a.id and a.name='J. K. Rouling';

select a.name, count(*) c
  from books b, authors a
  where b.author_id = a.id
  group by b.author_id, a.name having c >=3 ;

Storage on HDFS

Let's return to creating databases and tables. How are tables stored?

create table t1(id int);

insert into t1 values (11), (22), (33);
insert into t1 values (44), (55);
insert into t1 values (66), (77);

Let's take a look at HDFS:

hdfs dfs -ls /hive/warehouse/...
hdfs dfs -cat ...

With more data types and custom delimiters:

create table t2 (id int, name string, salary double);
insert into t2 values (1, 'aaa', 2.3), (2, 'bbb', 1000000000);

create table t3 (id int, name string, salary double)
row format delimited fields terminated by '|';
insert into t3 values (1, 'aaa', 2.3), (2, 'bbb', 1000000000);

Be careful though!

insert into t3 values (1, 'vvv|bbb', 2.3), (2, 'bbb', 1000000000);

Metadata

Let's take a look at what do we know about a table:

show databases;

show tables;

describe customer;

describe extended tpcds.customer;

describe formatted tpcds.customer;

show create table b1;

Partitioning

With partitions you can split your tables into parts based on some columns. To improve performance and make tables easier to manage, Hive tables are almost always partitioned.

Creating a partitioned table:

select count(*) from tpcds.customer where c_birth_month=2;

create table c2(c_customer_sk bigint) partitioned by (c_birth_month int);

select * from c2;

describe c2;

Inserting into a partitioned table:

insert into table c2 partition (c_birth_month=1) values (111), (222), (333);
insert into table c2 partition (c_birth_month=1) values (1), (2);
insert into table c2 partition (c_birth_month=2) values (444), (555);

select * from c2;

Now load data to this table from the TPCDS table:

insert overwrite table c2 partition (c_birth_month=1)
  select c_customer_sk from tpcds.customer where c_birth_month=1;

insert into table c2 partition (c_birth_month=2)
  select c_customer_sk from c1 where c_birth_month=2;

insert into table c2 partition (c_birth_month=3)
  select c_customer_sk from c1 where c_birth_month=3;

insert overwrite table c2 partition (c_birth_month)
  select c_customer_sk,c_birth_month from tpcds.customer;

select count(*) from c2 where c_birth_month=8;

Bucketing

Divide data into a fixed number of buckets.

create table tb10(id int) clustered by (id) into 10 buckets;  

insert into tb10 values (1), (5);
insert into tb10 values (15), (6);
insert into tb10 values (15);
insert into tb10 values (10);

select * from tb10;

This can be combined with partitioning as well.

create table parbuck (b int, c string)
partitioned by (a int)
clustered by (b) into 5 buckets;

insert into parbuck partition (a = 1) values (11, "eleven"), (12, 'twelve');
insert into parbuck partition (a = 2) values (23, "twenty-three"), (24, 'twenty-four');

select * from parbuck;

ORC

Up until now, we've only seen delimited file formats, but Hive supports other formats as well. Hive proably works best with ORC files (Optimized Row-Columnar File)

create table t1 (a int, b string) stored as orc;
insert into t1 values (1, "a");

select * from t1;

insert into t1 values (2, "b");

Now let's try something new, updating a row.

update t1 set b = "B" where a = 2;

ACID

To be able to update/delete, you'll have to enable ACID and use ACID tables.

CREATE TABLE t1_acid (a int, b string)
CLUSTERED BY (a) INTO 2 BUCKETS STORED AS ORC
TBLPROPERTIES ("transactional"="true");

insert into t1_acid values (1, "a"), (2, "b");
select * from t1_acid;

update t1_acid set b = "B" where a = 2;
select * from t1_acid;

delete from t1_acid where a = 2;
select * from t1_acid;

Let's take a look at HDFS again!

Police

Let's try to work with something closer to reality. police.orc_police stores data from the SF Police Department. From https://data.sfgov.org/Public-Safety/Police-Department-Incidents/tmnf-yvry

describe formatted police.orc_police;
select count(*) from police.orc_police;
select * from police.orc_police limit 10;

select * from police.orc_police where incidntnum = 991008997;

create table police_ordered stored as orc as select * from police.orc_police order by incidntnum;
select * from police_ordered where incidntnum = 991008997;

create table police_sorted stored as orc as select * from police.orc_police sort by incidntnum;
select * from police_sorted where incidntnum = 991008997;

Try to find out:

  • What was the most common resolution?
  • Where were the most robberies (category = ROBBERY)?
  • Do criminals take a break on the weekends?

Explain, optimization

Getting started with explains

use play;

explain select * from authors;

lets add a filter... and view a few more

explain select * from books where id = 10;
explain select title from books where id = 10;
explain select author_id from books where id = 10;
explain select id from books where id = 10;

As you can see the last two selects produced the same plan...which is odd... there are basically two types of explains available...we usually see the newer, shorter one.

set hive.explain.user=false;

explain select author_id from books where id = 10;
explain select id from books where id = 10;

Explaining a group by

Hive uses "shuffles" to do group by's and count()-s; shuffle should be considered as a heavy operation

explain select count(*), author_id from books group by author_id;

The new explain hides how this is being done...but the old give more detail about it.

Execution informations

Explain is fast&cheap beacuse it doesn't do any calculation - executing the query can give better insights.

Explain analyze

Decorates the plan with the actual row numbers.

explain
 select a.name, count(*) c from books b, authors a
 where b.author_id = a.id group by b.author_id, a.name having c >=3 ;

explain analyze
 select a.name, count(*) c from books b, authors a
 where b.author_id = a.id group by b.author_id, a.name having c >=3 ;

Tez UI

There are all kind of counters being collected...take a look at the earlier ORC sort by-s background.

Visual Explain

Ambari has a feature for this

Cost based optimizer

CBO is still in development...so there might be corner cases when it bails out...

explain
 select a.name, count(*) c from books b, authors a
 where b.author_id = a.id group by b.author_id, a.name having c >=3 ;

For this case there's a very simple workaround:

explain
 select a.name, count(*) c from books b, authors a
 where b.author_id = a.id group by b.author_id, a.name having count(*) >=3 ;

A few words about statistics

How can it be checked...updated?

select count(*) from customer;

set hive.stats.autogather=false;

drop table ccc;

create table ccc as select * from tpcds.customer; 

select count(*) from ccc;

analyze table ccc compute statistics;

select count(*) from ccc;

set hive.stats.autogather=true;

analyze table ccc compute statistics for columns;

It holds informations about:

  • number of rows
  • data size
  • min/max

Logical optimization

The CBO does consider propagating conditions by logic.

set hive.cbo.enable=false;

explain analyze
select c_customer_sk,ca_address_sk,c_current_addr_sk from customer c
join customer_address ca on c.c_current_addr_sk = ca.ca_address_sk
where (ca.ca_country='United States' and c_current_addr_sk = 26487)
 or (ca.ca_country='United States' and c_current_addr_sk = 2001);

explain analyze
select c_customer_sk,ca_address_sk,c_current_addr_sk from customer c
join customer_address ca on c.c_current_addr_sk = ca.ca_address_sk
where (ca.ca_country='United States' and ca_address_sk = 26487)
 or (ca.ca_country='United States' and ca_address_sk = 2001);

explain analyze
select c_customer_sk,ca_address_sk,c_current_addr_sk from customer c
join customer_address ca on c.c_current_addr_sk = ca.ca_address_sk
where (ca.ca_country='United States' and ca_address_sk = 26487 and c_current_addr_sk = 26487)
 or (ca.ca_country='United States' and ca_address_sk = 2001 and c_current_addr_sk = 2001);

set hive.cbo.enable=true;

Sort by

There are some custom language extensions which are usefull...like SORT BY.

drop table if exists tmp3;
drop table if exists tmp3x;
explain create table tmp3 as select c_first_name, c_last_name from tpcds_text_2.customer order by c_first_name;
create table tmp3 as select c_first_name, c_last_name from tpcds_text_2.customer order by c_first_name;

explain create table tmp3x as select c_first_name, c_last_name from tpcds_text_2.customer sort by c_first_name;
create table tmp3x as select c_first_name, c_last_name from tpcds_text_2.customer distribute by c_last_name sort by c_first_name;

Take a look at the tez ui.

Partition pruning

When the partition is not touched

count(distinct)

set hive.cbo.enable=false;

explain
select 'cbo-off',count(distinct c_birth_month) from customer;

select 'cbo-off',count(distinct c_birth_month) from customer;

explain
select 'advised',count(*) from (select distinct c_birth_month from customer) t;

select 'advised',count(*) from (select distinct c_birth_month from customer) t;


set hive.cbo.enable=true;

explain
select 'cbo-on',count(distinct c_birth_month) from customer;

select 'cbo-on',count(distinct c_birth_month) from customer;

Join

Merge join - full shuffle

set hive.explain.user=false;
set hive.auto.convert.join=false;
set hive.tez.dynamic.semijoin.reduction=false;


explain
select c_customer_sk,ca_address_sk,c_current_addr_sk from tpcds.customer c
join tpcds.customer_address ca on c.c_current_addr_sk = ca.ca_address_sk
where (ca.ca_country='United States' and c_current_addr_sk = 26487)
 or (ca.ca_country='United States' and c_current_addr_sk = 2001);

semijoin reduction

set hive.explain.user=false;
set hive.auto.convert.join=false;
set hive.tez.dynamic.semijoin.reduction=true;


explain
select c_customer_sk,ca_address_sk,c_current_addr_sk from tpcds.customer c
join tpcds.customer_address ca on c.c_current_addr_sk = ca.ca_address_sk
where (ca.ca_country='United States' and c_current_addr_sk = 26487)
 or (ca.ca_country='United States' and c_current_addr_sk = 2001);

mapjoin

set hive.explain.user=false;
set hive.auto.convert.join=true;
set hive.tez.dynamic.semijoin.reduction=true;


explain
select c_customer_sk,ca_address_sk,c_current_addr_sk from tpcds.customer c
join tpcds.customer_address ca on c.c_current_addr_sk = ca.ca_address_sk
where (ca.ca_country='United States' and c_current_addr_sk = 26487)
 or (ca.ca_country='United States' and c_current_addr_sk = 2001);

Tune a select to be a mapjoin

set hive.auto.convert.join.noconditionaltask.size=336273448;

explain
select c_customer_sk,ca_address_sk,c_current_addr_sk from tpcds.customer c
join tpcds.customer_address ca on c.c_current_addr_sk = ca.ca_address_sk
where (ca.ca_country='United States' and c_current_addr_sk = 26487)
 or (ca.ca_country='United States' and c_current_addr_sk = 2001);

Misc

Hive configuration

there are many options which could be used to tune the plans and hive itself. https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties

Skews

select count(*) c, c_first_name
  from customer
  group by c_first_name
  order by c desc
  limit 20;

create table cskew(c_customer_sk bigint, c_first_name string)
  skewed by (c_first_name)
  on ('', 'James', 'John', 'Robert', 'Michael', 'William',
      'David', 'Richard', 'Charles', 'Joseph')
  stored as orc;

insert into table cskew select c_customer_sk, c_first_name from customer;

select count(*) from cskew where c_first_name='';
select count(*) from cskew where c_first_name='Robert';
select count(*) from cskew where c_first_name='Donald';

Json, xml

select get_json_object('{"a":123, "b": [1,"x", null]}', '$.b[1]');

select xpath ('<a><b id="1"><c/></b><b id="2"><c/></b></a>',
  '/descendant::c/ancestor::b/@id');

Lateral view

create table tt1 (id int, arr array<int>);
insert into table tt1 select 20, array(5,6,7,8,9);
insert into table tt1 select 10, array(1,2,3);
select * from tt1;
select id, arr[0], arr[2] from tt1;

select id, a from tt1 lateral view explode (arr) aaa as a;

create table tt2 (id int, s string);
insert into tt2 values (1, "a,b,c"), (2, "x,y,z,w");
select id, split(s,",") from tt2;
select id, a from tt2 lateral view explode (split(s,",")) bbb as a;

Tez

set hive.execution.engine=mr;
set hive.execution.engine=tez;

select count(*), c_birth_month from customer group by c_birth_month;
explain select count(*),c_birth_month from customer group by c_birth_month;
select count(*) from customer where c_birth_month = 6;
explain select count(*) from customer where c_birth_month = 6;

Vectorization

set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;

set hive.vectorized.execution.enabled = false;
set hive.vectorized.execution.reduce.enabled = false;

drop table tx;
create table tx as
  select (x+100)/(x-100)*x*(x+23) / ((x -1) /100) / (x-1234)
  from (select c_customer_sk x from customer) t;

Analytical functions

select t.sk, c.c_first_name, t.c_birth_month
  from (
    select c_birth_month, max(c_customer_sk) sk from customer group by c_birth_month
  ) t
  inner join customer c
  on t.sk = c.c_customer_sk;

-- rewritten query, using analytical funtctions

select * , rank()
  over (partition by c_birth_month order by c_customer_sk desc) as rank
  from customer limit 10;

select * from (
    select c_first_name, c_birth_month, c_customer_sk , rank()
    over (partition by c_birth_month order by c_customer_sk desc) as rank from customer
  ) ranked
  where ranked.rank =1

Q

 explain
select
    dt.d_year,
    item.i_brand_id brand_id,
    item.i_brand brand,
    sum(ss_ext_sales_price) sum_agg
from
    date_dim dt,
    store_sales,
    item
where
    dt.d_date_sk = store_sales.ss_sold_date_sk
        and store_sales.ss_item_sk = item.i_item_sk
        and item.i_manufact_id = 436
        and dt.d_moy = 12
group by dt.d_year , item.i_brand , item.i_brand_id
order by dt.d_year , sum_agg desc , brand_id
limit 10;

Exercise

Database discogs stores data about music records obtained from https://www.discogs.com/. You'll find three tables there:

  • discogs_release_orc
  • discogs_artist_orc
  • discogs_label_orc

Schema:

DROP TABLE IF EXISTS discogs_release_orc;
CREATE TABLE discogs_release_orc(artist_names ARRAY<STRING>, label_names ARRAY<STRING>,
  title STRING, genres ARRAY<STRING>, styles ARRAY<STRING>, country STRING, released STRING)
STORED AS ORC;

DROP TABLE IF EXISTS discogs_artist_orc;
CREATE TABLE discogs_artist_orc(name STRING, realname STRING, namevariations ARRAY<STRING>,
  aliases ARRAY<STRING>, profile STRING, members ARRAY<STRING>, groups ARRAY<STRING>,
  images ARRAY<STRING>, urls ARRAY<STRING>)
STORED AS ORC;

DROP TABLE IF EXISTS discogs_label_orc;
CREATE TABLE discogs_label_orc(name STRING, parentlabel STRING,
  sublabels ARRAY<STRING>, contactinfo STRING,
  profile STRING, images ARRAY<STRING>, urls ARRAY<STRING>)
STORED AS ORC;

Try to get familiar with these, check if you find your favorite bands in there. Try to come up with potential use-cases and interesting questions to be answered with this data set. Extra credit if you can combine this with the SF police data!

A few simple examples:

select * from discogs_release_orc limit 10;

select count(*) from discogs_release_orc where artist_names[0] = "Willie Rosario";

select title, count(distinct artist_names) cnt from discogs_release_orc
group by title order by cnt desc limit 10;

select wordintitle, count(distinct artist_names) cnt from discogs.discogs_release_orc
lateral view explode(split(title, ' ')) tmp as wordintitle  group by wordintitle
order by cnt desc limit 10;

select artist_names, count(*) cnt from discogs_release_orc 
group by artist_names order by cnt desc limit 25;

select artist, count(*) cnt from discogs_release_orc lateral view explode(artist_names) tmptable as artist
group by artist order by cnt desc limit 25;

select country, count(*) cnt from discogs_release_orc group by country order by cnt desc limit 25;