diff --git a/.gitignore b/.gitignore index 658c217..4b97295 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ avatars __pycache__/ .DS_Store reports-* +.idea \ No newline at end of file diff --git a/GUI_template.py b/GUI_template.py index ce2fe35..1c17bb3 100644 --- a/GUI_template.py +++ b/GUI_template.py @@ -2,7 +2,6 @@ from sys import argv, exit from webbrowser import open -from backend.general_settings import latitude, longitude, meters from folium import Map, Marker from folium.plugins import MousePosition from PyQt5.QtCore import Qt @@ -23,6 +22,8 @@ QWidget, ) +from backend.general_settings import latitude, longitude, meters + class MainWindow(QMainWindow): def __init__(self): @@ -61,7 +62,7 @@ def __init__(self): upper_splitter.setStretchFactor(0, 1) # Allow adjusting height of the upper parts # Create right widget - #upper_right_widget = QWidget() + # upper_right_widget = QWidget() upper_right_widget = SettingsWidget() # Add right widget to vertical splitter @@ -76,20 +77,10 @@ def __init__(self): # Load map self.load_map() - - - - - - - - - - - def load_map(self): from backend.general_settings import latitude as default_latitude from backend.general_settings import longitude as default_longitude + m = Map(location=[default_latitude, default_longitude], zoom_start=12) MousePosition().add_to(m) # Add mouse position display @@ -102,23 +93,6 @@ def load_map(self): m_html = m.get_root().render() self.browser.setHtml(m_html) # Load the modified Folium map HTML directly into the QWebEngineView - - - - - - - - - - - - - - - - - def new_request(self): print("test") @@ -137,11 +111,11 @@ def show_about_dialog(self): def open_global_map(self): # Path to your HTML file current_directory = os.getcwd() - html_file_path = os.path.join(current_directory, 'reports-html', '_combined_data.html') + html_file_path = os.path.join(current_directory, "reports-html", "_combined_data.html") # Check if the HTML file exists if os.path.exists(html_file_path): - open('file://' + os.path.realpath(html_file_path)) + open("file://" + os.path.realpath(html_file_path)) else: print("HTML file not found!") @@ -189,24 +163,24 @@ def create_menu_bar(self): """) # Create File menu - file_menu = menubar.addMenu('&File') + file_menu = menubar.addMenu("&File") # Create actions for File menu - new_request_action = QAction('&New request', self) - new_request_action.setShortcut('Ctrl+n') - new_request_action.setStatusTip('New request') + new_request_action = QAction("&New request", self) + new_request_action.setShortcut("Ctrl+n") + new_request_action.setStatusTip("New request") new_request_action.triggered.connect(self.new_request) file_menu.addAction(new_request_action) - open_request_action = QAction('&Open request', self) - open_request_action.setShortcut('Ctrl+o') - open_request_action.setStatusTip('Open request') + open_request_action = QAction("&Open request", self) + open_request_action.setShortcut("Ctrl+o") + open_request_action.setStatusTip("Open request") open_request_action.triggered.connect(self.new_request) file_menu.addAction(open_request_action) - open_global_action = QAction('&Open global map', self) - open_global_action.setShortcut('Ctrl+Shift+o') - open_global_action.setStatusTip('Open global map') + open_global_action = QAction("&Open global map", self) + open_global_action.setShortcut("Ctrl+Shift+o") + open_global_action.setStatusTip("Open global map") open_global_action.triggered.connect(self.open_global_map) file_menu.addAction(open_global_action) @@ -216,36 +190,37 @@ def create_menu_bar(self): file_menu.addAction(separator_action) file_menu.setStyleSheet("QMenu::separator { background-color: #ccc; height: 1px; margin: 5px}") - exit_action = QAction('&Exit', self) - exit_action.setShortcut('Ctrl+Q') - exit_action.setStatusTip('Exit application') + exit_action = QAction("&Exit", self) + exit_action.setShortcut("Ctrl+Q") + exit_action.setStatusTip("Exit application") exit_action.triggered.connect(self.close) file_menu.addAction(exit_action) # Create Settings menu - settings_menu = menubar.addMenu('&Settings') + settings_menu = menubar.addMenu("&Settings") - telegram_settings_action = QAction('&Telegram settings', self) - telegram_settings_action.setStatusTip('Telegram setting') + telegram_settings_action = QAction("&Telegram settings", self) + telegram_settings_action.setStatusTip("Telegram setting") telegram_settings_action.triggered.connect(self.open_telegram_settings_window) settings_menu.addAction(telegram_settings_action) - general_settings_action = QAction('&General settings', self) - general_settings_action.setStatusTip('General setting') + general_settings_action = QAction("&General settings", self) + general_settings_action.setStatusTip("General setting") general_settings_action.triggered.connect(self.open_general_settings_window) settings_menu.addAction(general_settings_action) # Create About menu - about_menu = menubar.addMenu('&About') + about_menu = menubar.addMenu("&About") # Create actions for About menu - about_action = QAction('&About', self) - about_action.setStatusTip('&About') + about_action = QAction("&About", self) + about_action.setStatusTip("&About") about_action.triggered.connect(self.show_about_dialog) about_menu.addAction(about_action) return menubar + class SettingsWidget(QWidget): def __init__(self): super().__init__() @@ -278,7 +253,7 @@ def __init__(self): layout.addWidget(self.slider_position_label) # Create layout for latitude and longitude - lat_long_layout = QHBoxLayout() + # lat_long_layout = QHBoxLayout() # Create layout for latitude lat_layout = QHBoxLayout() @@ -324,20 +299,21 @@ def slider_changed(self, value): def revert_settings(self): try: - file_descriptor = os.open('./backend/general_settings.py', os.O_RDONLY) - file_contents = os.read(file_descriptor, os.path.getsize('./backend/general_settings.py')).decode() + file_descriptor = os.open("./backend/general_settings.py", os.O_RDONLY) + file_contents = os.read(file_descriptor, os.path.getsize("./backend/general_settings.py")).decode() os.close(file_descriptor) - for line in file_contents.split('\n'): - if line.startswith('meters'): - self.slider.setValue(int(line.split('=')[1].strip())) - elif line.startswith('latitude'): - self.lat_line_edit.setText(line.split('=')[1].strip()) - elif line.startswith('longitude'): - self.long_line_edit.setText(line.split('=')[1].strip()) + for line in file_contents.split("\n"): + if line.startswith("meters"): + self.slider.setValue(int(line.split("=")[1].strip())) + elif line.startswith("latitude"): + self.lat_line_edit.setText(line.split("=")[1].strip()) + elif line.startswith("longitude"): + self.long_line_edit.setText(line.split("=")[1].strip()) except FileNotFoundError: QMessageBox.warning(self, "Warning", "No settings file found.") + class TelegramSettingsDialog(QDialog): def __init__(self): super().__init__() @@ -363,35 +339,35 @@ def __init__(self): layout.addWidget(label) self.setLayout(layout) - self.phone_number_label = QLabel('Phone Number:') + self.phone_number_label = QLabel("Phone Number:") layout.addWidget(self.phone_number_label) self.phone_number_text = QLineEdit() layout.addWidget(self.phone_number_text) - self.telegram_name_label = QLabel('Telegram Name:') + self.telegram_name_label = QLabel("Telegram Name:") layout.addWidget(self.telegram_name_label) self.telegram_name_text = QLineEdit() layout.addWidget(self.telegram_name_text) - self.telegram_api_id_label = QLabel('Telegram API ID:') + self.telegram_api_id_label = QLabel("Telegram API ID:") layout.addWidget(self.telegram_api_id_label) self.telegram_api_id_text = QLineEdit() layout.addWidget(self.telegram_api_id_text) - self.telegram_api_hash_label = QLabel('Telegram API Hash:') + self.telegram_api_hash_label = QLabel("Telegram API Hash:") layout.addWidget(self.telegram_api_hash_label) self.telegram_api_hash_text = QLineEdit() layout.addWidget(self.telegram_api_hash_text) - self.submit_button = QPushButton('Submit') + self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.save_settings) layout.addWidget(self.submit_button) - self.revert_button = QPushButton('Revert') + self.revert_button = QPushButton("Revert") self.revert_button.clicked.connect(self.revert_settings) layout.addWidget(self.revert_button) - self.cancel_button = QPushButton('Cancel') + self.cancel_button = QPushButton("Cancel") self.cancel_button.clicked.connect(self.close) layout.addWidget(self.cancel_button) @@ -402,19 +378,19 @@ def __init__(self): def load_settings(self): try: - file_descriptor = os.open('./backend/telegram_creds.py', os.O_RDONLY) - file_contents = os.read(file_descriptor, os.path.getsize('./backend/telegram_creds.py')).decode() + file_descriptor = os.open("./backend/telegram_creds.py", os.O_RDONLY) + file_contents = os.read(file_descriptor, os.path.getsize("./backend/telegram_creds.py")).decode() os.close(file_descriptor) - for line in file_contents.split('\n'): - if line.startswith('phone_number'): - self.phone_number = line.split('=')[1].strip().strip("'") - elif line.startswith('telegram_name'): - self.telegram_name = line.split('=')[1].strip().strip("'") - elif line.startswith('telegram_api_id'): - self.telegram_api_id = line.split('=')[1].strip().strip("'") - elif line.startswith('telegram_api_hash'): - self.telegram_api_hash = line.split('=')[1].strip().strip("'") + for line in file_contents.split("\n"): + if line.startswith("phone_number"): + self.phone_number = line.split("=")[1].strip().strip("'") + elif line.startswith("telegram_name"): + self.telegram_name = line.split("=")[1].strip().strip("'") + elif line.startswith("telegram_api_id"): + self.telegram_api_id = line.split("=")[1].strip().strip("'") + elif line.startswith("telegram_api_hash"): + self.telegram_api_hash = line.split("=")[1].strip().strip("'") self.phone_number_text.setText(self.phone_number) self.telegram_name_text.setText(self.telegram_name) @@ -429,7 +405,7 @@ def save_settings(self): self.telegram_api_id = self.telegram_api_id_text.text() self.telegram_api_hash = self.telegram_api_hash_text.text() - file_descriptor = os.open('./backend/telegram_creds.py', os.O_WRONLY | os.O_CREAT) + file_descriptor = os.open("./backend/telegram_creds.py", os.O_WRONLY | os.O_CREAT) file_contents = ( f"phone_number = '{self.phone_number}'\n" f"telegram_name = '{self.telegram_name}'\n" @@ -443,22 +419,23 @@ def save_settings(self): def revert_settings(self): try: - file_descriptor = os.open('./backend/telegram_creds.py', os.O_RDONLY) - file_contents = os.read(file_descriptor, os.path.getsize('./backend/telegram_creds.py')).decode() + file_descriptor = os.open("./backend/telegram_creds.py", os.O_RDONLY) + file_contents = os.read(file_descriptor, os.path.getsize("./backend/telegram_creds.py")).decode() os.close(file_descriptor) - for line in file_contents.split('\n'): - if line.startswith('phone_number'): - self.phone_number_text.setText(line.split('=')[1].strip().strip("'")) - elif line.startswith('telegram_name'): - self.telegram_name_text.setText(line.split('=')[1].strip().strip("'")) - elif line.startswith('telegram_api_id'): - self.telegram_api_id_text.setText(line.split('=')[1].strip().strip("'")) - elif line.startswith('telegram_api_hash'): - self.telegram_api_hash_text.setText(line.split('=')[1].strip().strip("'")) + for line in file_contents.split("\n"): + if line.startswith("phone_number"): + self.phone_number_text.setText(line.split("=")[1].strip().strip("'")) + elif line.startswith("telegram_name"): + self.telegram_name_text.setText(line.split("=")[1].strip().strip("'")) + elif line.startswith("telegram_api_id"): + self.telegram_api_id_text.setText(line.split("=")[1].strip().strip("'")) + elif line.startswith("telegram_api_hash"): + self.telegram_api_hash_text.setText(line.split("=")[1].strip().strip("'")) except FileNotFoundError: QMessageBox.warning(self, "Warning", "No settings file found.") + class GeneralSettingsDialog(QDialog): def __init__(self): super().__init__() @@ -473,35 +450,35 @@ def __init__(self): layout = QVBoxLayout() - self.meters_label = QLabel('Search radius (meters):') + self.meters_label = QLabel("Search radius (meters):") layout.addWidget(self.meters_label) self.meters_text = QLineEdit(str(self.meters)) layout.addWidget(self.meters_text) - self.latitude_label = QLabel('Starting Latitude:') + self.latitude_label = QLabel("Starting Latitude:") layout.addWidget(self.latitude_label) self.latitude_text = QLineEdit(str(self.latitude)) layout.addWidget(self.latitude_text) - self.longitude_label = QLabel('Starting Longitude:') + self.longitude_label = QLabel("Starting Longitude:") layout.addWidget(self.longitude_label) self.longitude_text = QLineEdit(str(self.longitude)) layout.addWidget(self.longitude_text) - self.timesleep_label = QLabel('Delay between search steps (sec):') + self.timesleep_label = QLabel("Delay between search steps (sec):") layout.addWidget(self.timesleep_label) self.timesleep_text = QLineEdit(str(self.timesleep)) layout.addWidget(self.timesleep_text) - self.submit_button = QPushButton('Submit') + self.submit_button = QPushButton("Submit") self.submit_button.clicked.connect(self.save_settings) layout.addWidget(self.submit_button) - self.revert_button = QPushButton('Revert') + self.revert_button = QPushButton("Revert") self.revert_button.clicked.connect(self.revert_settings) layout.addWidget(self.revert_button) - self.cancel_button = QPushButton('Cancel') + self.cancel_button = QPushButton("Cancel") self.cancel_button.clicked.connect(self.close) layout.addWidget(self.cancel_button) @@ -512,19 +489,19 @@ def __init__(self): def load_settings(self): try: - file_descriptor = os.open('./backend/general_settings.py', os.O_RDONLY) - file_contents = os.read(file_descriptor, os.path.getsize('./backend/general_settings.py')).decode() + file_descriptor = os.open("./backend/general_settings.py", os.O_RDONLY) + file_contents = os.read(file_descriptor, os.path.getsize("./backend/general_settings.py")).decode() os.close(file_descriptor) - for line in file_contents.split('\n'): - if line.startswith('meters'): - self.meters_text.setText(line.split('=')[1].strip()) - elif line.startswith('latitude'): - self.latitude_text.setText(line.split('=')[1].strip()) - elif line.startswith('longitude'): - self.longitude_text.setText(line.split('=')[1].strip()) - elif line.startswith('timesleep'): - self.timesleep_text.setText(line.split('=')[1].strip()) + for line in file_contents.split("\n"): + if line.startswith("meters"): + self.meters_text.setText(line.split("=")[1].strip()) + elif line.startswith("latitude"): + self.latitude_text.setText(line.split("=")[1].strip()) + elif line.startswith("longitude"): + self.longitude_text.setText(line.split("=")[1].strip()) + elif line.startswith("timesleep"): + self.timesleep_text.setText(line.split("=")[1].strip()) except FileNotFoundError: QMessageBox.warning(self, "Warning", "No settings file found.") @@ -535,7 +512,7 @@ def save_settings(self): self.longitude = float(self.longitude_text.text()) self.timesleep = int(self.timesleep_text.text()) - file_descriptor = os.open('./backend/general_settings.py', os.O_WRONLY | os.O_CREAT) + file_descriptor = os.open("./backend/general_settings.py", os.O_WRONLY | os.O_CREAT) file_contents = ( f"meters = {self.meters}\n" f"latitude = {self.latitude}\n" @@ -551,22 +528,23 @@ def save_settings(self): def revert_settings(self): try: - file_descriptor = os.open('./backend/general_settings.py', os.O_RDONLY) - file_contents = os.read(file_descriptor, os.path.getsize('./backend/general_settings.py')).decode() + file_descriptor = os.open("./backend/general_settings.py", os.O_RDONLY) + file_contents = os.read(file_descriptor, os.path.getsize("./backend/general_settings.py")).decode() os.close(file_descriptor) - for line in file_contents.split('\n'): - if line.startswith('meters'): - self.meters_text.setText(line.split('=')[1].strip()) - elif line.startswith('latitude'): - self.latitude_text.setText(line.split('=')[1].strip()) - elif line.startswith('longitude'): - self.longitude_text.setText(line.split('=')[1].strip()) - elif line.startswith('timesleep'): - self.timesleep_text.setText(line.split('=')[1].strip()) + for line in file_contents.split("\n"): + if line.startswith("meters"): + self.meters_text.setText(line.split("=")[1].strip()) + elif line.startswith("latitude"): + self.latitude_text.setText(line.split("=")[1].strip()) + elif line.startswith("longitude"): + self.longitude_text.setText(line.split("=")[1].strip()) + elif line.startswith("timesleep"): + self.timesleep_text.setText(line.split("=")[1].strip()) except FileNotFoundError: QMessageBox.warning(self, "Warning", "No settings file found.") + class AboutDialog(QDialog): def __init__(self): super().__init__() @@ -589,7 +567,8 @@ def __init__(self): layout.addWidget(label) self.setLayout(layout) -if __name__ == '__main__': + +if __name__ == "__main__": app = QApplication(argv) app.setAttribute(Qt.AA_DontUseNativeMenuBar) # Disable native menu bar for macOS window = MainWindow() diff --git a/backend/banners.py b/backend/banners.py index 73d18f6..42ddf0e 100644 --- a/backend/banners.py +++ b/backend/banners.py @@ -1,6 +1,6 @@ from .functions import get_location_details -### Banner +# Banner banner = """ ██████╗██╗ ██████╗ ███████╗███████╗ ██████╗██╗██████╗ ██████╗██╗ ██╗██╗████████╗ ██╔════╝██║ ██╔═══██╗██╔════╝██╔════╝ ██╔════╝██║██╔══██╗██╔════╝██║ ██║██║╚══██╔══╝ @@ -22,45 +22,57 @@ [ ! ] https://t.me/EASM_HydrAttack """ -def print_geo_coordinater(lat,lon): + +def print_geo_coordinater(lat, lon): print("[ * ] Harvesting information based on the next coordinates:") print("\t[ * * ] Latitude: ", lat) print("\t[ * * ] Longitude:", lon) -def pring_city_by_geo(lat,lon): + +def print_city_by_geo(lat, lon): town, city, country = get_location_details(lat, lon) print(f"\t[ * * ] Country: {country}\n\t[ * * ] City:\t {city}\n\t[ * * ] Town:\t {town}\n") + def print_len_steps(steps, radius): print("[ * ] Overall steps to be performed:", steps, ", with overall diameter", radius * 2, "meters") + def print_telegram_initialization(): print("\n[ * ] Telegram client initialization...", end="") + def print_successfully(): print("successfully") + def print_start_harvesting(): print("\n[ * ] Start harvesting data:") + def print_current_step(step, lat, lon): print(f"\t[ {step} ] Latitude {round(lat, 4)}, Longitude {round(lon, 4)}") + def print_update_local_json(): print("\t\t[ > ] Harvesting data finished") print("\t\t[ > ] Updating JSON file...", end="") + def print_update_html(): print("\t\t[ > ] Generating HTML file...", end="") + def print_files_stored(report_json_directory, report_html_directory, filename): print("\n[ * ] Harvesting is finished and final files are generated:") - print("\t[ * * ] JSON:" , report_json_directory + filename + ".json") - print("\t[ * * ] HTML:" , report_html_directory + filename + ".html") + print("\t[ * * ] JSON:", report_json_directory + filename + ".json") + print("\t[ * * ] HTML:", report_html_directory + filename + ".html") + def print_combined_data(): print("\n[ * ] Combining all JSON files together and generating the global HTML map file") + def finishing_application(): print("\n[ * ] Everything has been executed successfully!") print("\t[ * * ] Enjoy your CCTV data!") diff --git a/backend/combine_data.py b/backend/combine_data.py index a5ca9dc..1fe762b 100644 --- a/backend/combine_data.py +++ b/backend/combine_data.py @@ -1,13 +1,16 @@ from .functions import combine_json_files from .json_into_html import generate_html_from_json -def combine_data(report_json_directory="./reports-json/", report_html_directory="./reports-html/"): +def combine_data(report_json_directory="./reports-json/", report_html_directory="./reports-html/"): combine_json_files(report_json_directory, f"{report_json_directory}_combined_data.json") # Generate the HTML file from JSON - generate_html_from_json(f"{report_json_directory}_combined_data.json", f"{report_html_directory}_combined_data.html") + generate_html_from_json( + f"{report_json_directory}_combined_data.json", f"{report_html_directory}_combined_data.html" + ) + # Call the function to execute the functionality if __name__ == "__main__": - combine_data() \ No newline at end of file + combine_data() diff --git a/backend/functions.py b/backend/functions.py index 0a7f585..ed00ac1 100644 --- a/backend/functions.py +++ b/backend/functions.py @@ -18,68 +18,73 @@ def generate_pattern(length): pattern.append((length - 3) // 2 - 1) return pattern + def calculate_length(metres): - remainder = ((metres + 400) % 800) + remainder = (metres + 400) % 800 if remainder == 0: return metres else: - return (metres - remainder + 800) + return metres - remainder + 800 + # Function to calculate coordinates based on steps and direction def calculate_coordinates(lat, lon, direction, distance): # Radius of the Earth in kilometers - R = 6378.1 # Earth radius in km + r = 6378.1 # Earth radius in km # Convert latitude and longitude from degrees to radians lat_rad = radians(lat) - lon_rad = radians(lon) + # Calculate new latitude and longitude based on direction and distance - if direction == 'starting': + if direction == "starting": new_lon = lon new_lat = lat - elif direction == 'west': - new_lon = lon - (distance / (R * cos(lat_rad))) * (180 / pi) + elif direction == "west": + new_lon = lon - (distance / (r * cos(lat_rad))) * (180 / pi) new_lat = lat - elif direction == 'south': - new_lat = lat - (distance / R) * (180 / pi) + elif direction == "south": + new_lat = lat - (distance / r) * (180 / pi) new_lon = lon - elif direction == 'east': - new_lon = lon + (distance / (R * cos(lat_rad))) * (180 / pi) + elif direction == "east": + new_lon = lon + (distance / (r * cos(lat_rad))) * (180 / pi) new_lat = lat - elif direction == 'north': - new_lat = lat + (distance / R) * (180 / pi) + elif direction == "north": + new_lat = lat + (distance / r) * (180 / pi) new_lon = lon else: raise ValueError("Invalid direction") return new_lat, new_lon + # Function to combine all json files into one def combine_json_files(folder_path, output_file): combined_data = {} for filename in listdir(folder_path): file_path = path.join(folder_path, filename) - if filename.endswith('.json') and filename != output_file: - with open(file_path, 'r') as file: + if filename.endswith(".json") and filename != output_file: + with open(file_path) as file: data = load(file) combined_data.update(data) # Merge dictionaries - with open(output_file, 'w') as output: + with open(output_file, "w") as output: dump(combined_data, output, indent=4) + # Get the city based on coordinates def get_location_details(latitude, longitude): - url = f"https://nominatim.openstreetmap.org/reverse?lat={latitude}&lon={longitude}&format=json" - headers = { - 'User-Agent': 'CCTV Bot' - } - response = get(url, headers=headers) - data = response.json() - if 'address' in data: - town = data['address'].get('town', '') - city = data['address'].get('city', '') - country = data['address'].get('country', '') - return town, city, country - else: - return None, None, None + url = "https://nominatim.openstreetmap.org/reverse" + headers = {"User-Agent": "CCTV Bot"} + response = get(url, headers=headers, params={"lat": latitude, "lon": longitude, "format": "json"}) + + if response.status_code == 200 and "application/json" in response.headers.get("Content-Type"): + data: dict = response.json() + if "address" in data: + town = data["address"].get("town", "") + city = data["address"].get("city", "") + country = data["address"].get("country", "") + return town, city, country + + return "", "", "" + def countdown_timer(duration): for remaining in range(duration, 0, -1): @@ -87,7 +92,8 @@ def countdown_timer(duration): sleep(1) print(" " * 100, end="\r") # Clear the line after countdown -#Download avatars + +# Download avatars def download_avatar(user_id, username, user_url, output_folder): avatar_filename = path.join(output_folder, f"{user_id}-{username}.jpg") if username is None: @@ -100,17 +106,19 @@ def download_avatar(user_id, username, user_url, output_folder): try: response = get(user_url) if response.status_code == 200: - soup = BeautifulSoup(response.text, 'html.parser') + soup = BeautifulSoup(response.text, "html.parser") meta_tag = soup.find("meta", property="og:image") if meta_tag and "cdn-telegram.org" in meta_tag["content"]: image_url = meta_tag["content"] response = get(image_url) if response.status_code == 200: - with open(avatar_filename, 'wb') as image_file: + with open(avatar_filename, "wb") as image_file: image_file.write(response.content) print(f"Downloaded avatar for user {user_id}-{username} successfully") else: - print(f"Failed to download avatar for user {user_id}-{username}. Status code: {response.status_code}") + print( + f"Failed to download avatar for user {user_id}-{username}. Status code: {response.status_code}" + ) else: print(f"No profile photo found for user {user_id}-{username}") else: @@ -118,38 +126,78 @@ def download_avatar(user_id, username, user_url, output_folder): except Exception as e: print(f"Error downloading avatar for user {user_id}-{username}: {e}") + def download_avatars(json_file, output_folder): print(f"Starting avatars download based on {json_file}...") - with open(json_file, 'r') as f: + with open(json_file) as f: data = load(f) with ThreadPoolExecutor(max_workers=5) as executor: for user_id, user_data in data.items(): # Skip users without a photo_id - if user_data.get('photo_id') is None: + if user_data.get("photo_id") is None: continue - username = user_data.get('username', '') + username = user_data.get("username", "") user_url = f"https://t.me/{username}" executor.submit(download_avatar, user_id, username, user_url, output_folder) + def create_config(file_path): """Create a custom YAML configuration file by asking the user for input.""" settings = { - 'api_config': { - 'phone': {'prompt': 'TG phone number: ', 'default': None, 'mandatory': True}, - 'api_id': {'prompt': 'TG api_id: ', 'default': None, 'mandatory': True, 'type': int}, - 'api_hash': {'prompt': 'TG api_hash:', 'default': None, 'mandatory': True}, + "api_config": { + "phone": { + "prompt": "TG phone number: ", + "default": None, + "mandatory": True + }, + "api_id": { + "prompt": "TG api_id: ", + "default": None, + "mandatory": True, + "type": int + }, + "api_hash": { + "prompt": "TG api_hash:", + "default": None, + "mandatory": True + }, + }, + "location": { + "lat": { + "prompt": "Starting latitude [51.51404]: ", + "default": 51.51404, + "mandatory": False, + "type": float + }, + "lon": { + "prompt": "Starting longitude [-0.15063]: ", + "default": -0.15063, + "mandatory": False, + "type": float, + }, + "meters": { + "prompt": "Search radius(meters) [1200]: ", + "default": 1200, + "mandatory": False, + "type": int + }, }, - 'location': { - 'lat': {'prompt': 'Starting latitude [51.51404]: ', 'default': 51.51404, 'mandatory': False, 'type': float}, - 'lon': {'prompt': 'Starting longitude [-0.15063]: ', 'default': -0.15063, 'mandatory': False, 'type': float}, - 'meters': {'prompt': 'Search radius(meters) [1200]: ', 'default': 1200, 'mandatory': False, 'type': int}, + "misc": { + "timesleep": { + "prompt": "Waiting time between locations(sec) [30]: ", + "default": 30, + "mandatory": False, + "type": int, + }, + "speed_kmh": { + "prompt": "Moving speed between locations(km/h) [50]: ", + "default": 50, + "mandatory": False, + "type": int, + }, }, - 'misc': { - 'timesleep': {'prompt': 'Waiting time between locations(sec) [30]: ', 'default': 30, 'mandatory': False, 'type': int}, - 'speed_kmh': {'prompt': 'Moving speed between locations(km/h) [50]: ', 'default': 50, 'mandatory': False, 'type': int}, - } } # Create a dictionary to store the configurations @@ -158,33 +206,34 @@ def create_config(file_path): print("\nCheck README.md and prepare required values at https://my.telegram.org/auth") # Iterate over the settings dictionary to prompt user for each value for group, group_settings in settings.items(): - for setting, details in group_settings.items(): + for setting, details in group_settings.items(): while True: # Loop until valid input is given or the program exits - user_input = input(details['prompt']) - if details['mandatory'] and not user_input.strip(): # Check if the input is mandatory and empty + user_input = input(details["prompt"]) + if details["mandatory"] and not user_input.strip(): # Check if the input is mandatory and empty print(f"Value {setting} is mandatory. Exiting program.") sys.exit() # Exit the program if input is mandatory and not provided elif user_input.strip(): - value = details['type'](user_input) if 'type' in details else user_input + value = details["type"](user_input) if "type" in details else user_input config_data[group][setting] = value break - elif details['default'] is not None: # Use default if available and input is not mandatory - config_data[group][setting] = details['default'] + elif details["default"] is not None: # Use default if available and input is not mandatory + config_data[group][setting] = details["default"] break else: print(f"No input provided for {setting}, and no default available. Exiting program.") sys.exit() # Exit if no default is available and input is not provided # Write the collected data to a YAML file - with open(file_path, 'w') as file: + with open(file_path, "w") as file: yaml.safe_dump(config_data, file) print(f"Config file created at {file_path}") + def load_config(file_path): """Load the YAML configuration file, creating it if it does not exist.""" if not path.exists(file_path): print(f"No config file found at {file_path}. Creating initial configuration...") create_config(file_path) - with open(file_path, 'r') as file: + with open(file_path) as file: return yaml.safe_load(file) diff --git a/backend/json_into_html.py b/backend/json_into_html.py index 8779ae4..744675c 100644 --- a/backend/json_into_html.py +++ b/backend/json_into_html.py @@ -3,7 +3,7 @@ def generate_html_from_json(json_file, output_file): - with open(json_file, 'r') as f: + with open(json_file) as f: data = load(f) # Calculate average coordinates @@ -12,8 +12,8 @@ def generate_html_from_json(json_file, output_file): num_items = len(data) if num_items > 0: for user_id, user_data in data.items(): - total_latitude += user_data['coordinates_average']['latitude'] - total_longitude += user_data['coordinates_average']['longitude'] + total_latitude += user_data["coordinates_average"]["latitude"] + total_longitude += user_data["coordinates_average"]["longitude"] average_latitude = total_latitude / num_items average_longitude = total_longitude / num_items @@ -28,13 +28,17 @@ def generate_html_from_json(json_file, output_file): # Generate JavaScript code for points points_js = "var points = [\n" for user_id, user_data in data.items(): - username = user_data['username'].replace("'", "'").replace("\\", "\") if user_data['username'] else "" - first_name = user_data['first_name'].replace("'", "'").replace("\\", "\") if user_data['first_name'] else "" - last_name = user_data['last_name'].replace("'", "'").replace("\\", "\") if user_data['last_name'] else "" - phone = user_data['phone'] or "" - coordinates = user_data['coordinates_average'] - latitude = coordinates['latitude'] - longitude = coordinates['longitude'] + username = user_data["username"].replace("'", "'").replace("\\", "\") if user_data["username"] else "" + first_name = ( + user_data["first_name"].replace("'", "'").replace("\\", "\") if user_data["first_name"] else "" + ) + last_name = ( + user_data["last_name"].replace("'", "'").replace("\\", "\") if user_data["last_name"] else "" + ) + phone = user_data["phone"] or "" + coordinates = user_data["coordinates_average"] + latitude = coordinates["latitude"] + longitude = coordinates["longitude"] image_url = f"avatars/{user_id}-{user_data['username'] or '.no_avatar'}.jpg" if path.exists("./" + image_url): image_url = "../" + image_url @@ -42,7 +46,7 @@ def generate_html_from_json(json_file, output_file): image_url = "../avatars/.no_avatar.jpg" name = f"{first_name} {last_name}" coordinates_text = f"
• latitude: {latitude}
• longitude: {longitude}" - datetime = user_data['coordinates'][0][2] if user_data['coordinates'] else "" + datetime = user_data["coordinates"][0][2] if user_data["coordinates"] else "" points_js += f"\t{{ coordinates: [{latitude}, {longitude}], imageUrl: '{image_url}', name: '{name}', username: '{username}', phone: '{phone}', coordinates_text: '{coordinates_text}', datetime: '{datetime}' }},\n" points_js += "];\n" @@ -170,8 +174,9 @@ def generate_html_from_json(json_file, output_file): """ # Write HTML content to output file - with open(output_file, 'w', encoding='utf-8') as f: + with open(output_file, "w", encoding="utf-8") as f: f.write(html_content) + # Usage example: # generate_html_from_json("./reports-json/_combined_data.json", "./reports-html/_combined_data.html") diff --git a/start.py b/start.py index 8b1e959..d753346 100644 --- a/start.py +++ b/start.py @@ -5,10 +5,14 @@ from json import dump from os import getcwd, makedirs, path +from telethon import functions, types +from telethon.errors import FloodWaitError +from telethon.sync import TelegramClient + from backend.banners import ( banner, finishing_application, - pring_city_by_geo, + print_city_by_geo, print_combined_data, print_current_step, print_files_stored, @@ -30,24 +34,21 @@ load_config, ) from backend.json_into_html import generate_html_from_json -from telethon import functions, types -from telethon.sync import TelegramClient -from telethon.errors import FloodWaitError # Create an ArgumentParser object -parser = argparse.ArgumentParser(description='Custom settings for script launch') +parser = argparse.ArgumentParser(description="Custom settings for script launch") # Add arguments for latitude, longitude, meters, and timesleep -parser.add_argument('-lat', '--latitude', type=float, help='Latitude setting') -parser.add_argument('-long', '--longitude', type=float, help='Longitude setting') -parser.add_argument('-m', '--meters', type=int, help='Meters setting') -parser.add_argument('-t', '--timesleep', type=int, help='Timesleep setting') -parser.add_argument('-s', '--speed_kmh', type=int, help='Speed setting') +parser.add_argument("-lat", "--latitude", type=float, help="Latitude setting") +parser.add_argument("-long", "--longitude", type=float, help="Longitude setting") +parser.add_argument("-m", "--meters", type=int, help="Meters setting") +parser.add_argument("-t", "--timesleep", type=int, help="Timesleep setting") +parser.add_argument("-s", "--speed_kmh", type=int, help="Speed setting") # Add arguments for Telegram credentials -parser.add_argument('-tn', '--telegram_name', type=str, help='Telegram session name') -parser.add_argument('-ti', '--telegram_api_id', type=int, help='Telegram API ID') -parser.add_argument('-th', '--telegram_api_hash', type=str, help='Telegram API hash') +parser.add_argument("-tn", "--telegram_name", type=str, help="Telegram session name") +parser.add_argument("-ti", "--telegram_api_id", type=int, help="Telegram API ID") +parser.add_argument("-th", "--telegram_api_hash", type=str, help="Telegram API hash") # Add arguments for data processing parser.add_argument('--skip-avatars', action='store_true', help='Skip avatars downloading') @@ -55,15 +56,16 @@ # Parse the command-line arguments args = parser.parse_args() -#Load or create config file -config_file="config.yaml" +# Load or create config file +config_file = "config.yaml" config = load_config(config_file) + # Update settings if provided in command-line arguments -latitude = args.latitude or config['location']['lat'] -longitude = args.longitude or config['location']['lon'] -meters = args.meters or config['location']['meters'] -timesleep = args.timesleep or config['misc']['timesleep'] -speed_kmh = args.speed_kmh or config['misc']['speed_kmh'] +latitude = args.latitude or config["location"]["lat"] +longitude = args.longitude or config["location"]["lon"] +meters = args.meters or config["location"]["meters"] +timesleep = args.timesleep or config["misc"]["timesleep"] +speed_kmh = args.speed_kmh or config["misc"]["speed_kmh"] telegram_name = args.telegram_name or "cctv" telegram_api_id = args.telegram_api_id or config['api_config']['api_id'] telegram_api_hash = args.telegram_api_hash or config['api_config']['api_hash'] @@ -71,73 +73,82 @@ phone_number = config['api_config']['phone'] - # General variables pattern = generate_pattern((calculate_length(meters + 400) + 800) // 200) # Adjust the length as needed (x / 2 - 2) -current_datetime = datetime.now().strftime('%Y-%m-%d_%H-%M-%S') +current_datetime = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") + # Store the initial coordinates and initialize lists to store the coordinates for each step initial_latitude = latitude initial_longitude = longitude step_coordinates = [] + # Initialize variables to store coordinates and counts coordinates = [] coordinates_count = 0 coordinates_sum = [0, 0] + # Extract users with distance 500 filtered_users = [] + # Initialize an empty dictionary to store user data users_data = {} step = 0 -filename = f'{latitude}-{longitude}-{current_datetime}' +filename = f"{latitude}-{longitude}-{current_datetime}" + # Directories avatar_directory = "./avatars/" report_json_directory = "./reports-json/" report_html_directory = "./reports-html/" # Check if the needed directories exist, create it if not -for dir in avatar_directory, report_json_directory, report_html_directory: - if not path.exists(dir): - makedirs(dir) +for directory in avatar_directory, report_json_directory, report_html_directory: + if not path.exists(directory): + makedirs(directory) -### Banner logo +# Banner logo print(banner) -### Printing geo cordinates -print_geo_coordinater(latitude,longitude) -### Printing city and country by coordinates -pring_city_by_geo(latitude,longitude) + +# Printing geo cordinates +print_geo_coordinater(latitude, longitude) + +# Printing city and country by coordinates +print_city_by_geo(latitude, longitude) # Perform steps according to the pattern for i, steps in enumerate(pattern): if i == 0: - direction = 'starting' + direction = "starting" elif i % 4 == 1: - direction = 'west' + direction = "west" elif i % 4 == 2: - direction = 'south' + direction = "south" elif i % 4 == 3: - direction = 'east' + direction = "east" else: - direction = 'north' + direction = "north" for _ in range(steps): latitude, longitude = calculate_coordinates(latitude, longitude, direction, 0.6) # 600 meters in kilometers step_coordinates.append((latitude, longitude)) -### Print number of steps +# Print number of steps print_len_steps(len(step_coordinates), meters) # Initialize the Telegram client print_telegram_initialization() - with TelegramClient(telegram_name, telegram_api_id, telegram_api_hash, system_version="CCTV") as client: # Authenticate the client client.connect() print_successfully() - time_adjusted = round(0.6*3600/speed_kmh) # seconds to cover distance 600 meters + time_adjusted = round(0.6 * 3600 / speed_kmh) # seconds to cover distance 600 meters if timesleep < time_adjusted: - print(f"[ ! ] Configured timesleep {timesleep}s is too low to cover all points with configured speed {speed_kmh} km/h") - print(f"[ ! ] Adjusting sleep time to {time_adjusted}s according to calculated distances") + print( + f""" +[ ! ] Configured timesleep {timesleep}s is too low to cover all points with configured speed {speed_kmh} km/h +[ ! ] Adjusting sleep time to {time_adjusted}s according to calculated distances + """ + ) timesleep = time_adjusted # Initialize the dictionary to store user data @@ -147,22 +158,20 @@ print_start_harvesting() for latitude, longitude in step_coordinates: try: - result = client(functions.contacts.GetLocatedRequest( - geo_point=types.InputGeoPoint( - lat=latitude, - long=longitude, - accuracy_radius=500 - ) - )) + result = client( + functions.contacts.GetLocatedRequest( + geo_point=types.InputGeoPoint(lat=latitude, long=longitude, accuracy_radius=500) + ) + ) except FloodWaitError as e: - print(f"[ ! ] FloodWaitError: {e}") + print(f"[ ! ] FloodWaitError: {e}") - # Check if the waiting time exceeds the threshold - if e.seconds > 300: - print(f"[ ! ] Waiting time is too long, try again in {round(e.seconds/3600)} hours. Exiting program.") - sys.exit() - countdown_timer(e.seconds) - continue + # Check if the waiting time exceeds the threshold + if e.seconds > 300: + print(f"[ ! ] Waiting time is too long, try again in {round(e.seconds / 3600)} hours. Exiting program.") + sys.exit() + countdown_timer(e.seconds) + continue # Print the step and its coordinates step += 1 @@ -173,45 +182,58 @@ for update in result.updates: if isinstance(update, types.UpdatePeerLocated): for peer_located in update.peers: - if isinstance(peer_located, types.PeerLocated): # Check if the peer_located is of type PeerLocated - if peer_located.distance == 500: - if isinstance(peer_located.peer, types.PeerUser): # Check if the peer is a PeerUser - user_id = peer_located.peer.user_id - user_info = next((user for user in result.users if user.id == user_id), None) - if user_info: - # Get current timestamp - timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - - if user_id not in users_data: - # If the user is not in the dictionary, add them with initial data - username = user_info.username - - users_data[user_id] = { - "first_name": user_info.first_name, - "last_name": user_info.last_name, - "username": user_info.username, - "phone": user_info.phone, - "photo_id": user_info.photo.photo_id if user_info.photo else None, - "coordinates": [], - "coordinates_average": {"latitude": 0, "longitude": 0, "timestamp": 0} - } - # Append new coordinates - users_data[user_id]["coordinates"].append((latitude, longitude, timestamp)) - - # Calculate average coordinates - avg_latitude = sum(coord[0] for coord in users_data[user_id]["coordinates"]) / len(users_data[user_id]["coordinates"]) - avg_longitude = sum(coord[1] for coord in users_data[user_id]["coordinates"]) / len(users_data[user_id]["coordinates"]) - - # Update the average coordinates - users_data[user_id]["coordinates_average"] = {"latitude": avg_latitude, "longitude": avg_longitude} + if all( + [ # Check if the peer_located is of type PeerLocated + isinstance(peer_located, types.PeerLocated), + peer_located.distance == 500, + # Check if the peer is a PeerUser + isinstance(peer_located.peer, types.PeerUser), + ] + ): + user_id = peer_located.peer.user_id + user_info = next((user for user in result.users if user.id == user_id), None) + if user_info: + # Get current timestamp + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") + + if user_id not in users_data: + # If the user is not in the dictionary, add them with initial data + username = user_info.username + + users_data[user_id] = { + "first_name": user_info.first_name, + "last_name": user_info.last_name, + "username": user_info.username, + "phone": user_info.phone, + "photo_id": user_info.photo.photo_id if user_info.photo else None, + "coordinates": [], + "coordinates_average": {"latitude": 0.0, "longitude": 0.0}, + } + + # Append new coordinates + users_data[user_id]["coordinates"].append((latitude, longitude, timestamp)) + + # Calculate average coordinates + avg_latitude = sum(coord[0] for coord in users_data[user_id]["coordinates"]) / len( + users_data[user_id]["coordinates"] + ) + avg_longitude = sum(coord[1] for coord in users_data[user_id]["coordinates"]) / len( + users_data[user_id]["coordinates"] + ) + + # Update the average coordinates + users_data[user_id]["coordinates_average"] = { + "latitude": avg_latitude, + "longitude": avg_longitude, + } # Write the updated data to the file print_update_local_json() - with open(f"{report_json_directory}{filename}.json", 'w', encoding='utf-8') as file: + with open(f"{report_json_directory}{filename}.json", "w", encoding="utf-8") as file: dump(users_data, file, indent=4) print_successfully() - if not step == len(step_coordinates): + if step != len(step_coordinates): countdown_timer(timesleep) # Download avatars @@ -232,13 +254,13 @@ current_directory = getcwd() html_file_current = path.join(current_directory, report_html_directory + filename + ".html") -html_file_combined = path.join(current_directory, 'reports-html', '_combined_data.html') +html_file_combined = path.join(current_directory, "reports-html", "_combined_data.html") -try: - webbrowser.open('file://' + path.realpath(html_file_current)) - webbrowser.open('file://' + path.realpath(html_file_combined)) -except: - print("HTML file not found!") +for html_file in [path.realpath(html_file_current), path.realpath(html_file_combined)]: + try: + webbrowser.open("file://" + html_file) + except (ValueError, FileNotFoundError): + print(f"File {html_file} not found!") # Finishing the application finishing_application()