-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathStream-data-app-simulation.py
64 lines (49 loc) · 2.41 KB
/
Stream-data-app-simulation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
import boto3
import csv
import json
import dateutil.parser as parser
from time import sleep
from datetime import datetime
# AWS Settings
s3 = boto3.client('s3', region_name = 'us-east-1')
s3_resource = boto3.resource('s3', region_name = 'us-east-1')
kinesis_client = boto3.client('kinesis', region_name='us-east-1')
# Env. variables; i.e. can be OS variables in Lambda
kinesis_stream_name = 'us-accidents-data-stream'
streaming_partition_key = 'Severity'
# Function can be converted to Lambda;
# i.e. by iterating the S3-put events records; e.g. record['s3']['bucket']['name']
def stream_data_simulator(input_s3_bucket, input_s3_key):
s3_bucket = input_s3_bucket
s3_key = input_s3_key
# Read CSV Lines and split the file into lines
csv_file = s3_resource.Object(s3_bucket, s3_key)
s3_response = csv_file.get()
lines = s3_response['Body'].read().decode('utf-8').split('\n')
for row in csv.DictReader(lines):
try:
# Convert to JSON, to make it easier to work in Kinesis Analytics
line_json = json.dumps(row)
json_load = json.loads(line_json)
# Simple date casting:
start_time_raw = parser.parse(json_load['Start_Time'])
start_time_iso = start_time_raw.isoformat()
json_load.update({'Start_Time':start_time_iso})
end_time_raw = parser.parse(json_load['End_Time'])
end_time_iso = end_time_raw.isoformat()
json_load.update({'End_Time':end_time_iso})
weather_time_raw = parser.parse(json_load['Weather_Timestamp'])
weather_time_iso = weather_time_raw.isoformat()
json_load.update({'Weather_Timestamp':weather_time_iso})
# Adding fake txn ts:
json_load['Txn_Timestamp'] = datetime.now().isoformat()
# Write to Kinesis Streams:
response = kinesis_client.put_record(StreamName=kinesis_stream_name,Data=json.dumps(json_load, indent=4),PartitionKey=str(json_load[streaming_partition_key]))
print(response)
# Adding a temporary pause, for demo-purposes:
sleep(0.250)
except Exception as e:
print('Error: {}'.format(e))
# Run stream:
for i in range(0, 3):
stream_data_simulator(input_s3_bucket="streaming-kinesis-flink", input_s3_key="raw-data/US_Accidents_Dec21_updated_sample.csv")