Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add: New CLI for downloading CPE match strings #47

Merged
merged 7 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion greenbone/scap/cpe/cli/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -424,10 +424,15 @@ async def download(console: Console, error_console: Console) -> None:
)

if run_time_file:
if until:
run_time = until
else:
run_time = datetime.now()
# ensure directories exist
run_time_file.parent.mkdir(parents=True, exist_ok=True)
run_time_file.write_text(
f"{until.isoformat()}\n", encoding="utf8" # type: ignore
f"{run_time.isoformat()}\n",
encoding="utf8", # type: ignore
)
console.log(f"Wrote run time to {run_time_file.absolute()}.")

Expand Down
3 changes: 3 additions & 0 deletions greenbone/scap/cpe_match/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
3 changes: 3 additions & 0 deletions greenbone/scap/cpe_match/cli/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later
68 changes: 68 additions & 0 deletions greenbone/scap/cpe_match/cli/db_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from argparse import ArgumentParser, Namespace
from typing import Sequence

import shtab
from rich.progress import Progress

from ...cli import CLIRunner
from ...cpe_match.cli.processor import CpeMatchProcessor
from ..producer.nvd_api import CpeMatchNvdApiProducer
from ..worker.db import CpeMatchDatabaseWriteWorker


def parse_args(args: Sequence[str] | None = None) -> Namespace:
parser = ArgumentParser(
description="Update a CPE match strings database from an API. "
"Downloads CPE match string information from the NIST NVD REST API "
"and stores it in a database."
)
shtab.add_argument_to(parser)

CpeMatchNvdApiProducer.add_args_to_parser(parser)

CpeMatchDatabaseWriteWorker.add_args_to_parser(parser)

CpeMatchProcessor.add_args_to_parser(parser)

return parser.parse_args(args)


async def download(console, error_console) -> None:
args = parse_args()

with Progress(console=console) as progress:
producer = CpeMatchNvdApiProducer.from_args(
args,
console,
error_console,
progress,
)

worker = CpeMatchDatabaseWriteWorker.from_args(
args,
console,
error_console,
progress,
)

processor = CpeMatchProcessor.from_args(
args,
console,
error_console,
producer,
worker,
)

await processor.run()


def main() -> None:
CLIRunner.run(download)


if __name__ == "__main__":
main()
69 changes: 69 additions & 0 deletions greenbone/scap/cpe_match/cli/json_download.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from argparse import ArgumentParser, Namespace
from typing import Sequence

import shtab
from rich.progress import Progress

from greenbone.scap.cli import CLIRunner
from greenbone.scap.cpe_match.cli.processor import CpeMatchProcessor

from ..producer.nvd_api import CpeMatchNvdApiProducer
from ..worker.json import CpeMatchJsonWriteWorker


def parse_args(args: Sequence[str] | None = None) -> Namespace:
parser = ArgumentParser(
description="Download and consolidate CPE match strings. "
"Downloads CPE match string information from the NIST NVD REST API "
"and consolidates it into a single JSON file."
)
shtab.add_argument_to(parser)

CpeMatchNvdApiProducer.add_args_to_parser(parser)

CpeMatchJsonWriteWorker.add_args_to_parser(parser)

CpeMatchProcessor.add_args_to_parser(parser)

return parser.parse_args(args)


async def download(console, error_console) -> None:
args = parse_args()

with Progress(console=console) as progress:
producer = CpeMatchNvdApiProducer.from_args(
args,
console,
error_console,
progress,
)

worker = CpeMatchJsonWriteWorker.from_args(
args,
console,
error_console,
progress,
)

processor = CpeMatchProcessor.from_args(
args,
console,
error_console,
producer,
worker,
)

await processor.run()


def main() -> None:
CLIRunner.run(download)


if __name__ == "__main__":
main()
95 changes: 95 additions & 0 deletions greenbone/scap/cpe_match/cli/processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
# SPDX-FileCopyrightText: 2024 Greenbone AG
#
# SPDX-License-Identifier: GPL-3.0-or-later

from argparse import Namespace

from pontos.nvd.models.cpe_match_string import CPEMatchString
from rich.console import Console

from ...cli import DEFAULT_VERBOSITY
from ...generic_cli.processor import ScapProcessor
from ...generic_cli.producer.base import BaseScapProducer
from ...generic_cli.queue import DEFAULT_QUEUE_SIZE
from ...generic_cli.worker.base import BaseScapWorker

CPE_MATCH_TYPE_PLURAL = "CPE match strings"
CPE_MATCH_DEFAULT_CHUNK_SIZE = 500


class CpeMatchProcessor(ScapProcessor[CPEMatchString]):
"""
Class that handles a producer object generating CPE match strings
to be processed by a worker object.
"""

_item_type_plural = CPE_MATCH_TYPE_PLURAL
_arg_defaults = {
"chunk_size": CPE_MATCH_DEFAULT_CHUNK_SIZE,
"queue_size": DEFAULT_QUEUE_SIZE,
"verbose": DEFAULT_VERBOSITY,
}

@staticmethod
def from_args(
args: Namespace,
console: Console,
error_console: Console,
producer: BaseScapProducer,
worker: BaseScapWorker,
) -> "CpeMatchProcessor":
"""
Create a new `CPEMatchNvdApiProducer` with parameters from
the given command line args gathered by an `ArgumentParser`.

Args:
args: Command line arguments to use
console: Console for standard output.
error_console: Console for error output.
producer: The producer generating the CPE match strings.
worker: The worker processing the CPE match strings.
Returns:
The new `CpeMatchProcessor`.
"""
return CpeMatchProcessor(
console,
error_console,
producer,
worker,
queue_size=args.queue_size,
chunk_size=args.chunk_size,
verbose=args.verbose,
)

def __init__(
self,
console: Console,
error_console: Console,
producer: BaseScapProducer,
worker: BaseScapWorker,
*,
queue_size: int | None = None,
chunk_size: int | None = None,
verbose: int | None = None,
):
"""
Constructor for a new CPE match string processor.

Args:
console: Console for standard output.
error_console: Console for error output.
producer: The producer generating the CPE match strings.
worker: The worker processing the CPE match strings.
queue_size: The number of chunks allowed in the queue.
chunk_size: The expected maximum number of CPE match strings per chunk.
verbose: Verbosity level of log messages.
"""
super().__init__(
console,
error_console,
producer,
worker,
queue_size=queue_size,
chunk_size=chunk_size,
verbose=verbose,
)
Empty file.
Loading
Loading