-
Notifications
You must be signed in to change notification settings - Fork 339
Routine for real time data appending avoid small tables by means of memory
Refer to the following article:
Routine for real-time data appending
In the above article we adopt the method of storing real-time data in multiple layers of zone tables (hereinafter referred to as ‘table’ unless otherwise specified), and use small table with shorter time interval to meet the scenarios where data needs to be frequently appended in order to ensure that new data can be written out quickly and timely. However, this method will result in more files, making it more difficult to manage.
To solve this problem, we design an optimization method in this routine based on the above method. Specifically, store smaller amount of recent data (i.e., new data) into an in-memory table sequence, periodically write new data to the table with longer time interval, and use the append@y() function of composite table to simulate the in-memory table sequence as a zone table of the multi-zone composite table during query. The advantages of this method include: fewer small tables, small in the number of files, convenient to manage, no need for buffer, the ability to return all data during query, and faster than the above-mentioned method if the query result contains recent data.
In practice, we can choose to store the data from a certain recent period of time into memory based on machine performance, data volume, and append frequency.
Most of the concepts used in this article are the same as in the above-mentioned article, so we only explain the differences.
Total data volume: 22.64 million rows, 4 fields.
Among these data, the recent data volume is 4 million rows, and the time interval is one hour.
Table storage method: store all 4 million rows of data from the recent hour in layer 0 and do not merge.
In-memory table sequence method: store all 4 million rows of data from the recent hour in an in-memory table sequence and do not write them to the table.
There are two test methods: read only the data from the recent hour and read all data. Read the data multiple times and take the average, and compare the read times.
Test environment: Processor: 11th Gen Intel(R) Core(TM) i7-11800H @ 2.30GHz 2.30 GHz Machine memory: 16.0 GB Memory allocated for Java: 9672m Unit: milliseconds
Store data in tables | Store data in in-memory table sequence | ||
Read all data | First read | 6802 | 5098 |
Second read | 6586 | 5040 | |
Third read | 6530 | 4983 | |
Average | 6639 | 5040 | |
Read only the data from the recent hour | First read | 1106 | 119 |
Second read | 1122 | 135 | |
Third read | 1217 | 156 | |
Average | 1148 | 137 |
Configuration file: ubc.json
By default, this file is located in the main path of esProc (if you want to save it to another directory, modify the code to use an absolute path). The configuration file includes the following contents:
[{ "sortKey":"account,tdate",
"timeField":"tdate",
"otherFields":"ColName1,ColName2",
"dataDir":"data/",
"dataFilename":"data.ctx",
"queryPeriod":2,
"level":[3,4],
"blockSize":[524288,1048576],
"interval":[1,1],
"lastZone":[1167851520,null],
"monthCumulate":true,
"discardZone":[],
"discardTime":[]
}]
“level” refers to the layer level, and its values include 1, 2, 3, and 4, representing the second, minute, hour and day respectively. This file only sets 3 and 4, indicating that the data at the second and minute levels are stored in the in-memory table sequence, and the data starting from hour level are stored in tables.
Under the dataDir directory, a backup.btx file is added to back up the data of in-memory table sequence to make it always consistent with the in-memory table sequence.
1. e-commerce system
Number of users: 1-10 million; number of records per day: 24 million rows; number of records per hour: 1 million rows. In this example, the data from the last hour is stored in the in-memory table sequence.
The configuration file is as follows:
[{ "sortKey":"uid,etime",
"timeField":"etime",
"otherFields":"eventID,eventType",
"dataDir":"data/",
"dataFilename":"data.ctx",
"queryPeriod":600,
"level":[3,4],
"blockSize":[524288,1048576],
"interval":[1,1],
"lastZone":[null,null],
"monthCumulate":true,
"discardZone":[],
"discardTime":[]
}]
2. Signal system
The characteristics of this system include: i) high data generation frequency (each monitoring point generates multiple pieces of data within one second); ii) rely heavily on the collection time (it is required that each piece of data corresponds to a unique time); iii) many monitoring points and large data amount (a conventional real-time monitoring system usually has thousands upon thousands of monitoring points; each point generates data every second, and dozens of GBs or more data are generated every day). The number of device IDs or monitoring point names is 1-100,000, and the number of records is appended once per second. The data amount per day is approximately 1.7 billion rows.
In this example, since the data amount is very large (over 1 million rows of data are generated per minute), we set the time interval of layer 1 as 1 minute (the data from the last 1 minute is stored in the in-memory table sequence), that of layer 2 as 2 hours, and that of layer 3 as 1 day. Due to too large data amount, and the query period usually does not exceed 24 hours, we set the month accumulation as false.
The configuration file is as follows:
[{ "sortKey":"TagName",
"timeField":"time",
"otherFields":"Type,Quality,Value",
"dataDir":"data/",
"dataFilename":"data.ctx",
"queryPeriod":10,
"level":[2,3,4],
"blockSize":[262144,524288,1048576],
"interval":[1, 2,1],
"lastZone":[null,null,null],
"monthCumulate":false,
"discardZone":[],
"discardTime":[]
}]
Add a global variable ‘cache’: store the in-memory table sequence.
Add the ‘cache’ lock: lock when modifying cache.
This script is executed only once when the server starts and will no longer be executed during calculating. If the server is started for the first time, it needs to initialize parameters; if the server is started for the nth time, it needs to read the configuration information written out at the end of the n-1th execution.
A | B | |
1 | =json(file("ubc.json").read()) | |
2 | =file("zone.btx") | |
3 | if(A2.exists()) | =A2.import@bi() |
4 | else | >B3=A1.level.len().([]) |
5 | =env(zones,B3) | |
6 | =file(A1.dataDir/"backup.btx") | |
7 | if(A6.exists()) | =A6.import@b().sort(${A1.sortKey}) |
8 | else | >B7=[] |
9 | =file(A1.dataDir/A1.dataFilename:0) | |
10 | if(!A9.exists()) | =A9.create(${A1.sortKey.split@c().("#"+trim(~)).concat@c()},${A1.otherFields};;A1.blockSize(1)) |
11 | =B10.close() | |
12 | =env(cache,B7) | |
13 | =register("zone","zone.splx") | |
14 | =register("zone0","zone0.splx") | |
15 | =call@n("merge.splx") |
A1: read the configuration file; A2: read the table number storage file; A3-B4: if the table number file exists, read this file. Otherwise, create a sequence consisting of empty sequences and having a length equal to the number of layers; A5: set the table number sequence as global variable ‘zones’; A6: the backup file of in-memory table sequence; A7-B8: if the backup file of in-memory table sequence exists, read this file and sort data by the sort field; if it does not exist, create an empty in-memory sequence; A9-B11: create an empty table with table number 0. When querying, if there is only the in-memory table sequence, merge the table sequence with the empty table, and return a composite table object to ensure a unified return interface; A12: store the in-memory table sequence as the global variable cache; A13: register the zone.splx script as a function with the same name; A14: register the zone0.splx script as a function with the same name (although there is no layer 0, the calculation of all table numbers is still based on the table number of layer 0, so this function is still retained); A15: start the merge thread.
This script is used to write the new data to the buffer every time new data is received. The input parameter ‘data’ represents the table sequence containing new data. Once the new data is written out, it will write the new file name of the buffer to config.
A | |
1 | =lock("config") |
2 | =json(file("ubc.json").read()) |
3 | =lock@u("config") |
4 | >data=data.sort(${A2.sortKey}) |
5 | =lock("cache") |
6 | =file(A2.dataDir/"backup.btx").export@ab(data) |
7 | >cache=[cache,data].merge(${A2.sortKey}) |
8 | =lock@u("cache") |
A2: read the configuration file; A4: sort the new data by the sort field; A6: write new data to the in-memory backup file; A7: merge new data into the in-memory table sequence by the sort field; Note: the order of fields that pass in data should follow the following rules: When the sort field contains the time field: sort field, other fields; When the sort field doesn’t contain the time field: sort field, time field, other fields; The order of other fields is the same as that in the configuration file.
This script is used to calculate the layer-0 table number based on time, for internal use.
Input parameter: tm: time
A | B | |
1 | 2023 | |
2 | [27,23,18,13,7,1] | |
3 | return [year(tm)-A1+8, month(tm),day(tm),hour(tm)+1,minute(tm)+1,second(tm)+1].sum(shift(~,-A2(#)))+1 |
A1: start year of data; A2: the number of binary bits on the right of each layer level, in the order of year, month, day, hour, minute and second.
The script is used to convert the low-layer table number to high-layer table number, for internal use.
Input parameter: z: low-layer table number; n: layer sequence number of high layer (refers to the sequence number in config.level, not the layer level); config: configuration file content; monthCumulate: whether or not to accumulate as monthly table.
A | B | |
1 | [27,23,18,13,7,1] | |
2 | 23 | |
3 | =config.interval(n-1) | |
4 | >p = 7 - config.level(n) | |
5 | >p=if(monthCumulate && p==3,2,p) | |
6 | >b = A1(p) | |
7 | >z1 = shift(z,b) | |
8 | >b1 = A1(p-1)-b | |
9 | >s = (and( z1, shift(1,-b1)-1 )-1) \A3*A3 + 1 | |
10 | =shift(shift( shift(z1,b1),-b1)+s, -b) | |
11 | if(p>3 || n<config.level.len()) | return A10 |
12 | =and(shift(A10,A1(3)),shift(1,A1(3)-A1(2))-1) | |
13 | =and(shift(A10,A1(2)),shift(1,A1(2)-A1(1))-1) | |
14 | =shift(A10,A1(1))-8+A2 | |
15 | return A14*100000+A13*1000+A12*10 |
A1: the number of binary bits on the right of each layer level, in the order of year, month, day, hour, minute and second; A2: start year of data, only the last two digits are taken; A3: layer interval of high layer; A4: based on the layer level of high layer, calculate its bit number in A1; A5: if there is a need to accumulate as monthly table and it is the day layer, set p to 2, which means it will be truncated to month afterwards; A6: the number of bits that the high layer needs to be truncated; A7: the value of table number after truncation; A8: the number of bits used in the last level after truncation; A9: divide the value of the last level by the layer interval and add 1; A10: put s into z and add 0 at the end; A11: if the layer level is hour or below, or it is not the highest layer, return A9 directly; If the layer level is day or above and is the highest layer, continue to calculate the short plain text table number; A12-A14: calculate the year, month and day components respectively, and restore the year to actual value; A15: put together the year, month and day components, and add 0 to the last bit.
This script is called and executed by merge. Before each execution, merge first calls write.splx to write the data of in-memory table sequence to layer 1, and then merges data into higher layers.
A | B | C | |
1 | =lock("config") | ||
2 | >config=json(file("ubc.json").read()) | ||
3 | =long(now())-config.queryPeriod*1000 | ||
4 | =config.discardTime.pselect@a(~<A3) | ||
5 | =config.discardZone(A4).(movefile(config.dataDir/~/"."/config.dataFilename)) | ||
6 | >config.discardZone.delete(A4),config.discardTime.delete(A4) | ||
7 | =file("ubc.json").write(json(config)) | ||
8 | =lock@u("config") | ||
9 | =func(A58,now()) | ||
10 | =cache.sort(${config.timeField}) | ||
11 | =A10.select@b(${config.timeField}<=A9) | ||
12 | =[] | =[] | =[] |
13 | >zus = zones(1) | ||
14 | for A11.len()>0 | ||
15 | =A11.${config.timeField} | ||
16 | =func(A58,B15,"end") | ||
17 | =zone0(B15,config) | ||
18 | =zone(B17, 1,config,false) | ||
19 | =A11.pselect(${config.timeField}>=B16) | ||
20 | if(B19==null) | ||
21 | >data=A11,A11=[] | ||
22 | else | ||
23 | >data=A11.to(B19-1) | ||
24 | >A11=A11.to(B19,A11.len()) | ||
25 | =zus.select@1(xor(~,B18)<=1) | ||
26 | =if(B25,xor(B25,1),B18) | ||
27 | =lock("config") | ||
28 | >config=json(file("ubc.json").read()) | ||
29 | if((B25!=null && config.level(1)==1)|| config.discardZone.pos(B26)) | ||
30 | goto A14 | ||
31 | else | >C12=C12|data | |
32 | =lock@u("config") | ||
33 | =file(config.dataDir/config.dataFilename:B26) | ||
34 | >data=data.sort(${config.sortKey}) | ||
35 | if(B25!=null) | =file(config.dataDir/config.dataFilename:B25) | |
36 | =C35.reset(B33:config.blockSize(1);data) | ||
37 | >A12.insert(0,B25) | ||
38 | else | =B33.create(${config.sortKey.split@c().("#"+trim(~)).concat@c()},${config.otherFields};;config.blockSize(1)) | |
39 | =C38.append@i(data.cursor()) | ||
40 | =C38.close() | ||
41 | >B12.insert(0,B26) | ||
42 | if(A12.len()>0) | ||
43 | =lock("config") | ||
44 | >config=json(file("ubc.json").read()) | ||
45 | >config.discardZone.insert(0,A12) | ||
46 | >config.discardTime.insert(0,[long(now())]*A12.len()) | ||
47 | =lock@u("config") | ||
48 | if(A12.len()>0 || B12.len()>0) | ||
49 | =lock("zones") | ||
50 | =((zones(1)\A12)|B12).sort() | ||
51 | >zones=[B50]|zones.to(2,) | ||
52 | =file("zone.btx").export@b(zones) | ||
53 | =lock@u("zones") | ||
54 | if(C12.len()>0) | =lock("cache") | |
55 | >cache=cache\C12 | ||
56 | =file(config.dataDir/"backup.btx").export@b(cache) | ||
57 | =lock@u("cache") | ||
58 | func | ||
59 | =[year(A58),month(A58)-1,day(A58)-1,hour(A58),minute(A58),second(A58)] | ||
60 | =7-config.level(1) | ||
61 | =config.interval(1) | ||
62 | >B59(B60)=B59(B60)\B61*B61 | ||
63 | if(B58=="end") | >B59(B60)=B59(B60)+B61 | |
64 | >B59.run(if(#>B60,~=0) ) | ||
65 | =datetime(B59(1),B59(2)+1,B59(3)+1,B59(4),B59(5),B59(6)) | ||
66 | return B65 |
A2: read the configuration file; A3-A6: delete the tables that have been discarded for a duration exceeding the max query period; A9: the current time is the start time of the interval to which layer 1 belongs, and only the data before this time are merged; A10: sort the data of in-memory table sequence by time; A11: filter out the data before A9; A12: table number to be deleted; B12: table number to be added; C12: in-memory data that has been written out; A13: existing table number at layer 1; A14: If there is in-memory data that satisfies the condition and needs to be written out, then: B15: fetch the time of the first record; B16: calculate the end time of the layer 1 interval to which the data belongs based on B15; B17: calculate the table number at layer 0 based on B15; B18: calculate the table number at layer 1 based on B17; B19: select the serial number of first record that is greater than or equal to B16 from A11; B20-C24: select the records smaller than B16 and store them in the ‘data’ variable; Only the data greater than or equal to B16 is retained in A11; B25: whether the target table number exists in layer 1; B26: calculate the new target table number; B27-C32: if layer 1 is the second layer and has the target table number, or if there is to-be-written target table number in the discard list, skip the current group. Otherwise, store the data of the current group into C12 and proceed to the subsequent write operation; B33-C40: sort the data by sortKey, merge them into the original target table, and write to the table with B26 as table number; B41: temporarily store B26 to the sequence in B12; A42-B47: write the to-be-deleted discard table number list A12 into the discard list of configuration file; A48-B53: delete the discard table numbers from the global variable ‘zones’, and write the new table number to the global variable; A54-B57: delete the written-out data from the in-memory table sequence and do the same thing on the backup file; A58: calculate the start/end time of layer 1 interval to which the current time belongs. If the second parameter is ‘end’, calculate the end time. Otherwise, calculate the start time.
This thread is used to periodically merge the data of layer n-1 into layer n. After each execution, it turns to layer 1 and re-execute write. The loop goes to the upper layer only when there is no merge operation at the lower layer. After merging all layers and sleeping for a time length of layer 1 interval, the thread is executed again.
A | B | C | D | E | |
1 | =call("write.splx") | ||||
2 | =lock("config") | ||||
3 | >config=json(file("ubc.json").read()) | ||||
4 | =lock@u("config") | ||||
5 | = zone0(now(),config ) | ||||
6 | =config.level.len()-1 | ||||
7 | for A6 | ||||
8 | >zz = zones(A7) | ||||
9 | =zone(A5,A7+1,config,false) | ||||
10 | =config.lastZone(A7) | ||||
11 | =lock("config") | ||||
12 | >config=json(file("ubc.json").read()) | ||||
13 | >config.lastZone(A7)=B9 | ||||
14 | =file("ubc.json").write(json(config)) | ||||
15 | =lock@u("config") | ||||
16 | if zz.len()>0 && B9>B10 | ||||
17 | =zz.group(zone( ~, A7+1,config,false):zu;~:zd) | ||||
18 | =C17.select(zu<B9) | ||||
19 | if(config.monthCumulate && A7==A6) | ||||
20 | >C18=C18.conj(zd).group(zone( ~, A7+1,config,true):zu;~:zd) | ||||
21 | >zus = zones(A7+1) | ||||
22 | =[] | =[] | =[] | ||
23 | for C18 | ||||
24 | =zus.select@1(xor(~,C23.zu)<=1) | ||||
25 | =if(D24,xor(D24,1),C23.zu) | ||||
26 | =lock("config") | ||||
27 | >config=json(file("ubc.json").read()) | ||||
28 | if(config.discardZone.pos(D25)) | ||||
29 | next C23 | ||||
30 | else | >E22.insert(0,C23.zd) | |||
31 | =lock@u("config") | ||||
32 | =file(config.dataDir/config.dataFilename:D25) | ||||
33 | =file(config.dataDir/config.dataFilename:(D24|C23.zd)) | ||||
34 | =D33.reset(D32:config.blockSize(#A7+1)) | ||||
35 | =lock("config") | ||||
36 | >config=json(file("ubc.json").read()) | ||||
37 | >config.discardZone.insert(0,D24|C23.zd) | ||||
38 | >config.discardTime.insert(0,[long(now())]*(D24|C23.zd).len()) | ||||
39 | =file("ubc.json").write(json(config)) | ||||
40 | =lock@u("config") | ||||
41 | >C22=C22|D24,D22=D22|D25 | ||||
42 | =lock("zones") | ||||
43 | =zones(A7)\E22 | ||||
44 | =((zones(A7+1)\C22)|D22).sort() | ||||
45 | >zones=zones.(~).modify(A7,[C43,C44]) | ||||
46 | =file("zone.btx").export@b(zones) | ||||
47 | =lock@u("zones") | ||||
48 | if E22.len()>0 | goto A1 | |||
49 | =config.interval(1) | ||||
50 | =sleep(case(config.level(1),2:A49*60,3:A49*3600,4:A49*3600*24;A49)*1000) | ||||
51 | goto A1 |
A1: execute the write-out thread to write the data of in-memory table sequence to layer 1; A2-A4: read the configuration file; A5: table number of layer 0 at current time; A7: loop from layer 1; B8: table number list of layer n-1; B9: calculate the table number of layer n based on A5; B10: the last table number of layer n; B11-B15: update the last table number of layer n; B16: if there is table at layer n-1, and the table number at layer n calculated based on the current time is greater than the last table number of layer n, then: C17: calculate and group the table numbers at layer n based on the table number at layer n-1; C18: filter out the groups whose table number at layer n is smaller than B9 (indicating that all tables of these groups are ready); C19: if month accumulation is required and it is currently merged toward the highest layer, then: D20: regroup C18 by the month table number; C21: read the table number that already exists in layer n from zones and denote it as zus; C22: table number to be discarded; D22: new table number; E22: merged table number; C23: loop by group; D24: read the table number to which the current group belongs from zus (including two kinds of table numbers with the last bit being 0 or 1); D25: calculate out the new table number. If the table number to which the current group belongs exists, alternate the new table number (if the last bit of the original table number is 1, then the last bit of new table number is 0, and vice versa). Otherwise, calculate the table number based on the current group; D26-D31: if the discarded table numbers in config include the new table number, skip the current group, or temporarily store the table numbers of the current group to E22; D32-D34: merge the current group’s table data with the original table data, and then write to a new table; D35-D40: write the written-out table number to the discard table number list of config, and record the discard time; D41: temporarily store the discard table number and new table number to C22 and D22 respectively; C42-C47: update the table number list and back it up to a file.
This script is used when querying data. It will combine the selected tables with the in-memory table sequence to form a multi-zone composite table to return.
Input parameter: start: start time end: end time
A | B | |
1 | >z=zones.m(:-2).rvs().conj() | |
2 | >sz=zone0(start) | |
3 | >ez=zone0(end) | |
4 | >sp = z.pselect@z(~<=sz) | |
5 | >ep = ifn(z.pselect( ~>ez), z.len()+1) | |
6 | =if (sp >= ep,null,z.to( ifn(sp,1), ep-1 )) | |
7 | =if(!sp && cache.len()>0,true,false) | |
8 | =lock("config") | |
9 | >config=json(file("ubc.json").read()) | |
10 | =lock@u("config") | |
11 | >z=zones.m(-1) | |
12 | >sz=(year(start)%100)*100000+month(start)*1000+if(!config.monthCumulate,day(start)*10,0) | |
13 | >ez=(year(end)%100)*100000+month(end)*1000+if(!config.monthCumulate,day(end)*10,0)+1 | |
14 | >sp = ifn(z.pselect@z( ~<sz), 0 ) | |
15 | >ep = ifn(z.pselect( ~>ez), z.len()+1) | |
16 | =if (sp >= z.len(),null,z.to(sp+1, ep-1)) | |
17 | =if(A16.len()>0 || A6.len()>0,A16|A6,0) | |
18 | =file(config.dataDir/config.dataFilename:A17).open() | |
19 | if(A7) | return A18.append@y(cache) |
20 | else | return A18 |
A1: sort all table numbers except those at the highest layer in chronological order; A2-A3: calculate the layer-0 table number corresponding to the input parameter start\end (since the last bit itself of the table number at layer 0 is 1, there is no need to add 1 here); A4: find out the position of the first table number that is earlier than or equal to the start time from back to front; A5: find out the position of the first table number that is later than the end time from front to back; A6: select the table number between the start time and end time (including the start time, excluding the end time); return null if no table number is found; A7: if the start time of the query is earlier than the first table and the in-memory table sequence is not empty, it means that the in-memory table sequence needs to be output; A8-A10: read the configuration file; A11: table numbers at the highest layer; A12-A13: calculate the high layer table number to which the input parameter start\end corresponds. Considering the alternate bit, setting the last bit of sz and ez as 0 and 1 respectively can make it easy to write query code; A14: find out the position of the first table number that is earlier than the start time from back to front; A15: find out the position of the first table number that is later than the end time from front to back; A16: select the table numbers between the start time and end time; return null if no table number is found; A17: merge the results of querying high layer and low layer together; A18-B20: if the query result contains the in-memory table sequence, merge it with the composite table object and return; otherwise, return the composite table object directly.
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code