Skip to content

Commit

Permalink
Working submission of Krypto hackathon, - Listening to Websockets for…
Browse files Browse the repository at this point in the history
… supported crypto channels - Adding to queue for every trigger price reached in rabbitMQ - Setting up consumer end in the rabbit MQ queue - Connected to postgresql db with 2 schemas (users, alerts) - Support of API endpoints for authentication to Creating alerts, to deleting alerts to fetching alerts with filters. - Uses Dependency Injection for checking auth in Restricted API endpoints

;
.
  • Loading branch information
surya-x committed Jul 30, 2022
1 parent ec52239 commit a507885
Show file tree
Hide file tree
Showing 8 changed files with 243 additions and 223 deletions.
23 changes: 23 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1 +1,24 @@
# krypto-backend-alerttool

Creating a price alert application that triggers an email when the user’s target price is
achieved.
By :- V Surya Kumar (19BCE10286)

## INSTALLATION
Pull the repository and Check out all the files in your local directory.

Follow the below steps to make it run:

### 1. Setting up database

1.


### 2. Setting up RabbitMQ (using docker)


### 3. Getting API key for Binance Websockets


### 4.

78 changes: 76 additions & 2 deletions SQL_CRUD.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ def __init__(self, user, password, host, port, dbname, table, primarykey=None):
self.table = table
# self.primarykey = primarykey


def connect(self):
try:
connection = psycopg2.connect(
Expand Down Expand Up @@ -75,7 +74,7 @@ def close(self, commit=False):
)

def insert(self, **column_value):
insert_query = sql.SQL("INSERT INTO {} ({}) VALUES ({})").format(
insert_query = sql.SQL("INSERT INTO {} ({}) VALUES ({}) ").format(
sql.Identifier(self.table),
sql.SQL(', ').join(map(sql.Identifier, column_value.keys())),
sql.SQL(', ').join(sql.Placeholder() * len(column_value.values()))
Expand All @@ -84,6 +83,21 @@ def insert(self, **column_value):
self._execute(insert_query, record_to_insert)
self._counter += 1

def get_last_id(self):
select_query = sql.SQL("SELECT {} from {} order by {} desc limit 1").format(
sql.Identifier('alert_id'),
sql.Identifier(self.table),
sql.Identifier('alert_id')
)
self._execute(select_query, )
try:
selected = self._cursor.fetchall()
except psycopg2.ProgrammingError as error:
selected = '# ERROR: ' + str(error)
else:
print('-# ' + str(selected) + '\n')
return selected

def select_all(self, primaryKey_value=None):
if primaryKey_value == None:
select_query = sql.SQL("SELECT * FROM {}").format(sql.Identifier(self.table))
Expand Down Expand Up @@ -119,6 +133,66 @@ def get_with_email(self, email):
print('-# ' + str(selected) + '\n')
return selected

def get_all_alerts_with_user(self, user_id):
if user_id:
select_query = sql.SQL("SELECT alert_id, crypto_code, trigger_value, status FROM {} WHERE {} = {}").format(
sql.Identifier(self.table),
sql.Identifier('user_id'),
sql.Placeholder()
)
self._execute(select_query, (user_id,))
try:
selected = self._cursor.fetchall()
except psycopg2.ProgrammingError as error:
selected = '# ERROR: ' + str(error)
else:
print('-# ' + str(selected) + '\n')
return selected

def get_all_alerts(self):
select_query = sql.SQL(
"SELECT alert_id, crypto_code, trigger_value, status FROM {}").format(
sql.Identifier(self.table),
)
self._execute(select_query, ())
try:
selected = self._cursor.fetchall()
except psycopg2.ProgrammingError as error:
selected = '# ERROR: ' + str(error)
else:
print('-# ' + str(selected) + '\n')
return selected

def get_all_alerts_with_status_and_user(self, status, user_id):
if user_id:
select_query = sql.SQL(
"SELECT alert_id, crypto_code, trigger_value, status FROM {} WHERE {} = {} AND {} = {}").format(
sql.Identifier(self.table),
sql.Identifier('user_id'),
sql.Placeholder(),
sql.Identifier('status'),
sql.Placeholder()
)
self._execute(select_query, (user_id, status,))
try:
selected = self._cursor.fetchall()
except psycopg2.ProgrammingError as error:
selected = '# ERROR: ' + str(error)
else:
print('-# ' + str(selected) + '\n')
return selected

def update_status_of_alert(self, status, alert_id):
if alert_id:
update_query = sql.SQL(
"UPDATE {} SET {} = {} WHERE {} = {}").format(
sql.Identifier(self.table),
sql.Identifier('status'),
sql.Placeholder(),
sql.Identifier('alert_id'),
sql.Placeholder()
)
self._execute(update_query, (status, alert_id, ))

# def connect():
# """ Connect to the PostgreSQL database server """
Expand Down
82 changes: 77 additions & 5 deletions app/app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
from fastapi.responses import RedirectResponse
from fastapi import FastAPI, status, HTTPException, Depends
from fastapi.security import OAuth2PasswordRequestForm
from app.schemas import UserOut, UserAuth, TokenSchema, SystemUser
from app.schemas import UserOut, UserAuth, TokenSchema, SystemUser, Alert
from uuid import uuid4
import test_websocket
from config import *
from app.utils import (
get_hashed_password,
Expand Down Expand Up @@ -103,7 +103,79 @@ async def get_me(my_user: SystemUser = Depends(get_current_user)):
# # print(my_user)
# return {"Ping": "Surya"}

@app.get("/alert/create", tags=['ROOT'], response_model=dict)
async def create_alert(my_user=Depends(get_current_user)) -> dict:
@app.post('/alert/create', tags=['ROOT'], summary="Create new alert", response_model=dict)
async def create_alert(alert_info: Alert, my_user=Depends(get_current_user)) -> dict:
# print(my_user)
return {"Ping": "Surya"}
# print(alert_info)
table = SQL_CRUD.SQL_CRUD(user=user,
password=password,
host=host,
port=port,
dbname=database,
table=alerts_table)
table.connect()
print(str(my_user.id))

table.insert(
user_id=str(my_user.id),
crypto_code=alert_info.crypto_code,
trigger_value=alert_info.trigger_price,
status='CREATED'
)

last_alert_id = table.get_last_id()[0][0]

table.commit()
table.close()

test_websocket.push_to_lc(sym=alert_info.crypto_code, trigger=float(alert_info.trigger_price), alert_id=last_alert_id)

return {"status": "created", "crypto_code": str(alert_info.crypto_code),
"trigger_price": str(alert_info.trigger_price)}


@app.get('/alert', tags=['ROOT'], summary='Get details of all alerts with filters', response_model=dict)
async def fetch_all_alerts(status: str = "", my_user: SystemUser = Depends(get_current_user)) -> dict:
table = SQL_CRUD.SQL_CRUD(user=user,
password=password,
host=host,
port=port,
dbname=database,
table=alerts_table)
table.connect()

if status == 'CREATED' or status == 'DELETED' or status == 'TRIGGERED':
selected = table.get_all_alerts_with_status_and_user(user_id=str(my_user.id), status=status)
else:
selected = table.get_all_alerts_with_user(str(my_user.id))

table.commit()

table.close()

my_list = []
for each in selected:
my_dict = {"alert_id": each[0], 'crypto_code': each[1], 'trigger_price': each[2], 'status': each[3]}
my_list.append(my_dict)

return {'data': my_list}


@app.put('/alert/delete', tags=['ROOT'], summary='Update status of alert', response_model=dict)
async def update_alerts(alert_id: int, my_user: SystemUser = Depends(get_current_user)) -> dict:
table = SQL_CRUD.SQL_CRUD(user=user,
password=password,
host=host,
port=port,
dbname=database,
table=alerts_table)
table.connect()

new_status = "DELETED"
table.update_status_of_alert(status=new_status, alert_id=alert_id)

table.commit()

table.close()

return {"status": 200}
1 change: 0 additions & 1 deletion app/deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ async def get_current_user(token: str = Depends(reuseable_oauth)) -> SystemUser:
headers={"WWW-Authenticate": "Bearer"},
)

# user: Union[dict[str, Any], None] = db.get(token_data.sub, None)
# querying database to check if user already exist
table = SQL_CRUD.SQL_CRUD(user=user,
password=password,
Expand Down
14 changes: 13 additions & 1 deletion app/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,16 @@ class UserOut(BaseModel):


class SystemUser(UserOut):
password: str
password: str


class Alert(BaseModel):
crypto_code: str
trigger_price: float


class AlertAllOut(BaseModel):
alert_id: int
crypto_code: str
trigger_price: float
status: str
139 changes: 0 additions & 139 deletions connectDB.py

This file was deleted.

Loading

0 comments on commit a507885

Please sign in to comment.