Skip to content

Introduce batching into worker discovery during scaling #773

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 168 additions & 0 deletions context/context-unit-test-learnings.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
# Unit Test Refactoring and Enhancement Learnings

This document captures key principles and thought processes learned from refactoring and enhancing unit tests for complex distributed systems, specifically from working with JobActor smart refresh logic and worker state management tests.

## Key Principles Learned

### 1. **Understand the Domain Context First**
- Before refactoring tests, deeply understand the system being tested (JobActor smart refresh logic, worker state transitions, batching behavior)
- Read through the actual implementation code to understand the expected behavior patterns
- Identify the core business logic being validated (smart refresh delays, flag management, worker state filtering)

### 2. **Meaningful Test Validation Over Superficial Checks**
- **Remove tests with incomplete validation**: Tests with TODO comments or placeholder assertions provide no real value and should be eliminated
- **Validate actual behavior, not just state**: Instead of just checking if something exists, verify it behaves correctly under realistic conditions
- **Test the "why" not just the "what"**: Validate the reasoning behind the logic (e.g., why smart refresh delays updates, why flags remain set)

### 3. **DRY Principle in Test Code**
- Extract common setup patterns into helper methods
- Consolidate repetitive configuration and initialization code
- Create reusable test utilities that make test intent clearer
- Example: `createJobMetadata()`, `createAndInitializeJobActor()`, `getJobSchedulingInfoSubject()`

### 4. **Test Real-World Scenarios**
- **Time-based behavior**: Test timing-sensitive logic like batching delays and timeouts
- **State transitions**: Validate complex state machines and edge cases
- **Mixed conditions**: Test scenarios where some workers are in different states simultaneously
- **Realistic data flows**: Use actual observable patterns and event streams rather than mocked behavior

### 5. **Comprehensive Assertion Strategies**
- **Stream-based validation**: Use Java streams to validate collections and aggregations meaningfully
- **State counting and grouping**: Validate not just presence but actual distribution of states
- **Timeline validation**: For time-sensitive features, validate that events happen in the correct sequence with appropriate delays
- **End-to-end behavior**: Test the complete flow from trigger to final state, not just individual steps

### 6. **Iterative Refinement Based on Feedback**
- **Listen to specific requirements**: When users provide detailed scenarios, implement exactly what they describe
- **Validate timing behavior**: For smart refresh logic, test both the batching delays and timeout behavior
- **Handle edge cases**: Test scenarios like mixed worker states, partial transitions, and timeout conditions

## Applied Thought Process for Future Tasks

### Step 1: Analysis Phase
1. **Read existing tests** to understand current validation approach
2. **Identify problematic patterns**: TODOs, weak assertions, duplicate code
3. **Study the implementation** being tested to understand expected behavior
4. **Map out the domain concepts** and their relationships

### Step 2: Refactoring Strategy
1. **Remove meaningless tests** that don't validate actual behavior
2. **Enhance weak tests** with proper assertions and realistic scenarios
3. **Extract common patterns** into helper methods
4. **Consolidate setup code** to reduce duplication

### Step 3: Test Design Principles
1. **Focus on behavior validation** over state checking
2. **Test realistic scenarios** that mirror production conditions
3. **Use appropriate data structures** for validation (streams, collectors, etc.)
4. **Validate timing and sequences** for time-sensitive logic
5. **Test edge cases** and mixed conditions

### Step 4: Implementation Guidelines
1. **Make tests self-documenting** through clear naming and logging
2. **Use proper synchronization** for concurrent/timing-based tests
3. **Validate both positive and negative conditions**
4. **Include meaningful error messages** in assertions
5. **Test the complete flow** from input to expected output

### Step 5: Quality Validation
1. **Ensure all tests pass** consistently
2. **Verify test execution times** are reasonable for the behavior being tested
3. **Check that tests actually validate** the intended functionality
4. **Confirm tests would catch regressions** if the implementation changed

## Concrete Examples from JobActor Smart Refresh Tests

### Before: Weak Test Validation
```java
// Problematic test with TODO and incomplete validation
@Test
public void testRefreshSkippingMetricIncremented() {
// TODO: Implement proper validation
// Just checking if metric exists, not if it behaves correctly
assertTrue(someMetric > 0);
}
```

### After: Comprehensive Behavior Validation
```java
@Test
public void testStageAssignmentFlagNotResetWithPendingWorkersDuringScaling() {
// Test the specific scenario: mixed worker states with timeout behavior

// 1. Set up realistic conditions
WorkerId workerId1 = new WorkerId(jobId, 0, 1);
WorkerId workerId2 = new WorkerId(jobId, 1, 2);

// 2. Create mixed states (one Started, one Launched)
JobTestHelper.sendLaunchedInitiatedStartedEventsToWorker(probe, jobActor, jobId, 1, workerId1);
JobTestHelper.sendWorkerLaunchedEvent(probe, jobActor, workerId2, 1);

// 3. Validate timing behavior
assertTrue("Should wait for max timeout", maxWaitTimeoutLatch.await(5, TimeUnit.SECONDS));

// 4. Validate end-to-end behavior
assertEquals("Should have exactly 1 Started worker", 1, startedWorkerCount);

// 5. Validate flag management
// Flag should remain true due to pending Launched worker
}
```

### Helper Method Pattern
```java
// Extract common setup into reusable helpers
private MantisJobMetadataImpl createJobMetadata(String clusterName, JobDefinition jobDefn) {
return new MantisJobMetadataImpl.Builder()
.withJobId(new JobId(clusterName, 1))
.withSubmittedAt(Instant.now())
.withJobState(JobState.Accepted)
.withNextWorkerNumToUse(1)
.withJobDefinition(jobDefn)
.build();
}

private BehaviorSubject<JobSchedulingInfo> getJobSchedulingInfoSubject(TestKit probe, ActorRef jobActor, String clusterName) {
jobActor.tell(new GetJobSchedInfoRequest(new JobId(clusterName, 1)), probe.getRef());
GetJobSchedInfoResponse resp = probe.expectMsgClass(GetJobSchedInfoResponse.class);
assertEquals(SUCCESS, resp.responseCode);
assertTrue(resp.getJobSchedInfoSubject().isPresent());
return resp.getJobSchedInfoSubject().get();
}
```

### Stream-Based Validation Pattern
```java
// Use streams for meaningful collection validation
Map<WorkerState, Integer> workerStateCounts = workerList.stream()
.collect(Collectors.groupingBy(
IMantisWorkerMetadata::getState,
Collectors.collectingAndThen(Collectors.counting(), Math::toIntExact)
));

assertTrue("WorkerListChangedEvent should include Accepted workers",
workerStateCounts.getOrDefault(WorkerState.Accepted, 0) >= 1);
assertTrue("WorkerListChangedEvent should include Started workers",
workerStateCounts.getOrDefault(WorkerState.Started, 0) >= 1);
```

## Key Takeaway

The most important insight is to **test the underlying business logic and behavior patterns**, not just surface-level state changes. Smart refresh logic, worker state management, and flag handling are complex distributed systems concepts that require sophisticated test validation. The goal is to create tests that would catch real bugs and regressions while clearly documenting the expected system behavior through executable specifications.

This approach transforms tests from simple "does it work?" checks into comprehensive behavior specifications that serve as both validation and documentation for complex system interactions.

## Practical Application Checklist

When refactoring or writing new tests for complex systems:

- [ ] Remove tests with TODOs or incomplete validation
- [ ] Extract common setup into helper methods
- [ ] Use stream operations for collection validation
- [ ] Test timing-sensitive behavior with appropriate waits/timeouts
- [ ] Validate mixed conditions and edge cases
- [ ] Include meaningful logging for debugging
- [ ] Test complete workflows, not just individual steps
- [ ] Ensure tests would catch actual regressions
- [ ] Make test intent clear through naming and structure
- [ ] Validate both positive and negative conditions
134 changes: 134 additions & 0 deletions context/inter-job-stages-connection-mechanisms.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
# Mantis Connection Mechanisms

This document captures the connection mechanisms used in Mantis for both stage-to-stage connections within jobs and job-to-job connections across different jobs.

## Stage-to-Stage Connections (Intra-Job)

### Overview
Stage-to-stage connections handle data flow between processing stages within the same Mantis job (e.g., Stage 1 → Stage 2).

### Key Components

#### 1. Worker Discovery API
- **Endpoint**: `GET /assignmentresults/{jobId}?sendHB=true`
- **Protocol**: Server-Sent Events (SSE)
- **Response**: Stream of `JobSchedulingInfo` objects
- **File**: `mantis-control-plane-client/MantisMasterClientApi.java:697-742`

#### 2. Connection Management
- **DynamicConnectionSet** (`mantis-remote-observable/DynamicConnectionSet.java:180-266`)
- Manages adding/removing worker connections dynamically
- Handles worker scaling events in real-time
- Provides load balancing with multiple connections per endpoint

- **Reconciliator** (`mantis-remote-observable/reconciliator/Reconciliator.java:154-183`)
- Ensures expected connections match actual connections
- Handles connection drift and failures
- Automatically reconnects with backoff logic

#### 3. Worker Execution
- **WorkerConsumerRemoteObservable** (`mantis-runtime/WorkerConsumerRemoteObservable.java:58-96`)
- Creates connections based on stage type (Keyed vs Scalar)
- Handles different connection patterns for different stage configurations

### Connection Flow
1. **Discovery**: Stage 2 workers subscribe to SSE stream for `JobSchedulingInfo`
2. **Endpoint Resolution**: Extract Stage 1 worker endpoints (host, port, state)
3. **Connection Creation**: Create multiple connections per endpoint for load balancing
4. **Dynamic Updates**: Add/remove connections as Stage 1 workers scale
5. **Reconciliation**: Continuously verify expected vs actual connections

### Data Structures
- **JobSchedulingInfo**: Contains worker assignment mapping per stage
- **WorkerAssignments**: Maps stage numbers to worker host information
- **WorkerHost**: Individual worker endpoint details (host, port, state)
- **Endpoint**: Connection endpoint with unique identifier

## Job-to-Job Connections (Inter-Job)

### Overview
Job-to-job connections enable downstream jobs to consume data from upstream jobs' output streams, creating multi-job processing pipelines.

### Key Components

#### 1. Job Discovery APIs
- **Job Cluster Resolution**: `GET /namedjobs/{jobCluster}` → current job ID
- **Job Scheduling**: `GET /assignmentresults/{jobId}?sendHB=true` → job worker info
- **Sink Discovery**: `getSinkStageNum(jobId)` → sink stage number
- **Worker Locations**: `getSinkLocations(jobId, sinkStage)` → worker endpoints

#### 2. Connection Client
- **JobSource** (`mantis-connector-job-source/JobSource.java`)
- Primary connector for job-to-job connections
- Uses `TargetInfo` configuration for connection parameters

- **MantisSSEJob**
- SSE client for consuming upstream job data
- Handles connection lifecycle and reconnection

#### 3. Configuration
```java
JobSource.TargetInfo targetInfo = new TargetInfoBuilder()
.withSourceJobName("UpstreamJobName") // Target job name
.withQuery("select * from stream") // MQL query filter
.withClientId("unique-client-id") // Connection identifier
.withSamplePerSec(100) // Sampling rate
.withBroadcastMode(false) // Connection mode
.build();
```

### Connection Flow
1. **Job Resolution**: Job name → `/namedjobs/{jobCluster}` → current job ID
2. **Sink Discovery**: Job ID → `getSinkStageNum(jobId)` → sink stage number
3. **Worker Discovery**: Job ID + stage → `getSinkLocations()` → worker endpoints
4. **SSE Connection**: Direct HTTP connection to upstream job's sink workers
5. **Data Consumption**: Stream data via SSE with optional MQL filtering

## Key Differences

| Aspect | Stage-to-Stage | Job-to-Job |
|--------|----------------|------------|
| **Scope** | Within same job | Across different jobs |
| **Discovery API** | JobSchedulingInfo stream | Job Discovery APIs |
| **Target** | Next stage workers | Sink stage of upstream job |
| **Protocol** | TCP/RxNetty | SSE over HTTP |
| **Addressing** | Stage number | Job name/cluster |
| **Configuration** | Stage config in job definition | TargetInfo with MQL queries |
| **Connection Management** | DynamicConnectionSet + Reconciliator | MantisSSEJob client |
| **Load Balancing** | Multiple connections per endpoint | SSE client-side balancing |

## Architecture Benefits

### Stage-to-Stage
- **Low latency**: Direct TCP connections between co-located workers
- **High throughput**: Multiple connections with custom networking
- **Dynamic scaling**: Real-time connection updates via SSE scheduling stream
- **Fault tolerance**: Automatic reconnection with backoff

### Job-to-Job
- **Isolation**: Jobs remain independent with loose coupling
- **Flexibility**: MQL queries enable data filtering and transformation
- **Standardization**: HTTP/SSE provides standard protocol
- **Discoverability**: Job names provide logical addressing

## Implementation Files

### Stage-to-Stage Core Files
- `mantis-runtime-executor/WorkerExecutionOperationsNetworkStage.java:656-697`
- `mantis-remote-observable/DynamicConnectionSet.java:180-266`
- `mantis-remote-observable/reconciliator/Reconciliator.java:154-183`
- `mantis-control-plane-client/MantisMasterClientApi.java:697-742`

### Job-to-Job Core Files
- `mantis-connectors/mantis-connector-job-source/src/main/java/io/mantisrx/connector/job/source/JobSource.java`
- `mantis-client/src/main/java/io/mantisrx/client/MantisSSEJob.java`
- `mantis-client/src/main/java/io/mantisrx/client/MantisClient.java`

## Worker Readiness and Scaling Gap Mitigation

### Problem Statement
During job scaling operations, there is a timing gap between when new workers appear in the JobSchedulingInfo stream and when they are actually ready to accept connections. This causes downstream consumers to attempt connections to unavailable workers, leading to:
- Connection failures and retries
- Increased latency during scaling events
- Resource waste from failed connection attempts
- Potential cascading failures in large deployments
Loading
Loading