return (bp->b_page_count * PAGE_SIZE) - bp->b_offset;
}
+/*
+ * Bump the I/O in flight count on the buftarg if we haven't yet done so for
+ * this buffer. The count is incremented once per buffer (per hold cycle)
+ * because the corresponding decrement is deferred to buffer release. Buffers
+ * can undergo I/O multiple times in a hold-release cycle and per buffer I/O
+ * tracking adds unnecessary overhead. This is used for sychronization purposes
+ * with unmount (see xfs_wait_buftarg()), so all we really need is a count of
+ * in-flight buffers.
+ *
+ * Buffers that are never released (e.g., superblock, iclog buffers) must set
+ * the XBF_NO_IOACCT flag before I/O submission. Otherwise, the buftarg count
+ * never reaches zero and unmount hangs indefinitely.
+ */
+static inline void
+xfs_buf_ioacct_inc(
+ struct xfs_buf *bp)
+{
+ if (bp->b_flags & (XBF_NO_IOACCT|_XBF_IN_FLIGHT))
+ return;
+
+ ASSERT(bp->b_flags & XBF_ASYNC);
+ bp->b_flags |= _XBF_IN_FLIGHT;
+ percpu_counter_inc(&bp->b_target->bt_io_count);
+}
+
+/*
+ * Clear the in-flight state on a buffer about to be released to the LRU or
+ * freed and unaccount from the buftarg.
+ */
+static inline void
+xfs_buf_ioacct_dec(
+ struct xfs_buf *bp)
+{
+ if (!(bp->b_flags & _XBF_IN_FLIGHT))
+ return;
+
+ ASSERT(bp->b_flags & XBF_ASYNC);
+ bp->b_flags &= ~_XBF_IN_FLIGHT;
+ percpu_counter_dec(&bp->b_target->bt_io_count);
+}
+
/*
* When we mark a buffer stale, we remove the buffer from the LRU and clear the
* b_lru_ref count so that the buffer is freed immediately when the buffer
*/
bp->b_flags &= ~_XBF_DELWRI_Q;
+ /*
+ * Once the buffer is marked stale and unlocked, a subsequent lookup
+ * could reset b_flags. There is no guarantee that the buffer is
+ * unaccounted (released to LRU) before that occurs. Drop in-flight
+ * status now to preserve accounting consistency.
+ */
+ xfs_buf_ioacct_dec(bp);
+
spin_lock(&bp->b_lock);
atomic_set(&bp->b_lru_ref, 0);
if (!(bp->b_state & XFS_BSTATE_DISPOSE) &&
}
/*
- * Releases a hold on the specified buffer. If the
- * the hold count is 1, calls xfs_buf_free.
+ * Release a hold on the specified buffer. If the hold count is 1, the buffer is
+ * placed on LRU or freed (depending on b_lru_ref).
*/
void
xfs_buf_rele(
xfs_buf_t *bp)
{
struct xfs_perag *pag = bp->b_pag;
+ bool release;
+ bool freebuf = false;
trace_xfs_buf_rele(bp, _RET_IP_);
if (!pag) {
ASSERT(list_empty(&bp->b_lru));
ASSERT(RB_EMPTY_NODE(&bp->b_rbnode));
- if (atomic_dec_and_test(&bp->b_hold))
+ if (atomic_dec_and_test(&bp->b_hold)) {
+ xfs_buf_ioacct_dec(bp);
xfs_buf_free(bp);
+ }
return;
}
ASSERT(!RB_EMPTY_NODE(&bp->b_rbnode));
ASSERT(atomic_read(&bp->b_hold) > 0);
- if (atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock)) {
- spin_lock(&bp->b_lock);
- if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
- /*
- * If the buffer is added to the LRU take a new
- * reference to the buffer for the LRU and clear the
- * (now stale) dispose list state flag
- */
- if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
- bp->b_state &= ~XFS_BSTATE_DISPOSE;
- atomic_inc(&bp->b_hold);
- }
- spin_unlock(&bp->b_lock);
- spin_unlock(&pag->pag_buf_lock);
- } else {
- /*
- * most of the time buffers will already be removed from
- * the LRU, so optimise that case by checking for the
- * XFS_BSTATE_DISPOSE flag indicating the last list the
- * buffer was on was the disposal list
- */
- if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
- list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
- } else {
- ASSERT(list_empty(&bp->b_lru));
- }
- spin_unlock(&bp->b_lock);
- ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
- rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
- spin_unlock(&pag->pag_buf_lock);
- xfs_perag_put(pag);
- xfs_buf_free(bp);
+ release = atomic_dec_and_lock(&bp->b_hold, &pag->pag_buf_lock);
+ spin_lock(&bp->b_lock);
+ if (!release) {
+ /*
+ * Drop the in-flight state if the buffer is already on the LRU
+ * and it holds the only reference. This is racy because we
+ * haven't acquired the pag lock, but the use of _XBF_IN_FLIGHT
+ * ensures the decrement occurs only once per-buf.
+ */
+ if ((atomic_read(&bp->b_hold) == 1) && !list_empty(&bp->b_lru))
+ xfs_buf_ioacct_dec(bp);
+ goto out_unlock;
+ }
+
+ /* the last reference has been dropped ... */
+ xfs_buf_ioacct_dec(bp);
+ if (!(bp->b_flags & XBF_STALE) && atomic_read(&bp->b_lru_ref)) {
+ /*
+ * If the buffer is added to the LRU take a new reference to the
+ * buffer for the LRU and clear the (now stale) dispose list
+ * state flag
+ */
+ if (list_lru_add(&bp->b_target->bt_lru, &bp->b_lru)) {
+ bp->b_state &= ~XFS_BSTATE_DISPOSE;
+ atomic_inc(&bp->b_hold);
}
+ spin_unlock(&pag->pag_buf_lock);
+ } else {
+ /*
+ * most of the time buffers will already be removed from the
+ * LRU, so optimise that case by checking for the
+ * XFS_BSTATE_DISPOSE flag indicating the last list the buffer
+ * was on was the disposal list
+ */
+ if (!(bp->b_state & XFS_BSTATE_DISPOSE)) {
+ list_lru_del(&bp->b_target->bt_lru, &bp->b_lru);
+ } else {
+ ASSERT(list_empty(&bp->b_lru));
+ }
+
+ ASSERT(!(bp->b_flags & _XBF_DELWRI_Q));
+ rb_erase(&bp->b_rbnode, &pag->pag_buf_tree);
+ spin_unlock(&pag->pag_buf_lock);
+ xfs_perag_put(pag);
+ freebuf = true;
}
+
+out_unlock:
+ spin_unlock(&bp->b_lock);
+
+ if (freebuf)
+ xfs_buf_free(bp);
}
* xfs_buf_ioend too early.
*/
atomic_set(&bp->b_io_remaining, 1);
+ xfs_buf_ioacct_inc(bp);
_xfs_buf_ioapply(bp);
/*
int loop = 0;
/*
- * We need to flush the buffer workqueue to ensure that all IO
- * completion processing is 100% done. Just waiting on buffer locks is
- * not sufficient for async IO as the reference count held over IO is
- * not released until after the buffer lock is dropped. Hence we need to
- * ensure here that all reference counts have been dropped before we
- * start walking the LRU list.
+ * First wait on the buftarg I/O count for all in-flight buffers to be
+ * released. This is critical as new buffers do not make the LRU until
+ * they are released.
+ *
+ * Next, flush the buffer workqueue to ensure all completion processing
+ * has finished. Just waiting on buffer locks is not sufficient for
+ * async IO as the reference count held over IO is not released until
+ * after the buffer lock is dropped. Hence we need to ensure here that
+ * all reference counts have been dropped before we start walking the
+ * LRU list.
*/
+ while (percpu_counter_sum(&btp->bt_io_count))
+ delay(100);
drain_workqueue(btp->bt_mount->m_buf_workqueue);
/* loop until there is nothing left on the lru list. */
struct xfs_buftarg *btp)
{
unregister_shrinker(&btp->bt_shrinker);
+ ASSERT(percpu_counter_sum(&btp->bt_io_count) == 0);
+ percpu_counter_destroy(&btp->bt_io_count);
list_lru_destroy(&btp->bt_lru);
if (mp->m_flags & XFS_MOUNT_BARRIER)
if (list_lru_init(&btp->bt_lru))
goto error;
+ if (percpu_counter_init(&btp->bt_io_count, 0, GFP_KERNEL))
+ goto error;
+
btp->bt_shrinker.count_objects = xfs_buftarg_shrink_count;
btp->bt_shrinker.scan_objects = xfs_buftarg_shrink_scan;
btp->bt_shrinker.seeks = DEFAULT_SEEKS;