5
5
6
6
rrdeng_stats_t global_io_errors = 0 ;
7
7
rrdeng_stats_t global_fs_errors = 0 ;
8
- rrdeng_stats_t global_pg_cache_warnings = 0 ;
9
- rrdeng_stats_t global_pg_cache_errors = 0 ;
10
8
rrdeng_stats_t rrdeng_reserved_file_descriptors = 0 ;
9
+ rrdeng_stats_t global_flushing_errors = 0 ;
11
10
12
11
void sanity_check (void )
13
12
{
@@ -253,6 +252,7 @@ void flush_pages_cb(uv_fs_t* req)
253
252
{
254
253
struct rrdengine_worker_config * wc = req -> loop -> data ;
255
254
struct rrdengine_instance * ctx = wc -> ctx ;
255
+ struct page_cache * pg_cache = & ctx -> pg_cache ;
256
256
struct extent_io_descriptor * xt_io_descr ;
257
257
struct rrdeng_page_descr * descr ;
258
258
struct page_cache_descr * pg_cache_descr ;
@@ -290,6 +290,10 @@ void flush_pages_cb(uv_fs_t* req)
290
290
uv_fs_req_cleanup (req );
291
291
free (xt_io_descr -> buf );
292
292
freez (xt_io_descr );
293
+
294
+ uv_rwlock_wrlock (& pg_cache -> committed_page_index .lock );
295
+ pg_cache -> committed_page_index .nr_committed_pages -= count ;
296
+ uv_rwlock_wrunlock (& pg_cache -> committed_page_index .lock );
293
297
}
294
298
295
299
/*
@@ -323,14 +327,14 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
323
327
if (force ) {
324
328
debug (D_RRDENGINE , "Asynchronous flushing of extent has been forced by page pressure." );
325
329
}
326
- uv_rwlock_wrlock (& pg_cache -> commited_page_index .lock );
330
+ uv_rwlock_wrlock (& pg_cache -> committed_page_index .lock );
327
331
for (Index = 0 , count = 0 , uncompressed_payload_length = 0 ,
328
- PValue = JudyLFirst (pg_cache -> commited_page_index .JudyL_array , & Index , PJE0 ),
332
+ PValue = JudyLFirst (pg_cache -> committed_page_index .JudyL_array , & Index , PJE0 ),
329
333
descr = unlikely (NULL == PValue ) ? NULL : * PValue ;
330
334
331
335
descr != NULL && count != MAX_PAGES_PER_EXTENT ;
332
336
333
- PValue = JudyLNext (pg_cache -> commited_page_index .JudyL_array , & Index , PJE0 ),
337
+ PValue = JudyLNext (pg_cache -> committed_page_index .JudyL_array , & Index , PJE0 ),
334
338
descr = unlikely (NULL == PValue ) ? NULL : * PValue ) {
335
339
uint8_t page_write_pending ;
336
340
@@ -350,12 +354,11 @@ static int do_flush_pages(struct rrdengine_worker_config* wc, int force, struct
350
354
rrdeng_page_descr_mutex_unlock (ctx , descr );
351
355
352
356
if (page_write_pending ) {
353
- ret = JudyLDel (& pg_cache -> commited_page_index .JudyL_array , Index , PJE0 );
357
+ ret = JudyLDel (& pg_cache -> committed_page_index .JudyL_array , Index , PJE0 );
354
358
assert (1 == ret );
355
- -- pg_cache -> commited_page_index .nr_commited_pages ;
356
359
}
357
360
}
358
- uv_rwlock_wrunlock (& pg_cache -> commited_page_index .lock );
361
+ uv_rwlock_wrunlock (& pg_cache -> committed_page_index .lock );
359
362
360
363
if (!count ) {
361
364
debug (D_RRDENGINE , "%s: no pages eligible for flushing." , __func__ );
@@ -648,6 +651,9 @@ void async_cb(uv_async_t *handle)
648
651
debug (D_RRDENGINE , "%s called, active=%d." , __func__ , uv_is_active ((uv_handle_t * )handle ));
649
652
}
650
653
654
+ /* Flushes dirty pages when timer expires */
655
+ #define TIMER_PERIOD_MS (1000)
656
+
651
657
void timer_cb (uv_timer_t * handle )
652
658
{
653
659
struct rrdengine_worker_config * wc = handle -> data ;
@@ -657,12 +663,31 @@ void timer_cb(uv_timer_t* handle)
657
663
rrdeng_test_quota (wc );
658
664
debug (D_RRDENGINE , "%s: timeout reached." , __func__ );
659
665
if (likely (!wc -> now_deleting .data )) {
660
- unsigned total_bytes , bytes_written ;
661
-
662
666
/* There is free space so we can write to disk */
667
+ struct rrdengine_instance * ctx = wc -> ctx ;
668
+ struct page_cache * pg_cache = & ctx -> pg_cache ;
669
+ unsigned long total_bytes , bytes_written , nr_committed_pages , bytes_to_write = 0 , producers , low_watermark ,
670
+ high_watermark ;
671
+
672
+ uv_rwlock_wrlock (& pg_cache -> committed_page_index .lock );
673
+ nr_committed_pages = pg_cache -> committed_page_index .nr_committed_pages ;
674
+ uv_rwlock_wrunlock (& pg_cache -> committed_page_index .lock );
675
+ producers = ctx -> stats .metric_API_producers ;
676
+ /* are flushable pages more than 25% of the maximum page cache size */
677
+ high_watermark = (ctx -> max_cache_pages * 25LLU ) / 100 ;
678
+ low_watermark = (ctx -> max_cache_pages * 5LLU ) / 100 ; /* 5%, must be smaller than high_watermark */
679
+
680
+ if (nr_committed_pages > producers &&
681
+ /* committed to be written pages are more than the produced number */
682
+ nr_committed_pages - producers > high_watermark ) {
683
+ /* Flushing speed must increase to stop page cache from filling with dirty pages */
684
+ bytes_to_write = (nr_committed_pages - producers - low_watermark ) * RRDENG_BLOCK_SIZE ;
685
+ }
686
+ bytes_to_write = MAX (DATAFILE_IDEAL_IO_SIZE , bytes_to_write );
687
+
663
688
debug (D_RRDENGINE , "Flushing pages to disk." );
664
689
for (total_bytes = bytes_written = do_flush_pages (wc , 0 , NULL ) ;
665
- bytes_written && (total_bytes < DATAFILE_IDEAL_IO_SIZE ) ;
690
+ bytes_written && (total_bytes < bytes_to_write ) ;
666
691
total_bytes += bytes_written ) {
667
692
bytes_written = do_flush_pages (wc , 0 , NULL );
668
693
}
@@ -675,9 +700,6 @@ void timer_cb(uv_timer_t* handle)
675
700
#endif
676
701
}
677
702
678
- /* Flushes dirty pages when timer expires */
679
- #define TIMER_PERIOD_MS (1000)
680
-
681
703
#define MAX_CMD_BATCH_SIZE (256)
682
704
683
705
void rrdeng_worker (void * arg )
@@ -771,8 +793,8 @@ void rrdeng_worker(void* arg)
771
793
/* First I/O should be enough to call completion */
772
794
bytes_written = do_flush_pages (wc , 1 , cmd .completion );
773
795
if (bytes_written ) {
774
- while (do_flush_pages (wc , 1 , NULL )) {
775
- ; /* Force flushing of all commited pages. */
796
+ while (do_flush_pages (wc , 1 , NULL ) && likely (! wc -> now_deleting . data ) ) {
797
+ ; /* Force flushing of all committed pages if there is free space . */
776
798
}
777
799
}
778
800
break ;
@@ -789,7 +811,7 @@ void rrdeng_worker(void* arg)
789
811
}
790
812
info ("Shutting down RRD engine event loop." );
791
813
while (do_flush_pages (wc , 1 , NULL )) {
792
- ; /* Force flushing of all commited pages. */
814
+ ; /* Force flushing of all committed pages. */
793
815
}
794
816
wal_flush_transaction_buffer (wc );
795
817
uv_run (loop , UV_RUN_DEFAULT );
0 commit comments