Skip to content

[Junie]: fix: modify session to resend only reject messages #999

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion quickfixj-core/src/main/java/quickfix/Session.java
Original file line number Diff line number Diff line change
Expand Up @@ -2390,7 +2390,8 @@ private void resendMessages(Message receivedMessage, int beginSeqNo, int endSeqN

final String msgType = msg.getHeader().getString(MsgType.FIELD);

if (MessageUtils.isAdminMessage(msgType) && !forceResendWhenCorruptedStore) {
if (MessageUtils.isAdminMessage(msgType) && (!forceResendWhenCorruptedStore || !MsgType.REJECT.equals(msgType))) {
// Skip admin messages (except Reject when forceResendWhenCorruptedStore is true)
if (begin == 0) {
begin = msgSeqNum;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
package quickfix;

import org.junit.Test;
import quickfix.fix44.Heartbeat;
import quickfix.fix44.Logon;
import quickfix.fix44.Logout;
import quickfix.fix44.Reject;
import quickfix.fix44.ResendRequest;
import quickfix.fix44.SequenceReset;
import quickfix.fix44.TestRequest;

import java.util.ArrayList;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

/**
* Test to verify that when ForceResendWhenCorruptedStore is enabled, session-level messages
* (except Reject) are not resent but are replaced with SequenceReset messages.
*/
public class ForceResendSessionLevelMessagesTest {

private class UnitTestResponder implements Responder {
public List<String> sentMessages = new ArrayList<>();

@Override
public boolean send(String data) {
sentMessages.add(data);
return true;
}

@Override
public String getRemoteAddress() {
return null;
}

@Override
public void disconnect() {
}
}

private class UnitTestApplication extends ApplicationAdapter {
}

@Test
public void testSessionLevelMessagesNotResentWhenForceResendWhenCorruptedStoreEnabled() throws Exception {
// Create a session with ForceResendWhenCorruptedStore enabled
UnitTestApplication application = new UnitTestApplication();
SessionID sessionID = new SessionID("FIX.4.4", "SENDER", "TARGET");

// Create a session with ForceResendWhenCorruptedStore enabled
Session session = SessionFactoryTestSupport.createSession(sessionID, application, true);
session.setForceResendWhenCorruptedStore(true);

UnitTestResponder responder = new UnitTestResponder();
session.setResponder(responder);

// Store some session-level messages in the message store
MessageStore store = session.getStore();

// Create and store session-level messages
Logon logon = new Logon();
setUpHeader(sessionID, logon, false, 1);
store.set(1, logon.toString());

Heartbeat heartbeat = new Heartbeat();
setUpHeader(sessionID, heartbeat, false, 2);
store.set(2, heartbeat.toString());

TestRequest testReq = new TestRequest();
setUpHeader(sessionID, testReq, false, 3);
store.set(3, testReq.toString());

Logout logout = new Logout();
setUpHeader(sessionID, logout, false, 4);
store.set(4, logout.toString());

// Create and store a Reject message (which should be resent according to FIX spec)
Reject reject = new Reject();
setUpHeader(sessionID, reject, false, 5);
store.set(5, reject.toString());

// Set next sender sequence number
session.setNextSenderMsgSeqNum(6);

// Create a ResendRequest for all stored messages
ResendRequest resendRequest = new ResendRequest();
setUpHeader(sessionID, resendRequest, true, 1);
resendRequest.set(new quickfix.field.BeginSeqNo(1));
resendRequest.set(new quickfix.field.EndSeqNo(5));

// Process the ResendRequest
session.next(resendRequest);

// Verify that only the Reject message was resent and other session-level messages
// were replaced with SequenceReset-GapFill messages

// We expect:
// 1. A SequenceReset-GapFill message covering sequences 1-4 (Logon, Heartbeat, TestRequest, Logout)
// 2. The Reject message (sequence 5)

// Check that we got exactly 2 messages
assertEquals("Should have sent exactly 2 messages", 2, responder.sentMessages.size());

// First message should be a SequenceReset-GapFill
String firstMessage = responder.sentMessages.get(0);
assertTrue("First message should be a SequenceReset",
firstMessage.contains("35=4")); // MsgType=4 (SequenceReset)
assertTrue("First message should have GapFill flag",
firstMessage.contains("123=Y")); // GapFillFlag=Y
assertTrue("First message should have NewSeqNo=5",
firstMessage.contains("36=5")); // NewSeqNo=5

// Second message should be the Reject
String secondMessage = responder.sentMessages.get(1);
assertTrue("Second message should be a Reject",
secondMessage.contains("35=3")); // MsgType=3 (Reject)
}

private void setUpHeader(SessionID sessionID, Message message, boolean reversed, int sequence) {
message.getHeader().setString(quickfix.field.BeginString.FIELD, sessionID.getBeginString());

if (!reversed) {
message.getHeader().setString(quickfix.field.SenderCompID.FIELD, sessionID.getSenderCompID());
message.getHeader().setString(quickfix.field.TargetCompID.FIELD, sessionID.getTargetCompID());
} else {
message.getHeader().setString(quickfix.field.SenderCompID.FIELD, sessionID.getTargetCompID());
message.getHeader().setString(quickfix.field.TargetCompID.FIELD, sessionID.getSenderCompID());
}

message.getHeader().setInt(quickfix.field.MsgSeqNum.FIELD, sequence);
message.getHeader().setUtcTimeStamp(quickfix.field.SendingTime.FIELD, new java.util.Date());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
package quickfix;

import org.junit.Test;
import quickfix.field.BeginSeqNo;
import quickfix.field.BeginString;
import quickfix.field.EndSeqNo;
import quickfix.field.EncryptMethod;
import quickfix.field.GapFillFlag;
import quickfix.field.HeartBtInt;
import quickfix.field.MsgSeqNum;
import quickfix.field.MsgType;
import quickfix.field.NewSeqNo;
import quickfix.field.RefSeqNum;
import quickfix.field.SenderCompID;
import quickfix.field.SendingTime;
import quickfix.field.TargetCompID;
import quickfix.field.TestReqID;
import quickfix.fix44.Heartbeat;
import quickfix.fix44.Logon;
import quickfix.fix44.Logout;
import quickfix.fix44.Reject;
import quickfix.fix44.ResendRequest;
import quickfix.fix44.SequenceReset;
import quickfix.fix44.TestRequest;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

/**
* Test to verify that when ForceResendWhenCorruptedStore is enabled, session-level messages
* (except Reject) are not resent but are replaced with SequenceReset messages.
*/
public class SessionResendSessionLevelMessagesTest {

@Test
public void testSessionLevelMessagesNotResentWhenForceResendWhenCorruptedStoreEnabled() throws Exception {
// Create a session with ForceResendWhenCorruptedStore enabled
UnitTestApplication application = new UnitTestApplication();
Session session = setUpSession(application, true);
session.setForceResendWhenCorruptedStore(true);

SessionState state = getSessionState(session);

// Store some session-level messages in the message store
int logonSeqNum = 1;
int heartbeatSeqNum = 2;
int testReqSeqNum = 3;
int logoutSeqNum = 4;
int rejectSeqNum = 5;

// Create and store session-level messages
Logon logon = createLogon(logonSeqNum);
storeMessage(session, logon, logonSeqNum);

Heartbeat heartbeat = createHeartbeat(heartbeatSeqNum);
storeMessage(session, heartbeat, heartbeatSeqNum);

TestRequest testReq = createTestRequest(testReqSeqNum);
storeMessage(session, testReq, testReqSeqNum);

Logout logout = createLogout(logoutSeqNum);
storeMessage(session, logout, logoutSeqNum);

// Create and store a Reject message (which should be resent according to FIX spec)
Reject reject = createReject(rejectSeqNum);
storeMessage(session, reject, rejectSeqNum);

// Set next sender sequence number
state.setNextSenderMsgSeqNum(6);

// Create a ResendRequest for all stored messages
ResendRequest resendRequest = new ResendRequest();
resendRequest.set(new BeginSeqNo(1));
resendRequest.set(new EndSeqNo(5));

Message.Header header = resendRequest.getHeader();
header.setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44);
header.setString(SenderCompID.FIELD, "TARGET");
header.setString(TargetCompID.FIELD, "SENDER");
header.setInt(MsgSeqNum.FIELD, 1);
header.setUtcTimeStamp(SendingTime.FIELD, new Date());

UnitTestResponder responder = (UnitTestResponder) session.getResponder();

// Process the ResendRequest
session.next(resendRequest);

// Verify that only the Reject message was resent and other session-level messages
// were replaced with SequenceReset-GapFill messages

// We expect:
// 1. A SequenceReset-GapFill message covering sequences 1-4 (Logon, Heartbeat, TestRequest, Logout)
// 2. The Reject message (sequence 5)

// Check that we got exactly 2 messages
assertEquals(2, responder.sentMessages.size());

// First message should be a SequenceReset-GapFill
Message firstMessage = MessageUtils.parse(session, responder.sentMessages.get(0));
assertEquals(MsgType.SEQUENCE_RESET, firstMessage.getHeader().getString(MsgType.FIELD));
assertTrue(firstMessage.getBoolean(GapFillFlag.FIELD));
assertEquals(1, firstMessage.getHeader().getInt(MsgSeqNum.FIELD));
assertEquals(5, firstMessage.getInt(NewSeqNo.FIELD)); // Should skip to sequence 5

// Second message should be the Reject
Message secondMessage = MessageUtils.parse(session, responder.sentMessages.get(1));
assertEquals(MsgType.REJECT, secondMessage.getHeader().getString(MsgType.FIELD));
assertEquals(5, secondMessage.getHeader().getInt(MsgSeqNum.FIELD));
}

private Session setUpSession(Application application, boolean isInitiator) throws ConfigError {
SessionID sessionID = new SessionID(FixVersions.BEGINSTRING_FIX44, "SENDER", "TARGET");
SessionSettings settings = new SessionSettings();
settings.setString(sessionID, "StartTime", "00:00:00");
settings.setString(sessionID, "EndTime", "00:00:00");
settings.setString(sessionID, "ConnectionType", isInitiator ? "initiator" : "acceptor");

UnitTestResponder responder = new UnitTestResponder();
return new Session(application, new MemoryStoreFactory(), sessionID, null, null,
null, settings, new DefaultMessageFactory(), 30, false, 30, true, true, false,
false, false, false, false, true, false, false, false, false,
false, false, true, false, null, true, 0, false, false, false, false, null);
}

private void storeMessage(Session session, Message message, int msgSeqNum) throws IOException {
session.getStore().set(msgSeqNum, message.toString());
}

private Logon createLogon(int sequence) {
Logon logon = new Logon();
logon.set(new HeartBtInt(30));
setUpHeader(logon.getHeader(), sequence);
return logon;
}

private Heartbeat createHeartbeat(int sequence) {
Heartbeat heartbeat = new Heartbeat();
setUpHeader(heartbeat.getHeader(), sequence);
return heartbeat;
}

private TestRequest createTestRequest(int sequence) {
TestRequest testRequest = new TestRequest();
testRequest.set(new TestReqID("TEST"));
setUpHeader(testRequest.getHeader(), sequence);
return testRequest;
}

private Logout createLogout(int sequence) {
Logout logout = new Logout();
setUpHeader(logout.getHeader(), sequence);
return logout;
}

private Reject createReject(int sequence) {
Reject reject = new Reject();
reject.set(new quickfix.field.RefSeqNum(100)); // Some reference sequence number
setUpHeader(reject.getHeader(), sequence);
return reject;
}

private void setUpHeader(Message.Header header, int sequence) {
header.setString(BeginString.FIELD, FixVersions.BEGINSTRING_FIX44);
header.setString(SenderCompID.FIELD, "SENDER");
header.setString(TargetCompID.FIELD, "TARGET");
header.setInt(MsgSeqNum.FIELD, sequence);
header.setUtcTimeStamp(SendingTime.FIELD, new Date());
}

private SessionState getSessionState(Session session) {
try {
return (SessionState) session.getClass().getDeclaredField("state").get(session);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

private class UnitTestApplication extends ApplicationAdapter {
public void fromAdmin(Message message, SessionID sessionId) throws FieldNotFound, IncorrectDataFormat,
IncorrectTagValue, RejectLogon {
}
}

private class UnitTestResponder implements Responder {
public java.util.ArrayList<String> sentMessages = new java.util.ArrayList<>();

public boolean send(String data) {
sentMessages.add(data);
return true;
}

public String getRemoteAddress() {
return null;
}

public void disconnect() {
}
}
}
Loading