Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(anta): Added the test case to verify BGP NLRI prefixes #792

Draft
wants to merge 12 commits into
base: main
Choose a base branch
from
181 changes: 181 additions & 0 deletions anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
else:
from typing_extensions import Self

# pylint: disable=C0302
# TODO: Refactor to reduce the number of lines in this module later


def _add_bgp_failures(failures: dict[tuple[str, str | None], dict[str, Any]], afi: Afi, safi: Safi | None, vrf: str, issue: str | dict[str, Any]) -> None:
"""Add a BGP failure entry to the given `failures` dictionary.
Expand Down Expand Up @@ -171,6 +174,43 @@ def _add_bgp_routes_failure(
return failure_routes


def _get_inconsistent_peers(peer_details: dict[Any, Any], bgp_peers: list[IPv4Address] | None) -> dict[Any, Any]:
"""Identify BGP peers with inconsistency of prefix(s) received and accepted in a BGP session.

Parameters
----------
peer_details: dict[Any, Any]
The BGP peer data dictionary.
bgp_peers: list[IPv4Address] | None
List of IPv4 address of a BGP peer to be verified. If not provided, test will verifies all the BGP peers.

Returns
-------
dict[Any, Any]: A dictionary containing the BGP peer(s) with inconsistent prefix(s).

"""
failure: dict[Any, Any] = {}

# Update the peer details as per input peer addresses for verification.
if bgp_peers:
input_peers_detail: dict[Any, Any] = {}
for peer in bgp_peers:
if not (peer_detail := peer_details.get(str(peer))):
failure[str(peer)] = "Not Configured"
else:
input_peers_detail[str(peer)] = peer_detail
peer_details = input_peers_detail

# Iterate over each peer and verify the prefix(s) consistency.
for peer, peer_detail in peer_details.items():
prefix_rcv = peer_detail.get("prefixReceived", "Not Found")
prefix_acc = peer_detail.get("prefixAccepted", "Not Found")
if (prefix_acc and prefix_rcv) == "Not Found" or prefix_rcv != prefix_acc:
failure[peer] = {"prefix received": prefix_rcv, "prefix accepted": prefix_acc}

return failure


class VerifyBGPPeerCount(AntaTest):
"""Verifies the count of BGP peers for a given address family.

Expand Down Expand Up @@ -1628,3 +1668,144 @@ def test(self) -> None:
self.result.is_success()
else:
self.result.is_failure(f"The following BGP peer(s) are not configured or maximum routes and maximum routes warning limit is not correct:\n{failures}")


class VerifyBGPPeerPrefixes(AntaTest):
"""Verifies BGP IPv4 peer(s) consistency of prefix(s) received and accepted in a BGP session.

Expected Results
----------------
* Success: The test will pass if the `PfxRcd` equals `PfxAcc`, indicating that all received prefix(s) were accepted.
* Failure: The test will fail if the `PfxRcd` is not equal to `PfxAcc`, indicating that some prefix(s) were rejected/filtered out or
peer(s) are not configured.

Examples
--------
```yaml
anta.tests.routing:
bgp:
- VerifyBGPPeerPrefixes:
address_families:
- afi: ipv4
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we have to check for specific peer for specific AFI/SAFI. Please recheck.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. Thanks!

safi: unicast
peers:
- 10.100.0.8
- 10.100.0.10
- afi: ipv4
safi: multicast
- afi: evpn
- afi: vpn-ipv4
- afi: ipv4
safi: labeled-unicast
```
"""

name = "VerifyBGPPeerPrefixes"
description = "Verifies the prefix(s) received and accepted of a BGP IPv4 peer."
categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [
AntaTemplate(template="show bgp {afi} {safi} summary vrf {vrf}", revision=3),
AntaTemplate(template="show bgp {afi} summary", revision=3),
]

class Input(AntaTest.Input):
"""Input model for the VerifyBGPPeerPrefixes test."""

address_families: list[BgpAfi]
"""List of BGP address families (BgpAfi)."""

class BgpAfi(BaseModel):
"""Model for a BGP address family (AFI) and subsequent address family (SAFI)."""

afi: Afi
"""BGP address family (AFI)."""
safi: Safi | None = None
"""Optional BGP subsequent service family (SAFI).

If the input `afi` is `ipv4` or `ipv6`, a valid `safi` must be provided.
"""
vrf: str = "default"
"""
Optional VRF for IPv4 and IPv6. If not provided, it defaults to `default`.

If the input `afi` is not `ipv4` or `ipv6`, e.g. `evpn`, `vrf` must be `default`.
"""
peers: list[IPv4Address] | None = None
"""Optional list of IPv4 address of a BGP peer to be verified. If not provided, test will verifies all the BGP peers."""

@model_validator(mode="after")
def validate_inputs(self: BaseModel) -> BaseModel:
"""Validate the inputs provided to the BgpAfi class.

If afi is either ipv4 or ipv6, safi must be provided.

If afi is not ipv4 or ipv6, safi must not be provided and vrf must be default.
"""
if self.afi in ["ipv4", "ipv6"]:
if self.safi is None:
msg = "'safi' must be provided when afi is ipv4 or ipv6"
raise ValueError(msg)
elif self.safi is not None:
msg = "'safi' must not be provided when afi is not ipv4 or ipv6"
raise ValueError(msg)
elif self.vrf != "default":
msg = "'vrf' must be default when afi is not ipv4 or ipv6"
raise ValueError(msg)
return self

def render(self, template: AntaTemplate) -> list[AntaCommand]:
"""Render the template for each BGP address family in the input list."""
commands = []
for afi in self.inputs.address_families:
if template == VerifyBGPPeerPrefixes.commands[0] and afi.afi in ["ipv4", "ipv6"] and afi.safi != "sr-te":
commands.append(template.render(afi=afi.afi, safi=afi.safi, vrf=afi.vrf))

# For SR-TE SAFI, the EOS command supports sr-te first then ipv4/ipv6
elif template == VerifyBGPPeerPrefixes.commands[0] and afi.afi in ["ipv4", "ipv6"] and afi.safi == "sr-te":
commands.append(template.render(afi=afi.safi, safi=afi.afi, vrf=afi.vrf))
elif template == VerifyBGPPeerPrefixes.commands[1] and afi.afi not in ["ipv4", "ipv6"]:
commands.append(template.render(afi=afi.afi))
return commands

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyBGPPeerPrefixes."""
self.result.is_success()
failures: dict[Any, Any] = {}

for command, input_entry in zip(self.instance_commands, self.inputs.address_families):
command_output = command.json_output
afi = input_entry.afi
safi = input_entry.safi
afi_vrf = input_entry.vrf
input_peers = input_entry.peers

# Update failures dictionary for `afi`, later on removing the same if no failure found.
if not failures.get(afi):
failures[afi] = {}
Comment on lines +1784 to +1785
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if not failures.get(afi):
failures[afi] = {}
failures.setdefault(afi, {})


# Verify peer details.
if not (peer_details := get_value(command_output, f"vrfs..{afi_vrf}..peers", separator="..")):
failures[afi].update({safi: "Peers not configured"})
if not safi:
failures[afi] = "Peers not configured"
continue

# Verify the received and accepted prefix(s).
failure_logs = _get_inconsistent_peers(peer_details, input_peers)

# Update failures if any.
if failure_logs and safi:
failures[afi].update({safi: failure_logs})
elif failure_logs:
failures[afi].update(failure_logs)

# Remove AFI from failures if empty.
if not failures.get(afi):
failures.pop(afi, None)

# Check if any failures
if failures:
self.result.is_failure(
f"The following BGP address family(s), peers are not configured or prefix(s) received and accepted are not consistent:\n{failures}"
)
13 changes: 13 additions & 0 deletions examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,19 @@ anta.tests.routing:
vrf: default
maximum_routes: 12000
warning_limit: 10000
- VerifyBGPPeerPrefixes:
address_families:
- afi: ipv4
safi: unicast
peers:
- 10.100.0.8
- 10.100.0.10
- afi: ipv4
safi: multicast
- afi: evpn
- afi: vpn-ipv4
- afi: ipv4
safi: labeled-unicast
ospf:
- VerifyOSPFNeighborState:
- VerifyOSPFNeighborCount:
Expand Down
Loading
Loading