diff --git a/examples/official/9.x/webcam/README.md b/examples/official/9.x/webcam/README.md new file mode 100644 index 0000000..044ebad --- /dev/null +++ b/examples/official/9.x/webcam/README.md @@ -0,0 +1,46 @@ +# Reading Barcodes and QR Codes Using Webcam, Python, and OpenCV +This repository provides samples demonstrating how to create a simple barcode and QR code reader using a webcam in Python. The OpenCV Stitcher API is utilized to stitch multiple barcode and QR code results together. + +## License Activation +To activate the [Dynamsoft Barcode Reader SDK](https://www.dynamsoft.com/barcode-reader/sdk-desktop-server/), obtain a desktop license key from [here](https://www.dynamsoft.com/customer/license/trialLicense?product=dbr): + +```python +BarcodeReader.init_license("LICENSE-KEY") +``` + +## Installation +Install the required dependencies using pip: + +``` +pip install opencv-python dbr +``` + +## Examples + +- [scanner.py](./scanner.py) + + Use your webcam to scan barcodes and QR codes in real-time. + + ![Python barcode and QR code reader](https://www.dynamsoft.com/codepool/img/2022/04/multiple-barcode-qrcode-scan.png) + +- [stitcher.py](./stitcher.py) + + Move the camera closer to scan barcodes and QR codes with higher precision, and stitch them into a panorama image. + + ![Python barcode and QR code reader with panorama stitching](https://www.dynamsoft.com/codepool/img/2022/04/panorama-barcode-qr-code.png) + +- [barcode_based_panorama.py](./barcode_based_panorama.py) + Concatenate images based on barcode and QR code detection results, without using any advanced image processing algorithms. + + ![concatenate barcode and QR code images](./output.png) + +- [barcode_reader.py](./barcode_reader.py) + Read barcodes and QR codes from image files: + + ```bash + python barcode_reader.py + ``` + +## Blog +[Scanning Barcode and QR Code Using Webcam, OpenCV and Python](https://www.dynamsoft.com/codepool/opencv-python-webcam-barcode-reader.html) + diff --git a/examples/official/9.x/webcam/barcode_based_panorama.png b/examples/official/9.x/webcam/barcode_based_panorama.png new file mode 100644 index 0000000..773855b Binary files /dev/null and b/examples/official/9.x/webcam/barcode_based_panorama.png differ diff --git a/examples/official/9.x/webcam/barcode_based_panorama.py b/examples/official/9.x/webcam/barcode_based_panorama.py new file mode 100644 index 0000000..1626241 --- /dev/null +++ b/examples/official/9.x/webcam/barcode_based_panorama.py @@ -0,0 +1,269 @@ +from __future__ import print_function +import re + +import numpy as np +import cv2 as cv + +from multiprocessing.pool import ThreadPool +from collections import deque + +import dbr +from dbr import * + +import time +from util import * + +BarcodeReader.init_license("DLS2eyJoYW5kc2hha2VDb2RlIjoiMjAwMDAxLTE2NDk4Mjk3OTI2MzUiLCJvcmdhbml6YXRpb25JRCI6IjIwMDAwMSIsInNlc3Npb25QYXNzd29yZCI6IndTcGR6Vm05WDJrcEQ5YUoifQ==") + +class ScanManager: + MODE_AUTO_STITCH = 0 + MODE_MANUAL_STITCH = 1 + MODE_CAMERA_ONLY = 2 + + def __init__(self): + modes = (cv.Stitcher_PANORAMA, cv.Stitcher_SCANS) + self.stitcher = cv.Stitcher.create(modes[1]) + self.stitcher.setPanoConfidenceThresh(0.1) + self.panorama = [] + self.isPanoramaDone = False + self.reader = BarcodeReader() + + def count_barcodes(self, frame): + try: + results = self.reader.decode_buffer(frame) + return len(results) + except BarcodeReaderError as e: + print(e) + + return 0 + + def save_frame(self, frame): + # frame = self.frame_overlay(frame) + filename = str(time.time()) + "_panorama.jpg" + cv.imwrite(filename, frame) + print("Saved to " + filename) + + def frame_overlay(self, frame): + frame_cp = frame.copy() + try: + results = self.reader.decode_buffer(frame_cp) + if results != None: + for result in results: + points = result.localization_result.localization_points + cv.line(frame_cp, points[0], points[1], (0,255,0), 2) + cv.line(frame_cp, points[1], points[2], (0,255,0), 2) + cv.line(frame_cp, points[2], points[3], (0,255,0), 2) + cv.line(frame_cp, points[3], points[0], (0,255,0), 2) + cv.putText(frame_cp, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + return frame_cp + except BarcodeReaderError as e: + print(e) + return None + + def stitch_frame(self, frame): + try: + results = self.reader.decode_buffer(frame) + if results != None: + # Draw results on the copy of the frame. Keep original frame clean. + frame_cp = frame.copy() + for result in results: + points = result.localization_result.localization_points + cv.line(frame_cp, points[0], points[1], (0,255,0), 2) + cv.line(frame_cp, points[1], points[2], (0,255,0), 2) + cv.line(frame_cp, points[2], points[3], (0,255,0), 2) + cv.line(frame_cp, points[3], points[0], (0,255,0), 2) + cv.putText(frame_cp, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + # Save frame and barcode info if panorama is empty + if len(self.panorama) == 0: + self.panorama.append((frame, results, frame_cp)) + else: + # Compare results. If there is an intersection, transform and stitch. Otherwise, discard. + preFrame = self.panorama[0][0] + preResults = self.panorama[0][1] + preFrameCp = self.panorama[0][2] + + while len(results) > 0: + result = results.pop() + for preResult in preResults: + if preResult.barcode_text == result.barcode_text and preResult.barcode_format == result.barcode_format: + prePoints = preResult.localization_result.localization_points + # preContour = np.array([prePoints[0], prePoints[1], prePoints[2], prePoints[3]]) + # preArea = cv.minAreaRect(preContour) + # preAreaSize = preArea[1][0] * preArea[1][1] + # preBounding = cv.boxPoints(preArea) + + points = result.localization_result.localization_points + + # # Crop image based on min area rect + preFrame = preFrame[0: preFrame.shape[0], 0: max(prePoints[0][0], prePoints[1][0], prePoints[2][0], prePoints[3][0]) + 10] + frame = frame[0: frame.shape[0], max(points[0][0], points[1][0], points[2][0], points[3][0]): frame.shape[1] + 10] + + preFrameCp = preFrameCp[0: preFrameCp.shape[0], 0: max(prePoints[0][0], prePoints[1][0], prePoints[2][0], prePoints[3][0]) + 10] + frame_cp = frame_cp[0: frame_cp.shape[0], max(points[0][0], points[1][0], points[2][0], points[3][0]): frame_cp.shape[1] + 10] + + # # Stitch images + frame = concat_images([preFrame, frame]) + frame_cp = concat_images([preFrameCp, frame_cp]) + + # Re-detect barcodes from the new image + results = self.reader.decode_buffer(frame) + + # Save results + self.panorama = [(frame, results, frame_cp)] + return frame, frame_cp + + return self.panorama[0][0], self.panorama[0][2] + + except BarcodeReaderError as e: + print(e) + return None, None + + return None, None + + + def process_frame(self, frame): + results = None + try: + results = self.reader.decode_buffer(frame) + except BarcodeReaderError as bre: + print(bre) + + return results + + def clean_deque(self, tasks): + while len(tasks) > 0: + tasks.popleft() + + def close_window(self, window_name): + try: + cv.destroyWindow(window_name) + except: + pass + + def run(self): + import sys + try: + fn = sys.argv[1] + except: + fn = 0 + cap = cv.VideoCapture(fn) + + threadn = 1 # cv.getNumberOfCPUs() + barcodePool = ThreadPool(processes = threadn) + panoramaPool = ThreadPool(processes = threadn) + cameraTasks = deque() + panoramaTask = deque() + mode = self.MODE_CAMERA_ONLY + image = None + imageCp = None + panoramaImage = None + panoramaImageCp = None + + while True: + ret, frame = cap.read() + frame_cp = frame.copy() + cv.putText(frame, 'A: auto pano, M: manual pano, C: capture, O: camera, S: stop', (10, 20), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + cv.putText(frame, 'Barcode & QR Code Scanning ...', (10, 50), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0)) + + # Scan and show barcode & QR code results + while len(cameraTasks) > 0 and cameraTasks[0].ready(): + results = cameraTasks.popleft().get() + if results != None: + for result in results: + points = result.localization_result.localization_points + cv.line(frame, points[0], points[1], (0,255,0), 2) + cv.line(frame, points[1], points[2], (0,255,0), 2) + cv.line(frame, points[2], points[3], (0,255,0), 2) + cv.line(frame, points[3], points[0], (0,255,0), 2) + cv.putText(frame, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + if len(cameraTasks) < threadn: + task = barcodePool.apply_async(self.process_frame, (frame_cp, )) + cameraTasks.append(task) + + # Stitch images for panorama + if mode == self.MODE_MANUAL_STITCH: + cv.putText(frame, 'Manual Panorama ...', (10, 70), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0)) + elif mode == self.MODE_AUTO_STITCH: + cv.putText(frame, 'Auto Panorama ...', (10, 70), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0)) + if not self.isPanoramaDone and len(panoramaTask) < threadn: + task = panoramaPool.apply_async(self.stitch_frame, (frame_cp, )) + panoramaTask.append(task) + + if mode == self.MODE_MANUAL_STITCH or mode == self.MODE_AUTO_STITCH: + while len(panoramaTask) > 0 and panoramaTask[0].ready(): + image, imageCp = panoramaTask.popleft().get() + if image is not None: + panoramaImage = image.copy() + panoramaImageCp = imageCp.copy() + cv.imshow('panorama', panoramaImageCp) + + # Key events + ch = cv.waitKey(1) + if ch == 27: + break + if ord('o') == ch: + self.close_window('panorama') + self.isPanoramaDone = True + mode = self.MODE_CAMERA_ONLY + self.clean_deque(panoramaTask) + elif ord('a') == ch: + self.close_window('panorama') + self.isPanoramaDone = False + mode = self.MODE_AUTO_STITCH + self.clean_deque(panoramaTask) + self.panorama = [] + elif ord('m') == ch: + self.close_window('panorama') + self.isPanoramaDone = False + mode = self.MODE_MANUAL_STITCH + self.clean_deque(panoramaTask) + self.panorama = [] + elif ord('c') == ch and mode == self.MODE_MANUAL_STITCH and not self.isPanoramaDone: + if len(panoramaTask) < threadn: + task = panoramaPool.apply_async(self.stitch_frame, (frame_cp, )) + panoramaTask.append(task) + ################################################### Test image operations + elif ord('x') == ch: + if panoramaImageCp is not None: + panoramaImageCp = shiftX(panoramaImageCp, 5) + cv.imshow('panorama', panoramaImageCp) + elif ord('t') == ch: + if panoramaImageCp is not None: + panoramaImageCp = concat_images([panoramaImageCp, frame]) + cv.imshow('panorama', panoramaImageCp) + elif ord('y') == ch: + if panoramaImageCp is not None: + panoramaImageCp = shiftY(panoramaImageCp, 5) + cv.imshow('panorama', panoramaImageCp) + elif ord('z') == ch: + if panoramaImageCp is not None: + panoramaImageCp = zoom_image(panoramaImageCp, 2) + cv.imshow('panorama', panoramaImageCp) + elif ord('r') == ch: + if panoramaImageCp is not None: + panoramaImageCp = rotate_image(panoramaImageCp, 1) + cv.imshow('panorama', panoramaImageCp) + ################################################### + + # Quit panorama mode + if self.isPanoramaDone: + self.close_window('panorama') + mode = self.MODE_CAMERA_ONLY + self.clean_deque(panoramaTask) + self.isPanoramaDone = False + if panoramaImage is not None: + self.save_frame(panoramaImage) + panoramaImage = None + + cv.imshow('Barcode & QR Code Scanner', frame) + + cv.destroyAllWindows() + print('Done') + + +if __name__ == '__main__': + ScanManager().run() + diff --git a/examples/official/9.x/webcam/barcode_reader.py b/examples/official/9.x/webcam/barcode_reader.py new file mode 100644 index 0000000..b732348 --- /dev/null +++ b/examples/official/9.x/webcam/barcode_reader.py @@ -0,0 +1,46 @@ +import numpy as np +import cv2 as cv + +from multiprocessing.pool import ThreadPool +from collections import deque + +import dbr +from dbr import * + +import time + +BarcodeReader.init_license("DLS2eyJoYW5kc2hha2VDb2RlIjoiMjAwMDAxLTE2NDk4Mjk3OTI2MzUiLCJvcmdhbml6YXRpb25JRCI6IjIwMDAwMSIsInNlc3Npb25QYXNzd29yZCI6IndTcGR6Vm05WDJrcEQ5YUoifQ==") +reader = BarcodeReader() +reader.init_runtime_settings_with_string("{\"ImageParameter\":{\"Name\":\"BestCoverage\",\"DeblurLevel\":9,\"ExpectedBarcodesCount\":512,\"ScaleDownThreshold\":100000,\"LocalizationModes\":[{\"Mode\":\"LM_CONNECTED_BLOCKS\"},{\"Mode\":\"LM_SCAN_DIRECTLY\"},{\"Mode\":\"LM_STATISTICS\"},{\"Mode\":\"LM_LINES\"},{\"Mode\":\"LM_STATISTICS_MARKS\"}],\"GrayscaleTransformationModes\":[{\"Mode\":\"GTM_ORIGINAL\"},{\"Mode\":\"GTM_INVERTED\"}]}}") + +def main(): + import sys + try: + filename = sys.argv[1] + except: + filename = '' + + if filename == '': + print('Usage: python3 barcode-reader.py ') + exit(1) + + frame = cv.imread(filename) + results = reader.decode_buffer(frame) + for result in results: + points = result.localization_result.localization_points + cv.line(frame, points[0], points[1], (0,255,0), 2) + cv.line(frame, points[1], points[2], (0,255,0), 2) + cv.line(frame, points[2], points[3], (0,255,0), 2) + cv.line(frame, points[3], points[0], (0,255,0), 2) + cv.putText(frame, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + cv.imshow('Barcode & QR Code Reader', frame) + cv.waitKey(0) + + cv.imwrite('output.png', frame) + print('Saved to output.png') + + +if __name__ == '__main__': + main() + cv.destroyAllWindows() \ No newline at end of file diff --git a/examples/official/9.x/webcam/multi_code_stitch.py b/examples/official/9.x/webcam/multi_code_stitch.py new file mode 100644 index 0000000..289ea7a --- /dev/null +++ b/examples/official/9.x/webcam/multi_code_stitch.py @@ -0,0 +1,230 @@ +import numpy as np +import cv2 as cv + +from multiprocessing.pool import ThreadPool +from collections import deque + +import dbr +from dbr import * + +import time +# from util import * + +BarcodeReader.init_license( + "DLS2eyJoYW5kc2hha2VDb2RlIjoiMjAwMDAxLTE2NDk4Mjk3OTI2MzUiLCJvcmdhbml6YXRpb25JRCI6IjIwMDAwMSIsInNlc3Npb25QYXNzd29yZCI6IndTcGR6Vm05WDJrcEQ5YUoifQ==") + +def transform_point(points, matrix): + """Transforms the point using the given matrix.""" + all = [] + for point in points: + x, y = point + transformed = np.dot(matrix, [x, y, 1]) + + all.append((int(transformed[0] / transformed[2]), int(transformed[1] / transformed[2]))) + + return all + +def perspective_correction(img, pts): + # Define the 4 points where the image will be warped to. + # For a typical use-case, this would be a rectangle. + # We'll use the dimensions of the input image, but you can adjust this + # as needed to set the dimensions of the output image. + rect = np.array([ + [0, 0], + [img.shape[1] - 1, 0], + [img.shape[1] - 1, img.shape[0] - 1], + [0, img.shape[0] - 1] + ], dtype="float32") + + # Compute the perspective transform matrix + matrix = cv.getPerspectiveTransform(pts, rect) + + # Perform the perspective warp + warped = cv.warpPerspective(img, matrix, (img.shape[1], img.shape[0])) + + return warped + +class ScanManager: + def __init__(self): + self.multicode = [] + self.ismulticodeDone = False + self.reader = BarcodeReader() + + def count_barcodes(self, frame): + try: + results = self.reader.decode_buffer(frame) + return len(results) + except BarcodeReaderError as e: + print(e) + + return 0 + + def save_frame(self, frame): + filename = str(time.time()) + "_multicode.jpg" + cv.imwrite(filename, frame) + print("Saved to " + filename) + + def stitch_frame(self, frame): + try: + results = self.reader.decode_buffer(frame) + if results != None: + if len(self.multicode) == 0: + self.multicode = [frame, results, frame.copy()] + else: + preResults = self.multicode[1] + matrix = None + newResults = [] + isFirstTime = True + while len(results) > 0: + result = results.pop() + isExisted = False + for preResult in preResults: + if preResult.barcode_text == result.barcode_text and preResult.barcode_format == result.barcode_format: + isExisted = True + prePoints = preResult.localization_result.localization_points + points = result.localization_result.localization_points + if isFirstTime: + isFirstTime = False + matrix = cv.getPerspectiveTransform(np.array([ + points[0], + points[1], + points[2], + points[3] + ], dtype="float32"), np.array([ + prePoints[0], + prePoints[1], + prePoints[2], + prePoints[3] + ], dtype="float32")) + break + + if not isExisted: + newResults.append(result) + + if len(newResults) > 0: + try: + for newResult in newResults: + points = newResult.localization_result.localization_points + points = transform_point(points, matrix) + newResult.localization_result.localization_points = points + preResults.extend([newResult]) + + self.multicode = [self.multicode[0], + preResults, self.multicode[2]] + except Exception as e: + return None + + for result in self.multicode[1]: + points = result.localization_result.localization_points + cv.line(self.multicode[2], points[0], + points[1], (0, 255, 0), 2) + cv.line(self.multicode[2], points[1], + points[2], (0, 255, 0), 2) + cv.line(self.multicode[2], points[2], + points[3], (0, 255, 0), 2) + cv.line(self.multicode[2], points[3], + points[0], (0, 255, 0), 2) + cv.putText(self.multicode[2], result.barcode_text, + points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255)) + + return self.multicode[2] + + except BarcodeReaderError as e: + print(e) + + return None + + def process_frame(self, frame): + results = None + try: + results = self.reader.decode_buffer(frame) + except BarcodeReaderError as bre: + print(bre) + + return results + + def clean_deque(self, tasks): + while len(tasks) > 0: + tasks.popleft() + + def close_window(self, window_name): + try: + cv.destroyWindow(window_name) + except: + pass + + def run(self): + import sys + try: + fn = sys.argv[1] + except: + fn = 0 + cap = cv.VideoCapture(0) + + threadn = 1 # cv.getNumberOfCPUs() + barcodePool = ThreadPool(processes=threadn) + stitchingPool = ThreadPool(processes=threadn) + cameraTasks = deque() + codeStitchingTask = deque() + stitching = False + save = False + + while True: + ret, frame = cap.read() + frame_cp = frame.copy() + cv.putText(frame, 's: save / r: reset / c: capture / ESC: stop', (10, 20), + cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255)) + cv.putText(frame, 'Multi Code Scanning ...', + (10, 50), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 255, 0)) + + # Scan and show Multi Code results + while len(cameraTasks) > 0 and cameraTasks[0].ready(): + results = cameraTasks.popleft().get() + if results != None: + for result in results: + points = result.localization_result.localization_points + cv.line(frame, points[0], points[1], (0, 255, 0), 2) + cv.line(frame, points[1], points[2], (0, 255, 0), 2) + cv.line(frame, points[2], points[3], (0, 255, 0), 2) + cv.line(frame, points[3], points[0], (0, 255, 0), 2) + cv.putText( + frame, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255)) + + while len(codeStitchingTask) > 0 and codeStitchingTask[0].ready(): + stitchedImage = codeStitchingTask.popleft().get() + if stitchedImage is not None: + cv.imshow('Multi Code', stitchedImage) + if save: + save = False + self.save_frame(stitchedImage) + + if len(cameraTasks) < threadn: + task = barcodePool.apply_async( + self.process_frame, (frame_cp, )) + cameraTasks.append(task) + + # Key events + ch = cv.waitKey(10) + if ch == 27: + break + elif ord('s') == ch: + save = True + elif ord('r') == ch: + stitching = False + cv.destroyWindow('Multi Code') + self.multicode = [] + elif ord('c') == ch or stitching: + stitching = True + if len(codeStitchingTask) < threadn: + task = stitchingPool.apply_async( + self.stitch_frame, (frame_cp, )) + codeStitchingTask.append(task) + + cv.imshow('Multi Code Scanner', frame) + + cv.destroyAllWindows() + print('Done') + + +if __name__ == '__main__': + ScanManager().run() diff --git a/examples/official/9.x/webcam/output.png b/examples/official/9.x/webcam/output.png new file mode 100644 index 0000000..16e549e Binary files /dev/null and b/examples/official/9.x/webcam/output.png differ diff --git a/examples/official/9.x/webcam/scanner.py b/examples/official/9.x/webcam/scanner.py new file mode 100644 index 0000000..eb5e6b4 --- /dev/null +++ b/examples/official/9.x/webcam/scanner.py @@ -0,0 +1,63 @@ +import numpy as np +import cv2 as cv + +from multiprocessing.pool import ThreadPool +from collections import deque + +import dbr +from dbr import * + +import time + +BarcodeReader.init_license("DLS2eyJoYW5kc2hha2VDb2RlIjoiMjAwMDAxLTE2NDk4Mjk3OTI2MzUiLCJvcmdhbml6YXRpb25JRCI6IjIwMDAwMSIsInNlc3Npb25QYXNzd29yZCI6IndTcGR6Vm05WDJrcEQ5YUoifQ==") +reader = BarcodeReader() + +def process_frame(frame): + results = None + try: + results = reader.decode_buffer(frame) + except BarcodeReaderError as bre: + print(bre) + + return results + +def main(): + import sys + try: + fn = sys.argv[1] + except: + fn = 0 + cap = cv.VideoCapture(fn) + + threadn = 1 # cv.getNumberOfCPUs() + pool = ThreadPool(processes = threadn) + barcodeTasks = deque() + + while True: + ret, frame = cap.read() + while len(barcodeTasks) > 0 and barcodeTasks[0].ready(): + results = barcodeTasks.popleft().get() + if results != None: + for result in results: + points = result.localization_result.localization_points + cv.line(frame, points[0], points[1], (0,255,0), 2) + cv.line(frame, points[1], points[2], (0,255,0), 2) + cv.line(frame, points[2], points[3], (0,255,0), 2) + cv.line(frame, points[3], points[0], (0,255,0), 2) + cv.putText(frame, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + if len(barcodeTasks) < threadn: + task = pool.apply_async(process_frame, (frame.copy(), )) + barcodeTasks.append(task) + + cv.imshow('Barcode & QR Code Scanner', frame) + ch = cv.waitKey(1) + if ch == 27: + break + + print('Done') + + +if __name__ == '__main__': + main() + cv.destroyAllWindows() diff --git a/examples/official/9.x/webcam/stitcher.py b/examples/official/9.x/webcam/stitcher.py new file mode 100644 index 0000000..92785ba --- /dev/null +++ b/examples/official/9.x/webcam/stitcher.py @@ -0,0 +1,233 @@ +from __future__ import print_function + +import numpy as np +import cv2 as cv + +from multiprocessing.pool import ThreadPool +from collections import deque + +import dbr +from dbr import * + +import time + +BarcodeReader.init_license("DLS2eyJoYW5kc2hha2VDb2RlIjoiMjAwMDAxLTE2NDk4Mjk3OTI2MzUiLCJvcmdhbml6YXRpb25JRCI6IjIwMDAwMSIsInNlc3Npb25QYXNzd29yZCI6IndTcGR6Vm05WDJrcEQ5YUoifQ==") + +class ScanManager: + MODE_AUTO_STITCH = 0 + MODE_MANUAL_STITCH = 1 + MODE_CAMERA_ONLY = 2 + + def __init__(self): + modes = (cv.Stitcher_PANORAMA, cv.Stitcher_SCANS) + self.stitcher = cv.Stitcher.create(modes[1]) + self.stitcher.setPanoConfidenceThresh(0.1) + self.panorama = [] + self.isPanoramaDone = False + self.reader = BarcodeReader() + + def count_barcodes(self, frame): + try: + results = self.reader.decode_buffer(frame) + return len(results) + except BarcodeReaderError as e: + print(e) + + return 0 + + def save_frame(self, frame): + # frame = self.frame_overlay(frame) + filename = str(time.time()) + "_panorama.jpg" + cv.imwrite(filename, frame) + print("Saved to " + filename) + + def frame_overlay(self, frame): + frame_cp = frame.copy() + try: + results = self.reader.decode_buffer(frame_cp) + if results != None: + for result in results: + points = result.localization_result.localization_points + cv.line(frame_cp, points[0], points[1], (0,255,0), 2) + cv.line(frame_cp, points[1], points[2], (0,255,0), 2) + cv.line(frame_cp, points[2], points[3], (0,255,0), 2) + cv.line(frame_cp, points[3], points[0], (0,255,0), 2) + cv.putText(frame_cp, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + return frame_cp + except BarcodeReaderError as e: + print(e) + return None + + def stitch_frame(self, frame): + try: + results = self.reader.decode_buffer(frame) + if results != None: + for result in results: + points = result.localization_result.localization_points + cv.line(frame, points[0], points[1], (0,255,0), 2) + cv.line(frame, points[1], points[2], (0,255,0), 2) + cv.line(frame, points[2], points[3], (0,255,0), 2) + cv.line(frame, points[3], points[0], (0,255,0), 2) + cv.putText(frame, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + self.panorama.append((frame, len(results))) + print('Stitching .............') + try: + all_images = [frame for frame, count in self.panorama] + status, image = self.stitcher.stitch(all_images) + + if status != cv.Stitcher_OK: + print("Can't stitch images, error code = %d" % status) + return self.panorama[0][0] + else: + # Stop stitching if the output image is out of control + if image.shape[0] >= frame.shape[0] * 1.5: + self.isPanoramaDone = True + self.save_frame(all_images[0]) + print('Stitching is done.............') + return None + + # Drop the stitched image if its quality is not good enough + total = 0 + for frame, count in self.panorama: + total += count + + count_stitch = self.count_barcodes(image) + if count_stitch > total or count_stitch < self.panorama[0][1]: + return self.panorama[0][0] + + # Wait for the next stitching and return the current stitched image + self.panorama = [(image, count_stitch)] + return image + except Exception as e: + print(e) + return None + + except BarcodeReaderError as e: + print(e) + return None + + return None + + + def process_frame(self, frame): + results = None + try: + results = self.reader.decode_buffer(frame) + except BarcodeReaderError as bre: + print(bre) + + return results + + def clean_deque(self, tasks): + while len(tasks) > 0: + tasks.popleft() + + def close_window(self, window_name): + try: + cv.destroyWindow(window_name) + except: + pass + + def run(self): + import sys + try: + fn = sys.argv[1] + except: + fn = 0 + cap = cv.VideoCapture(fn) + + threadn = 1 # cv.getNumberOfCPUs() + barcodePool = ThreadPool(processes = threadn) + panoramaPool = ThreadPool(processes = threadn) + cameraTasks = deque() + panoramaTask = deque() + mode = self.MODE_CAMERA_ONLY + image = None + + while True: + ret, frame = cap.read() + frame_cp = frame.copy() + cv.putText(frame, 'A: auto pano, M: manual pano, C: capture, O: camera, S: stop', (10, 20), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + cv.putText(frame, 'Barcode & QR Code Scanning ...', (10, 50), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0)) + + # Scan and show barcode & QR code results + while len(cameraTasks) > 0 and cameraTasks[0].ready(): + results = cameraTasks.popleft().get() + if results != None: + for result in results: + points = result.localization_result.localization_points + cv.line(frame, points[0], points[1], (0,255,0), 2) + cv.line(frame, points[1], points[2], (0,255,0), 2) + cv.line(frame, points[2], points[3], (0,255,0), 2) + cv.line(frame, points[3], points[0], (0,255,0), 2) + cv.putText(frame, result.barcode_text, points[0], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,0,255)) + + if len(cameraTasks) < threadn: + task = barcodePool.apply_async(self.process_frame, (frame_cp, )) + cameraTasks.append(task) + + # Stitch images for panorama + if mode == self.MODE_MANUAL_STITCH: + cv.putText(frame, 'Manual Panorama ...', (10, 70), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0)) + elif mode == self.MODE_AUTO_STITCH: + cv.putText(frame, 'Auto Panorama ...', (10, 70), cv.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0)) + if not self.isPanoramaDone and len(panoramaTask) < threadn: + task = panoramaPool.apply_async(self.stitch_frame, (frame_cp, )) + panoramaTask.append(task) + + if mode == self.MODE_MANUAL_STITCH or mode == self.MODE_AUTO_STITCH: + while len(panoramaTask) > 0 and panoramaTask[0].ready(): + image = panoramaTask.popleft().get() + if image is not None: + cv.imshow('panorama', image) + + # Key events + ch = cv.waitKey(1) + if ch == 27: + break + if ord('o') == ch: + self.close_window('panorama') + self.isPanoramaDone = True + mode = self.MODE_CAMERA_ONLY + self.clean_deque(panoramaTask) + elif ord('a') == ch: + self.close_window('panorama') + self.isPanoramaDone = False + mode = self.MODE_AUTO_STITCH + self.clean_deque(panoramaTask) + self.panorama = [] + elif ord('m') == ch: + self.close_window('panorama') + self.isPanoramaDone = False + mode = self.MODE_MANUAL_STITCH + self.clean_deque(panoramaTask) + self.panorama = [] + elif ord('c') == ch and mode == self.MODE_MANUAL_STITCH and not self.isPanoramaDone: + if len(panoramaTask) < threadn: + task = panoramaPool.apply_async(self.stitch_frame, (frame_cp, )) + panoramaTask.append(task) + elif ord('s') == ch: + print('Stitching is done.............') + self.isPanoramaDone = True + + # Quit panorama mode + if self.isPanoramaDone: + self.close_window('panorama') + mode = self.MODE_CAMERA_ONLY + self.clean_deque(panoramaTask) + self.isPanoramaDone = False + if image is not None: + self.save_frame(image) + image = None + + cv.imshow('Barcode & QR Code Scanner', frame) + + cv.destroyAllWindows() + print('Done') + + +if __name__ == '__main__': + ScanManager().run() + diff --git a/examples/official/9.x/webcam/util.py b/examples/official/9.x/webcam/util.py new file mode 100644 index 0000000..3fe83e4 --- /dev/null +++ b/examples/official/9.x/webcam/util.py @@ -0,0 +1,99 @@ +import numpy as np +import cv2 as cv +from scipy import ndimage +import math + +# https://stackoverflow.com/questions/19068085/shift-image-content-with-opencv +def shiftX(image, shift): + for i in range(image.shape[1] -1, image.shape[1] - shift, -1): + image = np.roll(image, -1, axis=1) + image[:, -1] = 0 + + return image + +def shiftY(image, shift): + for i in range(image.shape[0] -1, image.shape[0] - shift, -1): + image = np.roll(image, -1, axis=0) + image[-1, :] = 0 + + return image + +def is_moving(current_frame, previous_frame): + frame_diff = cv.absdiff(current_frame, previous_frame) + + diff_gray = cv.cvtColor(frame_diff, cv.COLOR_BGR2GRAY) + + diff_blur = cv.GaussianBlur(diff_gray, (5,5), 0) + + _, thresh = cv.threshold(diff_blur, 20, 255, cv.THRESH_BINARY) + + thresh = cv.dilate(thresh, None, iterations=2) + + contours, _ = cv.findContours(thresh, cv.RETR_TREE, cv.CHAIN_APPROX_SIMPLE) + + total = 0 + for contour in contours: + x, y, w, h = cv.boundingRect(contour) + total += cv.contourArea(contour) + + if total > (current_frame.shape[0] * current_frame.shape[1] * 0.5): + return True + + return False + +def rotate_image(image, angle, center = None, scale = 1.0): + rotated = ndimage.rotate(image, angle) + rotated = rotated[0: image.shape[0], 0: image.shape[1]] + return rotated + +def concat_images(images, axis=1): + if axis == 0: + return np.concatenate(images, axis=0) + elif axis == 1: + return np.concatenate(images, axis=1) + else: + raise ValueError('axis must be 0 or 1') + +def rotate_point(point, rotationDiff): + x = (point[0] * math.cos(rotationDiff)) - (point[1] * math.sin(rotationDiff)) + y = (point[0] * math.sin(rotationDiff)) + (point[1] * math.cos(rotationDiff)) + return (x, y) + +def zoom_image(image, zoom_factor): + height, width = image.shape[:2] + new_height = int(height * zoom_factor) + new_width = int(width * zoom_factor) + zoomed = cv.resize(image, (new_width, new_height)) + zoomed = zoomed[0: image.shape[0], 0: image.shape[1]] + return zoomed + +def transform_point(points, matrix): + """Transforms the point using the given matrix.""" + all = [] + for point in points: + x, y = point + transformed = np.dot(matrix, [x, y, 1]) + + all.append((int(transformed[0] / transformed[2]), int(transformed[1] / transformed[2]))) + + return all + +def perspective_correction(img, pts): + # Define the 4 points where the image will be warped to. + # For a typical use-case, this would be a rectangle. + # We'll use the dimensions of the input image, but you can adjust this + # as needed to set the dimensions of the output image. + rect = np.array([ + [0, 0], + [img.shape[1] - 1, 0], + [img.shape[1] - 1, img.shape[0] - 1], + [0, img.shape[0] - 1] + ], dtype="float32") + + # Compute the perspective transform matrix + matrix = cv.getPerspectiveTransform(pts, rect) + + # Perform the perspective warp + warped = cv.warpPerspective(img, matrix, (img.shape[1], img.shape[0])) + + return warped \ No newline at end of file