Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
theosanderson committed Oct 1, 2024
1 parent d0df1c7 commit 2c35370
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 25 deletions.
5 changes: 3 additions & 2 deletions taxonium_backend/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ var xml2js = require("xml2js");
var axios = require("axios");
var pako = require("pako");
const URL = require("url").URL;
var jsonlParser = require("stream-json/jsonl/Parser").parser;
var streamJson = require("stream-json");
const ReadableWebToNodeStream = require("readable-web-to-node-stream");
const { execSync } = require("child_process");
var importing;
Expand Down Expand Up @@ -507,7 +507,8 @@ const loadData = async () => {
supplied_object,
logStatusMessage,
ReadableWebToNodeStream.ReadableWebToNodeStream,
jsonlParser
streamJson.parser,
streamJson.streamValues
);

logStatusMessage({
Expand Down
7 changes: 4 additions & 3 deletions taxonium_component/src/webworkers/localBackendWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ import {
import { processNewickAndMetadata } from "../utils/processNewick.js";
import { processNextstrain } from "../utils/processNextstrain.js";
import { ReadableWebToNodeStream } from "readable-web-to-node-stream";

import { parser as jsonlParser } from "stream-json/jsonl/Parser";
import { parser } from "stream-json";
import { streamValues } from "stream-json/streamers/StreamValues";

console.log("worker starting");
postMessage({ data: "Worker starting" });
Expand Down Expand Up @@ -214,7 +214,8 @@ onmessage = async (event) => {
data.data,
sendStatusMessage,
ReadableWebToNodeStream,
jsonlParser
parser,
streamValues
);
console.log("processedUploadedData created");
} else if (
Expand Down
42 changes: 22 additions & 20 deletions taxonium_data_handling/importing.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import zlib from "zlib";
import stream from "stream";
import buffer from "buffer";
import { parser } from "stream-json";
import { streamValues } from "stream-json/streamers/StreamValues";

const roundToDp = (number, dp) => {
return Math.round(number * Math.pow(10, dp)) / Math.pow(10, dp);
Expand Down Expand Up @@ -28,13 +30,10 @@ function reduceMaxOrMin(array, accessFunction, maxOrMin) {
}
}

export const setUpStream = (
the_stream,
data,
sendStatusMessage,
jsonlParser
) => {
const pipeline = the_stream.pipe(jsonlParser());
export const setUpStream = (the_stream, data, sendStatusMessage, parser, streamValues) => {
const pipeline = the_stream
.pipe(parser({ jsonStreaming: true }))
.pipe(streamValues());

let line_number = 0;

Expand Down Expand Up @@ -69,18 +68,21 @@ export const setUpStream = (

pipeline.on("error", function (err) {
console.log(err);
sendStatusMessage({ error: `Stream error: ${err.message}` });
});

pipeline.on("end", function () {
console.log("end");
console.log("Stream processing completed.");
sendStatusMessage({ message: "Stream processing completed." });
});
};

export const processJsonl = async (
jsonl,
sendStatusMessage,
ReadableWebToNodeStream,
jsonlParser
parser,
streamValues
) => {
console.log("Worker processJsonl");
const data = jsonl.data;
Expand All @@ -92,11 +94,11 @@ export const processJsonl = async (
the_stream = new stream.PassThrough();
}
let new_data = {};
setUpStream(the_stream, new_data, sendStatusMessage, jsonlParser);
setUpStream(the_stream, new_data, sendStatusMessage, parser, streamValues);

if (status === "loaded") {
const dataAsArrayBuffer = data;
let chunkSize = 5 * 1024 * 1024;
let chunkSize = 5 * 1024 * 1024; // 5 MB chunks
for (let i = 0; i < dataAsArrayBuffer.byteLength; i += chunkSize) {
const chunk = dataAsArrayBuffer.slice(i, i + chunkSize);
const chunkAsBuffer = buffer.Buffer.from(chunk);
Expand All @@ -107,15 +109,15 @@ export const processJsonl = async (
} else if (status === "url_supplied") {
const url = jsonl.filename;
let response;
console.log("STARTING FETCH");
console.log("Starting fetch from URL:", url);
try {
response = await fetch(url);
} catch (error) {
console.log("Fetch error", error);
sendStatusMessage({ error: `Fetch error: ${error}` });
return;
}
console.log("ALL FINE", response);
console.log("Fetch successful", response);
sendStatusMessage({ message: "Loading root genome" });

const readableWebStream = response.body;
Expand All @@ -133,21 +135,21 @@ export const processJsonl = async (
the_stream.on("end", resolve);
the_stream.on("error", reject);
});
console.log("done with stream");
console.log("Done with stream");

const scale_y =
24e2 /
(new_data.nodes.length > 10e3
2400 /
(new_data.nodes.length > 10000
? new_data.nodes.length
: new_data.nodes.length * 0.6666);
console.log("Scaling");
console.log("Scaling y positions");
for (const node of new_data.nodes) {
node.y = roundToDp(node.y * scale_y, 6);
}
console.log("Calculating y positions");
const y_positions = new_data.nodes.map((node) => node.y);

console.log("Calculating coord extremes");
console.log("Calculating coordinate extremes");

const overallMaxY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "max");
const overallMinY = reduceMaxOrMin(new_data.nodes, (node) => node.y, "min");
Expand All @@ -166,7 +168,7 @@ export const processJsonl = async (
const rootMutations = root.mutations;
root.mutations = [];

console.log("Creating output obj");
console.log("Creating output object");

const overwrite_config = new_data.header.config ? new_data.header.config : {};
overwrite_config.num_tips = root.num_tips;
Expand Down Expand Up @@ -231,7 +233,7 @@ export const generateConfig = (config, processedUploadedData) => {
? ["x_dist"]
: ["x_time"];

config.keys_to_display = Object.keys(processedUploadedData.nodes[0]).filter(
config.keys_to_display = Object.keys(firstNode).filter(
(x) => !to_remove.includes(x)
);

Expand Down

0 comments on commit 2c35370

Please sign in to comment.