@@ -137,6 +137,14 @@ struct io_defer_entry {
137
137
#define IO_DISARM_MASK (REQ_F_ARM_LTIMEOUT | REQ_F_LINK_TIMEOUT | REQ_F_FAIL)
138
138
#define IO_REQ_LINK_FLAGS (REQ_F_LINK | REQ_F_HARDLINK)
139
139
140
+ /*
141
+ * No waiters. It's larger than any valid value of the tw counter
142
+ * so that tests against ->cq_wait_nr would fail and skip wake_up().
143
+ */
144
+ #define IO_CQ_WAKE_INIT (-1U)
145
+ /* Forced wake up if there is a waiter regardless of ->cq_wait_nr */
146
+ #define IO_CQ_WAKE_FORCE (IO_CQ_WAKE_INIT >> 1)
147
+
140
148
static bool io_uring_try_cancel_requests (struct io_ring_ctx * ctx ,
141
149
struct task_struct * task ,
142
150
bool cancel_all );
@@ -303,6 +311,7 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
303
311
goto err ;
304
312
305
313
ctx -> flags = p -> flags ;
314
+ atomic_set (& ctx -> cq_wait_nr , IO_CQ_WAKE_INIT );
306
315
init_waitqueue_head (& ctx -> sqo_sq_wait );
307
316
INIT_LIST_HEAD (& ctx -> sqd_list );
308
317
INIT_LIST_HEAD (& ctx -> cq_overflow_list );
@@ -1304,16 +1313,23 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
1304
1313
{
1305
1314
struct io_ring_ctx * ctx = req -> ctx ;
1306
1315
unsigned nr_wait , nr_tw , nr_tw_prev ;
1307
- struct llist_node * first ;
1316
+ struct llist_node * head ;
1317
+
1318
+ /* See comment above IO_CQ_WAKE_INIT */
1319
+ BUILD_BUG_ON (IO_CQ_WAKE_FORCE <= IORING_MAX_CQ_ENTRIES );
1308
1320
1321
+ /*
1322
+ * We don't know how many reuqests is there in the link and whether
1323
+ * they can even be queued lazily, fall back to non-lazy.
1324
+ */
1309
1325
if (req -> flags & (REQ_F_LINK | REQ_F_HARDLINK ))
1310
1326
flags &= ~IOU_F_TWQ_LAZY_WAKE ;
1311
1327
1312
- first = READ_ONCE (ctx -> work_llist .first );
1328
+ head = READ_ONCE (ctx -> work_llist .first );
1313
1329
do {
1314
1330
nr_tw_prev = 0 ;
1315
- if (first ) {
1316
- struct io_kiocb * first_req = container_of (first ,
1331
+ if (head ) {
1332
+ struct io_kiocb * first_req = container_of (head ,
1317
1333
struct io_kiocb ,
1318
1334
io_task_work .node );
1319
1335
/*
@@ -1322,32 +1338,42 @@ static inline void io_req_local_work_add(struct io_kiocb *req, unsigned flags)
1322
1338
*/
1323
1339
nr_tw_prev = READ_ONCE (first_req -> nr_tw );
1324
1340
}
1341
+
1342
+ /*
1343
+ * Theoretically, it can overflow, but that's fine as one of
1344
+ * previous adds should've tried to wake the task.
1345
+ */
1325
1346
nr_tw = nr_tw_prev + 1 ;
1326
- /* Large enough to fail the nr_wait comparison below */
1327
1347
if (!(flags & IOU_F_TWQ_LAZY_WAKE ))
1328
- nr_tw = -1U ;
1348
+ nr_tw = IO_CQ_WAKE_FORCE ;
1329
1349
1330
1350
req -> nr_tw = nr_tw ;
1331
- req -> io_task_work .node .next = first ;
1332
- } while (!try_cmpxchg (& ctx -> work_llist .first , & first ,
1351
+ req -> io_task_work .node .next = head ;
1352
+ } while (!try_cmpxchg (& ctx -> work_llist .first , & head ,
1333
1353
& req -> io_task_work .node ));
1334
1354
1335
- if (!first ) {
1355
+ /*
1356
+ * cmpxchg implies a full barrier, which pairs with the barrier
1357
+ * in set_current_state() on the io_cqring_wait() side. It's used
1358
+ * to ensure that either we see updated ->cq_wait_nr, or waiters
1359
+ * going to sleep will observe the work added to the list, which
1360
+ * is similar to the wait/wawke task state sync.
1361
+ */
1362
+
1363
+ if (!head ) {
1336
1364
if (ctx -> flags & IORING_SETUP_TASKRUN_FLAG )
1337
1365
atomic_or (IORING_SQ_TASKRUN , & ctx -> rings -> sq_flags );
1338
1366
if (ctx -> has_evfd )
1339
1367
io_eventfd_signal (ctx );
1340
1368
}
1341
1369
1342
1370
nr_wait = atomic_read (& ctx -> cq_wait_nr );
1343
- /* no one is waiting */
1344
- if (! nr_wait )
1371
+ /* not enough or no one is waiting */
1372
+ if (nr_tw < nr_wait )
1345
1373
return ;
1346
- /* either not enough or the previous add has already woken it up */
1347
- if (nr_wait > nr_tw || nr_tw_prev >= nr_wait )
1374
+ /* the previous add has already woken it up */
1375
+ if (nr_tw_prev >= nr_wait )
1348
1376
return ;
1349
- /* pairs with set_current_state() in io_cqring_wait() */
1350
- smp_mb__after_atomic ();
1351
1377
wake_up_state (ctx -> submitter_task , TASK_INTERRUPTIBLE );
1352
1378
}
1353
1379
@@ -2000,9 +2026,10 @@ inline struct file *io_file_get_fixed(struct io_kiocb *req, int fd,
2000
2026
goto out ;
2001
2027
fd = array_index_nospec (fd , ctx -> nr_user_files );
2002
2028
slot = io_fixed_file_slot (& ctx -> file_table , fd );
2003
- file = io_slot_file (slot );
2029
+ if (!req -> rsrc_node )
2030
+ __io_req_set_rsrc_node (req , ctx );
2004
2031
req -> flags |= io_slot_flags (slot );
2005
- io_req_set_rsrc_node ( req , ctx , 0 );
2032
+ file = io_slot_file ( slot );
2006
2033
out :
2007
2034
io_ring_submit_unlock (ctx , issue_flags );
2008
2035
return file ;
@@ -2613,7 +2640,7 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
2613
2640
2614
2641
ret = io_cqring_wait_schedule (ctx , & iowq );
2615
2642
__set_current_state (TASK_RUNNING );
2616
- atomic_set (& ctx -> cq_wait_nr , 0 );
2643
+ atomic_set (& ctx -> cq_wait_nr , IO_CQ_WAKE_INIT );
2617
2644
2618
2645
/*
2619
2646
* Run task_work after scheduling and before io_should_wake().
0 commit comments