Skip to content

Commit

Permalink
Addressed review comments: updated ci failure
Browse files Browse the repository at this point in the history
  • Loading branch information
vitthalmagadum committed Nov 28, 2024
1 parent 4fa7ce0 commit b39c655
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 72 deletions.
35 changes: 18 additions & 17 deletions anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,12 +408,14 @@ def test(self) -> None:
received_routes_cmd = self.instance_commands[peer_idx + num_peers]

# Get the BGP route entries of each command
advertised_routes_entries = get_value(advertised_routes_cmd.json_output, f"vrfs.{peer.vrf}.bgpRouteEntries", default={})
received_routes_entries = get_value(received_routes_cmd.json_output, f"vrfs.{peer.vrf}.bgpRouteEntries", default={})
command_output = {
"Advertised": get_value(advertised_routes_cmd.json_output, f"vrfs.{peer.vrf}.bgpRouteEntries", default={}),
"Received": get_value(received_routes_cmd.json_output, f"vrfs.{peer.vrf}.bgpRouteEntries", default={}),
}

# Validate both advertised and received routes
for route_type, routes in zip(["Advertised", "Received"], [peer.advertised_routes, peer.received_routes]):
entries = advertised_routes_entries if route_type == "Advertised" else received_routes_entries
entries = command_output[route_type]
for route in routes:
# Check if the route is found
if str(route) not in entries:
Expand Down Expand Up @@ -516,7 +518,7 @@ def test(self) -> None:

# If strict is True, check if only the specified capabilities are configured
if peer.strict and sorted(peer.capabilities) != sorted(act_mp_caps):
self.result.is_failure(f"{peer} - Mismatch; Expected: {', '.join(peer.capabilities)} Actual: {', '.join(act_mp_caps)}")
self.result.is_failure(f"{peer} - Mismatch - Expected: {', '.join(peer.capabilities)} Actual: {', '.join(act_mp_caps)}")
continue

# Check each capability
Expand All @@ -527,7 +529,7 @@ def test(self) -> None:

# Check if the capability is advertised, received, and enabled
elif not _check_bgp_neighbor_capability(capability_status):
self.result.is_failure(f"{peer} - {capability} not negotiated; {format_data(capability_status)}")
self.result.is_failure(f"{peer} - {capability} not negotiated - {format_data(capability_status)}")


class VerifyBGPPeerASNCap(AntaTest):
Expand Down Expand Up @@ -596,7 +598,7 @@ def test(self) -> None:

# Check if the 4-octet ASN capability is advertised, received, and enabled
if not _check_bgp_neighbor_capability(capablity_status):
self.result.is_failure(f"{peer} - 4-octet ASN capability not negotiated; {format_data(capablity_status)}")
self.result.is_failure(f"{peer} - 4-octet ASN capability not negotiated - {format_data(capablity_status)}")


class VerifyBGPPeerRouteRefreshCap(AntaTest):
Expand Down Expand Up @@ -665,7 +667,7 @@ def test(self) -> None:

# Check if the route refresh capability is advertised, received, and enabled
if not _check_bgp_neighbor_capability(capablity_status):
self.result.is_failure(f"{peer} - Route refresh capability not negotiated; {format_data(capablity_status)}")
self.result.is_failure(f"{peer} - Route refresh capability not negotiated - {format_data(capablity_status)}")


class VerifyBGPPeerMD5Auth(AntaTest):
Expand Down Expand Up @@ -733,7 +735,7 @@ def test(self) -> None:
state = peer_data.get("state")
md5_auth_enabled = peer_data.get("md5AuthEnabled")
if state != "Established":
self.result.is_failure(f"{peer} - Session state is not established; State: {state}")
self.result.is_failure(f"{peer} - Session state is not established - State: {state}")
if not md5_auth_enabled:
self.result.is_failure(f"{peer} - Session does not have MD5 authentication enabled")

Expand Down Expand Up @@ -845,7 +847,6 @@ class VerifyBGPAdvCommunities(AntaTest):
```
"""

description = "Verifies the advertised communities of a BGP peer."
categories: ClassVar[list[str]] = ["bgp"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show bgp neighbors vrf all", revision=3)]

Expand Down Expand Up @@ -951,9 +952,9 @@ def test(self) -> None:

# Check BGP peer timers
if peer_data["holdTime"] != peer.hold_time:
self.result.is_failure(f"{peer} - Hold time mismatch; Expected: {peer.hold_time} Actual: {peer_data['holdTime']}")
self.result.is_failure(f"{peer} - Hold time mismatch - Expected: {peer.hold_time}, Actual: {peer_data['holdTime']}")
if peer_data["keepaliveTime"] != peer.keep_alive_time:
self.result.is_failure(f"{peer} - Keepalive time mismatch; Expected: {peer.keep_alive_time} Actual: {peer_data['keepaliveTime']}")
self.result.is_failure(f"{peer} - Keepalive time mismatch - Expected: {peer.keep_alive_time}, Actual: {peer_data['keepaliveTime']}")


class VerifyBGPPeerDropStats(AntaTest):
Expand Down Expand Up @@ -1030,7 +1031,7 @@ def test(self) -> None:
# Verify BGP peer's drop stats
for drop_stat in drop_stats_input:
if (stat_value := drop_stats_output.get(drop_stat, 0)) != 0:
self.result.is_failure(f"{peer} - Non-zero NLRI drop statistics counter; {drop_stat}: {stat_value}")
self.result.is_failure(f"{peer} - Non-zero NLRI drop statistics counter - {drop_stat}: {stat_value}")


class VerifyBGPPeerUpdateErrors(AntaTest):
Expand Down Expand Up @@ -1108,7 +1109,7 @@ def test(self) -> None:
# Verify BGP peer's update error counters
for error_counter in update_errors_input:
if (stat_value := error_counters_output.get(error_counter, "Not Found")) != 0 and stat_value != "None":
self.result.is_failure(f"{peer} - Non-zero update error counter; {error_counter}: {stat_value}")
self.result.is_failure(f"{peer} - Non-zero update error counter - {error_counter}: {stat_value}")


class VerifyBgpRouteMaps(AntaTest):
Expand Down Expand Up @@ -1186,11 +1187,11 @@ def test(self) -> None:

# Verify Inbound route-map
if inbound_route_map and (inbound_map := peer_data.get("routeMapInbound", "Not Configured")) != inbound_route_map:
self.result.is_failure(f"{peer} - Inbound route-map mismatch; Expected: {inbound_route_map} Actual: {inbound_map}")
self.result.is_failure(f"{peer} - Inbound route-map mismatch - Expected: {inbound_route_map}, Actual: {inbound_map}")

# Verify Outbound route-map
if outbound_route_map and (outbound_map := peer_data.get("routeMapOutbound", "Not Configured")) != outbound_route_map:
self.result.is_failure(f"{peer} - Outbound route-map mismatch; Expected: {outbound_route_map} Actual: {outbound_map}")
self.result.is_failure(f"{peer} - Outbound route-map mismatch - Expected: {outbound_route_map}, Actual: {outbound_map}")


class VerifyBGPPeerRouteLimit(AntaTest):
Expand Down Expand Up @@ -1265,8 +1266,8 @@ def test(self) -> None:

# Verify maximum routes configured.
if (actual_routes := peer_data.get("maxTotalRoutes", "Not Found")) != maximum_routes:
self.result.is_failure(f"{peer} - Maximum routes mismatch; Expected: {maximum_routes} Actual: {actual_routes}")
self.result.is_failure(f"{peer} - Maximum routes mismatch - Expected: {maximum_routes}, Actual: {actual_routes}")

# Verify warning limit if given.
if warning_limit and (actual_warning_limit := peer_data.get("totalRoutesWarnLimit", "Not Found")) != warning_limit:
self.result.is_failure(f"{peer} - Maximum route warning limit mismatch; Expected: {warning_limit} Actual: {actual_warning_limit}")
self.result.is_failure(f"{peer} - Maximum route warning limit mismatch - Expected: {warning_limit}, Actual: {actual_warning_limit}")
Loading

0 comments on commit b39c655

Please sign in to comment.