{
struct osc_page *opg = oap2osc_page(oap);
struct cl_page *page = oap2cl_page(oap);
- struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
enum cl_req_type crt;
int srvlock;
/* Clear opg->ops_transfer_pinned before VM lock is released. */
opg->ops_transfer_pinned = 0;
- spin_lock(&obj->oo_seatbelt);
- LASSERT(opg->ops_submitter);
- LASSERT(!list_empty(&opg->ops_inflight));
- list_del_init(&opg->ops_inflight);
- opg->ops_submitter = NULL;
- spin_unlock(&obj->oo_seatbelt);
-
opg->ops_submit_time = 0;
srvlock = oap->oap_brw_flags & OBD_BRW_SRVLOCK;
#define OSC_DUMP_GRANT(lvl, cli, fmt, args...) do { \
struct client_obd *__tmp = (cli); \
- CDEBUG(lvl, "%s: grant { dirty: %ld/%ld dirty_pages: %ld/%lu " \
+ CDEBUG(lvl, "%s: grant { dirty: %lu/%lu dirty_pages: %ld/%lu " \
"dropped: %ld avail: %ld, reserved: %ld, flight: %d }" \
"lru {in list: %ld, left: %ld, waiters: %d }" fmt "\n", \
cli_name(__tmp), \
return 0;
if (!async) {
- /* disable osc_lru_shrink() temporarily to avoid
- * potential stack overrun problem. LU-2859
- */
- atomic_inc(&cli->cl_lru_shrinkers);
spin_lock(&cli->cl_loi_list_lock);
osc_check_rpcs(env, cli);
spin_unlock(&cli->cl_loi_list_lock);
- atomic_dec(&cli->cl_lru_shrinkers);
} else {
CDEBUG(D_CACHE, "Queue writeback work for client %p.\n", cli);
LASSERT(cli->cl_writeback_work);
struct osc_object *obj, struct osc_page *ops)
{
struct osc_async_page *oap = &ops->ops_oap;
- struct osc_extent *ext = NULL;
int rc = 0;
LASSERT(oap->oap_magic == OAP_MAGIC);
CDEBUG(D_INFO, "teardown oap %p page %p at index %lu.\n",
oap, ops, osc_index(oap2osc(oap)));
- osc_object_lock(obj);
if (!list_empty(&oap->oap_rpc_item)) {
CDEBUG(D_CACHE, "oap %p is not in cache.\n", oap);
rc = -EBUSY;
} else if (!list_empty(&oap->oap_pending_item)) {
+ struct osc_extent *ext = NULL;
+
+ osc_object_lock(obj);
ext = osc_extent_lookup(obj, osc_index(oap2osc(oap)));
+ osc_object_unlock(obj);
/* only truncated pages are allowed to be taken out.
* See osc_extent_truncate() and osc_cache_truncate_start()
* for details.
osc_index(oap2osc(oap)));
rc = -EBUSY;
}
+ if (ext)
+ osc_extent_put(env, ext);
}
- osc_object_unlock(obj);
- if (ext)
- osc_extent_put(env, ext);
return rc;
}
#define DEBUG_SUBSYSTEM S_OSC
+#include <linux/math64.h>
#include "osc_cl_internal.h"
static void osc_lru_del(struct client_obd *cli, struct osc_page *opg);
struct osc_object *obj = cl2osc(opg->ops_cl.cpl_obj);
osc_lru_use(osc_cli(obj), opg);
-
- spin_lock(&obj->oo_seatbelt);
- list_add(&opg->ops_inflight, &obj->oo_inflight[crt]);
- opg->ops_submitter = current;
- spin_unlock(&obj->oo_seatbelt);
}
int osc_page_cache_add(const struct lu_env *env,
struct osc_object *obj = cl2osc(slice->cpl_obj);
struct client_obd *cli = &osc_export(obj)->exp_obd->u.cli;
- return (*printer)(env, cookie, LUSTRE_OSC_NAME "-page@%p %lu: 1< %#x %d %u %s %s > 2< %llu %u %u %#x %#x | %p %p %p > 3< %s %p %d %lu %d > 4< %d %d %d %lu %s | %s %s %s %s > 5< %s %s %s %s | %d %s | %d %s %s>\n",
+ return (*printer)(env, cookie, LUSTRE_OSC_NAME "-page@%p %lu: 1< %#x %d %u %s %s > 2< %llu %u %u %#x %#x | %p %p %p > 3< %d %lu %d > 4< %d %d %d %lu %s | %s %s %s %s > 5< %s %s %s %s | %d %s | %d %s %s>\n",
opg, osc_index(opg),
/* 1 */
oap->oap_magic, oap->oap_cmd,
oap->oap_async_flags, oap->oap_brw_flags,
oap->oap_request, oap->oap_cli, obj,
/* 3 */
- osc_list(&opg->ops_inflight),
- opg->ops_submitter, opg->ops_transfer_pinned,
+ opg->ops_transfer_pinned,
osc_submit_duration(opg), opg->ops_srvlock,
/* 4 */
cli->cl_r_in_flight, cli->cl_w_in_flight,
LASSERT(0);
}
- spin_lock(&obj->oo_seatbelt);
- if (opg->ops_submitter) {
- LASSERT(!list_empty(&opg->ops_inflight));
- list_del_init(&opg->ops_inflight);
- opg->ops_submitter = NULL;
- }
- spin_unlock(&obj->oo_seatbelt);
-
osc_lru_del(osc_cli(obj), opg);
if (slice->cpl_page->cp_type == CPT_CACHEABLE) {
cl_page_slice_add(page, &opg->ops_cl, obj, index,
&osc_page_ops);
}
- /* ops_inflight and ops_lru are the same field, but it doesn't
- * hurt to initialize it twice :-)
- */
- INIT_LIST_HEAD(&opg->ops_inflight);
INIT_LIST_HEAD(&opg->ops_lru);
/* reserve an LRU space for this page */
* LRU pages are freed in batch mode. OSC should at least free this
* number of pages to avoid running out of LRU slots.
*/
-static const int lru_shrink_min = 2 << (20 - PAGE_SHIFT); /* 2M */
-static const int lru_shrink_max = 8 << (20 - PAGE_SHIFT); /* 8M */
+static inline int lru_shrink_min(struct client_obd *cli)
+{
+ return cli->cl_max_pages_per_rpc * 2;
+}
+
+/**
+ * free this number at most otherwise it will take too long time to finish.
+ */
+static inline int lru_shrink_max(struct client_obd *cli)
+{
+ return cli->cl_max_pages_per_rpc * cli->cl_max_rpcs_in_flight;
+}
/**
* Check if we can free LRU slots from this OSC. If there exists LRU waiters,
/* if it's going to run out LRU slots, we should free some, but not
* too much to maintain fairness among OSCs.
*/
- if (atomic_long_read(cli->cl_lru_left) < cache->ccc_lru_max >> 4) {
+ if (atomic_long_read(cli->cl_lru_left) < cache->ccc_lru_max >> 2) {
if (pages >= budget)
- return lru_shrink_max;
+ return lru_shrink_max(cli);
else if (pages >= budget / 2)
- return lru_shrink_min;
- } else if (pages >= budget * 2) {
- return lru_shrink_min;
+ return lru_shrink_min(cli);
+ } else {
+ time64_t duration = ktime_get_real_seconds();
+
+ /* knock out pages by duration of no IO activity */
+ duration -= cli->cl_lru_last_used;
+ duration >>= 6; /* approximately 1 minute */
+ if (duration > 0 &&
+ pages >= div64_s64((s64)budget, duration))
+ return lru_shrink_min(cli);
}
return 0;
}
int lru_queue_work(const struct lu_env *env, void *data)
{
struct client_obd *cli = data;
+ int count;
CDEBUG(D_CACHE, "%s: run LRU work for client obd\n", cli_name(cli));
- if (osc_cache_too_much(cli))
- osc_lru_shrink(env, cli, lru_shrink_max, true);
+ count = osc_cache_too_much(cli);
+ if (count > 0) {
+ int rc = osc_lru_shrink(env, cli, count, false);
+
+ CDEBUG(D_CACHE, "%s: shrank %d/%d pages from client obd\n",
+ cli_name(cli), rc, count);
+ if (rc >= count) {
+ CDEBUG(D_CACHE, "%s: queue again\n", cli_name(cli));
+ ptlrpcd_queue_work(cli->cl_lru_work);
+ }
+ }
return 0;
}
list_splice_tail(&lru, &cli->cl_lru_list);
atomic_long_sub(npages, &cli->cl_lru_busy);
atomic_long_add(npages, &cli->cl_lru_in_list);
+ cli->cl_lru_last_used = ktime_get_real_seconds();
spin_unlock(&cli->cl_lru_list_lock);
- /* XXX: May set force to be true for better performance */
- if (osc_cache_too_much(cli))
+ if (waitqueue_active(&osc_lru_waitq))
(void)ptlrpcd_queue_work(cli->cl_lru_work);
}
}
* this osc occupies too many LRU pages and kernel is
* stealing one of them.
*/
- if (!memory_pressure_get())
+ if (osc_cache_too_much(cli)) {
+ CDEBUG(D_CACHE, "%s: queue LRU workn", cli_name(cli));
(void)ptlrpcd_queue_work(cli->cl_lru_work);
+ }
wake_up(&osc_lru_waitq);
} else {
LASSERT(list_empty(&opg->ops_lru));
struct cl_page *page = pvec[i];
LASSERT(cl_page_is_owned(page, io));
+ cl_page_delete(env, page);
cl_page_discard(env, io, page);
cl_page_disown(env, io, page);
cl_page_put(env, page);
if (atomic_long_read(&cli->cl_lru_in_list) == 0 || target <= 0)
return 0;
+ CDEBUG(D_CACHE, "%s: shrinkers: %d, force: %d\n",
+ cli_name(cli), atomic_read(&cli->cl_lru_shrinkers), force);
if (!force) {
if (atomic_read(&cli->cl_lru_shrinkers) > 0)
return -EBUSY;
io = &osc_env_info(env)->oti_io;
spin_lock(&cli->cl_lru_list_lock);
+ if (force)
+ cli->cl_lru_reclaim++;
maxscan = min(target << 1, atomic_long_read(&cli->cl_lru_in_list));
list_for_each_entry_safe(opg, temp, &cli->cl_lru_list, ops_lru) {
struct cl_page *page;
bool will_free = false;
+ if (!force && atomic_read(&cli->cl_lru_shrinkers) > 1)
+ break;
+
if (--maxscan < 0)
break;
* LRU pages in batch. Therefore, the actual number is adjusted at least
* max_pages_per_rpc.
*/
-long osc_lru_reclaim(struct client_obd *cli)
+long osc_lru_reclaim(struct client_obd *cli, unsigned long npages)
{
struct lu_env *env;
struct cl_client_cache *cache = cli->cl_cache;
if (IS_ERR(env))
return 0;
- rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli), false);
- if (rc != 0) {
- if (rc == -EBUSY)
- rc = 0;
-
- CDEBUG(D_CACHE, "%s: Free %ld pages from own LRU: %p.\n",
- cli->cl_import->imp_obd->obd_name, rc, cli);
+ npages = max_t(int, npages, cli->cl_max_pages_per_rpc);
+ CDEBUG(D_CACHE, "%s: start to reclaim %ld pages from LRU\n",
+ cli_name(cli), npages);
+ rc = osc_lru_shrink(env, cli, npages, true);
+ if (rc >= npages) {
+ CDEBUG(D_CACHE, "%s: reclaimed %ld/%ld pages from LRU\n",
+ cli_name(cli), rc, npages);
+ if (osc_cache_too_much(cli) > 0)
+ ptlrpcd_queue_work(cli->cl_lru_work);
goto out;
+ } else if (rc > 0) {
+ npages -= rc;
}
- CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %ld, busy: %ld.\n",
+ CDEBUG(D_CACHE, "%s: cli %p no free slots, pages: %ld/%ld, want: %ld\n",
cli_name(cli), cli, atomic_long_read(&cli->cl_lru_in_list),
- atomic_long_read(&cli->cl_lru_busy));
+ atomic_long_read(&cli->cl_lru_busy), npages);
/* Reclaim LRU slots from other client_obd as it can't free enough
* from its own. This should rarely happen.
if (osc_cache_too_much(cli) > 0) {
spin_unlock(&cache->ccc_lru_lock);
- rc = osc_lru_shrink(env, cli, osc_cache_too_much(cli),
- true);
+ rc = osc_lru_shrink(env, cli, npages, true);
spin_lock(&cache->ccc_lru_lock);
- if (rc != 0)
+ if (rc >= npages)
break;
+ if (rc > 0)
+ npages -= rc;
}
}
spin_unlock(&cache->ccc_lru_lock);
LASSERT(atomic_long_read(cli->cl_lru_left) >= 0);
while (!atomic_long_add_unless(cli->cl_lru_left, -1, 0)) {
/* run out of LRU spaces, try to drop some by itself */
- rc = osc_lru_reclaim(cli);
+ rc = osc_lru_reclaim(cli, 1);
if (rc < 0)
break;
if (rc > 0)
if (!unstable_count)
wake_up_all(&cli->cl_cache->ccc_unstable_waitq);
- if (osc_cache_too_much(cli))
+ if (waitqueue_active(&osc_lru_waitq))
(void)ptlrpcd_queue_work(cli->cl_lru_work);
}