Skip to content

Commit

Permalink
NIFI-7132: fix handling of UUIDs in PutCassandraQL
Browse files Browse the repository at this point in the history
Signed-off-by: Pierre Villard <[email protected]>

This closes apache#4048.
  • Loading branch information
woutifier-t authored and pvillard31 committed Feb 11, 2020
1 parent 65b2a9b commit a80b247
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -327,8 +328,6 @@ protected void setStatementObject(final BoundStatement statement, final int para
if (mainType.equals(DataType.ascii())
|| mainType.equals(DataType.text())
|| mainType.equals(DataType.varchar())
|| mainType.equals(DataType.timeuuid())
|| mainType.equals(DataType.uuid())
|| mainType.equals(DataType.inet())
|| mainType.equals(DataType.varint())) {
// These are strings, so just use the paramValue
Expand All @@ -355,6 +354,9 @@ protected void setStatementObject(final BoundStatement statement, final int para

} else if (mainType.equals(DataType.timestamp())) {
statement.setTimestamp(paramIndex, (Date) typeCodec.parse(paramValue));
} else if (mainType.equals(DataType.timeuuid())
|| mainType.equals(DataType.uuid())) {
statement.setUUID(paramIndex, (UUID) typeCodec.parse(paramValue));
}
return;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,78 @@ public void testProcessorBadTimestamp() {
testRunner.clearTransferState();
}

@Test
public void testProcessorUuid() {
setUpStandardTestConfig();

testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
new HashMap<String, String>() {
{
put("cql.args.1.type", "int");
put("cql.args.1.value", "1");
put("cql.args.2.type", "text");
put("cql.args.2.value", "Joe");
put("cql.args.3.type", "text");
// No value for arg 3 to test setNull
put("cql.args.4.type", "map<text,text>");
put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
put("cql.args.5.type", "list<boolean>");
put("cql.args.5.value", "[true,false,true]");
put("cql.args.6.type", "set<double>");
put("cql.args.6.value", "{1.0, 2.0}");
put("cql.args.7.type", "bigint");
put("cql.args.7.value", "20000000");
put("cql.args.8.type", "float");
put("cql.args.8.value", "1.0");
put("cql.args.9.type", "blob");
put("cql.args.9.value", "0xDEADBEEF");
put("cql.args.10.type", "uuid");
put("cql.args.10.value", "5442b1f6-4c16-11ea-87f5-45a32dbc5199");

}
});

testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_SUCCESS, 1);
testRunner.clearTransferState();
}

@Test
public void testProcessorBadUuid() {
setUpStandardTestConfig();

testRunner.enqueue("INSERT INTO users (user_id, first_name, last_name, properties, bits, scaleset, largenum, scale, byteobject, ts) VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?",
new HashMap<String, String>() {
{
put("cql.args.1.type", "int");
put("cql.args.1.value", "1");
put("cql.args.2.type", "text");
put("cql.args.2.value", "Joe");
put("cql.args.3.type", "text");
// No value for arg 3 to test setNull
put("cql.args.4.type", "map<text,text>");
put("cql.args.4.value", "{'a':'Hello', 'b':'World'}");
put("cql.args.5.type", "list<boolean>");
put("cql.args.5.value", "[true,false,true]");
put("cql.args.6.type", "set<double>");
put("cql.args.6.value", "{1.0, 2.0}");
put("cql.args.7.type", "bigint");
put("cql.args.7.value", "20000000");
put("cql.args.8.type", "float");
put("cql.args.8.value", "1.0");
put("cql.args.9.type", "blob");
put("cql.args.9.value", "0xDEADBEEF");
put("cql.args.10.type", "uuid");
put("cql.args.10.value", "bad-uuid");

}
});

testRunner.run(1, true, true);
testRunner.assertAllFlowFilesTransferred(PutCassandraQL.REL_FAILURE, 1);
testRunner.clearTransferState();
}

@Test
public void testProcessorInvalidQueryException() {
setUpStandardTestConfig();
Expand Down

0 comments on commit a80b247

Please sign in to comment.