Skip to content

Commit

Permalink
✨ Enhance JS service deploy/undeploy mechanism: Trigger on any file m…
Browse files Browse the repository at this point in the history
…odification for more efficient updates

#529
  • Loading branch information
ujibang committed Oct 18, 2024
1 parent f74cc49 commit 0727fa8
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 64 deletions.
147 changes: 147 additions & 0 deletions commons/src/main/java/org/restheart/utils/DirectoryWatcher.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*-
* ========================LICENSE_START=================================
* restheart-polyglot
* %%
* Copyright (C) 2020 - 2024 SoftInstigate
* %%
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
* =========================LICENSE_END==================================
*/
package org.restheart.utils;

import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
import java.nio.file.WatchEvent;
import java.nio.file.WatchEvent.Kind;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.HashMap;
import java.util.Map;
import java.util.function.BiConsumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DirectoryWatcher implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(DirectoryWatcher.class);

private final WatchService watchService;
private final Map<WatchKey, Path> keys;
private final BiConsumer<Path, Kind<Path>> onEvent;

public DirectoryWatcher(Path rootDir, BiConsumer<Path, Kind<Path>> onEvent) throws IOException {
this.watchService = FileSystems.getDefault().newWatchService();
this.keys = new HashMap<>(); // Mapping WatchKeys to the corresponding directory
this.onEvent = onEvent;
registerDirectoryAndSubdirectories(rootDir);
}

private void registerDirectoryAndSubdirectories(Path dir) throws IOException {
// Skip directories named "node_modules"
if (dir.getFileName().toString().equals("node_modules")) {
LOGGER.debug("Skipping directory: {}", dir);
return;
}

// Register the directory itself
WatchKey key = dir.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
keys.put(key, dir);

// Walk through and register all subdirectories except "node_modules"
Files.walk(dir).filter(Files::isDirectory).forEach(d -> {
try {
if (!d.getFileName().toString().equals("node_modules")) {
WatchKey subKey = d.register(watchService, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY);
keys.put(subKey, d);
}
} catch (IOException e) {
LambdaUtils.throwsSneakyException(e);
}
});
}

@Override
public void run() {
while (true) {
WatchKey key;

try {
key = watchService.take(); // This will block until an event occurs
} catch (InterruptedException e) {
return;
}

var dir = keys.get(key);

if (dir == null) {
LOGGER.debug("WatchKey not recognized!");
continue;
}

for (WatchEvent<?> event : key.pollEvents()) {
var kind = event.kind();

// Context for the event is the file or directory that was affected
var ev = (WatchEvent<Path>) event;
var name = ev.context();
var child = dir.resolve(name);

if (kind.equals(ENTRY_CREATE) && Files.isDirectory(child)) {
LOGGER.debug("Directory created: {}", child);
try {
if (!child.getFileName().toString().equals("node_modules")) {
registerDirectoryAndSubdirectories(child);
}
} catch (IOException e) {
LambdaUtils.throwsSneakyException(e);
}
this.onEvent.accept(child, ENTRY_CREATE);
} else if (kind.equals(ENTRY_MODIFY) && Files.isRegularFile(child)) {
LOGGER.debug("File modified: {}", child);
this.onEvent.accept(child, ENTRY_MODIFY);
} else if (kind.equals(ENTRY_DELETE)) {
LOGGER.debug("File or directory deleted: {}", child);
this.onEvent.accept(child, ENTRY_DELETE);

// If a directory is deleted, we should stop watching it
if (Files.isDirectory(child)) {
var childKey = keys.entrySet().stream()
.filter(entry -> entry.getValue().equals(child))
.map(Map.Entry::getKey)
.findFirst().orElse(null);

if (childKey != null) {
childKey.cancel();
keys.remove(childKey);
}
}
}
}

// Reset the key -- this step is critical to receive further watch events
boolean valid = key.reset();
if (!valid) {
keys.remove(key); // Remove the key if it is no longer valid
if (keys.isEmpty()) {
break; // Exit if there are no more directories to watch
}
}
}
}
}
13 changes: 10 additions & 3 deletions examples/credit-card-hider/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions examples/js-plugin/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 3 additions & 8 deletions examples/js-plugin/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,19 @@
"name": "restheart-example-js-plugins",
"version": "1.0.0",
"description": "example JavaScript Plugins for RESTHeart",
"main": "index.mjs",
"dependencies": {
"moment": "^2.29.4",
"one-liner-joke": "^1.2.0"
},
"devDependencies": {},
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "",
"author": "Andrea Di Cesare [email protected]",
"license": "Apache-2.0",
"rh:services": [
"hello.mjs",
"sub/hello.mjs",
"require-module-service.mjs",
"mclient-service.mjs",
"http-client.mjs",
"test.mjs"
"test.mjs",
"require-module-service.mjs"
],
"rh:interceptors": [
"hello-interceptor.mjs",
Expand Down
14 changes: 14 additions & 0 deletions examples/node-plugin/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public void release(Context ctx) {

public static Context newContext(Engine engine, String name, Configuration conf, Logger LOGGER, Optional<MongoClient> mclient, String modulesReplacements, Map<String, String> OPTS) {
if (modulesReplacements!= null) {
LOGGER.debug("modules-replacements: {} ", modulesReplacements);
LOGGER.trace("modules-replacements: {} ", modulesReplacements);
OPTS.put("js.commonjs-core-modules-replacements", modulesReplacements);
} else {
OPTS.remove("js.commonjs-core-modules-replacements");
Expand Down
82 changes: 35 additions & 47 deletions polyglot/src/main/java/org/restheart/polyglot/PolyglotDeployer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardWatchEventKinds;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -68,6 +67,7 @@
import org.restheart.polyglot.services.JSService;
import org.restheart.polyglot.services.JSStringService;
import org.restheart.polyglot.services.NodeService;
import org.restheart.utils.DirectoryWatcher;
import org.restheart.utils.ThreadsUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -150,47 +150,34 @@ private void deployAll(Path pluginsDirectory) {
}
}

private void watch(Path pluginsDirectory) {
try {
var watchService = FileSystems.getDefault().newWatchService();
private Path pluginPathFromEvent(Path pluginsDirectory, Path path) {
// Get the relative path between pluginsDirectory and subdirectory
var relativePath = pluginsDirectory.relativize(path);

pluginsDirectory.register(watchService, StandardWatchEventKinds.ENTRY_CREATE, StandardWatchEventKinds.ENTRY_MODIFY, StandardWatchEventKinds.ENTRY_DELETE);
// Return the first element from the relative path (i.e., the pluginDirectory)
return pluginsDirectory.resolve(relativePath.getName(0));
}

ThreadsUtils.virtualThreadsExecutor().execute(() -> { while(true) {
private void watch(Path pluginsDirectory) {
try {
final var watcher = new DirectoryWatcher(pluginsDirectory, (path, kind) -> {
try {
var key = watchService.take();
while (key != null) {
for (var event : key.pollEvents()) {
var eventContext = event.context();
var pluginPath = pluginsDirectory.resolve(eventContext.toString());

LOGGER.trace("fs event {} {}", event.kind(), eventContext.toString());

if (event.kind() == StandardWatchEventKinds.ENTRY_CREATE) {
try {
deploy(findServices(pluginPath), findNodeServices(pluginPath), findInterceptors(pluginPath));
} catch (Throwable t) {
LOGGER.error("Error deploying {}", pluginPath.toAbsolutePath(), t);
}
} else if (event.kind() == StandardWatchEventKinds.ENTRY_MODIFY) {
try {
undeploy(pluginPath);
deploy(findServices(pluginPath), findNodeServices(pluginPath), findInterceptors(pluginPath));
} catch (Throwable t) {
LOGGER.warn("Error updating {}", pluginPath.toAbsolutePath(), t);
}
} else if (event.kind() == StandardWatchEventKinds.ENTRY_DELETE) {
undeploy(pluginPath);
}
var pluginPath = pluginPathFromEvent(pluginsDirectory, path);
switch (kind.name()) {
case "ENTRY_CREATE" -> deploy(findServices(pluginPath), findNodeServices(pluginPath), findInterceptors(pluginPath));
case "ENTRY_DELETE" -> undeploy(pluginPath);
case "ENTRY_MODIFY" -> {
undeploy(pluginPath);
deploy(findServices(pluginPath), findNodeServices(pluginPath), findInterceptors(pluginPath));
}

key.reset();
default -> {}
}
} catch (InterruptedException ex) {
LOGGER.error("Error watching {}", pluginsDirectory.toAbsolutePath(), ex);
Thread.currentThread().interrupt();
} catch (IOException | InterruptedException ex) {
LOGGER.error("Error handling fs event {} for file {}", kind, path, ex);
}
}});
});

ThreadsUtils.virtualThreadsExecutor().execute(watcher);
} catch (IOException ex) {
LOGGER.error("Error watching: {}", pluginsDirectory.toAbsolutePath(), ex);
}
Expand Down Expand Up @@ -383,10 +370,13 @@ var record = new PluginRecord<Service<? extends ServiceRequest<?>, ? extends Ser

DEPLOYEES.put(pluginPath.toAbsolutePath(), srv);

LOGGER.info(ansi().fg(GREEN).a("URI {} bound to service {}, description: {}, secured: {}, uri match {}").reset().toString(),
srv.uri(), srv.name(), srv.getDescription(), srv.secured(), srv.matchPolicy());
LOGGER.info(ansi().fg(GREEN).a("Service '{}' deployed at URI '{}' with description: '{}'. Secured: {}. Uri match policy: {}").reset().toString(), srv.name(), srv.uri(), srv.getDescription(), srv.secured(), srv.matchPolicy());
} catch(IllegalArgumentException | IllegalStateException e) {
LOGGER.error("Error deploying plugin {}", pluginPath, e);
if (e.getMessage().contains("require is not defined")) {
LOGGER.error("Error deploying plugin {}. Resolution: Try running 'npm install' for required dependencies.", pluginPath);
} else {
LOGGER.error("Error deploying plugin {}", pluginPath, e);
}
}
}

Expand All @@ -407,15 +397,13 @@ private void deployNodeService(Path pluginPath) throws IOException {

var srv = srvf.get();

var record = new PluginRecord<Service<? extends ServiceRequest<?>, ? extends ServiceResponse<?>>>(srv.name(), "description", srv.secured(), true,
srv.getClass().getName(), srv, new HashMap<>());
var record = new PluginRecord<Service<? extends ServiceRequest<?>, ? extends ServiceResponse<?>>>(srv.name(), "description", srv.secured(), true, srv.getClass().getName(), srv, new HashMap<>());

registry.plugService(record, srv.uri(), srv.matchPolicy(), srv.secured());

DEPLOYEES.put(pluginPath.toAbsolutePath(), srv);

LOGGER.info(ansi().fg(GREEN).a("URI {} bound to service {}, description: {}, secured: {}, uri match {}").reset().toString(),
srv.uri(), srv.name(), srv.getDescription(), srv.secured(), srv.matchPolicy());
LOGGER.info(ansi().fg(GREEN).a("Service '{}' deployed at URI '{}' with description: '{}'. Secured: {}. Uri match policy: {}").reset().toString(), srv.name(), srv.uri(), srv.getDescription(), srv.secured(), srv.matchPolicy());
} catch (IOException | InterruptedException | ExecutionException ex) {
LOGGER.error("Error deploying node service {}", pluginPath, ex);
Thread.currentThread().interrupt();
Expand All @@ -437,7 +425,7 @@ private void deployInterceptor(Path pluginPath) throws IOException, InterruptedE

DEPLOYEES.put(pluginPath.toAbsolutePath(), (JSPlugin) interceptorRecord.getInstance());

LOGGER.info(ansi().fg(GREEN).a("Added interceptor {}, description: {}").reset().toString(),
LOGGER.info(ansi().fg(GREEN).a("Interceptor '{}' deployed with description: '{}'").reset().toString(),
interceptorRecord.getName(),
interceptorRecord.getDescription());
}
Expand All @@ -459,7 +447,7 @@ private void undeployServices(Path pluginPath) {
if (_toUndeploy != null && _toUndeploy instanceof JSService toUndeploy) {
registry.unplug(toUndeploy.uri(), toUndeploy.matchPolicy());

LOGGER.info(ansi().fg(GREEN).a("removed service {} bound to URI {}").reset().toString(), toUndeploy.name(), toUndeploy.uri());
LOGGER.info(ansi().fg(GREEN).a("Service '{}' bound to '{}' undeployed").reset().toString(), toUndeploy.name(), toUndeploy.uri());
}
}
}
Expand All @@ -475,9 +463,9 @@ private void undeployInterceptors(Path pluginPath) {
var removed = registry.removeInterceptorIf(interceptor -> Objects.equal(interceptor.getName(), toUndeploy.name()));

if (removed) {
LOGGER.info(ansi().fg(GREEN).a("removed interceptor {}").reset().toString(), toUndeploy.name());
LOGGER.info(ansi().fg(GREEN).a("Interceptor '{}' undeployed").reset().toString(), toUndeploy.name());
} else {
LOGGER.warn("interceptor {} was not removed", toUndeploy.name());
LOGGER.warn("Interceptor {} was not undeployed", toUndeploy.name());
}
}
}
Expand Down
Loading

0 comments on commit 0727fa8

Please sign in to comment.