Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream-handler application #6

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -87,3 +87,6 @@ ENV/

# Rope project settings
.ropeproject

# Jetbrains settings
.idea/
51 changes: 47 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,59 @@ Uses https://python-twitter.readthedocs.io/en/latest/
## presterity/twitter-integration development

1. go to preferred workspace directory
2. virtualenv venvs/presterity
3. source venvs/presterity/bin/activate
4. git clone [email protected]:/presterity/twitter-integration
5. pip install -r twitter-integration/requirements.txt
2. `virtualenv -p python3 venvs/presterity`
3. `source venvs/presterity/bin/activate`
4. `git clone [email protected]:/presterity/twitter-integration`
5. `pip install -r twitter-integration/requirements.txt`


## Using the Twitter APIs

1. Follow the instructions at https://python-twitter.readthedocs.io/en/latest/getting_started.html to register a Twitter app.
2. Under "Keys and Access Tokens," generate Consumer Key and Secret
3. Copy your consumer key/secret and access key/secret into your own version of the config.py file
4. Execute the following command from the root of the repo (to avoid accidentaly checking in your changes to the file) `git update-index --assume-unchanged ./config.py`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool. I didn't know about that.
typo: accidentaly -> accidentally



## Running stream-listener

1. CD to root of repo
2. `python stream-listener/twitter-stream-listen.py --handle @presterity --verbose`

Notes about the above command:

1. Run `python stream-listener/twitter-stream-listen.py -h` for a help page
2. `python` - If you didn't run `-p python3` option when create the virtualenv workspace, you need to specify `python3` instead `python`. Otherwise, you will get errors immeditately after running.
3. If you don't start the command from the root of the repo, you'll need to specify `--config {path/to/config}`. If your error message is related to accessing `config.py`, this is likely your problem.
4. Pick a different handle than @presterity to listen to something more active. Specify multiple like follows: `--handle @presterity --handle @SenWarren --handle @PramiliaJayapal --handle @BarackObama` ('@' is optional. Leave it off if you want.)
5. I recommend `--verbose` the first time you run it because otherwise you might think the application is hanging (when it's just waiting for someone being watched to tweet). Remove this flag after you know it works for your own sanity.

## Using the AWS APIs

1. Log-in to the AWS Console (You can create an account with your Amazon information if you haven't already. A lot of things are free for the first year. Setting up the Dynamo databases like I do below costs approximately $5 per month if you are not still on the free level)
2. In the IAM section, create a user with programatic access and AmazonDynamoDBFullAccess permissions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

programatic -> programmatic

3. Create a file in ~/.aws/config and paste in your access key and secret access key in the following format

```
[default]
aws_access_key_id = xxxxxxxxxxxxxxxxxxxxxx
aws_secret_access_key = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
```

4. You're all set up. All AWS API calls will find this file automatically and grant permissions accordingly
5. If you have any difficulty, reach out on slack to kyle.parker.robinson

## Creating DynamoDB Tables

The application in stream-handler requires access to two DynamoDB tables. Run the scripts in ./dynamo-tables/ to create them

`python dynamo-tables/create_tweet_table.py`
`python dynamo-tables/create_users_table.py`

If your AWS account is not on the free tier, these tables will accrue charges of around $5 per month for both as long as they exist. You can delete them and recreate them when needed to save costs.

## Running stream-handler

`python3 stream-handler/main.py --handle @BarackObama`

Using BarackObama for testing has been useful because there is a steady stream of people tweeting to him
34 changes: 34 additions & 0 deletions dynamo-tables/create_tweet_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import boto3


dynamodb = boto3.resource('dynamodb', region_name='us-west-2')

table = dynamodb.create_table(
TableName='Tweets',
KeySchema=[
{
'AttributeName': 'user',
'KeyType': 'HASH' # Partition key
},
{
'AttributeName': 'id',
'KeyType': 'RANGE' # Sort key
}
],
AttributeDefinitions=[
{
'AttributeName': 'user',
'AttributeType': 'S'
},
{
'AttributeName': 'id',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)

print("Table status:", table.table_status)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This schema isn't correct yet, I assume.

25 changes: 25 additions & 0 deletions dynamo-tables/create_users_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import boto3

dynamodb = boto3.resource('dynamodb', region_name='us-west-2')

table = dynamodb.create_table(
TableName='Users',
KeySchema=[
{
'AttributeName': 'id',
'KeyType': 'HASH' # Partition key
}
],
AttributeDefinitions=[
{
'AttributeName': 'id',
'AttributeType': 'S'
}
],
ProvisionedThroughput={
'ReadCapacityUnits': 5,
'WriteCapacityUnits': 5
}
)

print("Table status:", table.table_status)
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
python-twitter
typing

# Amazon AWS SDK
boto3
23 changes: 23 additions & 0 deletions stream-handler/handlers/apply_user_filter_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import logging
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment briefly explaining what the ApplyUserFilterHandler does and that it is intended to be called after RecordUserHandler?



class ApplyUserFilterHandler:

def __init__(self):
self.log = logging.getLogger()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do getLogger(__name__) in our modules so that we can set different levels for our code and third-party libs. I often find third-party libs way too verbose at DEBUG level.


def handle(self, tweet: dict):
if tweet.get('presterity_action') is None:
tweet['presterity_action'] = {}

user_flags = tweet.get('presterity_user_flags')
if user_flags is None:
return

if 'user_blocked' in user_flags:
self.log.info('Blocked user detected: %s', tweet['user'].get('screen_name'))
tweet['presterity_action']['hidden'] = 'blocked_user'
if 'user_promoted' in user_flags:
self.log.info('Promoted user detected: %s', tweet['user'].get('screen_name'))
tweet['presterity_action']['promoted'] = 'promoted_user'

41 changes: 41 additions & 0 deletions stream-handler/handlers/record_user_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import boto3
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment briefly explaining what the RecordUserHandler does?

from tweet_type import TweetType
import logging


class RecordUserHandler:
def __init__(self):
_dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
self.table = _dynamodb.Table('Users')
self.log = logging.getLogger()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logging.getLogger(__name__)


def handle(self, tweet: dict):
user = tweet.get('user')

response = self.table.update_item(
Key={
'id': user.get('id_str')
},
UpdateExpression=
'set '
'screen_name = :s, '
'profile_image_url = :p, '
'created_at = :c',
ExpressionAttributeValues={
':s': user.get('screen_name'),
':p': user.get('profile_image_url'),
':c': self.__parse_datetime(user.get('created_at'))
},
ReturnValues='ALL_NEW'
)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the approach to insert/update the user every time, no matter if they already exist or not? Just making sure I understand.

# Take advantage of the fact that dynamo returns the record's attributes
# Removes need to read record from database in ApplyUserFilterHandler
record = response['Attributes']
user_flags = record['user_flags'] or {}
tweet['presterity_user_flags'] = user_flags

@staticmethod
def __parse_datetime(datetime):
datetime = TweetType.translate_datetime(datetime)
return datetime.strftime('%Y-%m-%d %H:%M:%S %Z')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd vote for using datetime.isoformat() instead of a custom formatting string, unless there's a reason not to.

17 changes: 17 additions & 0 deletions stream-handler/handlers/write_to_console_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from tweet_type import TweetType


class WriteToconsoleHandler:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this class name should be WriteToConsoleHandler


@classmethod
def handle(cls, tweet: dict):
if not tweet:
print('-- None -- ')
elif TweetType.is_timeout(tweet):
print('-- Timeout --')
elif TweetType.is_heartbeat_timeout(tweet):
print('-- Heartbeat Timeout --')
elif TweetType.is_hangup(tweet):
print('-- Hangup --')
else:
print(TweetType.get_text(tweet))
32 changes: 32 additions & 0 deletions stream-handler/handlers/write_to_dynamo_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import boto3
from tweet_type import TweetType


class WriteToDynamoHandler:
def __init__(self):
_dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
self.table = _dynamodb.Table('Tweets')

def handle(self, tweet: dict):
user = tweet['user']
self.table.put_item(
Item={
'user': user['id_str'],
'id': tweet['id_str'],
'text': tweet['text'],
'created_at': self.__parse_datetime(tweet['created_at']),
'in_reply_to_screen_name': tweet.get('in_reply_to_screen_name'),
'in_reply_to_status_id': tweet.get('in_reply_to_status_id_str'),
'user_info': {
'screen_name': user.get('screen_name'),
'profile_image_url': user.get('profile_image_url'),
'presterity_user_flags': tweet.get('presterity_user_flags')
},
'presterity_actions': tweet.get('presterity_actions')
}
)

@staticmethod
def __parse_datetime(datetime):
_datetime = TweetType.translate_datetime(datetime)
return _datetime.strftime('%Y-%m-%d %H:%M:%S %Z')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment re: datetime.isoformat()

49 changes: 49 additions & 0 deletions stream-handler/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import logging
import argparse
from twitter_client import TwitterClient
from twitter_stream_listener import TwitterStreamListener

from handlers.write_to_dynamo_handler import WriteToDynamoHandler
from handlers.record_user_handler import RecordUserHandler
from handlers.apply_user_filter_handler import ApplyUserFilterHandler


def build_parser() -> argparse.ArgumentParser:
"""Construct argument parser for script.

:return: ArgumentParser
"""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--handle', action='append',
help='Twitter handle to listen for; may appear multiple times')
parser.add_argument('--id', action='append',
help='Twitter user id to listen for; may appear multiple times')
parser.add_argument('--verbose', '-v', action='store_true', help='log level DEBUG')
return parser

if __name__ != '__main__':
raise Exception('main.py can only be run as main')

parser = build_parser()
args = parser.parse_args()

# Log at WARN overall; DEBUG is very verbose for python-twitter code
logging.basicConfig(level=logging.WARN)
log_level = logging.INFO
if args.verbose:
log_level = logging.DEBUG
log = logging.getLogger()
log.setLevel(log_level)

client = TwitterClient()
listener = TwitterStreamListener(client.get_stream(args.handle))

listener.register_handler(RecordUserHandler())
listener.register_handler(ApplyUserFilterHandler())
listener.register_handler(WriteToDynamoHandler())

listener.start()

input('Press Enter to quit')

listener.stop()
50 changes: 50 additions & 0 deletions stream-handler/tweet_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
"""
Utility class to help understand data returned from Twitter API.
"""

from typing import Optional
from datetime import datetime


class TweetType(object):
Timeout = {'timeout': True}
Hangup = {'hangup': True}
HeartbeatTimeout = {'hangup': True, 'heartbeat_timeout': True}

@classmethod
def is_hangup(cls, tweet_dict: dict) -> bool:
"""Return True if provided dict looks like any type of hangup.

Note that a heartbeat timeout is a kind of hangup.
"""
return tweet_dict and tweet_dict is cls.Hangup

@classmethod
def is_heartbeat_timeout(cls, tweet_dict: dict) -> bool:
"""Return True if provided dict looks like heartbeat timeout."""
return tweet_dict and tweet_dict is cls.HeartbeatTimeout

@classmethod
def is_timeout(cls, tweet_dict: dict) -> bool:
"""Return True if provided dict looks like timeout."""
return tweet_dict and tweet_dict is cls.Timeout

@classmethod
def is_hangup_or_timeout(cls, tweet_dict: dict) -> bool:
return cls.is_hangup(tweet_dict) \
or cls.is_heartbeat_timeout(tweet_dict) \
or cls.is_timeout(tweet_dict)

@classmethod
def get_text(cls, tweet_dict: dict) -> Optional[str]:
"""Return text or None."""
text = None
if tweet_dict:
text = tweet_dict.get('text')
# Convert empty string to None for consistency
return text or None

@staticmethod
def translate_datetime(tweet_datetime: str):
_format = '%a %b %d %H:%M:%S %z %Y'
return datetime.strptime(tweet_datetime, _format)
Loading