Skip to content

Commit

Permalink
Merge pull request #1 from teapot2/storage
Browse files Browse the repository at this point in the history
Storage
  • Loading branch information
teapot2 authored Nov 7, 2023
2 parents 65a2127 + 159952d commit 2eaf24e
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 33 deletions.
2 changes: 2 additions & 0 deletions config.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
FRAME_WIDTH = 720
FRAME_HEIGHT = 480
FRAME_SIZE_BYTES = FRAME_HEIGHT * FRAME_WIDTH * 3

VIDEO_SEGMENTATION_INTERVAL = 60
107 changes: 74 additions & 33 deletions create_threads.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
from multiprocessing import Process, shared_memory, Manager
from api import get_cameras, ping_cameras
import config
import cv2
from storage_service import store_video_data
import numpy as np
import argparse
import random
import time
import os
import config
import signal
import argparse
import time
import cv2
import sys
import os


# Lambda function for generating shared memory stream names based on index
generate_shm_stream_name = lambda index: f"camera_{index}_stream"
Expand Down Expand Up @@ -60,7 +62,7 @@ def signal_handler(sig, frame):


# Function for processing camera streams
def process_camera(index, url, name, shared_dict):
def process_camera(index, url, name, camera_id, shared_dict):
"""
Process the camera streams.
Expand All @@ -76,6 +78,7 @@ def process_camera(index, url, name, shared_dict):
"""

cap = cv2.VideoCapture(url)
frames = []

try:
# Create a shared memory segment for the frame
Expand All @@ -91,22 +94,36 @@ def process_camera(index, url, name, shared_dict):
shm = shared_memory.SharedMemory(name=generate_shm_stream_name(index))

try:
storage_start_time = time.time()

while True:
ret, frame = cap.read()

if ret:
start_time = time.time()
function_start_time = time.time()

# Normalize frame to specified dimensions
frame = cv2.resize(frame, (config.FRAME_WIDTH, config.FRAME_HEIGHT))

frames.append(frame)

# Store frames as video locally
fps = cap.get(cv2.CAP_PROP_FPS)
segmentation_interval = config.VIDEO_SEGMENTATION_INTERVAL

if (function_start_time - storage_start_time) >= config.VIDEO_SEGMENTATION_INTERVAL:
print("Storing video data...")
store_video_data(frames, camera_id, cap.get(cv2.CAP_PROP_FPS))
frames.clear()
storage_start_time = function_start_time

# Vision processing logic goes here

send_frame_to_shared_memory(frame, shm)

# Update the shared dictionary with relevant information
shared_dict[index] = {
"execution_time": f"{time.time() - start_time:.5f} s",
"execution_time": f"{time.time() - function_start_time:.5f} s",
"camera_name": name,
"stream_name": generate_shm_stream_name(index),
"faces_detected": 0,
Expand All @@ -130,7 +147,7 @@ def terminate_shm(shm):


# Function for monitoring the status of the camera processes
def monitor_process_status(shared_dict):
def monitor_process_status(shared_dict, log):
"""
Monitor the status of the camera processes.
Expand All @@ -144,32 +161,33 @@ def monitor_process_status(shared_dict):
"""

while True:
os.system("cls" if os.name == "nt" else "clear")

print(
"\033[1m{:<11} {:<21} {:<21} {:<15} {:<9}\033[0m".format(
"process_id",
"camera_name",
"stream_name",
"execution_time",
"faces_detected",
)
)
print("\033[1;37m{}\033[0m".format("=" * 90))
if log:
os.system("cls" if os.name == "nt" else "clear")

# Print the data with appropriate formatting and colors
for key, value in shared_dict.items():
print(
"\033[92m{:<11} \033[0m {:<21} {:<21} {:<15} \033[0m {:<9}".format(
key,
value["camera_name"],
value["stream_name"],
value["execution_time"],
value["faces_detected"],
"\033[1m{:<11} {:<21} {:<21} {:<15} {:<9}\033[0m".format(
"process_id",
"camera_name",
"stream_name",
"execution_time",
"faces_detected",
)
)
print("\033[1;37m{}\033[0m".format("=" * 90))

# Print the data with appropriate formatting and colors
for key, value in shared_dict.items():
print(
"\033[92m{:<11} \033[0m {:<21} {:<21} {:<15} \033[0m {:<9}".format(
key,
value["camera_name"],
value["stream_name"],
value["execution_time"],
value["faces_detected"],
)
)

time.sleep(0.1) # Delay for clarity
time.sleep(0.1) # Delay for clarity


# Function to close threads and shared memory segments
Expand Down Expand Up @@ -201,6 +219,7 @@ def close_threads(urls):
signal.signal(signal.SIGINT, signal_handler)

parser = argparse.ArgumentParser()

parser.add_argument(
"-U",
"--update",
Expand All @@ -209,6 +228,14 @@ def close_threads(urls):
help="Specify this flag to skip camera updates",
)

parser.add_argument(
"-L",
"--log",
action="store_true",
default=False,
help="Specify this flag to log on the console during the monitoring process",
)

args = parser.parse_args()

# Use a manager for shared dictionary
Expand All @@ -229,15 +256,29 @@ def close_threads(urls):
if camera["camera_status"] != "offline"
]

ids = [
camera["camera_id"]
for camera in cameras
if camera["camera_status"] != "offline"
]

# Close any existing threads
close_threads(urls)

for i, (url, name) in enumerate(zip(urls, names)):
p = Process(target=process_camera, args=(i, url, name, shared_dict))
for i, (url, name, camera_id) in enumerate(zip(urls, names, ids)):
p = Process(
target=process_camera, args=(i, url, name, camera_id, shared_dict)
)
p.start()
processes.append(p)

monitor_p = Process(target=monitor_process_status, args=(shared_dict,))
monitor_p = Process(
target=monitor_process_status,
args=(
shared_dict,
args.log,
),
)
monitor_p.start()
processes.append(monitor_p)

Expand Down
46 changes: 46 additions & 0 deletions storage_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import config
import time
import cv2
import os

fourcc = cv2.VideoWriter_fourcc("m", "p", "4", "v")


# Function to store video data
def store_video_data(frames, camera_id, fps):
"""
Store segmented and compressed video data to a designated storage location.
Args:
frame: The frame data to be stored.
Returns:
None
Stores the segmented and compressed video data to the specified storage location or file system.
"""
try:
storage_path = f"C:/Users/Sebastián/Documents/Tec/SF/multiprocessing_test/storage/{camera_id}/"
current_date = time.strftime("%Y-%m-%d")
current_time = time.strftime("%H-%M-%S")
filename = f"video_{current_date}_{current_time}.mp4"

os.makedirs(storage_path, exist_ok=True)

out = cv2.VideoWriter(
storage_path + filename,
fourcc,
fps,
(config.FRAME_WIDTH, config.FRAME_HEIGHT),
)

for frame in frames:
out.write(frame)

print(f"Video for stream {camera_id} segment stored successfully: {storage_path + filename}")

except Exception as e:
print(f"An error occurred while storing the video segment: {e}")

finally:
out.release()

0 comments on commit 2eaf24e

Please sign in to comment.