diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java index b9238e5675..d37f117b2d 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexService.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.index; +import org.apache.usergrid.utils.StringUtils; + /** * An interface for re-indexing all entities in an application */ @@ -47,6 +49,13 @@ public interface ReIndexService { */ ReIndexStatus getStatus( final String jobId ); + /** + * Get the status of a collection job + * @param collectionName The collectionName for the rebuild index + * @return + */ + ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ); + /** * The response when requesting a re-index operation @@ -56,14 +65,27 @@ public class ReIndexStatus { final Status status; final long numberProcessed; final long lastUpdated; + final String collectionName; public ReIndexStatus( final String jobId, final Status status, final long numberProcessed, - final long lastUpdated ) { - this.jobId = jobId; + final long lastUpdated, final String collectionName ) { + + if(StringUtils.isNotEmpty(jobId)){ + this.jobId = jobId; + }else { + this.jobId = ""; + } + this.status = status; this.numberProcessed = numberProcessed; this.lastUpdated = lastUpdated; + + if(StringUtils.isNotEmpty(collectionName)){ + this.collectionName = collectionName; + }else { + this.collectionName = ""; + } } @@ -74,6 +96,13 @@ public String getJobId() { return jobId; } + /** + * Get the jobId used to resume this operation + */ + public String getCollectionName() { + return collectionName; + } + /** * Get the last updated time, as a long diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java index 05602fc5c8..d4fb249379 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/index/ReIndexServiceImpl.java @@ -75,6 +75,7 @@ public class ReIndexServiceImpl implements ReIndexService { private static final String MAP_COUNT_KEY = "count"; private static final String MAP_STATUS_KEY = "status"; private static final String MAP_UPDATED_KEY = "lastUpdated"; + private static final String MAP_SEPARATOR = "|||"; private final AllApplicationsObservable allApplicationsObservable; @@ -140,7 +141,9 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui // create an observable that loads a batch to be indexed - if(reIndexRequestBuilder.getCollectionName().isPresent()) { + final boolean isForCollection = reIndexRequestBuilder.getCollectionName().isPresent(); + + if(isForCollection) { String collectionName = InflectionUtils.pluralize( CpNamingUtils.getNameFromEdgeType(reIndexRequestBuilder.getCollectionName().get() )); @@ -175,12 +178,36 @@ public ReIndexStatus rebuildIndex( final ReIndexRequestBuilder reIndexRequestBui if( edgeScopes.size() > 0 ) { writeCursorState(jobId, edgeScopes.get(edgeScopes.size() - 1)); } - writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); }) - .doOnCompleted(() -> writeStateMeta( jobId, Status.COMPLETE, count.get(), System.currentTimeMillis() )) + if( isForCollection ){ + writeStateMetaForCollection( + appId.get().getApplication().getUuid().toString(), + reIndexRequestBuilder.getCollectionName().get(), + Status.INPROGRESS, count.get(), + System.currentTimeMillis() ); + }else{ + writeStateMeta( jobId, Status.INPROGRESS, count.get(), System.currentTimeMillis() ); + } + }) + .doOnCompleted(() ->{ + if( isForCollection ){ + writeStateMetaForCollection( + appId.get().getApplication().getUuid().toString(), + reIndexRequestBuilder.getCollectionName().get(), + Status.COMPLETE, count.get(), + System.currentTimeMillis() ); + }else { + writeStateMeta(jobId, Status.COMPLETE, count.get(), System.currentTimeMillis()); + } + }) .subscribeOn( Schedulers.io() ).subscribe(); + if(isForCollection){ + return new ReIndexStatus( "", Status.STARTED, 0, 0, reIndexRequestBuilder.getCollectionName().get() ); + + } + - return new ReIndexStatus( jobId, Status.STARTED, 0, 0 ); + return new ReIndexStatus( jobId, Status.STARTED, 0, 0, "" ); } @@ -196,38 +223,15 @@ public ReIndexStatus getStatus( final String jobId ) { return getIndexResponse( jobId ); } - - /** - * Simple collector that counts state, then flushed every time a buffer is provided. Writes final state when complete - */ - private class FlushingCollector { - - private final String jobId; - private long count; - - - private FlushingCollector( final String jobId ) { - this.jobId = jobId; - } - - - public void flushBuffer( final List buffer ) { - count += buffer.size(); - - //write our cursor state - if ( buffer.size() > 0 ) { - writeCursorState( jobId, buffer.get( buffer.size() - 1 ) ); - } - - writeStateMeta( jobId, Status.INPROGRESS, count, System.currentTimeMillis() ); - } - - public void complete(){ - writeStateMeta( jobId, Status.COMPLETE, count, System.currentTimeMillis() ); - } + @Override + public ReIndexStatus getStatusForCollection( final String appIdString, final String collectionName ) { + Preconditions.checkNotNull( collectionName, "appIdString must not be null" ); + Preconditions.checkNotNull( collectionName, "collectionName must not be null" ); + return getIndexResponseForCollection( appIdString, collectionName ); } + /** * Get the resume edge scope * @@ -346,7 +350,7 @@ private ReIndexStatus getIndexResponse( final String jobId ) { final String stringStatus = mapManager.getString( jobId+MAP_STATUS_KEY ); if(stringStatus == null){ - return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0 ); + return new ReIndexStatus( jobId, Status.UNKNOWN, 0, 0, "" ); } final Status status = Status.valueOf( stringStatus ); @@ -354,7 +358,39 @@ private ReIndexStatus getIndexResponse( final String jobId ) { final long processedCount = mapManager.getLong( jobId + MAP_COUNT_KEY ); final long lastUpdated = mapManager.getLong( jobId + MAP_UPDATED_KEY ); - return new ReIndexStatus( jobId, status, processedCount, lastUpdated ); + return new ReIndexStatus( jobId, status, processedCount, lastUpdated, "" ); + } + + + private void writeStateMetaForCollection(final String appIdString, final String collectionName, + final Status status, final long processedCount, final long lastUpdated ) { + + if(logger.isDebugEnabled()) { + logger.debug( "Flushing state for collection {}, status {}, processedCount {}, lastUpdated {}", + collectionName, status, processedCount, lastUpdated); + } + + mapManager.putString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY, status.name() ); + mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY, processedCount ); + mapManager.putLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY, lastUpdated ); + } + + + private ReIndexStatus getIndexResponseForCollection( final String appIdString, final String collectionName ) { + + final String stringStatus = + mapManager.getString( appIdString + MAP_SEPARATOR + collectionName + MAP_STATUS_KEY ); + + if(stringStatus == null){ + return new ReIndexStatus( "", Status.UNKNOWN, 0, 0, collectionName ); + } + + final Status status = Status.valueOf( stringStatus ); + + final long processedCount = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_COUNT_KEY ); + final long lastUpdated = mapManager.getLong( appIdString + MAP_SEPARATOR + collectionName + MAP_UPDATED_KEY ); + + return new ReIndexStatus( "", status, processedCount, lastUpdated, collectionName ); } } diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java index 9c0980673a..9da0e3d1ab 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/applications/CollectionResource.java @@ -251,26 +251,41 @@ public ApiResponse clearCollectionJobGet( } - // TODO: this can't be controlled and until it can be controlled we shouldn' allow muggles to do this. - // So system access only. - // TODO: use scheduler here to get around people sending a reindex call 30 times. + @POST @Path("{itemName}/_reindex") @Produces({ MediaType.APPLICATION_JSON,"application/javascript"}) - @RequireSystemAccess + @RequireApplicationAccess @JSONP public ApiResponse executePostForReindexing( - @Context UriInfo ui, String body, + @Context UriInfo ui, final Map payload, @PathParam("itemName") PathSegment itemName, @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception { addItemToServiceContext( ui, itemName ); IndexResource indexResource = new IndexResource(injector); - return indexResource.rebuildIndexesPost( + return indexResource.rebuildIndexCollectionPost(payload, services.getApplicationId().toString(),itemName.getPath(),false,callback ); } + @GET + @Path("{itemName}/_reindex") + @Produces({ MediaType.APPLICATION_JSON,"application/javascript"}) + @RequireApplicationAccess + @JSONP + public ApiResponse executeGetForReindexStatus( + @Context UriInfo ui, final Map payload, + @PathParam("itemName") PathSegment itemName, + @QueryParam("callback") @DefaultValue("callback") String callback ) throws Exception { + + addItemToServiceContext( ui, itemName ); + + IndexResource indexResource = new IndexResource(injector); + return indexResource.rebuildIndexCollectionGet(services.getApplicationId().toString(), itemName.getPath(), + callback ); + } + private CollectionDeleteService getCollectionDeleteService() { return injector.getInstance( CollectionDeleteService.class ); @@ -310,18 +325,17 @@ private ApiResponse executeResumeAndCreateResponse( final Map pa private ApiResponse executeAndCreateResponse(final CollectionDeleteRequestBuilder request, final String callback ) { - final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection( request ); + final CollectionDeleteService.CollectionDeleteStatus status = getCollectionDeleteService().deleteCollection(request); final ApiResponse response = createApiResponse(); - response.setAction( "clear collection" ); - response.setProperty( "jobId", status.getJobId() ); - response.setProperty( "status", status.getStatus() ); - response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); - response.setProperty( "numberQueued", status.getNumberProcessed() ); + response.setAction("clear collection"); + response.setProperty("jobId", status.getJobId()); + response.setProperty("status", status.getStatus()); + response.setProperty("lastUpdatedEpoch", status.getLastUpdated()); + response.setProperty("numberQueued", status.getNumberProcessed()); response.setSuccess(); return response; } - } diff --git a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java index be60177fde..ec867501ac 100644 --- a/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java +++ b/stack/rest/src/main/java/org/apache/usergrid/rest/system/IndexResource.java @@ -28,13 +28,16 @@ import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilder; import org.apache.usergrid.corepersistence.index.ReIndexRequestBuilderImpl; import org.apache.usergrid.corepersistence.index.ReIndexService; +import org.apache.usergrid.exception.ConflictException; import org.apache.usergrid.persistence.EntityManager; import org.apache.usergrid.persistence.index.utils.ConversionUtils; import org.apache.usergrid.persistence.index.utils.UUIDUtils; import org.apache.usergrid.rest.AbstractContextResource; import org.apache.usergrid.rest.ApiResponse; import org.apache.usergrid.rest.RootResource; +import org.apache.usergrid.rest.security.annotations.RequireOrganizationAccess; import org.apache.usergrid.rest.security.annotations.RequireSystemAccess; +import org.apache.usergrid.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.context.annotation.Scope; @@ -182,26 +185,78 @@ public ApiResponse rebuildIndexesPut( final Map payload, return executeResumeAndCreateResponse( payload, request, callback ); } + @RequireOrganizationAccess + @GET + @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" ) + @JSONP + @Produces({ MediaType.APPLICATION_JSON, "application/javascript" }) + public ApiResponse rebuildIndexCollectionGet( @PathParam( "applicationId" ) final String applicationIdStr, + @PathParam( "collectionName" ) final String collectionName, + @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) - @RequireSystemAccess + + throws Exception { + if (logger.isTraceEnabled()) { + logger.trace("Getting re-index status for app: {}, collection: {}", applicationIdStr, collectionName); + } + + + ReIndexService.ReIndexStatus status = getReIndexService().getStatusForCollection(applicationIdStr, collectionName); + + final ApiResponse response = createApiResponse(); + + response.setAction( "get rebuild index status" ); + response.setProperty( "collection", status.getCollectionName() ); + response.setProperty( "status", status.getStatus() ); + response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); + response.setProperty( "numberQueued", status.getNumberProcessed() ); + response.setSuccess(); + + return response; + } + + @RequireOrganizationAccess @POST @Path( "rebuild/" + RootResource.APPLICATION_ID_PATH + "/{collectionName}" ) @JSONP @Produces({MediaType.APPLICATION_JSON, "application/javascript"}) - public ApiResponse rebuildIndexesPost( @PathParam( "applicationId" ) final String applicationIdStr, - @PathParam( "collectionName" ) final String collectionName, - @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse, - @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) + public ApiResponse rebuildIndexCollectionPost(final Map payload, + @PathParam( "applicationId" ) final String applicationIdStr, + @PathParam( "collectionName" ) final String collectionName, + @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse, + @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) throws Exception { + ReIndexService.ReIndexStatus existingStatus = + getReIndexService().getStatusForCollection(applicationIdStr, collectionName); - logger.info( "Rebuilding collection {} in application {}", collectionName, applicationIdStr ); + if(existingStatus.getStatus().equals(ReIndexService.Status.INPROGRESS)){ + throw new ConflictException("Re-index for collection currently in progress"); + } + + logger.info( "Re-indexing collection {} in application {}", collectionName, applicationIdStr ); final UUID appId = UUIDUtils.tryExtractUUID( applicationIdStr ); + final ReIndexRequestBuilder request = createRequest().withApplicationId( appId ).withCollection( collectionName ); + Map newPayload = payload; + if(newPayload == null || !payload.containsKey( UPDATED_FIELD )){ + newPayload = new HashMap<>(1); + newPayload.put(UPDATED_FIELD,0); + } + + Preconditions.checkArgument(newPayload.get(UPDATED_FIELD) instanceof Number, + "Property \"updated\" in the payload must be a number in unix timestamp millis format" ); + + //add our updated timestamp to the request + if ( newPayload.containsKey( UPDATED_FIELD ) ) { + final long timestamp = ConversionUtils.getLong(newPayload.get(UPDATED_FIELD)); + request.withStartTimestamp( timestamp ); + } + return executeAndCreateResponse( request, callback ); } @@ -214,7 +269,6 @@ public ApiResponse rebuildIndexesPost( @PathParam( "applicationId" ) final Strin public ApiResponse rebuildIndexesPut( final Map payload, @PathParam( "applicationId" ) final String applicationIdStr, @PathParam( "collectionName" ) final String collectionName, - @QueryParam( "reverse" ) @DefaultValue( "false" ) final Boolean reverse, @QueryParam( "callback" ) @DefaultValue( "callback" ) String callback ) throws Exception { @@ -350,7 +404,15 @@ private ApiResponse executeAndCreateResponse( final ReIndexRequestBuilder reques final ApiResponse response = createApiResponse(); response.setAction( "rebuild indexes" ); - response.setProperty( "jobId", status.getJobId() ); + + if(StringUtils.isNotEmpty(status.getJobId())){ + response.setProperty( "jobId", status.getJobId() ); + } + + if(StringUtils.isNotEmpty(status.getCollectionName())){ + response.setProperty( "collection", status.getCollectionName() ); + } + response.setProperty( "status", status.getStatus() ); response.setProperty( "lastUpdatedEpoch", status.getLastUpdated() ); response.setProperty( "numberQueued", status.getNumberProcessed() );