From d6fdf66355d511f0c4a4002b9aabf230373b6bdb Mon Sep 17 00:00:00 2001 From: teapot2 Date: Mon, 6 Nov 2023 23:48:08 -0600 Subject: [PATCH 1/2] Implemented video storage by intervals --- config.py | 2 + create_threads.py | 101 +++++++++++++++++++++++++++++++-------------- storage_service.py | 46 +++++++++++++++++++++ 3 files changed, 118 insertions(+), 31 deletions(-) create mode 100644 storage_service.py diff --git a/config.py b/config.py index 38d18c6..14b82ac 100644 --- a/config.py +++ b/config.py @@ -1,3 +1,5 @@ FRAME_WIDTH = 720 FRAME_HEIGHT = 480 FRAME_SIZE_BYTES = FRAME_HEIGHT * FRAME_WIDTH * 3 + +VIDEO_SEGMENTATION_INTERVAL = 20 diff --git a/create_threads.py b/create_threads.py index be3e0d1..ca2a9b6 100644 --- a/create_threads.py +++ b/create_threads.py @@ -1,14 +1,16 @@ from multiprocessing import Process, shared_memory, Manager from api import get_cameras, ping_cameras -import config -import cv2 +from storage_service import store_video_data import numpy as np +import argparse import random -import time -import os +import config import signal -import argparse +import time +import cv2 import sys +import os + # Lambda function for generating shared memory stream names based on index generate_shm_stream_name = lambda index: f"camera_{index}_stream" @@ -60,7 +62,7 @@ def signal_handler(sig, frame): # Function for processing camera streams -def process_camera(index, url, name, shared_dict): +def process_camera(index, url, name, camera_id, shared_dict): """ Process the camera streams. @@ -76,6 +78,7 @@ def process_camera(index, url, name, shared_dict): """ cap = cv2.VideoCapture(url) + frames = [] try: # Create a shared memory segment for the frame @@ -100,6 +103,18 @@ def process_camera(index, url, name, shared_dict): # Normalize frame to specified dimensions frame = cv2.resize(frame, (config.FRAME_WIDTH, config.FRAME_HEIGHT)) + frames.append(frame) + + # Store frames as video locally + if ( + len(frames) + % (cap.get(cv2.CAP_PROP_FPS) * config.VIDEO_SEGMENTATION_INTERVAL) + == 0 + ): + print("Storing video data...") + store_video_data(frames, camera_id, cap.get(cv2.CAP_PROP_FPS)) + frames.clear() + # Vision processing logic goes here send_frame_to_shared_memory(frame, shm) @@ -130,7 +145,7 @@ def terminate_shm(shm): # Function for monitoring the status of the camera processes -def monitor_process_status(shared_dict): +def monitor_process_status(shared_dict, log): """ Monitor the status of the camera processes. @@ -144,32 +159,33 @@ def monitor_process_status(shared_dict): """ while True: - os.system("cls" if os.name == "nt" else "clear") - - print( - "\033[1m{:<11} {:<21} {:<21} {:<15} {:<9}\033[0m".format( - "process_id", - "camera_name", - "stream_name", - "execution_time", - "faces_detected", - ) - ) - print("\033[1;37m{}\033[0m".format("=" * 90)) + if log: + os.system("cls" if os.name == "nt" else "clear") - # Print the data with appropriate formatting and colors - for key, value in shared_dict.items(): print( - "\033[92m{:<11} \033[0m {:<21} {:<21} {:<15} \033[0m {:<9}".format( - key, - value["camera_name"], - value["stream_name"], - value["execution_time"], - value["faces_detected"], + "\033[1m{:<11} {:<21} {:<21} {:<15} {:<9}\033[0m".format( + "process_id", + "camera_name", + "stream_name", + "execution_time", + "faces_detected", ) ) + print("\033[1;37m{}\033[0m".format("=" * 90)) + + # Print the data with appropriate formatting and colors + for key, value in shared_dict.items(): + print( + "\033[92m{:<11} \033[0m {:<21} {:<21} {:<15} \033[0m {:<9}".format( + key, + value["camera_name"], + value["stream_name"], + value["execution_time"], + value["faces_detected"], + ) + ) - time.sleep(0.1) # Delay for clarity + time.sleep(0.1) # Delay for clarity # Function to close threads and shared memory segments @@ -201,6 +217,7 @@ def close_threads(urls): signal.signal(signal.SIGINT, signal_handler) parser = argparse.ArgumentParser() + parser.add_argument( "-U", "--update", @@ -209,6 +226,14 @@ def close_threads(urls): help="Specify this flag to skip camera updates", ) + parser.add_argument( + "-L", + "--log", + action="store_true", + default=False, + help="Specify this flag to log on the console during the monitoring process", + ) + args = parser.parse_args() # Use a manager for shared dictionary @@ -229,15 +254,29 @@ def close_threads(urls): if camera["camera_status"] != "offline" ] + ids = [ + camera["camera_id"] + for camera in cameras + if camera["camera_status"] != "offline" + ] + # Close any existing threads close_threads(urls) - for i, (url, name) in enumerate(zip(urls, names)): - p = Process(target=process_camera, args=(i, url, name, shared_dict)) + for i, (url, name, camera_id) in enumerate(zip(urls, names, ids)): + p = Process( + target=process_camera, args=(i, url, name, camera_id, shared_dict) + ) p.start() processes.append(p) - monitor_p = Process(target=monitor_process_status, args=(shared_dict,)) + monitor_p = Process( + target=monitor_process_status, + args=( + shared_dict, + args.log, + ), + ) monitor_p.start() processes.append(monitor_p) diff --git a/storage_service.py b/storage_service.py new file mode 100644 index 0000000..ecd5d23 --- /dev/null +++ b/storage_service.py @@ -0,0 +1,46 @@ +import config +import time +import cv2 +import os + +fourcc = cv2.VideoWriter_fourcc("m", "p", "4", "v") + + +# Function to store video data +def store_video_data(frames, camera_id, fps): + """ + Store segmented and compressed video data to a designated storage location. + + Args: + frame: The frame data to be stored. + + Returns: + None + + Stores the segmented and compressed video data to the specified storage location or file system. + """ + try: + storage_path = f"C:/Users/Sebastián/Documents/Tec/SF/multiprocessing_test/storage/{camera_id}/" + current_date = time.strftime("%Y-%m-%d") + current_time = time.strftime("%H-%M-%S") + filename = f"video_{current_date}_{current_time}.mp4" + + os.makedirs(storage_path, exist_ok=True) + + out = cv2.VideoWriter( + storage_path + filename, + fourcc, + fps, + (config.FRAME_WIDTH, config.FRAME_HEIGHT), + ) + + for frame in frames: + out.write(frame) + + print(f"Video for stream {camera_id} segment stored successfully: {storage_path + filename}") + + except Exception as e: + print(f"An error occurred while storing the video segment: {e}") + + finally: + out.release() From 159952dd21ffd6ee9fc5c5d934c068388f99ec49 Mon Sep 17 00:00:00 2001 From: teapot2 Date: Tue, 7 Nov 2023 10:24:16 -0600 Subject: [PATCH 2/2] Fixed segmentation period --- config.py | 2 +- create_threads.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/config.py b/config.py index 14b82ac..b76ae59 100644 --- a/config.py +++ b/config.py @@ -2,4 +2,4 @@ FRAME_HEIGHT = 480 FRAME_SIZE_BYTES = FRAME_HEIGHT * FRAME_WIDTH * 3 -VIDEO_SEGMENTATION_INTERVAL = 20 +VIDEO_SEGMENTATION_INTERVAL = 60 diff --git a/create_threads.py b/create_threads.py index ca2a9b6..b2f0dfd 100644 --- a/create_threads.py +++ b/create_threads.py @@ -94,11 +94,13 @@ def process_camera(index, url, name, camera_id, shared_dict): shm = shared_memory.SharedMemory(name=generate_shm_stream_name(index)) try: + storage_start_time = time.time() + while True: ret, frame = cap.read() if ret: - start_time = time.time() + function_start_time = time.time() # Normalize frame to specified dimensions frame = cv2.resize(frame, (config.FRAME_WIDTH, config.FRAME_HEIGHT)) @@ -106,14 +108,14 @@ def process_camera(index, url, name, camera_id, shared_dict): frames.append(frame) # Store frames as video locally - if ( - len(frames) - % (cap.get(cv2.CAP_PROP_FPS) * config.VIDEO_SEGMENTATION_INTERVAL) - == 0 - ): + fps = cap.get(cv2.CAP_PROP_FPS) + segmentation_interval = config.VIDEO_SEGMENTATION_INTERVAL + + if (function_start_time - storage_start_time) >= config.VIDEO_SEGMENTATION_INTERVAL: print("Storing video data...") store_video_data(frames, camera_id, cap.get(cv2.CAP_PROP_FPS)) frames.clear() + storage_start_time = function_start_time # Vision processing logic goes here @@ -121,7 +123,7 @@ def process_camera(index, url, name, camera_id, shared_dict): # Update the shared dictionary with relevant information shared_dict[index] = { - "execution_time": f"{time.time() - start_time:.5f} s", + "execution_time": f"{time.time() - function_start_time:.5f} s", "camera_name": name, "stream_name": generate_shm_stream_name(index), "faces_detected": 0,