Skip to content

Commit

Permalink
Change dequeue logic with channels (#80)
Browse files Browse the repository at this point in the history
* Update package.json

* Improve logging

* Improve logging

* Update payment.ts

* fix consumer flow with asyncWith wrapper, fix logs

* Fix double spending information display

* Update index.ts

* Update index.ts

* Update index.ts

* update subnet to `community.3` in the examples

* move logsummary to utils (#55)

* move logsummary to utils

* update js example

* Update package.json

* Clean dependencies with npm script

* Move cancellationToken to Engine (#60)

* More sgx runtime names + simplified code (#64)

* update clean script with rimraf for cross-platform

* version bump

* version bump (#66)

* More like yapapi

* Fix

* Fix

* Fix

* Fix

* Fix

* More fixes

* Fix

* Add timeout

* Fix agreement.confirm

* Fix

* Port invoice.accept-related code from yapapi

* Fix

* Fix

* Fix

* Remove waiting for done workers

* Fix timeout

* Cancel worker, subscription when computation ends (#68)

* remove unused variable

* Fix tasks invoices handling (#70)

* Add checking payments logging

* Improve logging

* Cancel worker_starter_task

* Add PaymentsFinished event; print a nice table with computation results

* Print table when checking payments

* Show table only in debug mode

* Address code review

* Total cost in non-debug mode

* Fix

* Fix

* Fix

* version bump

* Final fix for invoice event (#73)

* Final fix for invoice event

* Fix total cost printing (#74)

* Fix total cost printing

* Fix

Co-authored-by: filipgolem <[email protected]>

* version bump

* version bump

* Change dequeue logic with channels

* Reject get method in case of closed channel

* Remove local path

Co-authored-by: filipgolem <[email protected]>
Co-authored-by: Filip <[email protected]>
Co-authored-by: shadeofblue <[email protected]>
Co-authored-by: Marek Franciszkiewicz <[email protected]>
  • Loading branch information
5 people authored Feb 4, 2021
1 parent 01165ef commit 55c7319
Show file tree
Hide file tree
Showing 6 changed files with 42 additions and 27 deletions.
8 changes: 6 additions & 2 deletions yajsapi/executor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -179,11 +179,15 @@ export class Executor {
let cancellationToken = this._cancellation_token;

this._worker_cancellation_token = new CancellationToken();
let workerCancellationToken = this._worker_cancellation_token;

function cancel(e) {
function cancel(event) {
if (cancellationToken && !cancellationToken.cancelled) {
cancellationToken.cancel();
}
if (workerCancellationToken && !workerCancellationToken.cancelled) {
workerCancellationToken.cancel();
}
SIGNALS.forEach((event) => {
process.off(event, cancel);
});
Expand Down Expand Up @@ -243,7 +247,7 @@ export class Executor {
let activity_api = this._activity_api;
let strategy = this._strategy;
let cancellationToken = this._cancellation_token;
let done_queue: Queue<Task<D, R>> = new Queue([], cancellationToken);
let done_queue: Queue<Task<D, R>> = new Queue([]);
let stream_output = this._stream_output;

function on_task_done(task: Task<D, R>, status: TaskStatus): void {
Expand Down
13 changes: 1 addition & 12 deletions yajsapi/executor/smartq.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import * as csp from "js-csp";
import { eventLoop } from "../utils";
import { eventLoop, promisify } from "../utils";

type Item = "Item";

Expand Down Expand Up @@ -176,14 +176,3 @@ export class Consumer<Item> {
yield* val;
}
}

function promisify(fn) {
return (...args) =>
new Promise((resolve, reject) => {
try {
fn(...args, resolve);
} catch (error) {
reject(error);
}
});
}
5 changes: 3 additions & 2 deletions yajsapi/utils/asyncWrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export default class AsyncWrapper {
cancellationToken
) {
this._wrapped = wrapped;
this._args_buffer = new Queue([], cancellationToken);
this._args_buffer = new Queue([]);
this._task = null;
this._loop = event_loop || eventLoop();
this._cancellationToken = cancellationToken;
Expand All @@ -23,7 +23,7 @@ export default class AsyncWrapper {
while (true) {
if(this._cancellationToken.cancelled) break;
const args = await this._args_buffer.get()
this._wrapped(...args)
if(args) this._wrapped(...args)
// this._args_buffer.task_done()
}
}
Expand All @@ -36,6 +36,7 @@ export default class AsyncWrapper {

async done(): Promise<void> {
// await this._args_buffer.join()
this._args_buffer.close();
if (this._task)
this._task.cancel()
this._task = null
Expand Down
2 changes: 2 additions & 0 deletions yajsapi/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import eventLoop from "./eventLoop";
import getAllProperties from "./getAllProperties";
import logger, * as logUtils from "./log";
import { changeLogLevel } from "./log";
import promisify from './promisify';
import Queue from "./queue";
import range from "./range";
import sleep from "./sleep";
Expand All @@ -24,6 +25,7 @@ export {
getAllProperties,
logger,
logUtils,
promisify,
Queue,
range,
sleep,
Expand Down
10 changes: 10 additions & 0 deletions yajsapi/utils/promisify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
export default function promisify(fn: Function): Function {
return (...args) =>
new Promise((resolve, reject) => {
try {
fn(...args, resolve);
} catch (error) {
reject(error);
}
});
}
31 changes: 20 additions & 11 deletions yajsapi/utils/queue.ts
Original file line number Diff line number Diff line change
@@ -1,36 +1,45 @@
import sleep from "./sleep";
import * as csp from "js-csp";
import promisify from "./promisify";

export default interface Queue<T> {}
export default class Queue<T> {
private _tasks;
private _cancellationToken;
private __new_items;

constructor(list = [], cancellationToken) {
constructor(list = []) {
this._tasks = list;
this._cancellationToken = cancellationToken;
this.__new_items = csp.chan();

if (list.length > 0) {
let first = this._tasks.shift();
first();
}
}

put(item: T) {
if(item === undefined || item === null || this.__new_items.closed) return;
this._tasks.push(item);
csp.putAsync(this.__new_items, true);
}

async get(): Promise<T> {
return new Promise(async (resolve, reject) => {
let item;
while (!item) {
if (this._cancellationToken.cancelled) break;
item = this._tasks.shift();
if (!item) await sleep(2);
if(this.__new_items.closed) reject("new_items channel interrupted");
try {
await promisify(csp.takeAsync)(this.__new_items);
let item = this._tasks.shift();
resolve(item);
} catch (error) {
reject(error);
}
if (this._cancellationToken.cancelled) reject();
resolve(item);
});
}

empty() {
return this._tasks.length === 0;
}

close() {
this.__new_items.close();
}
}

0 comments on commit 55c7319

Please sign in to comment.