diff --git a/asset-model-management/pom.xml b/asset-model-management/pom.xml
new file mode 100644
index 0000000000..44a9f431c7
--- /dev/null
+++ b/asset-model-management/pom.xml
@@ -0,0 +1,87 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.streampipes
+ streampipes-parent
+ 0.97.0-SNAPSHOT
+
+
+ asset-model-management
+
+
+ 17
+ 17
+ UTF-8
+
+
+
+
+
+
+
+ org.apache.streampipes
+ streampipes-storage-api
+ 0.97.0-SNAPSHOT
+
+
+ org.apache.streampipes
+ streampipes-storage-management
+ 0.97.0-SNAPSHOT
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-databind
+
+
+
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.mockito
+ mockito-core
+ test
+
+
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-checkstyle-plugin
+
+
+ checkstyle.config.base.path=${project.parent.basedir}/tools/maven
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java b/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
new file mode 100644
index 0000000000..5a0b0e72df
--- /dev/null
+++ b/asset-model-management/src/main/java/org/apache/streampipes/assetmodel/management/AssetModelManagement.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.assetmodel.management;
+
+import org.apache.streampipes.model.assets.SpAssetModel;
+import org.apache.streampipes.storage.api.IGenericStorage;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * This class provides convinience methods to work with asset models
+ */
+public class AssetModelManagement {
+
+ private final IGenericStorage genericStorage;
+ private final ObjectMapper objectMapper;
+
+ public AssetModelManagement(IGenericStorage genericStorage) {
+ this.genericStorage = genericStorage;
+ this.objectMapper = new ObjectMapper();
+ }
+
+ /**
+ * Retrieves all asset models from generic storage and converts them to a list of asset models.
+ *
+ * @return a list of asset models
+ */
+ public List findAll() throws IOException {
+
+ try {
+ return genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)
+ .stream()
+ .map(this::convertMapToAssetModel)
+ .toList();
+ } catch (IOException e) {
+ throw new IOException("Error while fetching all asset models from generic storage.", e);
+ }
+ }
+
+
+ /**
+ * Retrieves a single asset model by its ID.
+ *
+ * @param assetId the ID of the asset model to retrieve
+ * @return the asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel findOne(String assetId) throws NoSuchElementException, IOException {
+ var assetModelData = genericStorage.findOne(assetId);
+
+ if (assetModelData == null) {
+ throw new NoSuchElementException("Asset model with ID " + assetId + " not found.");
+ }
+
+ return this.convertMapToAssetModel(assetModelData);
+ }
+
+ /**
+ * Creates a new asset model.
+ *
+ * @param asset the asset model to create
+ * @return the created asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel create(String asset) throws IOException {
+ var assetModelInstanceInDatabase = genericStorage.create(asset);
+
+ return this.convertMapToAssetModel(assetModelInstanceInDatabase);
+ }
+
+ /**
+ *
+ * @param assetId the ID of the asset model to update
+ * @param assetModel the updated asset model
+ * @return the updated asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel update(String assetId, SpAssetModel assetModel) throws IOException {
+ var assetModelAsJson = this.convertAssetModelToJson(assetModel);
+ return update(assetId, assetModelAsJson);
+ }
+
+ /**
+ * Updates an existing asset model.
+ *
+ * @param assetId the ID of the asset model to update
+ * @param assetModelJson the updated asset model as a JSON string
+ * @return the updated asset model
+ * @throws IOException if an I/O error occurs
+ */
+ public SpAssetModel update(String assetId, String assetModelJson) throws IOException {
+ var updatedAssetModelAsMap = genericStorage.update(assetId, assetModelJson);
+ return this.convertMapToAssetModel(updatedAssetModelAsMap);
+ }
+
+ /**
+ * Deletes an asset model by its ID and revision.
+ *
+ * @param assetId the ID of the asset model to delete
+ * @param rev the revision of the asset model to delete
+ * @throws IOException if an I/O error occurs
+ */
+ public void delete(String assetId, String rev) throws IOException {
+ genericStorage.delete(assetId, rev);
+ }
+
+ private SpAssetModel convertMapToAssetModel(Map assetModelMap) {
+ return objectMapper.convertValue(assetModelMap, SpAssetModel.class);
+ }
+
+ private String convertAssetModelToJson(SpAssetModel assetModel) throws JsonProcessingException {
+ return objectMapper.writeValueAsString(assetModel);
+ }
+
+}
diff --git a/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java b/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
new file mode 100644
index 0000000000..de52be285b
--- /dev/null
+++ b/asset-model-management/src/test/java/org/apache/streampipes/assetmodel/management/AssetModelManagementTest.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.streampipes.assetmodel.management;
+
+import org.apache.streampipes.model.assets.SpAssetModel;
+import org.apache.streampipes.storage.api.IGenericStorage;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+class AssetModelManagementTest {
+
+ private static final String SAMPLE_ASSET_MODEL_ID = "1";
+ private static final String SAMPLE_ASSET_MODEL_NAME = "Asset1";
+ private static final Map SAMPLE_ASSET_MODEL_AS_MAP = Map.of(
+ "_id",
+ SAMPLE_ASSET_MODEL_ID,
+ "assetName",
+ SAMPLE_ASSET_MODEL_NAME
+ );
+ private static final String SAMPLE_ASSET_MODEL_AS_JSON = """
+ {
+ "_id": "SAMPLE_ASSET_MODEL_ID",
+ "assetName": "SAMPLE_ASSET_MODEL_NAME"
+ }
+ """;
+
+ private static final String REV = "1";
+
+ private IGenericStorage genericStorage;
+ private AssetModelManagement assetModelManagement;
+
+ @BeforeEach
+ void setUp() {
+ genericStorage = Mockito.mock(IGenericStorage.class);
+ assetModelManagement = new AssetModelManagement(genericStorage);
+ }
+
+ @Test
+ void findAll_ReturnsListOfAssetModels() throws IOException {
+ when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenReturn(List.of(SAMPLE_ASSET_MODEL_AS_MAP));
+
+ var result = assetModelManagement.findAll();
+
+ assertEquals(1, result.size());
+ assertEquals(
+ SAMPLE_ASSET_MODEL_ID,
+ result.get(0)
+ .getId()
+ );
+ assertEquals(
+ SAMPLE_ASSET_MODEL_NAME,
+ result.get(0)
+ .getAssetName()
+ );
+ }
+
+ @Test
+ void findAll_ReturnsEmptyListWhenNoData() throws IOException {
+ when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenReturn(List.of());
+
+ var result = assetModelManagement.findAll();
+
+ assertTrue(result.isEmpty());
+ }
+
+ @Test
+ void findAll_ThrowsIOException() throws IOException {
+ when(genericStorage.findAll(SpAssetModel.APP_DOC_TYPE)).thenThrow(new IOException());
+
+ assertThrows(IOException.class, () -> assetModelManagement.findAll());
+ }
+
+
+ @Test
+ void findOne_ReturnsAssetModel() throws IOException {
+ when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void findOne_ThrowsIOException() throws IOException {
+ when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenThrow(new IOException());
+
+ assertThrows(IOException.class, () -> assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID));
+ }
+
+ @Test
+ void findOne_ReturnsNoSuchElementExceptionWhenNotFound() throws IOException {
+ when(genericStorage.findOne(SAMPLE_ASSET_MODEL_ID)).thenReturn(null);
+
+ assertThrows(NoSuchElementException.class, () -> assetModelManagement.findOne(SAMPLE_ASSET_MODEL_ID));
+ }
+
+ @Test
+ void create_ReturnsCreatedAssetModel() throws IOException {
+ when(genericStorage.create(SAMPLE_ASSET_MODEL_NAME)).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.create(SAMPLE_ASSET_MODEL_NAME);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void create_ThrowsIOException() throws IOException {
+ when(genericStorage.create(SAMPLE_ASSET_MODEL_AS_JSON)).thenThrow(new IOException());
+
+ assertThrows(IOException.class, () -> assetModelManagement.create(SAMPLE_ASSET_MODEL_AS_JSON));
+ }
+
+
+ @Test
+ void update_ReturnsUpdatedAssetModel() throws IOException {
+ var assetModelToUpdate = new SpAssetModel();
+ assetModelToUpdate.setId(SAMPLE_ASSET_MODEL_ID);
+ assetModelToUpdate.setAssetName(SAMPLE_ASSET_MODEL_NAME);
+
+ when(genericStorage.update(any(), any())).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, assetModelToUpdate);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void update_GenericStorageThrowsIOException() throws IOException {
+ when(genericStorage.update(any(), any())).thenThrow(new IOException());
+
+ assertThrows(IOException.class, () -> assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, new SpAssetModel()));
+ }
+
+ @Test
+ void update_ReturnsUpdatedAssetModelFromJson() throws IOException {
+ when(genericStorage.update(any(), any())).thenReturn(SAMPLE_ASSET_MODEL_AS_MAP);
+
+ var result = assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, SAMPLE_ASSET_MODEL_AS_JSON);
+
+ assertEquals(SAMPLE_ASSET_MODEL_ID, result.getId());
+ assertEquals(SAMPLE_ASSET_MODEL_NAME, result.getAssetName());
+ }
+
+ @Test
+ void update_ThrowsIOExceptionWhenUpdatingFromJson() throws IOException {
+ when(genericStorage.update(any(), any())).thenThrow(new IOException());
+
+ assertThrows(
+ IOException.class,
+ () -> assetModelManagement.update(SAMPLE_ASSET_MODEL_ID, SAMPLE_ASSET_MODEL_AS_JSON)
+ );
+ }
+
+ @Test
+ void delete_RemovesAssetModel() throws IOException {
+ doNothing().when(genericStorage)
+ .delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+ assetModelManagement.delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+ verify(genericStorage, times(1)).delete(SAMPLE_ASSET_MODEL_ID, REV);
+ }
+
+ @Test
+ void delete_ThrowsIOException() throws IOException {
+ doThrow(new IOException()).when(genericStorage)
+ .delete(SAMPLE_ASSET_MODEL_ID, REV);
+
+ assertThrows(IOException.class, () -> assetModelManagement.delete(SAMPLE_ASSET_MODEL_ID, REV));
+ }
+
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 455675231a..b32678f846 100644
--- a/pom.xml
+++ b/pom.xml
@@ -872,6 +872,7 @@
streampipes-wrapper-kafka-streams
streampipes-wrapper-siddhi
streampipes-wrapper-standalone
+ asset-model-management
diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
index 79310b96b1..6b84e60847 100644
--- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
+++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterMasterManagement.java
@@ -116,72 +116,86 @@ public AdapterDescription getAdapter(String elementId) throws AdapterException {
}
/**
- * First the adapter is stopped removed, then the corresponding data source is deleted
+ * This method deletes the adapter and the related resources inlcuding the data stream, and the asset links in the
+ * asset model
*
* @param elementId The elementId of the adapter instance
- * @throws AdapterException when adapter can not be stopped
*/
- public void deleteAdapter(String elementId) throws AdapterException {
+ public void deleteAdapter(String elementId) {
- // Stop stream adapter
+ var adapterDescription = getAdapterDescription(elementId);
+
+ stopAdapterWithLogging(elementId);
+
+ deleteAdaterFromCouchDbAndFromLoggingService(elementId);
+
+ deleteCorrespondingDataStream(adapterDescription);
+ }
+
+ private void stopAdapterWithLogging(String elementId) {
+ LOG.info("Attempting to stop adapter: {}", elementId);
try {
- stopStreamAdapter(elementId);
+ stopAdapter(elementId);
+ LOG.info("Successfully stopped adapter with id: {}", elementId);
} catch (AdapterException e) {
- LOG.info("Could not stop adapter: " + elementId, e);
+ LOG.error("Failed to stop adapter with id: {}", elementId, e);
}
+ }
- AdapterDescription adapter = adapterInstanceStorage.getElementById(elementId);
- // Delete adapter
+ private void deleteAdaterFromCouchDbAndFromLoggingService(String elementId) {
adapterResourceManager.delete(elementId);
ExtensionsLogProvider.INSTANCE.remove(elementId);
- LOG.info("Successfully deleted adapter: " + elementId);
+ LOG.info("Successfully deleted adapter in couchdb: {}", elementId);
+ }
- // Delete data stream
- this.dataStreamResourceManager.delete(adapter.getCorrespondingDataStreamElementId());
- LOG.info("Successfully deleted data stream: " + adapter.getCorrespondingDataStreamElementId());
+ private void deleteCorrespondingDataStream(AdapterDescription adapterDescription) {
+ var correspondingDataStreamElementId = adapterDescription.getCorrespondingDataStreamElementId();
+ dataStreamResourceManager.delete(correspondingDataStreamElementId);
+ LOG.info("Successfully deleted data stream in couchdb: {}", correspondingDataStreamElementId);
}
public List getAllAdapterInstances() {
return adapterInstanceStorage.findAll();
}
- public void stopStreamAdapter(String elementId) throws AdapterException {
- AdapterDescription ad = adapterInstanceStorage.getElementById(elementId);
- WorkerRestClient.stopStreamAdapter(ad.getSelectedEndpointUrl(), ad);
+ public void stopAdapter(String elementId) throws AdapterException {
+ var adapterDescription = getAdapterDescription(elementId);
+
+ WorkerRestClient.stopStreamAdapter(adapterDescription.getSelectedEndpointUrl(), adapterDescription);
ExtensionsLogProvider.INSTANCE.reset(elementId);
// remove the adapter from the metrics manager so that
// no metrics for this adapter are exposed anymore
try {
- adapterMetrics.remove(ad.getElementId(), ad.getName());
+ adapterMetrics.remove(adapterDescription.getElementId(), adapterDescription.getName());
} catch (NoSuchElementException e) {
- LOG.error("Could not remove adapter metrics for adapter {}", ad.getName());
+ LOG.error("Could not remove adapter metrics for adapter {}", adapterDescription.getName());
}
}
public void startStreamAdapter(String elementId) throws AdapterException {
- var ad = adapterInstanceStorage.getElementById(elementId);
+ var adapterDescription = getAdapterDescription(elementId);
try {
// Find endpoint to start adapter on
var baseUrl = new ExtensionsServiceEndpointGenerator().getEndpointBaseUrl(
- ad.getAppId(),
+ adapterDescription.getAppId(),
SpServiceUrlProvider.ADAPTER,
- ad.getDeploymentConfiguration()
- .getDesiredServiceTags()
+ adapterDescription.getDeploymentConfiguration()
+ .getDesiredServiceTags()
);
// Update selected endpoint URL of adapter
- ad.setSelectedEndpointUrl(baseUrl);
- adapterInstanceStorage.updateElement(ad);
+ adapterDescription.setSelectedEndpointUrl(baseUrl);
+ adapterInstanceStorage.updateElement(adapterDescription);
// Invoke adapter instance
WorkerRestClient.invokeStreamAdapter(baseUrl, elementId);
// register the adapter at the metrics manager so that the AdapterHealthCheck can send metrics
- adapterMetrics.register(ad.getElementId(), ad.getName());
+ adapterMetrics.register(adapterDescription.getElementId(), adapterDescription.getName());
LOG.info("Started adapter " + elementId + " on: " + baseUrl);
} catch (NoServiceEndpointsAvailableException e) {
@@ -200,4 +214,8 @@ private void installDataSource(
throw new AdapterException();
}
}
+
+ private AdapterDescription getAdapterDescription(String elementId) {
+ return adapterInstanceStorage.getElementById(elementId);
+ }
}
diff --git a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
index b64b661495..6d97b2dcc1 100644
--- a/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
+++ b/streampipes-connect-management/src/main/java/org/apache/streampipes/connect/management/management/AdapterUpdateManagement.java
@@ -65,7 +65,7 @@ public void updateAdapter(AdapterDescription ad)
boolean shouldRestart = ad.isRunning();
if (ad.isRunning()) {
- this.adapterMasterManagement.stopStreamAdapter(ad.getElementId());
+ this.adapterMasterManagement.stopAdapter(ad.getElementId());
}
// update data source in database
diff --git a/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java b/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
index 0140eafdf5..8f493efff8 100644
--- a/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
+++ b/streampipes-model/src/main/java/org/apache/streampipes/model/assets/SpAssetModel.java
@@ -20,9 +20,11 @@
import org.apache.streampipes.commons.constants.GenericDocTypes;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.gson.annotations.SerializedName;
+@JsonIgnoreProperties(ignoreUnknown = true)
public class SpAssetModel extends SpAsset {
public static final String APP_DOC_TYPE = GenericDocTypes.DOC_ASSET_MANGEMENT;
@@ -30,6 +32,9 @@ public class SpAssetModel extends SpAsset {
@JsonProperty("_id")
private @SerializedName("_id") String id;
+ @JsonProperty("_rev")
+ private @SerializedName("_rev") String rev;
+
private boolean removable;
public SpAssetModel() {
@@ -48,6 +53,14 @@ public boolean isRemovable() {
return removable;
}
+ public String getRev() {
+ return rev;
+ }
+
+ public void setRev(String rev) {
+ this.rev = rev;
+ }
+
public void setRemovable(boolean removable) {
this.removable = removable;
}
diff --git a/streampipes-rest/pom.xml b/streampipes-rest/pom.xml
index 43f2d74594..2cacb72d0b 100644
--- a/streampipes-rest/pom.xml
+++ b/streampipes-rest/pom.xml
@@ -29,6 +29,11 @@
+
+ org.apache.streampipes
+ asset-model-management
+ 0.97.0-SNAPSHOT
+
org.apache.streampipes
streampipes-commons
@@ -117,6 +122,7 @@
mockito-core
test
+
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
index 0c88e461b6..44fc94d6ed 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/ResetManagement.java
@@ -19,7 +19,6 @@
package org.apache.streampipes.rest;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
-import org.apache.streampipes.commons.exceptions.connect.AdapterException;
import org.apache.streampipes.commons.prometheus.adapter.AdapterMetricsManager;
import org.apache.streampipes.connect.management.management.AdapterMasterManagement;
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
@@ -116,11 +115,7 @@ private static void stopAndDeleteAllAdapters() {
List allAdapters = adapterMasterManagement.getAllAdapterInstances();
allAdapters.forEach(adapterDescription -> {
- try {
- adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
- } catch (AdapterException e) {
- logger.error("Failed to delete adapter with id: " + adapterDescription.getElementId(), e);
- }
+ adapterMasterManagement.deleteAdapter(adapterDescription.getElementId());
});
}
diff --git a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
index f15531ec27..de1247e2fd 100644
--- a/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
+++ b/streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/AssetManagementResource.java
@@ -18,10 +18,11 @@
package org.apache.streampipes.rest.impl;
+import org.apache.streampipes.assetmodel.management.AssetModelManagement;
+import org.apache.streampipes.model.assets.SpAssetModel;
import org.apache.streampipes.rest.core.base.impl.AbstractAuthGuardedRestResource;
import org.apache.streampipes.rest.security.AuthConstants;
import org.apache.streampipes.rest.shared.exception.SpMessageException;
-import org.apache.streampipes.storage.api.IGenericStorage;
import org.apache.streampipes.storage.management.StorageDispatcher;
import org.slf4j.Logger;
@@ -41,7 +42,7 @@
import java.io.IOException;
import java.util.List;
-import java.util.Map;
+import java.util.NoSuchElementException;
@RestController
@RequestMapping("/api/v2/assets")
@@ -49,12 +50,18 @@ public class AssetManagementResource extends AbstractAuthGuardedRestResource {
private static final Logger LOG = LoggerFactory.getLogger(AssetManagementResource.class);
- private static final String APP_DOC_TYPE = "asset-management";
+ private final AssetModelManagement assetModelManagement;
+
+ public AssetManagementResource() {
+ var genericStorage = StorageDispatcher.INSTANCE.getNoSqlStore()
+ .getGenericStorage();
+ assetModelManagement = new AssetModelManagement(genericStorage);
+ }
@GetMapping(produces = MediaType.APPLICATION_JSON_VALUE)
@PreAuthorize(AuthConstants.HAS_READ_ASSETS_PRIVILEGE)
- public List