Skip to content

Commit b223ddb

Browse files
committed
File Reader | Add line_bytes_offset
Signed-off-by: Romy <[email protected]>
1 parent d3ce5e9 commit b223ddb

File tree

2 files changed

+92
-6
lines changed

2 files changed

+92
-6
lines changed

src/test/unit_tests/jest_tests/test_newline_reader.test.js

+82
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,26 @@ describe('newline_reader', () => {
5151
expect(result).toStrictEqual(UTF8DATA_ARR);
5252
});
5353

54+
it('next_line_file_offset - can process utf8 characters when termination with newline character', async () => {
55+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n') + '\n', 'utf8');
56+
57+
const reader = new NewlineReader({}, '', { skip_leftover_line: true, read_file_offset: 0 });
58+
// @ts-ignore
59+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
60+
61+
const result = [];
62+
let expected_cur_next_line_file_offset = 0;
63+
const [processed] = await reader.forEach(async entry => {
64+
result.push(entry);
65+
expected_cur_next_line_file_offset += Buffer.byteLength(entry, 'utf8') + 1;
66+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
67+
return true;
68+
});
69+
70+
expect(processed).toBe(UTF8DATA_ARR.length);
71+
expect(result).toStrictEqual(UTF8DATA_ARR);
72+
});
73+
5474
it('can process utf8 characters when termination not with new line character', async () => {
5575
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n'), 'utf8');
5676

@@ -68,6 +88,47 @@ describe('newline_reader', () => {
6888
expect(result).toStrictEqual(UTF8DATA_ARR);
6989
});
7090

91+
it('next_line_file_offset - can process utf8 characters when termination not with new line character', async () => {
92+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n'), 'utf8');
93+
94+
const reader = new NewlineReader({}, '', { read_file_offset: 0 });
95+
// @ts-ignore
96+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
97+
98+
const result = [];
99+
let expected_cur_next_line_file_offset = 0;
100+
const [processed] = await reader.forEach(async entry => {
101+
result.push(entry);
102+
expected_cur_next_line_file_offset += Buffer.byteLength(entry, 'utf8') + (reader.eof ? 0 : 1);
103+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
104+
return true;
105+
});
106+
107+
expect(processed).toBe(UTF8DATA_ARR.length);
108+
expect(result).toStrictEqual(UTF8DATA_ARR);
109+
});
110+
111+
it('next_line_file_offset starts from the second line - can process utf8 characters when termination not with new line character', async () => {
112+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR.join('\n'), 'utf8');
113+
const expected_to_be_processed_data_array = UTF8DATA_ARR.slice(1);
114+
const initial_next_line_file_offset = Buffer.byteLength(UTF8DATA_ARR[0], 'utf8') + 1;
115+
const reader = new NewlineReader({}, '', { read_file_offset: initial_next_line_file_offset});
116+
// @ts-ignore
117+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
118+
119+
const result = [];
120+
let expected_cur_next_line_file_offset = initial_next_line_file_offset;
121+
const [processed] = await reader.forEach(async entry => {
122+
result.push(entry);
123+
expected_cur_next_line_file_offset += Buffer.byteLength(entry, 'utf8') + (reader.eof ? 0 : 1);
124+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
125+
return true;
126+
});
127+
128+
expect(processed).toBe(expected_to_be_processed_data_array.length);
129+
expect(result).toStrictEqual(expected_to_be_processed_data_array);
130+
});
131+
71132
it('can process utf8 characters when termination not with new line character [bufsize = 4]', async () => {
72133
const expected = "abc";
73134
const UTF8DATA_ARR_TEMP = [ ...UTF8DATA_ARR, expected ];
@@ -86,5 +147,26 @@ describe('newline_reader', () => {
86147
expect(processed).toBe(1);
87148
expect(result).toStrictEqual([expected]);
88149
});
150+
151+
it('next_line_file_offset - can process utf8 characters when termination not with new line character [bufsize = 4]', async () => {
152+
const expected = "abc";
153+
const UTF8DATA_ARR_TEMP = [ ...UTF8DATA_ARR, expected ];
154+
const UTF8DATA_BUF = Buffer.from(UTF8DATA_ARR_TEMP.join('\n'), 'utf8');
155+
156+
const reader = new NewlineReader({}, '', { bufsize: 256, skip_overflow_lines: true, read_file_offset: 0 });
157+
// @ts-ignore
158+
reader.fh = mocked_file_handler(UTF8DATA_BUF);
159+
160+
const result = [];
161+
const [processed] = await reader.forEach(async entry => {
162+
result.push(entry);
163+
return true;
164+
});
165+
166+
expect(processed).toBe(1);
167+
expect(result).toStrictEqual([expected]);
168+
const expected_cur_next_line_file_offset = UTF8DATA_BUF.length;
169+
expect(reader.next_line_file_offset).toBe(expected_cur_next_line_file_offset);
170+
});
89171
});
90172
});

src/util/file_reader.js

+10-6
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class NewlineReader {
3030
* bufsize?: number;
3131
* skip_leftover_line?: boolean;
3232
* skip_overflow_lines?: boolean;
33+
* read_file_offset?: number;
3334
* }} [cfg]
3435
**/
3536
constructor(fs_context, filepath, cfg) {
@@ -41,18 +42,19 @@ class NewlineReader {
4142
this.fs_context = fs_context;
4243
this.fh = null;
4344
this.eof = false;
44-
this.readoffset = 0;
45+
this.read_file_offset = cfg?.read_file_offset || 0;
4546

4647
this.buf = Buffer.alloc(cfg?.bufsize || 64 * 1024);
4748
this.start = 0;
4849
this.end = 0;
4950
this.overflow_state = false;
51+
this.next_line_file_offset = cfg?.read_file_offset || 0;
5052
}
5153

5254
info() {
5355
return {
5456
path: this.path,
55-
read_offset: this.readoffset,
57+
read_offset: this.read_file_offset,
5658
overflow_state: this.overflow_state,
5759
start: this.start,
5860
end: this.end,
@@ -78,9 +80,9 @@ class NewlineReader {
7880
this.start += term_idx + 1;
7981
continue;
8082
}
81-
8283
const line = this.buf.toString('utf8', this.start, this.start + term_idx);
8384
this.start += term_idx + 1;
85+
this.next_line_file_offset = this.read_file_offset - (this.end - this.start);
8486
return line;
8587
}
8688
}
@@ -106,7 +108,7 @@ class NewlineReader {
106108

107109
// read from file
108110
const avail = this.buf.length - this.end;
109-
const read = await this.fh.read(this.fs_context, this.buf, this.end, avail, this.readoffset);
111+
const read = await this.fh.read(this.fs_context, this.buf, this.end, avail, this.read_file_offset);
110112
if (!read) {
111113
this.eof = true;
112114

@@ -118,13 +120,15 @@ class NewlineReader {
118120
console.warn('line too long finally terminated at eof:', this.info());
119121
} else {
120122
const line = this.buf.toString('utf8', this.start, this.end);
123+
this.start = this.end;
124+
this.next_line_file_offset = this.read_file_offset;
121125
return line;
122126
}
123127
}
124128

125129
return null;
126130
}
127-
this.readoffset += read;
131+
this.read_file_offset += read;
128132
this.end += read;
129133
}
130134

@@ -169,7 +173,7 @@ class NewlineReader {
169173
// was moved, this will still keep on reading from the previous FD.
170174
reset() {
171175
this.eof = false;
172-
this.readoffset = 0;
176+
this.read_file_offset = 0;
173177
this.start = 0;
174178
this.end = 0;
175179
this.overflow_state = false;

0 commit comments

Comments
 (0)