Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented Parallel Download of Forms using Multi-Threading and Locks #60

Open
wants to merge 7 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,19 @@ package org.odk.collect.android.formmanagement
import org.odk.collect.android.utilities.FileUtils.copyFile
import org.odk.collect.android.utilities.FileUtils.interuptablyWriteFile
import org.odk.collect.async.OngoingWorkListener
import org.odk.collect.forms.Form
import org.odk.collect.forms.FormSource
import org.odk.collect.forms.FormSourceException
import org.odk.collect.forms.FormsRepository
import org.odk.collect.forms.MediaFile
import org.odk.collect.forms.*
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove * imports and use specific imports.

import org.odk.collect.shared.strings.Md5.getMd5Hash
import java.io.File
import java.io.IOException
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

class FormMediaDownloader(
private val formsRepository: FormsRepository,
private val formSource: FormSource
) {

private val lock: Lock = ReentrantLock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, add a newline.

@Throws(IOException::class, FormSourceException::class, InterruptedException::class)
fun download(
formToDownload: ServerFormDetails,
Expand All @@ -39,7 +38,9 @@ class FormMediaDownloader(
} else {
atLeastOneNewMediaFileDetected = true
val file = formSource.fetchMediaFile(mediaFile.downloadUrl)
lock.lock()
interuptablyWriteFile(file, tempMediaFile, tempDir, stateListener)
lock.unlock()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import javax.annotation.Nullable;
Expand All @@ -46,6 +48,8 @@ public ServerFormDownloader(FormSource formSource, FormsRepository formsReposito
this.clock = clock;
}

private static Lock lock = new ReentrantLock();

@Override
public void downloadForm(ServerFormDetails form, @Nullable ProgressReporter progressReporter, @Nullable Supplier<Boolean> isCancelled) throws FormDownloadException {
Form formOnDevice;
Expand Down Expand Up @@ -94,17 +98,19 @@ private void processOneForm(ServerFormDetails fd, OngoingWorkListener stateListe
// get the xml file
// if we've downloaded a duplicate, this gives us the file
fileResult = downloadXform(fd.getFormName(), fd.getDownloadUrl(), stateListener, tempDir, formsDirPath);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this method is thread safe? If so, how?


// download media files if there are any
lock.unlock();
if (fd.getManifest() != null && !fd.getManifest().getMediaFiles().isEmpty()) {
FormMediaDownloader mediaDownloader = new FormMediaDownloader(formsRepository, formSource);
newAttachmentsDetected = mediaDownloader.download(fd, fd.getManifest().getMediaFiles(), tempMediaPath, tempDir, stateListener);
}
} catch (FormDownloadException.DownloadingInterrupted | InterruptedException e) {
Timber.i(e);
cleanUp(fileResult, tempMediaPath);
lock.unlock();
throw new FormDownloadException.DownloadingInterrupted();
} catch (IOException e) {
lock.unlock();
throw new FormDownloadException.DiskError();
}

Expand Down Expand Up @@ -137,9 +143,12 @@ private void processOneForm(ServerFormDetails fd, OngoingWorkListener stateListe
}

try {
lock.lock();
installEverything(tempMediaPath, fileResult, parsedFields, formsDirPath, newAttachmentsDetected);
lock.unlock();
} catch (FormDownloadException.DiskError e) {
cleanUp(fileResult, tempMediaPath);
lock.unlock();
throw e;
}
}
Expand Down Expand Up @@ -244,7 +253,7 @@ private Form saveNewForm(Map<String, String> formInfo, File formFile, String med
*/
private FileResult downloadXform(String formName, String url, OngoingWorkListener stateListener, File tempDir, String formsDirPath) throws FormSourceException, IOException, FormDownloadException.DownloadingInterrupted, InterruptedException {
InputStream xform = formSource.fetchForm(url);

lock.lock();
String fileName = getFormFileName(formName, formsDirPath);
File tempFormFile = new File(tempDir + File.separator + fileName);
interuptablyWriteFile(xform, tempFormFile, tempDir, stateListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
import static java.util.Collections.emptyMap;

import android.os.AsyncTask;
import android.os.Handler;
import android.os.Looper;

import org.odk.collect.android.R;
import org.odk.collect.android.application.Collect;
Expand All @@ -30,6 +32,9 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Background task for downloading a given list of forms. We assume right now that the forms are
Expand All @@ -53,34 +58,54 @@ public DownloadFormsTask(FormDownloader formDownloader) {
protected Map<ServerFormDetails, FormDownloadException> doInBackground(ArrayList<ServerFormDetails>... values) {
HashMap<ServerFormDetails, FormDownloadException> results = new HashMap<>();

ExecutorService executorService = Executors.newCachedThreadPool();
int index = 1;
for (ServerFormDetails serverFormDetails : values[0]) {
try {
String currentFormNumber = String.valueOf(index);
String totalForms = String.valueOf(values[0].size());
publishProgress(serverFormDetails.getFormName(), currentFormNumber, totalForms);

formDownloader.downloadForm(serverFormDetails, count -> {
String message = getLocalizedString(Collect.getInstance(), R.string.form_download_progress,
serverFormDetails.getFormName(),
String.valueOf(count),
String.valueOf(serverFormDetails.getManifest().getMediaFiles().size())
);

publishProgress(message, currentFormNumber, totalForms);
}, this::isCancelled);

results.put(serverFormDetails, null);
FormEventBus.INSTANCE.formDownloaded(serverFormDetails.getFormId());
} catch (FormDownloadException.DownloadingInterrupted e) {
return emptyMap();
} catch (FormDownloadException e) {
results.put(serverFormDetails, e);
FormEventBus.INSTANCE.formDownloadFailed(serverFormDetails.getFormId(), e.getMessage());
}
String currentFormNumber = String.valueOf(index);
String totalForms = String.valueOf(values[0].size());
publishProgress(serverFormDetails.getFormName(), currentFormNumber, totalForms);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this process is not synchronous now, currentFormNumber will return wrong data when used in a concurrent threaded system.


Thread thread = new Thread() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we limiting resource consumption on this? If there are a large number of forms let's say 1000 forms, this would open 1000 threads which is not exactly resource friendly. We should provide a configurable way to limit the number of threads.

public void run() {

try {
formDownloader.downloadForm(serverFormDetails, count -> {
String message = getLocalizedString(Collect.getInstance(), R.string.form_download_progress,
serverFormDetails.getFormName(),
String.valueOf(count),
String.valueOf(serverFormDetails.getManifest().getMediaFiles().size())
);

publishProgress(message, currentFormNumber, totalForms);

}, DownloadFormsTask.this::isCancelled);
} catch (FormDownloadException e) {
results.put(serverFormDetails, e);
FormEventBus.INSTANCE.formDownloadFailed(serverFormDetails.getFormId(), e.getMessage());
throw new RuntimeException(e);
}

new Handler(Looper.getMainLooper()).post(new Runnable() {
@Override
public void run() {
results.put(serverFormDetails, null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HashMaps are not thread safe.

FormEventBus.INSTANCE.formDownloaded(serverFormDetails.getFormId());
}
});
}
};

executorService.submit(thread);

index++;
}
executorService.shutdown();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorService may shut down before all the threads have completed work, it is not a blocking method. In such a scenario results might not contain all the forms data.

// Wait for all tasks to complete
try {
executorService.awaitTermination(1, TimeUnit.MINUTES);
Copy link
Member

@chinmoy12c chinmoy12c Jun 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the basis for choosing 1 minute for timeout? If there are 500 forms, will those be completed in 500 forms?

} catch (InterruptedException e) {
e.printStackTrace();
}

return results;
}
Expand Down