-
Notifications
You must be signed in to change notification settings - Fork 707
RiverSource, RiverMouth, and RiverFlow
The JDBC river consists of three conceptual interfaces than can be implemented separately.
When you use the strategy
parameter, the JDBC river tries to load additional classes before falling back to the simple
strategy.
You can implement your own strategy by adding your implementation jars to the plugin folder and exporting the implementing classes in the META-INF/services
directory. The RiverService
looks up implementations for your favorite strategy
before the JDBC river initializes. So, it is easy to reuse or replace existing code, or adapt your own JDBC retrieval strategy to the core JDBC river.
The river source models the data producing side. Beside defining the JDBC connect parameters, it manages a dual-channel connection to the data producer for reading and for writing. The reading channel is used for fetching data, while the writing channel can update the source in order to prepare for the next cycle.
An alternative handshake mechanism is provided by the bulk indexer of Elasticsearch. A river source can receive a BulkResponse in order to update the source with the acknowledge() method.
Available RiverSource methods are:
String strategy();
RiverSource riverContext(RiverContext context);
String fetch() throws SQLException, IOException;
RiverSource driver(String driver);
RiverSource url(String url);
RiverSource user(String user);
RiverSource password(String password);
RiverSource rounding(String rounding);
RiverSource precision(int scale);
Connection connectionForReading() throws SQLException;
Connection connectionForWriting() throws SQLException;
PreparedStatement prepareQuery(String sql) throws SQLException;
PreparedStatement prepareUpdate(String sql) throws SQLException;
RiverSource bind(PreparedStatement pstmt, List<? extends Object> values) throws SQLException;
ResultSet executeQuery(PreparedStatement statement) throws SQLException;
RiverSource executeUpdate(PreparedStatement statement) throws SQLException;
void beforeFirstRow(ResultSet result, ValueListener listener) throws SQLException, IOException, ParseException;
boolean nextRow(ResultSet result, ValueListener listener) throws SQLException, IOException, ParseException;
Object parseType(ResultSet result, Integer num, int type, Locale locale) throws SQLException, IOException, ParseException;
RiverSource acknowledge(BulkResponse response) throws IOException;
RiverSource close(ResultSet result) throws SQLException;
RiverSource close(PreparedStatement statement) throws SQLException;
RiverSource closeReading();
RiverSource closeWriting();
The RiverMouth
is the abstraction of the destination where all the data is flowing from the river source. The main methods are create(), index(), and delete(), where a StructuredObject
can be passed. It can also control the resource usage of the bulk indexing method of Elasticsearch. Throttling is possible by limiting the number of bulk actions per request or by the maximum number of concurrent request.
Available method of a RiverMouth are:
String strategy();
RiverMouth riverContext(RiverContext context);
RiverMouth client(Client client);
Client client();
RiverMouth index(String index);
String index();
RiverMouth type(String type);
String type();
RiverMouth id(String id);
RiverMouth maxBulkActions(int actions);
int maxBulkActions();
RiverMouth maxConcurrentBulkRequests(int max);
int maxConcurrentBulkRequests();
RiverMouth versioning(boolean enable);
boolean versioning();
RiverMouth acknowledge(boolean enable);
boolean acknowledge();
void create(StructuredObject object) throws IOException;
void index(StructuredObject object) throws IOException;
void delete(StructuredObject object) throws IOException;
void flush() throws IOException;
void close();
void createIndexIfNotExists(String settings, String mapping);
The RiverFlow
is the abstraction to the thread which performs data fetching from the river source and transports it to the river mouth. A 'move' is considered a single step in the river live cycle, followed by an optional delay, before it all starts over again. A river flow can be aborted.
Available methods of a RiverFlow are:
String strategy();
RiverFlow riverContext(RiverContext context);
RiverContext riverContext();
RiverFlow startDate(Date creationDate);
Date startDate();
void move();
RiverFlow delay(String reason);
void abort();
For more information, consult the source code or the Java docs.