Skip to content

Commit

Permalink
Change: Changed the way the existence of a package is checked.
Browse files Browse the repository at this point in the history
Added: Checked the existence of some of the metadata to be backuped.
Added more logging to the API Handler
Fix: Fixed a bug in the creation of the package that caused the processor to fail even when everithing was created correctly.

Signed-off-by: Ricardo Ruiz Saiz <[email protected]>
  • Loading branch information
dormstd committed Oct 5, 2018
1 parent 2cb7cc7 commit e4ed4ea
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.http.entity.mime.content.StringBody;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.util.EntityUtils;
import org.apache.log4j.Logger;

import java.io.BufferedReader;
Expand Down Expand Up @@ -70,41 +69,37 @@ public CKAN_API_Handler(String HOST, String api_key)
this.httpclient = HttpClientBuilder.create().build();
}

// ToDo: Check if the package exists marked as delete, then reactivate it?
public boolean packageExists() throws IOException{
public boolean packageExists(String package_id) throws IOException{

String line;
StringBuilder sb = new StringBuilder();
HttpPost postRequest;
Gson gson = new Gson();

HttpEntity reqEntity = MultipartEntityBuilder.create()
.addPart("id",new StringBody(package_id,ContentType.TEXT_PLAIN))
.build();

postRequest = new HttpPost(HOST+"/api/action/package_show");
postRequest.setEntity(reqEntity);
postRequest = new HttpPost(HOST+"/api/3/action/package_search?q=name:"+package_id);
postRequest.setHeader("X-CKAN-API-Key", api_key);

HttpResponse response = httpclient.execute(postRequest);
int statusCode = response.getStatusLine().getStatusCode();

BufferedReader br = new BufferedReader(
new InputStreamReader((response.getEntity().getContent())));
while ((line = br.readLine()) != null) {
sb.append(line);
sb.append("\n");
}

if(statusCode==200)
{
log.info("Package with id "+package_id+" exists");
//Check if that package is deleted

log.info(sb);
return true;
// Parse the response into a POJO to be able to get results from it.
// ToDo: If no result is returned, raise an error (when converting to POJO fails or return code !=200?)
if(statusCode==200) {
CkanFullList CkanFullList = gson.fromJson(sb.toString(), CkanFullList.class);
//by default we get the first package_ of the list of packages
if (CkanFullList.getPackage().getPackages().size() == 1) {
log.info("Package: "+package_id+" was found in CKAN.");
return true;
} else {
log.warn("Package: "+package_id+" not found");
//ToDo: Null, really?
return false;
}
}else{
log.warn("Package with id "+package_id+" not found");
log.warn(sb);
return false;
}
}
Expand Down Expand Up @@ -183,7 +178,7 @@ public Package_ getPackageByName(String name) throws IOException {

}

public void createPackage() throws IOException{
public void createPackage(String package_id) throws IOException{

HttpPost postRequest;
StringBuilder sb = new StringBuilder();
Expand All @@ -203,9 +198,6 @@ public void createPackage() throws IOException{
HttpResponse response = httpclient.execute(postRequest);
int statusCode = response.getStatusLine().getStatusCode();

String message = EntityUtils.toString(response.getEntity());
log.warn("***** Response: "+message);

BufferedReader br = new BufferedReader(
new InputStreamReader((response.getEntity().getContent())));
sb.append(statusCode);
Expand Down Expand Up @@ -233,27 +225,33 @@ public void createPackagePojo(Package_ dataset, String name) throws IOException{
MultipartEntityBuilder multipart = MultipartEntityBuilder.create()
.addPart("name",new StringBody(name,ContentType.TEXT_PLAIN));
// ToDo: Improve this way of handling null values in the returned dataset
if(dataset.getAuthor()!=null) {
multipart.addPart("author", new StringBody(dataset.getAuthor(), ContentType.TEXT_PLAIN));
}
if(dataset.getAuthorEmail()!=null) {
multipart.addPart("author_email", new StringBody(dataset.getAuthorEmail(), ContentType.TEXT_PLAIN));
}
if(dataset.getOwnerOrg()!=null) {
multipart.addPart("owner_org", new StringBody(dataset.getOwnerOrg(), ContentType.TEXT_PLAIN));
}
if(dataset.getNotes()!=null) {
multipart.addPart("notes", new StringBody(dataset.getNotes(), ContentType.TEXT_PLAIN));
}
if(dataset.getPrivate()!=null) {
multipart.addPart("private", new StringBody(dataset.getPrivate().toString(), ContentType.TEXT_PLAIN));
}
if(dataset.getLicenseTitle()!=null) {
multipart.addPart("license_title", new StringBody(dataset.getLicenseTitle(), ContentType.TEXT_PLAIN));
}
if(dataset.getLicenseId()!=null) {
multipart.addPart("license_id", new StringBody(dataset.getLicenseId(), ContentType.TEXT_PLAIN));
}
if(dataset.getAuthor()!=null) {
multipart.addPart("author", new StringBody(dataset.getAuthor(), ContentType.TEXT_PLAIN));
}
if(dataset.getAuthorEmail()!=null) {
multipart.addPart("author_email", new StringBody(dataset.getAuthorEmail(), ContentType.TEXT_PLAIN));
}
if(dataset.getOwnerOrg()!=null) {
multipart.addPart("owner_org", new StringBody(dataset.getOwnerOrg(), ContentType.TEXT_PLAIN));
}
if(dataset.getNotes()!=null) {
multipart.addPart("notes", new StringBody(dataset.getNotes(), ContentType.TEXT_PLAIN));
}
if(dataset.getPrivate()!=null) {
multipart.addPart("private", new StringBody(dataset.getPrivate().toString(), ContentType.TEXT_PLAIN));
}
if(dataset.getLicenseTitle()!=null) {
multipart.addPart("license_title", new StringBody(dataset.getLicenseTitle(), ContentType.TEXT_PLAIN));
}
if(dataset.getLicenseId()!=null) {
multipart.addPart("license_id", new StringBody(dataset.getLicenseId(), ContentType.TEXT_PLAIN));
}
if(dataset.getLicenseTitle() != null){
multipart.addPart("license_title", new StringBody(dataset.getLicenseTitle(), ContentType.TEXT_PLAIN));
}
if(dataset.getMaintainerEmail() != null){
multipart.addPart("maintainer_email", new StringBody(dataset.getMaintainerEmail(), ContentType.TEXT_PLAIN));
}

HttpEntity reqEntity = multipart.build();

Expand Down Expand Up @@ -365,18 +363,29 @@ public void uploadFilePojo(Resource resource, String dataset_name, String resour

HttpPost postRequest;
ContentBody cbFile = new FileBody(file, ContentType.TEXT_HTML);
//ToDo: Handle error when any of the attributes of the resource is null as in the package
HttpEntity reqEntity = MultipartEntityBuilder.create()

MultipartEntityBuilder multipart = MultipartEntityBuilder.create()
.addPart("file", cbFile)
//Cannot use getKey() because sometimes it is empty and causes error (resource with no filename in it)
.addPart("key", new StringBody(resourceFileName.split("\\.")[0],ContentType.TEXT_PLAIN))
.addPart("name", new StringBody(resourceFileName,ContentType.TEXT_PLAIN))
.addPart("url",new StringBody(resource.getUrl(),ContentType.TEXT_PLAIN))
.addPart("package_id",new StringBody(dataset_name,ContentType.TEXT_PLAIN))
.addPart("format",new StringBody(resource.getFormat(),ContentType.TEXT_PLAIN))
.addPart("upload",cbFile)
.addPart("description",new StringBody(resource.getDescription(),ContentType.TEXT_PLAIN))
.build();
.addPart("upload",cbFile);
//ToDo: Add the rest of the fields¿?
if(resource.getUrl() != null){
multipart.addPart("url",new StringBody(resource.getUrl(),ContentType.TEXT_PLAIN));
}
if(resource.getFormat() != null)
{
multipart.addPart("format",new StringBody(resource.getFormat(),ContentType.TEXT_PLAIN));
}
if(resource.getDescription() != null){
multipart.addPart("description",new StringBody(resource.getDescription(),ContentType.TEXT_PLAIN));
}
if(resource.getMimetype() != null)
{
multipart.addPart("mimetype",new StringBody(resource.getMimetype().toString(),ContentType.TEXT_PLAIN));
}
HttpEntity reqEntity = multipart.build();

postRequest = new HttpPost(HOST+"/api/action/resource_create");
postRequest.setEntity(reqEntity);
Expand Down Expand Up @@ -414,9 +423,8 @@ public Boolean createOrUpdateResource(String path) throws IOException {
ResourceResponse resResponse = gson.fromJson(sb.toString(),ResourceResponse.class);
System.out.println(resResponse);

String resource_packageId = resResponse.getResult().getResults().get(0).getPackageId();
String id = resResponse.getResult().getResults().get(0).getId();

String resource_packageId;
String id;
//This is needed to check that the resource belongs to the current package
Package_ foundPackage = getPackageByName(package_id);
String foundPackageId = "Not_found";
Expand All @@ -435,6 +443,8 @@ public Boolean createOrUpdateResource(String path) throws IOException {
//if the count is 1, get all the needed data to update the resource
}else if(resResponse.getResult().getCount()==1)
{
resource_packageId = resResponse.getResult().getResults().get(0).getPackageId();
id = resResponse.getResult().getResults().get(0).getId();
//If the resource's package_id is the same as the current package id (search for package by name and get the id)
if( foundPackage != null && resource_packageId.equals(foundPackageId)) {
log.info("Resource found in the current package, updating it");
Expand Down Expand Up @@ -520,6 +530,8 @@ private void uploadFile(String path) throws IOException {
File file = new File(path);
SimpleDateFormat dateFormatGmt = new SimpleDateFormat("yyyyMMdd_HHmmss");
String date=dateFormatGmt.format(new Date());
StringBuilder sb = new StringBuilder();
String line;

HttpPost postRequest;
ContentBody cbFile = new FileBody(file, ContentType.TEXT_HTML);
Expand All @@ -540,8 +552,19 @@ private void uploadFile(String path) throws IOException {
HttpResponse response = httpclient.execute(postRequest);
int statusCode = response.getStatusLine().getStatusCode();

BufferedReader br = new BufferedReader(
new InputStreamReader((response.getEntity().getContent())));
sb.append(statusCode);
sb.append("\n");
while ((line = br.readLine()) != null) {
sb.append(line);
sb.append("\n");
}

if(statusCode!=200){
log.error("Error creating a resource: "+ file.getName().split("\\.")[0] +"in package:"+package_id);
log.error("statusCode =!=" +statusCode);
log.error(sb);
}
else log.info("Request returns statusCode 200: OK");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
if (!ckan_api_handler.organizationExists()) {
ckan_api_handler.createOrganization();
}
if (!ckan_api_handler.packageExists()) {
ckan_api_handler.createPackage();
if (!ckan_api_handler.packageExists(filenameNoExtension)) {
ckan_api_handler.createPackage(filenameNoExtension);
}
if(ckan_api_handler.createOrUpdateResource(file.toAbsolutePath().toString())) {
if(ckan_api_handler.createOrUpdateResource(file.toFile().toString())) {
getLogger().info("File tried to be uploaded to CKAN: {}", new Object[]{file.toFile().toString()});
session.transfer(flowFile, REL_SUCCESS);
ckan_api_handler.close();
}else
Expand All @@ -216,11 +217,11 @@ public void onTrigger(final ProcessContext context, final ProcessSession session
{
getLogger().log(LogLevel.ERROR, "Error while uploading file {} to CKAN {}: Organization {}.",
new Object[]{file, url, organizationId });
getLogger().error(ioe.toString());
session.transfer(session.penalize(flowFile), REL_FAILURE);
}



// It is critical that we commit the session before we perform the Delete. Otherwise, we could have a case where we
// ingest the file, delete it, and then NiFi is restarted before the session is committed. That would result in data loss.
// As long as we commit the session right here, we are safe.
Expand Down

0 comments on commit e4ed4ea

Please sign in to comment.