Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minor improvements to flags module, fix packet handling and luxos-run #41

Open
wants to merge 16 commits into
base: main
Choose a base branch
from
Open
14 changes: 14 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,25 @@ Please, use the format:

-->

## [0.2.5]

- luxor-run support for progress bar
- luxor-run support setup/teardown functions
- cli/shared improve ArgumentTypeBase for flags supporting constraints
- ips.splitip better support for address
- syncops support packets with 0x00 termination


## [0.2.4]

- adds --version flag to luxos.cli scripts
- luxos is a namespaced project
- version information is now from luxos.version module
- classes derived from ArgumentTypeBase accpet constructor arguments
- luxor-run takes a setup/teardown pair
- syncops handles packet with 0x00 endings
- improve cli tests


## [0.2.2]

Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@ full documentation [here](https://luxorlabs.github.io/luxos-tooling).

To install the latest version:
```bash
# latest stable (base or with all extensions)
$> pip install -U luxos

# to install the extra features
$> pip install -U luxos[extra]

# beta from git hub
$> pip install git+https://github.com/LuxorLabs/luxos-tooling
```
Or pulling the latest published from [pypi](https://pypi.org/project/luxos/#history).

You can check the version:
```bash
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "luxos"
version = "0.2.4"
version = "0.2.5"
description = "The all encompassing LuxOS python library."
readme = "README.md"
license = { text = "MIT" } # TODO I don't think this is a MIT??
Expand All @@ -29,14 +29,14 @@ classifiers = [

dependencies = [
"pyyaml",
"tqdm",
]

[project.optional-dependencies]
extra = [
"asyncpg",
"httpx",
"pandas",
"tqdm",
]

[project.urls]
Expand Down
26 changes: 18 additions & 8 deletions src/luxos/cli/flags.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,26 +45,36 @@ class type_ipaddress(ArgumentTypeBase):
options = parser.parse_args()
...

assert options.x == ("host", 9999)
assert options.x == ("1.2.3.4", 9999)


shell::

file.py -x host:9999
file.py -x 1.2.3.4:9999
"""

def __init__(self, port=None, strict=True):
super().__init__()
self.strict = strict
self.port = port

def validate(self, txt) -> None | tuple[str, None | int]:
from luxos import ips

if txt is None:
return None
try:
result = ips.parse_expr(txt) or ("", "", None)
if result[1]:
raise argparse.ArgumentTypeError("cannot use a range as expression")
return (result[0], result[2])
except ips.AddressParsingError as exc:
raise argparse.ArgumentTypeError(f"failed to parse {txt=}: {exc.args[0]}")
if txt.count(":") not in {0, 1}:
raise ValueError("too many ':' (none or one)")
if self.strict:
ip, port = ips.splitip(txt) or ("", self.port)
else:
ip, _, port = txt.partition(":")
return ip, int(port) if port else self.port
except (RuntimeError, ValueError) as exc:
raise argparse.ArgumentTypeError(
f"failed to convert to a strict ip address: {exc.args[0]}"
)


def type_range(txt: str) -> Sequence[tuple[str, int | None]]:
Expand Down
67 changes: 57 additions & 10 deletions src/luxos/cli/shared.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import argparse
import inspect
import sys
import types
from typing import Callable
Expand All @@ -13,31 +14,77 @@
ArgsCallback = Callable


def check_default_constructor(klass: type):
signature = inspect.signature(klass.__init__) # type: ignore[misc]
for name, value in signature.parameters.items():
if name in {"self", "args", "kwargs"}:
continue
if value.default is inspect.Signature.empty:
raise RuntimeError(f"the {klass}() cannot be called without arguments")


# The luxor base parser
class LuxosParserBase(argparse.ArgumentParser):
def __init__(self, modules: list[types.ModuleType], *args, **kwargs):
super().__init__(*args, **kwargs)
self.modules = modules
self.callbacks: list[ArgsCallback | None] = []

def parse_args(self, args=None, namespace=None):
options = super().parse_args(args, namespace)
for name in dir(options):
if isinstance(getattr(options, name), ArgumentTypeBase):
fallback = getattr(options, name).value
setattr(
options,
name,
None if fallback is ArgumentTypeBase._NA else fallback,
)
return options

def add_argument(self, *args, **kwargs):
try:
if issubclass(kwargs.get("type"), ArgumentTypeBase):
kwargs["default"] = kwargs.get("type")(kwargs.get("default"))
kwargs["type"] = kwargs["default"]
except TypeError:
pass
typ = kwargs.get("type")
obj = None
if isinstance(typ, type) and issubclass(typ, ArgumentTypeBase):
check_default_constructor(typ)
obj = typ()
if isinstance(typ, ArgumentTypeBase):
obj = typ
if obj is not None:
obj.default = kwargs.get("default", ArgumentTypeBase._NA)
kwargs["default"] = obj
kwargs["type"] = obj
super().add_argument(*args, **kwargs)

def error(self, message):
try:
super().error(message)
except SystemExit:
# gh-121018
raise argparse.ArgumentError(None, message)


class ArgumentTypeBase:
class _NA:
pass

def __init__(self, default=_NA):
self.default = default
if default is not ArgumentTypeBase._NA:
self.default = self._validate(default)
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
self._default = self._NA
self.__name__ = self.__class__.__name__

@property
def default(self):
return self._default

@default.setter
def default(self, value):
if value is ArgumentTypeBase._NA:
self._default = ArgumentTypeBase._NA
else:
self._default = self._validate(value)
return self._default

def __call__(self, txt):
self._value = None
Expand Down
10 changes: 3 additions & 7 deletions src/luxos/cli/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ def main(parser: argparse.ArgumentParser):
from typing import Any, Callable

from . import flags
from .shared import ArgumentTypeBase, LuxosParserBase
from .shared import LuxosParserBase


class MyHandler(logging.StreamHandler):
Expand Down Expand Up @@ -139,7 +139,7 @@ class AbortWrongArgumentError(CliBaseError):
def log_sys_info(modules=None):
from luxos.version import get_version

log.info(get_version())
log.debug(get_version())
log.debug("interpreter: %s", sys.executable)


Expand Down Expand Up @@ -175,10 +175,6 @@ def parse_args(self, args=None, namespace=None):
options.error = self.error
options.modules = self.modules

for name in dir(options):
if isinstance(getattr(options, name), ArgumentTypeBase):
setattr(options, name, getattr(options, name).value)

for callback in self.callbacks:
if not callback:
continue
Expand Down Expand Up @@ -273,7 +269,7 @@ def setup(
finally:
if show_timing:
delta = round(time.monotonic() - t0, 2)
log.info("task %s in %.2fs", success, delta)
log.debug("task %s in %.2fs", success, delta)
if errormsg:
parser.error(errormsg)

Expand Down
26 changes: 21 additions & 5 deletions src/luxos/ips.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,27 @@ class DataParsingError(LuxosBaseException):
pass


def splitip(txt: str) -> tuple[str, int | None]:
expr = re.compile(r"(?P<ip>\d{1,3}([.]\d{1,3}){3})(:(?P<port>\d+))?")
if not (match := expr.search(txt)):
raise RuntimeError(f"invalid ip:port address {txt}")
return match["ip"], int(match["port"]) if match["port"] is not None else None
def splitip(txt: str, strict=True) -> tuple[str, int | None]:
if txt.count(":") not in {0, 1}:
raise ValueError("too many ':' in value")

host, _, port_txt = txt.partition(":")
host = host.strip()
try:
port = int(port_txt) if port_txt else None
except ValueError:
raise ValueError(f"cannot convert '{port_txt}' to an integer")
if not host.strip():
raise ValueError(f"cannot find host part in '{txt}'")

if not strict:
return host, port

if re.search(r"(?P<ip>\d{1,3}([.]\d{1,3}){3})", host) and all(
int(c) < 256 for c in host.split(".")
):
return host, port
raise ValueError(f"cannot convert '{host}' into an ipv4 address N.N.N.N")


def parse_expr(txt: str) -> None | tuple[str, str | None, int | None]:
Expand Down
9 changes: 9 additions & 0 deletions src/luxos/misc.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ def iter_ip_ranges(

def loadmod(path: Path) -> types.ModuleType:
from importlib import util
from types import ModuleType
from urllib.parse import urlparse
from urllib.request import urlopen

if urlparse(str(path)).scheme in {"http", "https"}:
urltxt = str(urlopen(str(path)).read(), encoding="utf-8")
mod = ModuleType(str(path).rpartition("/")[2])
exec(urltxt, mod.__dict__)
return mod

spec = util.spec_from_file_location(Path(path).name, Path(path))
module = util.module_from_spec(spec) # type: ignore
Expand Down
Loading