-
Notifications
You must be signed in to change notification settings - Fork 339
SPL practice:high concurrency account queries
Data structure of account details table ‘T’:
Field name | Field type | Field meaning | Sample data |
---|---|---|---|
Id | String | Account number | 28-digit number |
Tdate | Date | Date | 2023-10-22 |
Tamt | Number | Amount | 23345.70 |
Ttype | Number | Type | 1, 0 |
Tcorp | String | Branch | A210172 |
… |
Data structure of branch table ‘C’:
Field name | Field type | Field meaning | Sample data |
---|---|---|---|
cid | String | Branch number | A210172 |
cname | String | Branch name | X Bank New York Branch |
caddress | String | Branch address | 226 East 56th Street, New York, NY 10022, U.S.A. |
… |
Table T contains 300 million records. The user account id consists of 28-digit number, and the total number is 10 million after deduplication.
Table C contains 3000 records. Table C and table T are associated on table C’s cid field and table T’s tcorp field.
Requirement: search for the data details by specified id and date range, and the result should include id, tdate, tamt, cname, caddress.
It is desired that the response time for 60 concurrent queries does not exceed 1 second on a cluster consisting of six 40C256G servers.
Unfortunately, this expectation is not achieved using a certain renowned distributed data warehouse.
Under the same hardware environment, using ES can meet this expectation. However, since ES does not support multi-table association, we have to redundantly store the data of table C in table T to form a wide table, but this will require regenerating a wide table containing 300 million data once the data of a branch changes, which is too time-consuming.
When using a relational database to implement this search task, the SQL statement is very simple:
select id,tamt,tdate,cname,caddress
from T left join C on C.cid=T.tcorp
where id='1110101014992000000000000219'
and tdate>= to_date('2023-01-10', 'yyyy-MM-dd')
and tdate<to_date('2023-10-25', 'yyyy-MM-dd')
and …
In order to improve the response speed, an index is generally created on the id field of table T:
create index index_T_1 on T(id)
The database first finds the corresponding original-table position of account id from the index and then retrieve the data from the original table. The first step is usually very fast, whereas the latter is often unsatisfactory. This is because the database cannot ensure the corresponding data of the same account id are stored continuously in physics, instead, they are very likely to be separated. However, the hard disk has the minimum read unit which is generally far bigger than the space a record occupies. Therefore, the database reads more than it requires when reading the discontinuous data, resulting in slow queries. Although there are only a few to thousands of data per account id, the overall performance will be poor when each query is a little slower during highly concurrent access.
Therefore, for scenarios like highly concurrent account queries that seek extreme performance, a solution with higher speed is required.
The most important means to improve high-concurrency account queries is to store data in a physically ordered manner. To achieve this, we need to sort the account details by account id and store them in hard disk using the row-based storage so that the data of the same account id will be stored continuously and almost all the data blocks read from the hard disk will be the desired target data, which will obtain a significant performance improvement.
The reason for not adopting columnar storage is that columnar storage is to store each column of data together, yet the fields of one account id are scattered in different columns, which will still cause the hard disk to read discontinuous data.
After storing table T by id in a physically ordered manner, if we directly use the binary search, it needs to read the data file multiple times to locate the target value, yet many reads are unnecessary in the process.
In order to achieve extreme performance, we also need to create an index on the id field so that we can quickly find target value’s physical position in original table from the index based on the id of target value, and then read target value.
The index only contains ids and corresponding physical positions, which take up much less storage space than original table. However, when the data volume is large, the index will also be relatively large. For this reason, SPL will automatically create a multi-level index, that is, create sub-index for the index to speed up the search process on index.
When the number of concurrent queries is large, if the index is loaded from hard disk every time, this process will be repeated many times. To solve this, SPL provides the index preloading mechanism. This mechanism will actively pre-load the two-level or three-level index into memory during the system initialization, which can avoid wasting time due to repeated loading.
Creating an index for row-stored data has more advantages over that for columnar-stored data. When creating an index for row-stored data, the position of whole record can be simply represented as one number in the index. In contrast, for a record of columnar storage, each column has its own position and it is impossible to record all of them, so we can only record the sequence number, which will add a step during the search, resulting in decreased performance.
High-concurrency account queries belong to the search calculations. If the account details table is also used for traversal calculation, we can use the index-with-values mechanism to copy the fields required for the search result to the index, which can eliminate reading the original table during the searching. In this way, the original table can be stored in columnar storage to meet traversal calculation requirement.
However, the index-with-values will take up more storage space than ordinary index, which makes it less suitable for the search scenarios where many fields are involved. Moreover, creating sub-index is very troublesome. Therefore, we should weigh our options based on the actual situation.
This task also involves another association operation, which is a typical foreign key join of a small dimension table. For such operation, SQL database generally adopts the hash join method.
In contrast, SPL can store the entire dimension table in memory, and adopts the foreign key sequence-numberization algorithm to implement this association operation. To be specific, convert the tcorp field of table T to the position sequence number of records in table C in advance, which allow us to directly retrieve record by the corresponding position of in-memory table C by the sequence number in table T during associating, hereby avoiding the calculation and comparison of hash values.
Foreign key sequence-numberization is to convert the tcorp field of table T from string to integer for storage. Since both the storage efficiency and calculation speed of integer are better than those of string and date type data, it is very beneficial for improving performance.
In addition to the tcorp field, we can also use the days@o function to convert the tdate field to integer to implement integerization.
The id field is 28-digit pure number, exceeding the value range of long (19-digit number). Therefore, this field cannot be converted to integer and is still stored as string.
A | B | |
---|---|---|
1 | =to(3000).new("A"/(210000+~):cid,"X bank Branch"/~:cname,~/"East 56th Street, New York, NY 10022, U.S.A.":caddress) | |
2 | =file("C.txt").export@t(A1) | |
3 | 2023-01-01 | 2023-10-27 |
4 | =periods(A3,B3) | |
5 | =file("T.txt") | =movefile(A5) |
6 | for A4 | =to(1000000) |
7 | =B6.new("1110101014992000000"/pad(string(rand(10000000)+1),"0",9):id,A6:tdate,rand(2):ttype,"A"/(210000+rand(3000)+1):tcorp,rand()*10000:tamt) | |
8 | =A5.export@at(B7) |
This code generates two text files, one is T.txt containing 300 million account details and the other is C.txt containing 3,000 branch records. The two files serve as the raw data exported from database or other data sources.
In practice, the account details are generally stored in the order in which the data is generated, and hence T.txt is in order by date.
1. Data preprocessing
A | B | |
---|---|---|
1 | =T("C.txt").sort(cid) | =file("C.btx").export@b(A1) |
2 | =A1.derive@o().keys@i(cid) | |
3 | =file("T_r.ctx").create@pyr(#id,tdate,ttype,tcorp,tamt) | |
4 | =file("T.txt").cursor@t(id:string,tdate,ttype,tcorp,tamt) | |
5 | =A4.sortx(id) | |
6 | =A5.run(tcorp=A2.pfind(tcorp),tdate=days@o(tdate)) | |
7 | =A3.append(A6) | |
8 | =A3.index(file("T_r.idx");id) |
A1, B1: sort the dimension table C.txt by cid and store it in the bin file C.btx for subsequent computation;
A2: create primary key ‘cid’ with index for the in-memory dimension table;
A3: the create function is appended with @r option, indicating the table being established is a row-based composite table;
A4: create a cursor based on T.txt and specify id as string;
A5: since the data in T.txt are in order by date, it needs to sort by id on external storage;
A6: convert tcorp to the sequence numbers of dimension table record to implement sequence-numberization of dimension table. Use the days@o function to convert tdate to integer to implement integerization;
A7: since the account details stored in the composite table are ordered by id, ordered storage in physics is implemented;
A8: create an index for the row-based composite table.
2. Preload the index and dimension table
A | B | |
---|---|---|
1 | if !ifv(T_r) | =file("T_r.ctx").open().index@3(file("T_r.idx")) |
2 | =env(T_r,B1) | |
3 | if !ifv(corp) | =T("C.btx") |
4 | =env(corp,B3) |
Index preloading is accomplished at the system initialization phase, during which the branch table (dimension table) is also preloaded.
Since the tcorp of the fact table is already converted to sequence number in advance, we can directly retrieve record from the dimension table by the position in fact table when associating the two tables without requiring the primary key and index of dimension table, and hence there is no need to create primary key and index for the dimension table.
3. Search for id
A | |
---|---|
1 | =T_r.icursor(id,tdate,tcorp,tamt;id=="1110101014992000000000000219" && tdate>=days@o(date("2023-01-10")) && tdate<days@o(date("2023-10-25"));file("T_r.idx")).fetch() |
2 | =A1.new(id,date@o(tdate):tdate,tamt,corp(tcorp).cname,corp(tcorp).caddress) |
A1: use the global variable T_r to search the index (the index file needs to be specified);
The id and date in the filter condition here are constant. In practice, they are generally passed in as cellset parameter.
For example, if there are three given parameters: arg_id, arg_startdate, arg_enddate, then A1 needs to be changed to:
=T_r.icursor(id,tdate,tamt;id==arg_id && tdate>=days@o(arg_startdate) && tdate<days@o(arg_enddate);file("T_r.idx")).fetch()
A2: use the date@o function to convert the search result from integer to date, and then retrieve the branch name and address through the sequence number from dimension table, hereby implementing join calculation.
If using the scheme of columnar composite table and index-with-values, then the code for data preprocessing needs to be modified as:
A | B | |
---|---|---|
1 | =T("C.txt").sort(cid) | =file("C.btx").export@b(A1) |
2 | =A1.derive@o().keys@i(cid) | |
3 | =file("T.ctx").create@py(#id,tdate,ttype,tcorp,tamt) | |
4 | =file("T.txt").cursor@t(id:string,tdate,ttype,tcorp,tamt) | |
5 | =A4.sortx(id) | |
6 | =A5.run(tcorp=A2.pfind(tcorp),tdate=days@o(tdate)) | |
7 | =A3.append(A6) | |
8 | =A3.index(file("T.idx");id;tdate,tamt,tcorp) |
A3: the create function is not appended with the @r option, indicating the table being generated is a columnar composite table;
A8: the last set of parameters (tdate, tamt, tcorp) of the index function are the fields which would be stored in the index.
The code for searching for id also needs to be modified:
A | |
---|---|
1 | =T.icursor(id,tdate,tcorp,tamt;id=="1110101014992000000000002427";file("T.idx")).select(tdate>=days@o(date("2023-01-10")) && tdate<days@o(date("2023-10-31"))).fetch() |
2 | =A1.new(id,date@o(tdate):tdate,tamt,corp(tcorp).cname,corp(tcorp).caddress) |
A1: the search condition of index-with-values can only be the index field, so a separate select function needs to be added for the date condition. Since the data volume of each id is not large, adding a select function has little impact on performance.
In practice, new data details will be generated every day. Here we assume the new data generated each day is saved in the file ‘newdata.txt’.
The composite table T or T_r is stored in order by id, yet the date of new data is later than that of the existing data in composite table. However, the id contains the same batch of values, it would make the order of id values disruptive if we directly appended new data to the end of composite table, so it is not feasible.
If we sorted the new data together with historical data by id, and then generated a new composite table, it would be very time consuming.
We can divide the composite table into two parts: historical data composite table ‘hisdata.ctx’ and incremental data composite table ‘newdata.ctx’, and re-sort only the latter each time new data is appended, and merge the new data with historical data after a period of time (like one month).
A | B | C | |
---|---|---|---|
1 | =file("T_new.ctx") | =file("T_new.idx") | =file("newdata.csv") |
2 | =C1.cursor@ct(id:string,tdate,ttype,tcorp,tamt).sortx(id) | ||
3 | if day(now())==1 | =file("T.ctx") | =file("T.idx") |
4 | =A1.open() | =B4.cursor() | |
5 | =B3.reset(;C4) | >B4.close() | |
6 | >movefile(A1),movefile(B1),movefile(C3) | ||
7 | =B3.open() | =B7.index(C3;id;tdate,tamt,tcorp) | |
8 | =B7.index@3(C3) | =env(T,B8) | |
9 | >B7.close() | ||
10 | if !A1.exists() | =A1.create@py(#id,tdate,ttype,tcorp,tamt) | |
11 | =B10.append@i(A2) | =B10.close() | |
12 | else | =A1.reset(;A2) | |
13 | =A1.open() | =movefile(B1) | |
14 | =A13.index(B1;id;tdate,tamt,tcorp) | ||
15 | =A13.index@3(B1) | =env(T_new,A15) | >A13.close() |
If the date is not the first day of the current month, execute A10-B12 to merge newdata.txt into T_new.ctx, and then execute A13-C15 to recreate an index of T_new.ctx and re-preload.
If the date is the first day of the current month, it needs to execute B5 to merge the data of T_new.ctx into T.ctx. B6 is to delete the T_new.ctx, index, and the index of T.ctx. B7-B9 is to recreate the index of T.ctx and re-preload.
Now, the search operation should be based on two composite tables:
A | |
---|---|
1 | =T.icursor(id,tdate,tamt; id,tdate,tcorp,tamt;id=="1110101014992000000000000219";file("T.idx")).select(tdate>=days@o(date("2023-01-10")) && tdate<days@o(date("2023-10-25"))).fetch() |
2 | =T_new.icursor(id,tdate,tamt; id,tdate,tcorp,tamt;id=="1110101014992000000000000219";file("T_new.idx")).select(tdate>=days@o(date("2023-01-10")) && tdate<days@o(date("2023-10-25"))).fetch() |
3 | =[A1,A2].merge(id) |
When using SPL to perform the computing task involving 60 concurrent queries on 300 million data details on a single 40C256G server, the response time is 0.5 seconds on average.
The common characteristics of scenario involving high-concurrency account queries include:
1. The historical data of plenty of accounts are involved, and the total data amount is huge (tens of or even hundreds of millions) which needs to be stored outside memory.
2. The data amount per account is not large (from a few to thousands) which only needs simple queries and involves almost no computation.
3. The huge number of accounts and the high frequency make the queries highly concurrent; they all require extreme performance with response in seconds or even faster;
4. There is a possibility that multiple dimension tables need to be associated.
For such scenarios, the main means of SPL to achieve extreme performance is to store data in order by account number and use index.
If the application only involves search, then the scheme of row-based storage and ordinary index is sufficient. If the application involves not only search but also traversal, we can consider using the scheme of columnar storage and index-with-values.
Generally, the dimension table to be associated is small, we can load it entirely into memory and, convert the association field of fact table to sequence number in advance so that we can directly retrieve the record of dimension table through sequence number during association. In this way, a much better performance can be achieved compared to the hash join of database.
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code