forked from confluentinc/demo-scene
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsmoke-test.ksql
25 lines (20 loc) · 3.32 KB
/
smoke-test.ksql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
CREATE STREAM ratings WITH (KAFKA_TOPIC='ratings', VALUE_FORMAT='AVRO');
CREATE STREAM POOR_RATINGS AS SELECT STARS, CHANNEL, MESSAGE FROM ratings WHERE STARS <3 AND CHANNEL='iOS';
SELECT * FROM POOR_RATINGS EMIT CHANGES LIMIT 5;
-- Check : did this output rows?
CREATE SOURCE CONNECTOR SOURCE_MYSQL_01 WITH ( 'connector.class' = 'io.debezium.connector.mysql.MySqlConnector', 'database.hostname' = 'mysql', 'database.port' = '3306', 'database.user' = 'debezium', 'database.password' = 'dbz', 'database.server.id' = '42', 'database.server.name' = 'asgard', 'table.whitelist' = 'demo.customers', 'database.history.kafka.bootstrap.servers' = 'kafka:29092', 'database.history.kafka.topic' = 'dbhistory.demo' , 'include.schema.changes' = 'false', 'transforms'= 'unwrap,extractkey', 'transforms.unwrap.type'= 'io.debezium.transforms.ExtractNewRecordState', 'transforms.extractkey.type'= 'org.apache.kafka.connect.transforms.ExtractField$Key', 'transforms.extractkey.field'= 'id', 'key.converter'= 'org.apache.kafka.connect.storage.StringConverter', 'value.converter'= 'io.confluent.connect.avro.AvroConverter', 'value.converter.schema.registry.url'= 'http://schema-registry:8081' );
SHOW CONNECTORS;
-- Check/wait: connector should be running successfully
CREATE TABLE CUSTOMERS WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
CREATE STREAM CUSTOMERS_STREAM WITH (KAFKA_TOPIC='asgard.demo.CUSTOMERS', VALUE_FORMAT='AVRO');
-- Check: both objects should be created successfully
SET 'auto.offset.reset' = 'earliest';
CREATE STREAM RATINGS_WITH_CUSTOMER_DATA WITH (KAFKA_TOPIC='ratings-enriched') AS SELECT R.RATING_ID, R.MESSAGE, R.STARS, R.CHANNEL, C.ID, C.FIRST_NAME + ' ' + C.LAST_NAME AS FULL_NAME, C.CLUB_STATUS, C.EMAIL FROM RATINGS R LEFT JOIN CUSTOMERS C ON CAST(R.USER_ID AS STRING) = C.ROWKEY WHERE C.FIRST_NAME IS NOT NULL EMIT CHANGES;
SELECT * FROM RATINGS_WITH_CUSTOMER_DATA EMIT CHANGES LIMIT 5;
-- Check : did this output rows with customer names?
CREATE STREAM UNHAPPY_PLATINUM_CUSTOMERS AS SELECT FULL_NAME, CLUB_STATUS, EMAIL, STARS, MESSAGE FROM RATINGS_WITH_CUSTOMER_DATA WHERE STARS < 3 AND CLUB_STATUS = 'platinum';
CREATE SINK CONNECTOR SINK_ELASTIC_01 WITH ( 'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'connection.url' = 'http://elasticsearch:9200', 'type.name' = '', 'behavior.on.malformed.documents' = 'warn', 'errors.tolerance' = 'all', 'errors.log.enable' = 'true', 'errors.log.include.messages' = 'true', 'topics' = 'ratings-enriched,UNHAPPY_PLATINUM_CUSTOMERS', 'key.ignore' = 'true', 'schema.ignore' = 'true', 'key.converter' = 'org.apache.kafka.connect.storage.StringConverter', 'transforms'= 'ExtractTimestamp', 'transforms.ExtractTimestamp.type'= 'org.apache.kafka.connect.transforms.InsertField$Value', 'transforms.ExtractTimestamp.timestamp.field' = 'EXTRACT_TS' );
-- Check: Connect to Kibana (http://ip:5601/app/kibana#/dashboard/mysql-ksql-kafka-es) - should see data in dashboard
CREATE TABLE RATINGS_BY_CLUB_STATUS AS SELECT CLUB_STATUS, COUNT(*) AS RATING_COUNT FROM RATINGS_WITH_CUSTOMER_DATA WINDOW TUMBLING (SIZE 1 MINUTES) GROUP BY CLUB_STATUS EMIT CHANGES;
SELECT TIMESTAMPTOSTRING(WINDOWSTART, 'yyyy-MM-dd HH:mm:ss') AS WINDOW_START_TS, CLUB_STATUS, RATING_COUNT FROM RATINGS_BY_CLUB_STATUS WHERE ROWKEY='bronze' AND WINDOWSTART>'2020-02-12T10:46:00.000';
-- Check: did this output rows?