Skip to content

Commit

Permalink
🐛 Ensure predefined aggregation variables (e.g., @user, @mongoPermiss…
Browse files Browse the repository at this point in the history
…ion) are available in change stream aggregations
  • Loading branch information
ujibang committed Nov 2, 2024
1 parent 64dca1c commit c8c7a9b
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,17 @@

import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonNull;
import org.bson.BsonValue;
import org.restheart.exchange.InvalidMetadataException;
import org.restheart.exchange.MongoRequest;
import org.restheart.exchange.QueryVariableNotBoundException;
import static org.restheart.mongodb.utils.VarsInterpolator.VAR_OPERATOR;
import org.restheart.security.AclVarsInterpolator;
import org.restheart.security.MongoPermissions;
import org.restheart.security.MongoRealmAccount;
import org.restheart.security.WithProperties;
import org.restheart.utils.BsonUtils;
import org.restheart.utils.LambdaUtils;

Expand Down Expand Up @@ -170,4 +177,68 @@ private static BsonDocument _stage(STAGE_OPERATOR stageOperator, BsonDocument st
}
}

/**
* adds the default variables to the avars document
*
* Supports accounts handled by MongoRealAuthenticator,
* FileRealmAuthenticator and JwtAuthenticationMechanism
*
* @param request
* @param avars
*/
public static void injectAvars(MongoRequest request, BsonDocument avars) {
// add @page, @pagesize, @limit and @skip to avars to allow handling
// paging in the aggragation via default page and pagesize qparams
avars.put("@page", new BsonInt32(request.getPage()));
avars.put("@pagesize", new BsonInt32(request.getPagesize()));
avars.put("@limit", new BsonInt32(request.getPagesize()));
avars.put("@skip", new BsonInt32(request.getPagesize() * (request.getPage() - 1)));

// add @mongoPermissions to avars
var mongoPermissions = MongoPermissions.of(request);
if (mongoPermissions != null) {
avars.put("@mongoPermissions" ,mongoPermissions.asBson());

avars.put("@mongoPermissions.projectResponse", mongoPermissions.getProjectResponse() == null
? BsonNull.VALUE
: mongoPermissions.getProjectResponse());

avars.put("@mongoPermissions.mergeRequest", mongoPermissions.getMergeRequest() == null
? BsonNull.VALUE
: AclVarsInterpolator.interpolateBson(request, mongoPermissions.getMergeRequest()));

avars.put("@mongoPermissions.readFilter", mongoPermissions.getReadFilter() == null
? BsonNull.VALUE
: AclVarsInterpolator.interpolateBson(request, mongoPermissions.getReadFilter()));

avars.put("@mongoPermissions.writeFilter", mongoPermissions.getWriteFilter() == null
? BsonNull.VALUE
: AclVarsInterpolator.interpolateBson(request, mongoPermissions.getWriteFilter()));
} else {
avars.put("@mongoPermissions", new MongoPermissions().asBson());
avars.put("@mongoPermissions.projectResponse", BsonNull.VALUE);
avars.put("@mongoPermissions.mergeRequest", BsonNull.VALUE);
avars.put("@mongoPermissions.readFilter", BsonNull.VALUE);
avars.put("@mongoPermissions.writeFilter", BsonNull.VALUE);
}

// add @user to avars
var account = request.getAuthenticatedAccount();

if (account == null) {
avars.put("@user", BsonNull.VALUE);
} else if (account instanceof MongoRealmAccount maccount) {
var ba = maccount.properties();
avars.put("@user", ba);
ba.keySet().forEach(k -> avars.put("@user.".concat(k), ba.get(k)));
} else if (account instanceof WithProperties<?> accountWithProperties) {
var ba = BsonUtils.toBsonDocument(accountWithProperties.propertiesAsMap());
avars.put("@user", ba);
ba.keySet().forEach(k -> avars.put("@user.".concat(k), ba.get(k)));
} else {
avars.put("@user", BsonNull.VALUE);
}
}


}
3 changes: 1 addition & 2 deletions core/src/main/java/org/restheart/handlers/RequestLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ protected void dumpExchange(HttpServerExchange exchange, Integer logLevel) {
addExchangeCompleteListener(exchange, logLevel, sb, start);
}

private void addExchangeCompleteListener(HttpServerExchange exchange, Integer logLevel, final StringBuilder sb,
final long start) {
private void addExchangeCompleteListener(HttpServerExchange exchange, Integer logLevel, final StringBuilder sb, final long start) {
exchange.addExchangeCompleteListener(
(final HttpServerExchange exchange1, final ExchangeCompletionListener.NextListener nextListener) -> {
if (logLevel < 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,12 @@
*/
package org.restheart.mongodb.handlers.aggregation;

import com.mongodb.MongoCommandException;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MapReduceIterable;
import io.undertow.server.HttpServerExchange;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;

import org.bson.BsonArray;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
import org.bson.BsonNull;
import org.bson.BsonString;
import org.restheart.exchange.IllegalQueryParamenterException;
import org.restheart.exchange.InvalidMetadataException;
Expand All @@ -39,18 +35,19 @@
import org.restheart.handlers.PipelinedHandler;
import org.restheart.mongodb.MongoServiceConfiguration;
import org.restheart.mongodb.db.Databases;
import org.restheart.security.AclVarsInterpolator;
import org.restheart.security.MongoPermissions;
import org.restheart.security.MongoRealmAccount;
import org.restheart.security.WithProperties;
import org.restheart.utils.BsonUtils;
import org.restheart.utils.HttpStatus;
import org.restheart.mongodb.utils.StagesInterpolator;
import static org.restheart.mongodb.utils.StagesInterpolator.STAGE_OPERATOR;
import static org.restheart.mongodb.utils.VarsInterpolator.VAR_OPERATOR;
import org.restheart.mongodb.utils.StagesInterpolator.STAGE_OPERATOR;
import org.restheart.mongodb.utils.VarsInterpolator.VAR_OPERATOR;
import org.restheart.utils.HttpStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.MongoCommandException;
import com.mongodb.client.AggregateIterable;
import com.mongodb.client.MapReduceIterable;

import io.undertow.server.HttpServerExchange;

/**
*
* @author Andrea Di Cesare {@literal <[email protected]>}
Expand Down Expand Up @@ -117,7 +114,7 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
: request.getAggregationVars();

// add the default variables to the avars document
injectAvars(request, avars);
StagesInterpolator.injectAvars(request, avars);

switch (query.getType()) {
case MAP_REDUCE -> {
Expand Down Expand Up @@ -237,69 +234,6 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
}
}

/**
* adds the default variables to the avars document
*
* Supports accounts handled by MongoRealAuthenticator,
* FileRealmAuthenticator and JwtAuthenticationMechanism
*
* @param request
* @param avars
*/
private void injectAvars(MongoRequest request, BsonDocument avars) {
// add @page, @pagesize, @limit and @skip to avars to allow handling
// paging in the aggragation via default page and pagesize qparams
avars.put("@page", new BsonInt32(request.getPage()));
avars.put("@pagesize", new BsonInt32(request.getPagesize()));
avars.put("@limit", new BsonInt32(request.getPagesize()));
avars.put("@skip", new BsonInt32(request.getPagesize() * (request.getPage() - 1)));

// add @mongoPermissions to avars
var mongoPermissions = MongoPermissions.of(request);
if (mongoPermissions != null) {
avars.put("@mongoPermissions" ,mongoPermissions.asBson());

avars.put("@mongoPermissions.projectResponse", mongoPermissions.getProjectResponse() == null
? BsonNull.VALUE
: mongoPermissions.getProjectResponse());

avars.put("@mongoPermissions.mergeRequest", mongoPermissions.getMergeRequest() == null
? BsonNull.VALUE
: AclVarsInterpolator.interpolateBson(request, mongoPermissions.getMergeRequest()));

avars.put("@mongoPermissions.readFilter", mongoPermissions.getReadFilter() == null
? BsonNull.VALUE
: AclVarsInterpolator.interpolateBson(request, mongoPermissions.getReadFilter()));

avars.put("@mongoPermissions.writeFilter", mongoPermissions.getWriteFilter() == null
? BsonNull.VALUE
: AclVarsInterpolator.interpolateBson(request, mongoPermissions.getWriteFilter()));
} else {
avars.put("@mongoPermissions", new MongoPermissions().asBson());
avars.put("@mongoPermissions.projectResponse", BsonNull.VALUE);
avars.put("@mongoPermissions.mergeRequest", BsonNull.VALUE);
avars.put("@mongoPermissions.readFilter", BsonNull.VALUE);
avars.put("@mongoPermissions.writeFilter", BsonNull.VALUE);
}

// add @user to avars
var account = request.getAuthenticatedAccount();

if (account == null) {
avars.put("@user", BsonNull.VALUE);
} else if (account instanceof MongoRealmAccount maccount) {
var ba = maccount.properties();
avars.put("@user", ba);
ba.keySet().forEach(k -> avars.put("@user.".concat(k), ba.get(k)));
} else if (account instanceof WithProperties<?> accountWithProperties) {
var ba = BsonUtils.toBsonDocument(accountWithProperties.propertiesAsMap());
avars.put("@user", ba);
ba.keySet().forEach(k -> avars.put("@user.".concat(k), ba.get(k)));
} else {
avars.put("@user", BsonNull.VALUE);
}
}

public static String mongoCommandExceptionError(MongoCommandException mce) {
var mongoErrorResponse = mce.getResponse();
var errorCode = mongoErrorResponse.getNumber("code", new BsonInt32(-1)).intValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,17 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
try {
if (isWebSocketHandshakeRequest(exchange)) {
exchange.putAttachment(JSON_MODE_ATTACHMENT_KEY, request.getJsonMode());
exchange.putAttachment(AVARS_ATTACHMENT_KEY, request.getAggregationVars());

var _avars = request.getAggregationVars();

if (_avars == null) {
_avars = new BsonDocument();
}

// add the default variables to the avars document
StagesInterpolator.injectAvars(request, _avars);

exchange.putAttachment(AVARS_ATTACHMENT_KEY, _avars);

initChangeStreamWorker(exchange);

Expand Down Expand Up @@ -156,7 +166,9 @@ private List<BsonDocument> getResolvedStagesAsList(MongoRequest request) throws

var pipeline = _query.get();

var resolvedStages = StagesInterpolator.interpolate(VAR_OPERATOR.$var, STAGE_OPERATOR.$ifvar, pipeline.getStages(), request.getAggregationVars());
var avars = request.getExchange().getAttachment(GetChangeStreamHandler.AVARS_ATTACHMENT_KEY);

var resolvedStages = StagesInterpolator.interpolate(VAR_OPERATOR.$var, STAGE_OPERATOR.$ifvar, pipeline.getStages(), avars);
return resolvedStages;
}

Expand Down

0 comments on commit c8c7a9b

Please sign in to comment.