Skip to content

Commit

Permalink
Oas update (erev0s#55)
Browse files Browse the repository at this point in the history
* Updated OAS to include auth and added swagger ui

* custom 401 handler for connexion to match rest

* update readme
  • Loading branch information
erev0s authored May 30, 2024
1 parent 1449f9b commit beca461
Show file tree
Hide file tree
Showing 8 changed files with 84 additions and 62 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
FROM python:3.7-alpine as builder
FROM python:3.11-alpine as builder
RUN apk --update add bash nano g++
COPY . /vampi
WORKDIR /vampi
RUN pip install -r requirements.txt

# Build a fresh container, copying across files & compiled parts
FROM python:3.7-alpine
FROM python:3.11-alpine
COPY . /vampi
WORKDIR /vampi
COPY --from=builder /usr/local/lib /usr/local/lib
Expand Down
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ VAmPI is a vulnerable API made with Flask and it includes vulnerabilities from t
- OpenAPI3 specs and Postman Collection included.
- Global switch on/off to have a vulnerable environment or not.
- Token-Based Authentication (Adjust lifetime from within app.py)
- Available Swagger UI to directly interact with the API

VAmPI's flow of actions is going like this: an unregistered user can see minimal information about the dummy users included in the API. A user can register and then login to be allowed using the token received during login to post a book. For a book posted the data accepted are the title and a secret about that book. Each book is unique for every user and only the owner of the book should be allowed to view the secret.

Expand All @@ -34,7 +35,7 @@ A quick rundown of the actions included can be seen in the following table:
| POST | /books/v1 | Add new book |
| GET | /books/v1/{book} | Retrieves book by title along with secret |

For more details you can use a service like the [swagger editor](https://editor.swagger.io) supplying it the OpenAPI specification which can be found in the directory `openapi_specs`.
For more details you can either run VAmPI and visit `http://127.0.0.1:5000/ui/` or use a service like the [swagger editor](https://editor.swagger.io) supplying the OpenAPI specification which can be found in the directory `openapi_specs`.


#### List of Vulnerabilities
Expand Down Expand Up @@ -70,6 +71,12 @@ docker run -p 5000:5000 erev0s/vampi:latest
docker-compose up -d
~~~~

## Available Swagger UI :rocket:
Visit the path `/ui` where you are running the API and a Swagger UI will be available to help you get started!
~~~~
http://127.0.0.1:5000/ui/
~~~~

## Customizing token timeout and vulnerable environment or not
If you would like to alter the timeout of the token created after login or if you want to change the environment **not** to be vulnerable then you can use a few ways depending how you run the application.

Expand Down
18 changes: 5 additions & 13 deletions api_views/books.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import jsonschema

from api_views.users import token_validator
from api_views.users import token_validator, error_message_helper
from config import db
from api_views.json_schemas import *
from flask import jsonify, Response, request, json
Expand All @@ -9,10 +9,6 @@
from app import vuln


def error_message_helper(msg):
return '{ "status": "fail", "message": "' + msg + '"}'


def get_all_books():
return_value = jsonify({'Books': Book.get_all_books()})
return return_value
Expand All @@ -25,12 +21,10 @@ def add_new_book():
except:
return Response(error_message_helper("Please provide a proper JSON body."), 400, mimetype="application/json")
resp = token_validator(request.headers.get('Authorization'))
if "expired" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
elif "Invalid token" in resp:
if "error" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
else:
user = User.query.filter_by(username=resp).first()
user = User.query.filter_by(username=resp['sub']).first()

# check if user already has this book title
book = Book.query.filter_by(user=user, book_title=request_data.get('book_title')).first()
Expand All @@ -50,9 +44,7 @@ def add_new_book():

def get_by_title(book_title):
resp = token_validator(request.headers.get('Authorization'))
if "expired" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
elif "Invalid token" in resp:
if "error" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
else:
if vuln: # Broken Object Level Authorization
Expand All @@ -67,7 +59,7 @@ def get_by_title(book_title):
else:
return Response(error_message_helper("Book not found!"), 404, mimetype="application/json")
else:
user = User.query.filter_by(username=resp).first()
user = User.query.filter_by(username=resp['sub']).first()
book = Book.query.filter_by(user=user, book_title=str(book_title)).first()
if book:
responseObject = {
Expand Down
40 changes: 19 additions & 21 deletions api_views/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@


def error_message_helper(msg):
return '{ "status": "fail", "message": "' + msg + '"}'
if isinstance(msg, dict):
return '{ "status": "fail", "message": "' + msg['error'] + '"}'
else:
return '{ "status": "fail", "message": "' + msg + '"}'


def get_all_users():
Expand Down Expand Up @@ -81,12 +84,14 @@ def login_user():
return Response(json.dumps(responseObject), 200, mimetype="application/json")
if vuln: # Password Enumeration
if user and request_data.get('password') != user.password:
return Response(error_message_helper("Password is not correct for the given username."), 200, mimetype="application/json")
return Response(error_message_helper("Password is not correct for the given username."), 200,
mimetype="application/json")
elif not user: # User enumeration
return Response(error_message_helper("Username does not exist"), 200, mimetype="application/json")
else:
if (user and request_data.get('password') != user.password) or (not user):
return Response(error_message_helper("Username or Password Incorrect!"), 200, mimetype="application/json")
return Response(error_message_helper("Username or Password Incorrect!"), 200,
mimetype="application/json")
except jsonschema.exceptions.ValidationError as exc:
return Response(error_message_helper(exc.message), 400, mimetype="application/json")
except:
Expand All @@ -105,7 +110,7 @@ def token_validator(auth_header):
# if auth_token is valid we get back the username of the user
return User.decode_auth_token(auth_token)
else:
return "Invalid token"
return {'error': 'Invalid token. Please log in again.'}


def update_email(username):
Expand All @@ -115,12 +120,10 @@ def update_email(username):
except:
return Response(error_message_helper("Please provide a proper JSON body."), 400, mimetype="application/json")
resp = token_validator(request.headers.get('Authorization'))
if "expired" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
elif "Invalid token" in resp:
if "error" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
else:
user = User.query.filter_by(username=resp).first()
user = User.query.filter_by(username=resp['sub']).first()
if vuln: # Regex DoS
match = re.search(
r"^([0-9a-zA-Z]([-.\w]*[0-9a-zA-Z])*@{1}([0-9a-zA-Z][-\w]*[0-9a-zA-Z]\.)+[a-zA-Z]{2,9})$",
Expand All @@ -137,7 +140,8 @@ def update_email(username):
}
return Response(json.dumps(responseObject), 204, mimetype="application/json")
else:
return Response(error_message_helper("Please Provide a valid email address."), 400, mimetype="application/json")
return Response(error_message_helper("Please Provide a valid email address."), 400,
mimetype="application/json")
else:
regex = '^[a-z0-9]+[\._]?[a-z0-9]+[@]\w+[.]\w{2,3}$'
if (re.search(regex, request_data.get('email'))):
Expand All @@ -152,16 +156,14 @@ def update_email(username):
}
return Response(json.dumps(responseObject), 204, mimetype="application/json")
else:
return Response(error_message_helper("Please Provide a valid email address."), 400, mimetype="application/json")

return Response(error_message_helper("Please Provide a valid email address."), 400,
mimetype="application/json")


def update_password(username):
request_data = request.get_json()
resp = token_validator(request.headers.get('Authorization'))
if "expired" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
elif "Invalid token" in resp:
if "error" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
else:
if request_data.get('password'):
Expand All @@ -173,7 +175,7 @@ def update_password(username):
else:
return Response(error_message_helper("User Not Found"), 400, mimetype="application/json")
else:
user = User.query.filter_by(username=resp).first()
user = User.query.filter_by(username=resp['sub']).first()
user.password = request_data.get('password')
db.session.commit()
responseObject = {
Expand All @@ -185,16 +187,12 @@ def update_password(username):
return Response(error_message_helper("Malformed Data"), 400, mimetype="application/json")




def delete_user(username):
resp = token_validator(request.headers.get('Authorization'))
if "expired" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
elif "Invalid token" in resp:
if "error" in resp:
return Response(error_message_helper(resp), 401, mimetype="application/json")
else:
user = User.query.filter_by(username=resp).first()
user = User.query.filter_by(username=resp['sub']).first()
if user.admin:
if bool(User.delete_user(username)):
responseObject = {
Expand Down
12 changes: 10 additions & 2 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,25 @@
import os
import connexion
from flask import jsonify
from flask_sqlalchemy import SQLAlchemy

vuln_app = connexion.App(__name__, specification_dir='./openapi_specs')

SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(vuln_app.root_path, 'database/database.db')
SQLALCHEMY_DATABASE_URI = 'sqlite:///' + os.path.join(vuln_app.app.root_path, 'database/database.db')
vuln_app.app.config['SQLALCHEMY_DATABASE_URI'] = SQLALCHEMY_DATABASE_URI
vuln_app.app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

vuln_app.app.config['SECRET_KEY'] = 'random'
# start the db
db = SQLAlchemy(vuln_app.app)

vuln_app.add_api('openapi3.yml')

@vuln_app.app.errorhandler(401)
def custom_401(error):
# Custom 401 to match the original response sent by Vampi
response = jsonify({"status": "fail", "message": "Invalid token. Please log in again."})
response.status_code = 401
return response


vuln_app.add_api('openapi3.yml')
11 changes: 6 additions & 5 deletions models/user_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from random import randrange
from sqlalchemy.sql import text


class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True, unique=True, autoincrement=True)
Expand Down Expand Up @@ -45,17 +46,17 @@ def encode_auth_token(self, user_id):
def decode_auth_token(auth_token):
try:
payload = jwt.decode(auth_token, vuln_app.app.config.get('SECRET_KEY'), algorithms=["HS256"])
return payload['sub']
return payload
except jwt.ExpiredSignatureError:
return 'Signature expired. Please log in again.'
return {'error': 'Signature expired. Please log in again.'}
except jwt.InvalidTokenError:
return 'Invalid token. Please log in again.'
return {'error': 'Invalid token. Please log in again.'}

def json(self):
return{'username': self.username, 'email': self.email}
return {'username': self.username, 'email': self.email}

def json_debug(self):
return{'username': self.username, 'password': self.password, 'email': self.email, 'admin': self.admin}
return {'username': self.username, 'password': self.password, 'email': self.email, 'admin': self.admin}

@staticmethod
def get_all_users():
Expand Down
Loading

0 comments on commit beca461

Please sign in to comment.