Aggregator processor enables an application to aggregates incoming messages into groups and release them into an output destination.
java -jar aggregator-processor-kafka-<version>.jar --aggregator.message-store-type=jdbc
Change kafka to rabbit if you want to run it against RabbitMQ.
If an input payload is a byte[]
and content-type header is a JSON, then JsonBytesToMap
function tries to deserialize this payload to a Map
for better data representation on the output of the aggregator function.
Also, such a Map
data representation makes it easy to access to the payload content from SpEL expressions mentioned below.
Otherwise(including a deserialization error), the input payload is left as is - and it is the target application configuration to convert it into a desired form.
Properties grouped by prefix:
- aggregation
-
SpEL expression for aggregation strategy. Default is collection of payloads. (Expression, default:
<none>
) - correlation
-
SpEL expression for correlation key. Default to correlationId header. (Expression, default:
<none>
) - group-timeout
-
SpEL expression for timeout to expiring uncompleted groups. (Expression, default:
<none>
) - message-store-entity
-
Persistence message store entity: table prefix in RDBMS, collection name in MongoDb, etc. (String, default:
<none>
) - message-store-type
-
Message store type. (String, default:
<none>
) - release
-
SpEL expression for release strategy. Default is based on the sequenceSize header. (Expression, default:
<none>
)
- additional-hosts
-
Additional server hosts. Cannot be set with URI or if 'host' is not specified. Additional hosts will use the default mongo port of 27017. If you want to use a different port you can use the "host:port" syntax. (List<String>, default:
<none>
) - authentication-database
-
Authentication database name. (String, default:
<none>
) - auto-index-creation
-
Whether to enable auto-index creation. (Boolean, default:
<none>
) - database
-
Database name. Overrides database in URI. (String, default:
<none>
) - field-naming-strategy
-
Fully qualified name of the FieldNamingStrategy to use. (Class<?>, default:
<none>
) - host
-
Mongo server host. Cannot be set with URI. (String, default:
<none>
) - password
-
Login password of the mongo server. Cannot be set with URI. (Character[], default:
<none>
) - port
-
Mongo server port. Cannot be set with URI. (Integer, default:
<none>
) - replica-set-name
-
Required replica set name for the cluster. Cannot be set with URI. (String, default:
<none>
) - uri
-
Mongo database URI. Overrides host, port, username, and password. (String, default:
mongodb://localhost/test
) - username
-
Login user of the mongo server. Cannot be set with URI. (String, default:
<none>
) - uuid-representation
-
Representation to use when converting a UUID to a BSON binary value. (UuidRepresentation, default:
java-legacy
, possible values:UNSPECIFIED
,STANDARD
,C_SHARP_LEGACY
,JAVA_LEGACY
,PYTHON_LEGACY
)
- bucket
-
GridFS bucket name. (String, default:
<none>
) - database
-
GridFS database name. (String, default:
<none>
)
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default:
<none>
)
- client-name
-
Client name to be set on connections with CLIENT SETNAME. (String, default:
<none>
) - client-type
-
Type of client to use. By default, auto-detected according to the classpath. (ClientType, default:
<none>
, possible values:LETTUCE
,JEDIS
) - connect-timeout
-
Connection timeout. (Duration, default:
<none>
) - database
-
Database index used by the connection factory. (Integer, default:
0
) - host
-
Redis server host. (String, default:
localhost
) - password
-
Login password of the redis server. (String, default:
<none>
) - port
-
Redis server port. (Integer, default:
6379
) - timeout
-
Read timeout. (Duration, default:
<none>
) - url
-
Connection URL. Overrides host, port, username, and password. Example: redis://user:[email protected]:6379 (String, default:
<none>
) - username
-
Login username of the redis server. (String, default:
<none>
)
- max-redirects
-
Maximum number of redirects to follow when executing commands across the cluster. (Integer, default:
<none>
) - nodes
-
Comma-separated list of "host:port" pairs to bootstrap from. This represents an "initial" list of cluster nodes and is required to have at least one entry. (List<String>, default:
<none>
)
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
- enabled
-
Whether to enable the pool. Enabled automatically if "commons-pool2" is available. With Jedis, pooling is implicitly enabled in sentinel mode and this setting only applies to single node setup. (Boolean, default:
<none>
) - max-active
-
Maximum number of connections that can be allocated by the pool at a given time. Use a negative value for no limit. (Integer, default:
8
) - max-idle
-
Maximum number of "idle" connections in the pool. Use a negative value to indicate an unlimited number of idle connections. (Integer, default:
8
) - max-wait
-
Maximum amount of time a connection allocation should block before throwing an exception when the pool is exhausted. Use a negative value to block indefinitely. (Duration, default:
-1ms
) - min-idle
-
Target for the minimum number of idle connections to maintain in the pool. This setting only has an effect if both it and time between eviction runs are positive. (Integer, default:
0
) - time-between-eviction-runs
-
Time between runs of the idle object evictor thread. When positive, the idle object evictor thread starts, otherwise no idle object eviction is performed. (Duration, default:
<none>
)
- master
-
Name of the Redis server. (String, default:
<none>
) - nodes
-
Comma-separated list of "host:port" pairs. (List<String>, default:
<none>
) - password
-
Password for authenticating with sentinel(s). (String, default:
<none>
) - username
-
Login username for authenticating with sentinel(s). (String, default:
<none>
)
- bundle
-
SSL bundle name. (String, default:
<none>
) - enabled
-
Whether to enable SSL support. Enabled automatically if "bundle" is provided unless specified otherwise. (Boolean, default:
<none>
)
- driver-class-name
-
Fully qualified name of the JDBC driver. Auto-detected based on the URL by default. (String, default:
<none>
) - password
-
Login password of the database. (String, default:
<none>
) - url
-
JDBC URL of the database. (String, default:
<none>
) - username
-
Login username of the database. (String, default:
<none>
)