-
Notifications
You must be signed in to change notification settings - Fork 339
Samples of Processing Big Text File
This article explains various operations over a big file, illustrates how to make the computation faster using parallel processing, and gives concise sample programs in esProc SPL. Looking Samples of Processing Big Text File for details.
Structured text file processing, such as TXT and CSV files, is common data analysis work. Sometimes the file is too big to be loaded into the memory at one time. You need to load and calculate it in batches and then aggregate the temporary intermediate result sets according to the specific requirements. The whole process is quite different from that of calculating a small file that can be wholly loaded into the memory.
Cursor is an important concept for processing big text files. Maybe you are more familiar with the database cursor. A database cursor returns a part of the data each time rather than load all data to the memory at one time. It is like a pointer. A certain number of records are extracted and retrieved from a result set each time by changing the pointer’s place. A file cursor works similarly in reading data from a big text file. It has two features:
-
Only for retrieving data, but not for modifying the result set;
-
Traversing data once from start to end only.
With a file cursor, you can attach each operation of calculating a big file to it and retrieve records at the execution to perform the operations in order.
In the following part, I’ll explain how to implement different operations, including filtering, sorting, aggregate, getting computed column, grouping & aggregate, getting top N and parallel computation, over a big structured text file, and offer sample esProc SPL programs. esProc is the SPL-based professional data computing engine equipped with all-round cursor objects and operations. It’s convenient to handle the above operations with esProc.
To perform filtering, we specify a conditional expression and calculate the expression over each record. If the result is true, the current record is eligible and will be added to the result set; otherwise the record is ineligible and won’t be retrieved. Filtering over a big file is a type of delayed calculation. The filtering expression will be attached to the cursor object and won’t be calculated until a record is retrieved. We decide whether to add the record to the result set according to the result of calculating the expression.
Example: students_scores.txt is a big file that records student scores. Find scores of students of class 10 from it. Values of different columns are separated by tabs. Below is part of the file:
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.txt").cursor@t() | Use @t option to read the first row as column headers |
2 | =A1.select(CLASS==10) | Select scores of students of class 10, which is a delayed calculation |
3 | =file("E:/txt/students_scores_10.txt").export@t(A2) | Export the eligible records to a new file |
An aggregate operation summarizes values of a specified column over all records in a big text file. There are sum, average, max, min and count, etc. To decrease memory use, we traverse all records in the cursor, calculate the current aggregate over each record and store only each aggregate result instead of all records in the memory. We’ll get the final result as soon as the traversal is over.
Example: students_scores.csv is a big file that records student scores. Values of different columns are separated by commas. Below is part of the file:
To calculate the total Chinese scores, esProc SPL produces the following script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.csv").cursor@tc() | @c means using comma as the separator |
2 | =A1.total(sum(Chinese)) | Sum the Chinese scores |
To calculate the total Chinese scores in class 10, esProc SPL has the following script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.csv").cursor@tc() | @c means using comma as the separator |
2 | =A1.select(CLASS==10).total(sum(Chinese)) | Get records of class 10 and sum its Chinese scores |
We generate a computed column by performing a specified operation over one or more columns and assign the result values to a new column. It is a delayed operation because the specified expression is calculated and result is assigned to the new column only when a record is retrieved.
Example: The big file students_scores_.txt stores student scores. Values of different columns are separated by |. Below is part of the file:
To calculate the total score of each student, esProc SPL has the following script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores_.txt").cursor@t(;,"|") | Specify “|” as the separator |
2 | =A1.derive(English+Chinese+Math:total_score) | Attach a derive operation to calculate the total score and add a new column named total_score |
3 | =file("E:/txt/students_scores_total.txt").export@t(A2;"|") | Export data with total scores to a new file |
You can create a new data structure with new() function with a new column added. For example:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores_.txt").cursor@t(;,"|") | Specify “|” as the separator |
2 | =A1.new(CLASS,NAME,English+Chinese+Math:total_score) | Select desired column(s) from A1, and get a new column using an expression of adding scores of three subjects |
3 | =file("E:/txt/students_scores_total.txt").export@t(A2;"|") | Export data of the new structure to another file |
Another way is to filter the table and then add a new column or generate a new structure over desired records. To get scores of students in class 10 and add a total score column, for example, esProc SPL has the following script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores_.txt").cursor@t(;,"|") | Specify “|” as the separator |
2 | =A1.select(CLASS==10) | Get scores of students in class 10; it is a delayed calculation |
3 | =A2.derive(English+Chinese+Math:total_score) | Calculate the total score using derive() function and add a new column named total_score |
4 | =file("E:/txt/students_scores_total.txt").export@t(A3;"|") | Export data with total scores to a new file |
Before getting the final result, you can attach any basic operations, scuh as filtering, getting a computed column, generating new structure, value modification and sorting, to the cursor as needed in your desired order.
Since a big file cannot be loaded into the memory at a time, we implement a sorting over such a file in a different way. We retrieve a batch of records, where the number is decided according to the size the memory, sort the records and store them in a temporary file, and then retrieve another batch of records… The actions are repeated until all records are retrieved and processed. Finally we merge all temporary file in order. Get the first row of each file, compare their sorting field values to find the one that precedes all the others and write it to the result file; then get the next row from the temporary file where the first exported row was located, do the same comparison to find the next eligible row to write to the result file. Repeat the actions until all rows are exported to the result file.
Example: Sort the text file students_scores.txt that records student scores by Chinese score in ascending order:
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.txt").cursor@t() | Create a cursor |
2 | =A1.sortx(Chinese) | Sort records by Chinese in ascending order and return a cursor |
3 | =file("E:/txt/students_scores_sort.txt").export@t(A2) | Export the sorted records to a new file |
You can sort records by multiple fields or an expression. To change A2’s expression in the following way, for example:
=A1.sortx(Chinese,Math) // Sort by Chinese and then Math
=A1.sortx(Math+English+Chinese) // Sort by total score
The mixed operation groups records, performs a certain aggregate over each group, and then returns all aggregate results. There are two types of grouping & aggregation over a big file. One has a small result set. A small result set is made up of results returned from a relatively small number of groups and can fit into the memory. The other has a large result set. A large result set consist of results returned from a relatively large number of groups and cannot fit into the memory
To implement a grouping & aggregation with a small result set, we store grouping key values and aggregate result of each group into the memory. At the retrieval of each record, we calculate the grouping key value according to the grouping expression, search for the key value in the groups currently stored, and perform a certain aggregate operation over the current record and the existing aggregate result if the value is found and put the record and aggregate into a new group if it isn’t found. Repeat the actions until all records are processed.
Example: The big file user_info_reg.csv records user login information. We want to calculate the total number and duration of user logins in each province. Values of different columns are separated by commas. Below is part of the file:
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/user_info_reg.csv").cursor@tc() | Create a cursor; @c option enables using commas as the separator |
2 | =A1.groups(id_province;count(~):cnt,sum(reg_time):total_reg) | Group records and calculate the number and duration of user logins in each province |
Since a large result set cannot fit into the memory, we need to handle the grouping & aggregation of a big file in batches, during which the result of processing each batch of records is stored in a temporary file and then all temporary results are summarized. To implement this, we retrieve data row by row and perform grouping & aggregation in the way of handling small groups. When the result set accumulates to become large enough in the memory (compared with the memory size), sort it by grouping key values, store it in a temporary file and remove it from the memory. Then go on to retrieve rows and perform same operations. Finally we’ll get multiple temporary files that are sorted by the grouping key when all rows are retrieved and processed. Now we perform an order-based merge over these temporary files (the process is same as the merge for big file sorting), get a big file that is sorted by the grouping key and where there may be duplicate key values, union records with same key values into same group to get a big file containing all groups, and then return a file cursor from which the grouping result can be extracted.
Example: The big file user_info_reg.csv records user login information. We want to calculate the total number and duration of logins for each user.
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/user_info_reg.csv").cursor@tc() | Create a cursor; @c option enables using commas as the separator |
2 | =A1.groupx(user_id; count(~):cnt,sum(reg_time):total_reg) | Group records and calculate the number and duration of logins for each user |
3 | =file("E:/txt/user_info_tj.csv").export@tc(A2) | Export the results to a new file |
To get top N records we need to first sort the records. Sometimes we want to find top N from all records, other times we need to first group records and then get top N from each group.
Actually there’s no need to sort all records to get top N because it takes long to sort all of them, particularly when the file is big. We implement getting top N in this way. We retrieve a number of records (N) to form a small data set of N records and sort them. Next we retrieve a new record and compare it with the last record in the small data set. If the newly-retrieved record comes behind then we’ll give it up; otherwise we insert the new record into the appropriate place in the small data set and remove the last record of from the data set. When all records are retrieved and processed in such a way, we get a sequence of N desired records. The process is similar to that of performing aggregation.
Example: The big file students_scores.txt records student scores. Now we want to find the records of students whose math scores rank in top 10.
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.txt").cursor@t() | Create a cursor |
2 | =A1.groups(;top(10;-Math)) | Sort records in reversed order using the expression -Math |
top() function can return top N values, too. To get the math scores that rank in top 10, esProc SPL has the following script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.txt").cursor@t() | Create a cursor |
2 | =A1.groups(;top(-10,Math)) | When using comma to separate parameters, top() function returns the top 10 Math scores |
You can get top N records or values from each group. The implementation process is similar. You just need to prepare a data set of N records for each group.
Example: The big file students_scores.txt records student scores. Now we want to find the records of students whose math scores rank in top 10 in each class.
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.txt").cursor@t() | Create a cursor |
2 | =A1.groups(CLASS;top(10;-Math)) | Sort records in reversed order using the expression -Math |
The parallel processing handles one computing task using multiple threads. It makes the most of the multicore CPU to increase computing performance, which is particularly effective in processing a big file. Batch processing handles a big file batch by batch and then union the results together. Parallel processing works in a similar way. It divides a big file into multiple segments, assigns each segment to a thread, which handles its part using the way of processing a big file, and finally, union the results returned from every thread.
Example: The big file user_info_reg.csv records user login information. We need to count the number of user logins in each province. Here we handle the task through 4 parallel threads.
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/user_info_reg.csv").cursor@tcm(;4) | Create a cursor; @m option enables parallel processing; parameter 4 means there are 4 parallel threads |
2 | =A1.groups(id_province;count(~):cnt) |
Example: The big file students_scores.csv stores student scores. The task is to get students whose Chinese scores are above 90 and whose total scores rank in top 5 using 8 parallel threads.
esProc SPL script:
A | Comment | |
---|---|---|
1 | =file("E:/txt/students_scores.csv").cursor@tcm(;8) | Create a cursor; @m option enables parallel processing; parameter 8 means there are 8 parallel threads |
2 | =A1.select(Chinese>=90) | Get students whose Chinese scores are above 90 |
3 | =A2.derive(English+Chinese+Math:total_score) | Attach derive operation to calculate total score and add a new column named total_score |
4 | =A3.groups(CLASS;top(-5;total_score)) | Group records by CLASS and get students whose total scores rank in top 5 in each class |
5 | =file("E:/txt/students_scores_total.txt").export@tc(A4) | Export result to a new file |
It’s easy to implement parallel processing in SPL. You just need to add an option for retrieving data with cursor and use a parameter to define the number of threads. That’s convenient.
Find more examples in SPL CookBook.
SPL Resource: SPL Official Website | SPL Blog | Download esProc SPL | SPL Source Code