Skip to content

Commit 9af456f

Browse files
author
xy
committed
Merge remote-tracking branch 'origin/devel'
2 parents 2db990b + 545762a commit 9af456f

File tree

3 files changed

+177
-239
lines changed

3 files changed

+177
-239
lines changed

src/core.py

+40-103
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@
7171

7272
SCAN_DAT_FILE = 'scaninfo'
7373
SEARCH_DAT_FILE = 'searchinfo'
74-
SIGINT_FILE = 'signal'
74+
SIGNAL_FILE = 'signal'
7575

7676
def get_dev_labes_dict():
7777
lsblk = subprocess_run(['lsblk','-fJ'],capture_output = True,text = True)
@@ -211,8 +211,7 @@ def popen_lin(command,shell,stdin=DEVNULL):
211211

212212
def send_signal(subproc,temp_dir,kind=0):
213213
try:
214-
signal_file = sep.join([temp_dir,SIGINT_FILE])
215-
#print(f'sending signal in file {signal_file}')
214+
signal_file = sep.join([temp_dir,SIGNAL_FILE])
216215

217216
temp_signal_file = signal_file+ '_temp'
218217
with open(temp_signal_file,'w') as tsf:
@@ -304,19 +303,15 @@ def __init__(self,label='',scan_path=''):
304303

305304
#######################################################################
306305
class LibrerRecord:
307-
def __init__(self,log,label=None,scan_path=None,file_path=None):
306+
def __init__(self,label=None,scan_path=None,file_path=None):
308307
self.header = Header(label,scan_path)
309308

310309
self.filestructure = ()
311310
self.customdata = []
312311
self.filenames = []
313312

314-
self.log = log
315313
self.find_results = []
316314

317-
self.info_line = ''
318-
#self.info_line_current = ''
319-
320315
self.abort_action = False
321316

322317
self.file_name = ''
@@ -335,9 +330,6 @@ def load(self,file_path):
335330
self.file_path = file_path
336331
self.file_name = basename(normpath(file_path))
337332

338-
#self.log.info('loading %s' % file_name)
339-
#TODO - problem w podprocesie
340-
341333
try:
342334
with ZipFile(file_path, "r") as zip_file:
343335
header_ser_compr = zip_file.read('header')
@@ -346,16 +338,12 @@ def load(self,file_path):
346338
self.header.zipinfo["header"]=(asizeof(header_ser_compr),asizeof(header_ser),asizeof(self.header))
347339

348340
if self.header.data_format_version != DATA_FORMAT_VERSION:
349-
message = f'loading "{file_path}" error: incompatible data format version: {self.header.data_format_version} vs {DATA_FORMAT_VERSION}'
350-
self.log.error(message)
351-
return message
341+
return f'loading "{file_path}" error: incompatible data format version: {self.header.data_format_version} vs {DATA_FORMAT_VERSION}'
352342

353343
self.prepare_info()
354344

355345
except Exception as e:
356-
message = f'loading "{file_path}" error: "{e}"'
357-
#self.log.error(message)
358-
return message
346+
return f'loading "{file_path}" error: "{e}"'
359347

360348
return False
361349

@@ -368,18 +356,12 @@ def save(self,print_func,file_path=None,compression_level=9):
368356

369357
self.file_path = file_path
370358

371-
#self.info_line = f'saving {filename}'
372-
373-
#self.log.info('saving %s' % file_path)
374-
#print_func(['save',f'saving {file_path}'])
375-
376359
self_header = self.header
377360

378361
self.header.compression_level = compression_level
379362

380363
with ZipFile(file_path, "w") as zip_file:
381364
def compress_with_header_update_wrapp(data,datalabel):
382-
#self.info_line = f'compressing {datalabel}'
383365
print_func(['save',f'compressing {datalabel}'],True)
384366
compress_with_header_update(self.header,data,compression_level,datalabel,zip_file)
385367

@@ -434,16 +416,11 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check
434416
if is_file:
435417
self_header_ext_stats[ext]+=1
436418

437-
#self.info_line_current = entry_name
438-
439-
#print_func(('scan-line',entry_name))
440-
441419
try:
442420
stat_res = stat(entry)
443421
mtime = int(stat_res.st_mtime)
444422
dev=stat_res.st_dev
445423
except Exception as e:
446-
#self.log.error('stat error:%s', e )
447424
print_func( ('error',f'stat {entry_name} error:{e}') )
448425
#size -1 <=> error, dev,in ==0
449426
is_bind = False
@@ -501,16 +478,10 @@ def scan_rec(self,print_func,abort_list,path, scan_like_data,filenames_set,check
501478
self_header.quant_folders += local_folder_folders_count
502479

503480
print_func( ('scan',self_header.sum_size,self_header.quant_files,self_header.quant_folders,path) )
504-
#t_now = perf_counter()
505-
#if t_now>self.progress_update_time+1.0:
506-
#self.progress_update_time = t_now
507481

508482
except Exception as e:
509-
#self.log.error('scandir error:%s',e )
510483
print_func( ('error', f'scandir {path} error:{e}') )
511484

512-
#self.info_line_current = ''
513-
514485
return (local_folder_size_with_subtree+local_folder_size,subitems)
515486

516487
def scan(self,print_func,abort_list,cde_list,check_dev=True):
@@ -608,18 +579,19 @@ def extract_customdata(self,print_func,abort_list):
608579
self_header = self.header
609580
scan_path = self_header.scan_path
610581

611-
#self.info_line = 'custom data extraction ...'
612582
print_func( ('info','custom data extraction ...'),True)
613583

614584
self_header.files_cde_quant = 0
615585
self_header.files_cde_size = 0
616586
self_header.files_cde_size_extracted = 0
617587
self_header.files_cde_errors_quant = defaultdict(int)
618588
self_header.files_cde_errors_quant_all = 0
619-
self_header.files_cde_quant_sum = len(self.customdata_pool)
620-
589+
files_cde_quant_sum = self_header.files_cde_quant_sum = len(self.customdata_pool)
590+
files_cde_size_sum = self_header.files_cde_size_sum
621591
cde_list = self.header.cde_list
622592

593+
print_func( ('cdeinit',files_cde_quant_sum,files_cde_size_sum),True)
594+
623595
customdata_helper={}
624596

625597
customdata_stats_size=defaultdict(int)
@@ -643,10 +615,10 @@ def threaded_cde(timeout_semi_list):
643615
files_cde_size = 0
644616
files_cde_size_extracted = 0
645617

618+
files_cde_errors_quant_all = 0
646619
for (scan_like_list,subpath,rule_nr,size) in self.customdata_pool.values():
647620

648621
self.killed=False
649-
#self.abort_action_single=False
650622

651623
time_start = perf_counter()
652624
if abort_list[0] : #wszystko
@@ -661,8 +633,8 @@ def threaded_cde(timeout_semi_list):
661633
full_file_path = normpath(abspath(sep.join([scan_path,subpath]))).replace('/',sep)
662634
command,command_info = get_command(executable,parameters,full_file_path,shell)
663635

664-
info_line = f'{full_file_path} ({bytes_to_str(size)})'
665-
print_func( ('cde',info_line,size, files_cde_size_extracted,self_header.files_cde_errors_quant_all,files_cde_quant,self_header.files_cde_quant_sum,files_cde_size,self_header.files_cde_size_sum) )
636+
#print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_quant_sum,files_cde_size,files_cde_size_sum) )
637+
print_func( ('cde',f'{full_file_path} ({bytes_to_str(size)})',size,files_cde_size_extracted,files_cde_errors_quant_all,files_cde_quant,files_cde_size) )
666638

667639
timeout_val=time()+timeout if timeout else None
668640
#####################################
@@ -706,6 +678,7 @@ def threaded_cde(timeout_semi_list):
706678

707679
if returncode or self.killed or aborted:
708680
files_cde_errors_quant[rule_nr]+=1
681+
files_cde_errors_quant_all+=1
709682

710683
if not aborted:
711684
files_cde_quant += 1
@@ -737,7 +710,7 @@ def threaded_cde(timeout_semi_list):
737710
time_end_all = perf_counter()
738711

739712
self_header.files_cde_errors_quant=files_cde_errors_quant
740-
self_header.files_cde_errors_quant_all = sum(files_cde_errors_quant.values())
713+
self_header.files_cde_errors_quant_all = files_cde_errors_quant_all
741714

742715
self_header.files_cde_quant = files_cde_quant
743716
self_header.files_cde_size = files_cde_size
@@ -910,8 +883,6 @@ def find_items(self,
910883
filestructure = self.filestructure
911884

912885
search_progress = 0
913-
#search_progress_update_quant = 0
914-
#progress_update_time = perf_counter()
915886

916887
if cd_search_kind!='dont':
917888
self.decompress_customdata()
@@ -937,8 +908,6 @@ def find_items(self,
937908

938909
self_customdata = self.customdata
939910

940-
#results_queue_put = results_queue.append
941-
942911
while search_list:
943912
filestructure,parent_path_components = search_list_pop()
944913

@@ -979,8 +948,6 @@ def find_items(self,
979948
if name_func_to_call:
980949
if name_func_to_call(name):
981950
print_func( (search_progress,size,mtime,*next_level) )
982-
#search_progress_update_quant=0
983-
#progress_update_time = perf_counter()
984951

985952
if sub_data:
986953
search_list_append( (sub_data,next_level) )
@@ -1047,22 +1014,8 @@ def find_items(self,
10471014
continue
10481015

10491016
print_func( (search_progress,size,mtime,*next_level) )
1050-
#search_progress_update_quant=0
1051-
#progress_update_time = perf_counter()
1052-
1053-
#print_func((search_progress))
1054-
1055-
#t_now = perf_counter()
1056-
#if t_now>progress_update_time+1.0:
1057-
# progress_update_time = t_now
1058-
1059-
#if search_progress_update_quant>1024:
1060-
#search_progress_update_quant=0
1061-
#else:
1062-
# search_progress_update_quant+=1
10631017

10641018
print_func( [search_progress] )
1065-
#print_func(True)
10661019

10671020
def find_items_sort(self,what,reverse):
10681021
if what=='data':
@@ -1288,7 +1241,6 @@ def __init__(self,db_dir,log):
12881241
self.db_dir = db_dir
12891242
self.log=log
12901243
self.info_line = 'init'
1291-
#self.info_line_current = ''
12921244

12931245
self.records_to_show=[]
12941246
self.abort_action=False
@@ -1304,7 +1256,7 @@ def update_sorted(self):
13041256
self.records_sorted = sorted(self.records,key = lambda x : x.header.creation_time)
13051257

13061258
def create(self,label='',scan_path=''):
1307-
new_record = LibrerRecord(self.log,label=label,scan_path=scan_path)
1259+
new_record = LibrerRecord(label=label,scan_path=scan_path)
13081260
new_record.db_dir = self.db_dir
13091261

13101262
self.records.add(new_record)
@@ -1573,14 +1525,14 @@ def find_results_clean(self):
15731525
########################################################################################################################
15741526
def create_new_record(self,temp_dir,update_callback):
15751527
self.log.info(f'create_new_record')
1528+
self_log_info = self.log.info
1529+
15761530

15771531
new_file_path = sep.join([self.db_dir,f'rep.{int(time())}.dat'])
15781532

1579-
#new_record_filename = str(int(time()) + .dat
15801533
command = self.record_exe()
15811534
command.append('create')
15821535
command.append(new_file_path)
1583-
#command.append(settings_file)
15841536
command.append(temp_dir)
15851537

15861538
self.abort_action=False
@@ -1602,8 +1554,6 @@ def create_new_record(self,temp_dir,update_callback):
16021554
self.stdout_files_cde_size_sum=0
16031555

16041556
def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
1605-
#results_list_append = results_semi_list[0].find_results.append
1606-
16071557
try:
16081558
subprocess = uni_popen(command,stdin=PIPE)
16091559
except Exception as re:
@@ -1617,12 +1567,12 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
16171567

16181568
while True:
16191569
if line := subprocess_stdout_readline():
1570+
line_strip = line.strip()
1571+
self_log_info(f'rec:{line_strip}')
16201572
try:
1621-
#print(line)
16221573
if line[0]!='#':
1623-
val = json_loads(line.strip())
1574+
val = json_loads(line_strip)
16241575

1625-
self.info_line = val
16261576
kind = val[0]
16271577
if kind == 'stage':
16281578
self.stage = val[1]
@@ -1637,25 +1587,20 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
16371587
self.stdout_sum_size,self.stdout_quant_files,self.stdout_quant_folders,self.stdout_info_line_current = val[1:5]
16381588

16391589
elif self.stage==1: #cde
1640-
self.stdout_info_line_current = val[1]
1641-
self.stdout_cde_size = val[2]
1642-
1643-
self.stdout_files_cde_size_extracted=val[3]
1644-
self.stdout_files_cde_errors_quant_all=val[4]
1645-
self.stdout_files_cde_quant=val[5]
1646-
self.stdout_files_cde_quant_sum=val[6]
1647-
self.stdout_files_cde_size=val[7]
1648-
self.stdout_files_cde_size_sum=val[8]
1649-
1650-
self.stdout_info_line_current
1651-
self.stdout_cde_size
1652-
1653-
#print(type(self.stdout_files_cde_size_extracted))
1654-
#print(type(self.stdout_files_cde_errors_quant_all))
1655-
#print(type(self.stdout_files_cde_quant))
1656-
#print(type(self.stdout_files_cde_quant_sum))
1657-
#print(type(self.stdout_files_cde_size))
1658-
#print(type(self.stdout_files_cde_size_sum))
1590+
if val[0]=='cdeinit':
1591+
#files_cde_quant_sum,files_cde_size_sum
1592+
self.stdout_files_cde_quant_sum = val[1]
1593+
self.stdout_files_cde_size_sum = val[2]
1594+
elif val[0]=='cde':
1595+
self.stdout_info_line_current = val[1]
1596+
self.stdout_cde_size = val[2]
1597+
1598+
self.stdout_files_cde_size_extracted=val[3]
1599+
self.stdout_files_cde_errors_quant_all=val[4]
1600+
self.stdout_files_cde_quant=val[5]
1601+
self.stdout_files_cde_size=val[6]
1602+
else:
1603+
self_log_info('ERROR UNRECOGNIZED LINE')
16591604

16601605
elif self.stage==2: #pack
16611606
self.stdout_info_line_current = val[1]
@@ -1666,10 +1611,11 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
16661611
elif self.stage==4: #end
16671612
pass
16681613
else:
1669-
info_semi_list[0]=line.strip()
1614+
info_semi_list[0]=line_strip
16701615
except Exception as e:
16711616
print(f'threaded_run work error:{e} line:{line}')
16721617
info_semi_list[0]=f'threaded_run work error:{e} line:{line}'
1618+
self_log_info(f'threaded_run work error:{e} line:{line}')
16731619
else:
16741620
if subprocess_poll() is not None:
16751621
break
@@ -1683,32 +1629,24 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
16831629
job.start()
16841630
job_is_alive = job.is_alive
16851631

1632+
aborted=False
16861633
###########################################
16871634
while job_is_alive():
16881635
subprocess=processes_semi_list[0]
16891636
if subprocess:
16901637
if self.abort_action:
1691-
self.info_line = 'Aborting ...'
16921638
send_signal(subprocess,temp_dir,0)
16931639
self.abort_action=False
1640+
aborted=True
16941641
if self.abort_action_single:
1695-
self.info_line = 'Aborting single ...'
16961642
send_signal(subprocess,temp_dir,1)
16971643
self.abort_action_single=False
1644+
sleep(0.1)
16981645

1699-
#try:
1700-
# subprocess.kill()
1701-
#except Exception as ke:
1702-
# print('killing error:',ke)
1703-
1704-
#break
1705-
sleep(0.01)
1706-
1707-
self.info_line = f'scanning czy costam'
17081646
job.join()
17091647
###########################################
17101648

1711-
if not self.abort_action:
1649+
if not aborted:
17121650
new_record = self.create()
17131651

17141652
if res:=new_record.load(new_file_path) :
@@ -1717,7 +1655,6 @@ def threaded_run(command,results_semi_list,info_semi_list,processes_semi_list):
17171655
print(res)
17181656
else:
17191657
update_callback(new_record)
1720-
#self.records_to_show.append( (new_record,info_curr_quant,info_curr_size) )
17211658

17221659
return True
17231660

0 commit comments

Comments
 (0)