-----BEGIN PGP SIGNATURE-----

iQEcBAABAgAGBQJYbPilAAoJEJykq7OBq3PIYf0H/jedP2AA090Uh+ECWTJkg9n3
 8S7hpxMOhQGoNpYn1lgdaWM/B1nYo6qpFC/kAus4gytv9+MkxOHrcYQsNNfxZzZF
 dQWXrrMK0y7f7tSRfjUy4bMMnc4cHTpqwhVWvtygxG67llr+vk7wppzCOYzUCz6e
 YZJj5111a3gYUzTVSkc0jIf5gdfpjIbo4NcdPpSgLrf1CL7+nM2k1cHMxj1bBBxJ
 M9y7Dek+txHQ0zf5rQm4duvFmrrCp8/pfb5zNUk89Za3NJ41SKK3XY+aPYVpi8kl
 j8Uvv368llMzo9fYJFs9ykb2siZx1vbSS6EIcxqL0toZ+ZlCBpWVS6zRAlmys2o=
 =6YKx
 -----END PGP SIGNATURE-----

Merge remote-tracking branch 'remotes/stefanha/tags/block-pull-request' into staging

# gpg: Signature made Wed 04 Jan 2017 13:29:09 GMT
# gpg:                using RSA key 0x9CA4ABB381AB73C8
# gpg: Good signature from "Stefan Hajnoczi <stefanha@redhat.com>"
# gpg:                 aka "Stefan Hajnoczi <stefanha@gmail.com>"
# Primary key fingerprint: 8695 A8BF D3F9 7CDA AC35  775A 9CA4 ABB3 81AB 73C8

* remotes/stefanha/tags/block-pull-request:
  iothread: add poll-grow and poll-shrink parameters
  aio: self-tune polling time
  virtio: disable virtqueue notifications during polling
  aio: add .io_poll_begin/end() callbacks
  virtio: turn vq->notification into a nested counter
  virtio-scsi: suppress virtqueue kick during processing
  virtio-blk: suppress virtqueue kick during processing
  iothread: add polling parameters
  linux-aio: poll ring for completions
  virtio: poll virtqueues for new buffers
  aio: add polling mode to AioContext
  aio: add AioPollFn and io_poll() interface
  aio: add flag to skip fds to aio_dispatch()
  HACKING: document #include order

Signed-off-by: Peter Maydell <peter.maydell@linaro.org>
This commit is contained in:
Peter Maydell 2017-01-05 12:44:22 +00:00
commit e92fbc753d
24 changed files with 624 additions and 112 deletions

18
HACKING
View File

@ -1,10 +1,28 @@
1. Preprocessor 1. Preprocessor
1.1. Variadic macros
For variadic macros, stick with this C99-like syntax: For variadic macros, stick with this C99-like syntax:
#define DPRINTF(fmt, ...) \ #define DPRINTF(fmt, ...) \
do { printf("IRQ: " fmt, ## __VA_ARGS__); } while (0) do { printf("IRQ: " fmt, ## __VA_ARGS__); } while (0)
1.2. Include directives
Order include directives as follows:
#include "qemu/osdep.h" /* Always first... */
#include <...> /* then system headers... */
#include "..." /* and finally QEMU headers. */
The "qemu/osdep.h" header contains preprocessor macros that affect the behavior
of core system headers like <stdint.h>. It must be the first include so that
core system headers included by external libraries get the preprocessor macros
that QEMU depends on.
Do not include "qemu/osdep.h" from header files since the .c file will have
already included it.
2. C types 2. C types
It should be common sense to use the right type, but we have collected It should be common sense to use the right type, but we have collected

View File

@ -18,6 +18,8 @@
#include "block/block.h" #include "block/block.h"
#include "qemu/queue.h" #include "qemu/queue.h"
#include "qemu/sockets.h" #include "qemu/sockets.h"
#include "qemu/cutils.h"
#include "trace.h"
#ifdef CONFIG_EPOLL_CREATE1 #ifdef CONFIG_EPOLL_CREATE1
#include <sys/epoll.h> #include <sys/epoll.h>
#endif #endif
@ -27,6 +29,9 @@ struct AioHandler
GPollFD pfd; GPollFD pfd;
IOHandler *io_read; IOHandler *io_read;
IOHandler *io_write; IOHandler *io_write;
AioPollFn *io_poll;
IOHandler *io_poll_begin;
IOHandler *io_poll_end;
int deleted; int deleted;
void *opaque; void *opaque;
bool is_external; bool is_external;
@ -200,6 +205,7 @@ void aio_set_fd_handler(AioContext *ctx,
bool is_external, bool is_external,
IOHandler *io_read, IOHandler *io_read,
IOHandler *io_write, IOHandler *io_write,
AioPollFn *io_poll,
void *opaque) void *opaque)
{ {
AioHandler *node; AioHandler *node;
@ -209,7 +215,7 @@ void aio_set_fd_handler(AioContext *ctx,
node = find_aio_handler(ctx, fd); node = find_aio_handler(ctx, fd);
/* Are we deleting the fd handler? */ /* Are we deleting the fd handler? */
if (!io_read && !io_write) { if (!io_read && !io_write && !io_poll) {
if (node == NULL) { if (node == NULL) {
return; return;
} }
@ -228,6 +234,10 @@ void aio_set_fd_handler(AioContext *ctx,
QLIST_REMOVE(node, node); QLIST_REMOVE(node, node);
deleted = true; deleted = true;
} }
if (!node->io_poll) {
ctx->poll_disable_cnt--;
}
} else { } else {
if (node == NULL) { if (node == NULL) {
/* Alloc and insert if it's not already there */ /* Alloc and insert if it's not already there */
@ -237,10 +247,16 @@ void aio_set_fd_handler(AioContext *ctx,
g_source_add_poll(&ctx->source, &node->pfd); g_source_add_poll(&ctx->source, &node->pfd);
is_new = true; is_new = true;
ctx->poll_disable_cnt += !io_poll;
} else {
ctx->poll_disable_cnt += !io_poll - !node->io_poll;
} }
/* Update handler with latest information */ /* Update handler with latest information */
node->io_read = io_read; node->io_read = io_read;
node->io_write = io_write; node->io_write = io_write;
node->io_poll = io_poll;
node->opaque = opaque; node->opaque = opaque;
node->is_external = is_external; node->is_external = is_external;
@ -250,22 +266,83 @@ void aio_set_fd_handler(AioContext *ctx,
aio_epoll_update(ctx, node, is_new); aio_epoll_update(ctx, node, is_new);
aio_notify(ctx); aio_notify(ctx);
if (deleted) { if (deleted) {
g_free(node); g_free(node);
} }
} }
void aio_set_fd_poll(AioContext *ctx, int fd,
IOHandler *io_poll_begin,
IOHandler *io_poll_end)
{
AioHandler *node = find_aio_handler(ctx, fd);
if (!node) {
return;
}
node->io_poll_begin = io_poll_begin;
node->io_poll_end = io_poll_end;
}
void aio_set_event_notifier(AioContext *ctx, void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier, EventNotifier *notifier,
bool is_external, bool is_external,
EventNotifierHandler *io_read) EventNotifierHandler *io_read,
AioPollFn *io_poll)
{ {
aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), aio_set_fd_handler(ctx, event_notifier_get_fd(notifier), is_external,
is_external, (IOHandler *)io_read, NULL, notifier); (IOHandler *)io_read, NULL, io_poll, notifier);
} }
void aio_set_event_notifier_poll(AioContext *ctx,
EventNotifier *notifier,
EventNotifierHandler *io_poll_begin,
EventNotifierHandler *io_poll_end)
{
aio_set_fd_poll(ctx, event_notifier_get_fd(notifier),
(IOHandler *)io_poll_begin,
(IOHandler *)io_poll_end);
}
static void poll_set_started(AioContext *ctx, bool started)
{
AioHandler *node;
if (started == ctx->poll_started) {
return;
}
ctx->poll_started = started;
ctx->walking_handlers++;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
IOHandler *fn;
if (node->deleted) {
continue;
}
if (started) {
fn = node->io_poll_begin;
} else {
fn = node->io_poll_end;
}
if (fn) {
fn(node->opaque);
}
}
ctx->walking_handlers--;
}
bool aio_prepare(AioContext *ctx) bool aio_prepare(AioContext *ctx)
{ {
/* Poll mode cannot be used with glib's event loop, disable it. */
poll_set_started(ctx, false);
return false; return false;
} }
@ -290,9 +367,13 @@ bool aio_pending(AioContext *ctx)
return false; return false;
} }
bool aio_dispatch(AioContext *ctx) /*
* Note that dispatch_fds == false has the side-effect of post-poning the
* freeing of deleted handlers.
*/
bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
{ {
AioHandler *node; AioHandler *node = NULL;
bool progress = false; bool progress = false;
/* /*
@ -308,7 +389,9 @@ bool aio_dispatch(AioContext *ctx)
* We have to walk very carefully in case aio_set_fd_handler is * We have to walk very carefully in case aio_set_fd_handler is
* called while we're walking. * called while we're walking.
*/ */
node = QLIST_FIRST(&ctx->aio_handlers); if (dispatch_fds) {
node = QLIST_FIRST(&ctx->aio_handlers);
}
while (node) { while (node) {
AioHandler *tmp; AioHandler *tmp;
int revents; int revents;
@ -400,12 +483,100 @@ static void add_pollfd(AioHandler *node)
npfd++; npfd++;
} }
static bool run_poll_handlers_once(AioContext *ctx)
{
bool progress = false;
AioHandler *node;
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->io_poll &&
node->io_poll(node->opaque)) {
progress = true;
}
/* Caller handles freeing deleted nodes. Don't do it here. */
}
return progress;
}
/* run_poll_handlers:
* @ctx: the AioContext
* @max_ns: maximum time to poll for, in nanoseconds
*
* Polls for a given time.
*
* Note that ctx->notify_me must be non-zero so this function can detect
* aio_notify().
*
* Note that the caller must have incremented ctx->walking_handlers.
*
* Returns: true if progress was made, false otherwise
*/
static bool run_poll_handlers(AioContext *ctx, int64_t max_ns)
{
bool progress;
int64_t end_time;
assert(ctx->notify_me);
assert(ctx->walking_handlers > 0);
assert(ctx->poll_disable_cnt == 0);
trace_run_poll_handlers_begin(ctx, max_ns);
end_time = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) + max_ns;
do {
progress = run_poll_handlers_once(ctx);
} while (!progress && qemu_clock_get_ns(QEMU_CLOCK_REALTIME) < end_time);
trace_run_poll_handlers_end(ctx, progress);
return progress;
}
/* try_poll_mode:
* @ctx: the AioContext
* @blocking: busy polling is only attempted when blocking is true
*
* ctx->notify_me must be non-zero so this function can detect aio_notify().
*
* Note that the caller must have incremented ctx->walking_handlers.
*
* Returns: true if progress was made, false otherwise
*/
static bool try_poll_mode(AioContext *ctx, bool blocking)
{
if (blocking && ctx->poll_max_ns && ctx->poll_disable_cnt == 0) {
/* See qemu_soonest_timeout() uint64_t hack */
int64_t max_ns = MIN((uint64_t)aio_compute_timeout(ctx),
(uint64_t)ctx->poll_ns);
if (max_ns) {
poll_set_started(ctx, true);
if (run_poll_handlers(ctx, max_ns)) {
return true;
}
}
}
poll_set_started(ctx, false);
/* Even if we don't run busy polling, try polling once in case it can make
* progress and the caller will be able to avoid ppoll(2)/epoll_wait(2).
*/
return run_poll_handlers_once(ctx);
}
bool aio_poll(AioContext *ctx, bool blocking) bool aio_poll(AioContext *ctx, bool blocking)
{ {
AioHandler *node; AioHandler *node;
int i, ret; int i;
int ret = 0;
bool progress; bool progress;
int64_t timeout; int64_t timeout;
int64_t start = 0;
aio_context_acquire(ctx); aio_context_acquire(ctx);
progress = false; progress = false;
@ -423,41 +594,91 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers++; ctx->walking_handlers++;
assert(npfd == 0); if (ctx->poll_max_ns) {
start = qemu_clock_get_ns(QEMU_CLOCK_REALTIME);
}
/* fill pollfds */ if (try_poll_mode(ctx, blocking)) {
progress = true;
} else {
assert(npfd == 0);
if (!aio_epoll_enabled(ctx)) { /* fill pollfds */
QLIST_FOREACH(node, &ctx->aio_handlers, node) {
if (!node->deleted && node->pfd.events if (!aio_epoll_enabled(ctx)) {
&& aio_node_check(ctx, node->is_external)) { QLIST_FOREACH(node, &ctx->aio_handlers, node) {
add_pollfd(node); if (!node->deleted && node->pfd.events
&& aio_node_check(ctx, node->is_external)) {
add_pollfd(node);
}
} }
} }
timeout = blocking ? aio_compute_timeout(ctx) : 0;
/* wait until next event */
if (timeout) {
aio_context_release(ctx);
}
if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
AioHandler epoll_handler;
epoll_handler.pfd.fd = ctx->epollfd;
epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR;
npfd = 0;
add_pollfd(&epoll_handler);
ret = aio_epoll(ctx, pollfds, npfd, timeout);
} else {
ret = qemu_poll_ns(pollfds, npfd, timeout);
}
if (timeout) {
aio_context_acquire(ctx);
}
} }
timeout = blocking ? aio_compute_timeout(ctx) : 0;
/* wait until next event */
if (timeout) {
aio_context_release(ctx);
}
if (aio_epoll_check_poll(ctx, pollfds, npfd, timeout)) {
AioHandler epoll_handler;
epoll_handler.pfd.fd = ctx->epollfd;
epoll_handler.pfd.events = G_IO_IN | G_IO_OUT | G_IO_HUP | G_IO_ERR;
npfd = 0;
add_pollfd(&epoll_handler);
ret = aio_epoll(ctx, pollfds, npfd, timeout);
} else {
ret = qemu_poll_ns(pollfds, npfd, timeout);
}
if (blocking) { if (blocking) {
atomic_sub(&ctx->notify_me, 2); atomic_sub(&ctx->notify_me, 2);
} }
if (timeout) {
aio_context_acquire(ctx); /* Adjust polling time */
if (ctx->poll_max_ns) {
int64_t block_ns = qemu_clock_get_ns(QEMU_CLOCK_REALTIME) - start;
if (block_ns <= ctx->poll_ns) {
/* This is the sweet spot, no adjustment needed */
} else if (block_ns > ctx->poll_max_ns) {
/* We'd have to poll for too long, poll less */
int64_t old = ctx->poll_ns;
if (ctx->poll_shrink) {
ctx->poll_ns /= ctx->poll_shrink;
} else {
ctx->poll_ns = 0;
}
trace_poll_shrink(ctx, old, ctx->poll_ns);
} else if (ctx->poll_ns < ctx->poll_max_ns &&
block_ns < ctx->poll_max_ns) {
/* There is room to grow, poll longer */
int64_t old = ctx->poll_ns;
int64_t grow = ctx->poll_grow;
if (grow == 0) {
grow = 2;
}
if (ctx->poll_ns) {
ctx->poll_ns *= grow;
} else {
ctx->poll_ns = 4000; /* start polling at 4 microseconds */
}
if (ctx->poll_ns > ctx->poll_max_ns) {
ctx->poll_ns = ctx->poll_max_ns;
}
trace_poll_grow(ctx, old, ctx->poll_ns);
}
} }
aio_notify_accept(ctx); aio_notify_accept(ctx);
@ -473,7 +694,7 @@ bool aio_poll(AioContext *ctx, bool blocking)
ctx->walking_handlers--; ctx->walking_handlers--;
/* Run dispatch even if there were no readable fds to run timers */ /* Run dispatch even if there were no readable fds to run timers */
if (aio_dispatch(ctx)) { if (aio_dispatch(ctx, ret > 0)) {
progress = true; progress = true;
} }
@ -484,6 +705,13 @@ bool aio_poll(AioContext *ctx, bool blocking)
void aio_context_setup(AioContext *ctx) void aio_context_setup(AioContext *ctx)
{ {
/* TODO remove this in final patch submission */
if (getenv("QEMU_AIO_POLL_MAX_NS")) {
fprintf(stderr, "The QEMU_AIO_POLL_MAX_NS environment variable has "
"been replaced with -object iothread,poll-max-ns=NUM\n");
exit(1);
}
#ifdef CONFIG_EPOLL_CREATE1 #ifdef CONFIG_EPOLL_CREATE1
assert(!ctx->epollfd); assert(!ctx->epollfd);
ctx->epollfd = epoll_create1(EPOLL_CLOEXEC); ctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
@ -495,3 +723,17 @@ void aio_context_setup(AioContext *ctx)
} }
#endif #endif
} }
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
int64_t grow, int64_t shrink, Error **errp)
{
/* No thread synchronization here, it doesn't matter if an incorrect value
* is used once.
*/
ctx->poll_max_ns = max_ns;
ctx->poll_ns = 0;
ctx->poll_grow = grow;
ctx->poll_shrink = shrink;
aio_notify(ctx);
}

View File

@ -20,6 +20,7 @@
#include "block/block.h" #include "block/block.h"
#include "qemu/queue.h" #include "qemu/queue.h"
#include "qemu/sockets.h" #include "qemu/sockets.h"
#include "qapi/error.h"
struct AioHandler { struct AioHandler {
EventNotifier *e; EventNotifier *e;
@ -38,6 +39,7 @@ void aio_set_fd_handler(AioContext *ctx,
bool is_external, bool is_external,
IOHandler *io_read, IOHandler *io_read,
IOHandler *io_write, IOHandler *io_write,
AioPollFn *io_poll,
void *opaque) void *opaque)
{ {
/* fd is a SOCKET in our case */ /* fd is a SOCKET in our case */
@ -100,10 +102,18 @@ void aio_set_fd_handler(AioContext *ctx,
aio_notify(ctx); aio_notify(ctx);
} }
void aio_set_fd_poll(AioContext *ctx, int fd,
IOHandler *io_poll_begin,
IOHandler *io_poll_end)
{
/* Not implemented */
}
void aio_set_event_notifier(AioContext *ctx, void aio_set_event_notifier(AioContext *ctx,
EventNotifier *e, EventNotifier *e,
bool is_external, bool is_external,
EventNotifierHandler *io_notify) EventNotifierHandler *io_notify,
AioPollFn *io_poll)
{ {
AioHandler *node; AioHandler *node;
@ -150,6 +160,14 @@ void aio_set_event_notifier(AioContext *ctx,
aio_notify(ctx); aio_notify(ctx);
} }
void aio_set_event_notifier_poll(AioContext *ctx,
EventNotifier *notifier,
EventNotifierHandler *io_poll_begin,
EventNotifierHandler *io_poll_end)
{
/* Not implemented */
}
bool aio_prepare(AioContext *ctx) bool aio_prepare(AioContext *ctx)
{ {
static struct timeval tv0; static struct timeval tv0;
@ -271,12 +289,14 @@ static bool aio_dispatch_handlers(AioContext *ctx, HANDLE event)
return progress; return progress;
} }
bool aio_dispatch(AioContext *ctx) bool aio_dispatch(AioContext *ctx, bool dispatch_fds)
{ {
bool progress; bool progress;
progress = aio_bh_poll(ctx); progress = aio_bh_poll(ctx);
progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE); if (dispatch_fds) {
progress |= aio_dispatch_handlers(ctx, INVALID_HANDLE_VALUE);
}
progress |= timerlistgroup_run_timers(&ctx->tlg); progress |= timerlistgroup_run_timers(&ctx->tlg);
return progress; return progress;
} }
@ -374,3 +394,9 @@ bool aio_poll(AioContext *ctx, bool blocking)
void aio_context_setup(AioContext *ctx) void aio_context_setup(AioContext *ctx)
{ {
} }
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
int64_t grow, int64_t shrink, Error **errp)
{
error_setg(errp, "AioContext polling is not implemented on Windows");
}

21
async.c
View File

@ -251,7 +251,7 @@ aio_ctx_dispatch(GSource *source,
AioContext *ctx = (AioContext *) source; AioContext *ctx = (AioContext *) source;
assert(callback == NULL); assert(callback == NULL);
aio_dispatch(ctx); aio_dispatch(ctx, true);
return true; return true;
} }
@ -282,7 +282,7 @@ aio_ctx_finalize(GSource *source)
} }
qemu_mutex_unlock(&ctx->bh_lock); qemu_mutex_unlock(&ctx->bh_lock);
aio_set_event_notifier(ctx, &ctx->notifier, false, NULL); aio_set_event_notifier(ctx, &ctx->notifier, false, NULL, NULL);
event_notifier_cleanup(&ctx->notifier); event_notifier_cleanup(&ctx->notifier);
qemu_rec_mutex_destroy(&ctx->lock); qemu_rec_mutex_destroy(&ctx->lock);
qemu_mutex_destroy(&ctx->bh_lock); qemu_mutex_destroy(&ctx->bh_lock);
@ -349,6 +349,15 @@ static void event_notifier_dummy_cb(EventNotifier *e)
{ {
} }
/* Returns true if aio_notify() was called (e.g. a BH was scheduled) */
static bool event_notifier_poll(void *opaque)
{
EventNotifier *e = opaque;
AioContext *ctx = container_of(e, AioContext, notifier);
return atomic_read(&ctx->notified);
}
AioContext *aio_context_new(Error **errp) AioContext *aio_context_new(Error **errp)
{ {
int ret; int ret;
@ -366,7 +375,8 @@ AioContext *aio_context_new(Error **errp)
aio_set_event_notifier(ctx, &ctx->notifier, aio_set_event_notifier(ctx, &ctx->notifier,
false, false,
(EventNotifierHandler *) (EventNotifierHandler *)
event_notifier_dummy_cb); event_notifier_dummy_cb,
event_notifier_poll);
#ifdef CONFIG_LINUX_AIO #ifdef CONFIG_LINUX_AIO
ctx->linux_aio = NULL; ctx->linux_aio = NULL;
#endif #endif
@ -375,6 +385,11 @@ AioContext *aio_context_new(Error **errp)
qemu_rec_mutex_init(&ctx->lock); qemu_rec_mutex_init(&ctx->lock);
timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx); timerlistgroup_init(&ctx->tlg, aio_timerlist_notify, ctx);
ctx->poll_ns = 0;
ctx->poll_max_ns = 0;
ctx->poll_grow = 0;
ctx->poll_shrink = 0;
return ctx; return ctx;
fail: fail:
g_source_destroy(&ctx->source); g_source_destroy(&ctx->source);

View File

@ -192,19 +192,19 @@ static int curl_sock_cb(CURL *curl, curl_socket_t fd, int action,
switch (action) { switch (action) {
case CURL_POLL_IN: case CURL_POLL_IN:
aio_set_fd_handler(s->aio_context, fd, false, aio_set_fd_handler(s->aio_context, fd, false,
curl_multi_read, NULL, state); curl_multi_read, NULL, NULL, state);
break; break;
case CURL_POLL_OUT: case CURL_POLL_OUT:
aio_set_fd_handler(s->aio_context, fd, false, aio_set_fd_handler(s->aio_context, fd, false,
NULL, curl_multi_do, state); NULL, curl_multi_do, NULL, state);
break; break;
case CURL_POLL_INOUT: case CURL_POLL_INOUT:
aio_set_fd_handler(s->aio_context, fd, false, aio_set_fd_handler(s->aio_context, fd, false,
curl_multi_read, curl_multi_do, state); curl_multi_read, curl_multi_do, NULL, state);
break; break;
case CURL_POLL_REMOVE: case CURL_POLL_REMOVE:
aio_set_fd_handler(s->aio_context, fd, false, aio_set_fd_handler(s->aio_context, fd, false,
NULL, NULL, NULL); NULL, NULL, NULL, NULL);
break; break;
} }

View File

@ -362,6 +362,7 @@ iscsi_set_events(IscsiLun *iscsilun)
false, false,
(ev & POLLIN) ? iscsi_process_read : NULL, (ev & POLLIN) ? iscsi_process_read : NULL,
(ev & POLLOUT) ? iscsi_process_write : NULL, (ev & POLLOUT) ? iscsi_process_write : NULL,
NULL,
iscsilun); iscsilun);
iscsilun->events = ev; iscsilun->events = ev;
} }
@ -1526,7 +1527,7 @@ static void iscsi_detach_aio_context(BlockDriverState *bs)
IscsiLun *iscsilun = bs->opaque; IscsiLun *iscsilun = bs->opaque;
aio_set_fd_handler(iscsilun->aio_context, iscsi_get_fd(iscsilun->iscsi), aio_set_fd_handler(iscsilun->aio_context, iscsi_get_fd(iscsilun->iscsi),
false, NULL, NULL, NULL); false, NULL, NULL, NULL, NULL);
iscsilun->events = 0; iscsilun->events = 0;
if (iscsilun->nop_timer) { if (iscsilun->nop_timer) {

View File

@ -255,6 +255,20 @@ static void qemu_laio_completion_cb(EventNotifier *e)
} }
} }
static bool qemu_laio_poll_cb(void *opaque)
{
EventNotifier *e = opaque;
LinuxAioState *s = container_of(e, LinuxAioState, e);
struct io_event *events;
if (!io_getevents_peek(s->ctx, &events)) {
return false;
}
qemu_laio_process_completions_and_submit(s);
return true;
}
static void laio_cancel(BlockAIOCB *blockacb) static void laio_cancel(BlockAIOCB *blockacb)
{ {
struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb; struct qemu_laiocb *laiocb = (struct qemu_laiocb *)blockacb;
@ -439,7 +453,7 @@ BlockAIOCB *laio_submit(BlockDriverState *bs, LinuxAioState *s, int fd,
void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context) void laio_detach_aio_context(LinuxAioState *s, AioContext *old_context)
{ {
aio_set_event_notifier(old_context, &s->e, false, NULL); aio_set_event_notifier(old_context, &s->e, false, NULL, NULL);
qemu_bh_delete(s->completion_bh); qemu_bh_delete(s->completion_bh);
} }
@ -448,7 +462,8 @@ void laio_attach_aio_context(LinuxAioState *s, AioContext *new_context)
s->aio_context = new_context; s->aio_context = new_context;
s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s); s->completion_bh = aio_bh_new(new_context, qemu_laio_completion_bh, s);
aio_set_event_notifier(new_context, &s->e, false, aio_set_event_notifier(new_context, &s->e, false,
qemu_laio_completion_cb); qemu_laio_completion_cb,
qemu_laio_poll_cb);
} }
LinuxAioState *laio_init(void) LinuxAioState *laio_init(void)

View File

@ -145,7 +145,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
aio_context = bdrv_get_aio_context(bs); aio_context = bdrv_get_aio_context(bs);
aio_set_fd_handler(aio_context, s->sioc->fd, false, aio_set_fd_handler(aio_context, s->sioc->fd, false,
nbd_reply_ready, nbd_restart_write, bs); nbd_reply_ready, nbd_restart_write, NULL, bs);
if (qiov) { if (qiov) {
qio_channel_set_cork(s->ioc, true); qio_channel_set_cork(s->ioc, true);
rc = nbd_send_request(s->ioc, request); rc = nbd_send_request(s->ioc, request);
@ -161,7 +161,7 @@ static int nbd_co_send_request(BlockDriverState *bs,
rc = nbd_send_request(s->ioc, request); rc = nbd_send_request(s->ioc, request);
} }
aio_set_fd_handler(aio_context, s->sioc->fd, false, aio_set_fd_handler(aio_context, s->sioc->fd, false,
nbd_reply_ready, NULL, bs); nbd_reply_ready, NULL, NULL, bs);
s->send_coroutine = NULL; s->send_coroutine = NULL;
qemu_co_mutex_unlock(&s->send_mutex); qemu_co_mutex_unlock(&s->send_mutex);
return rc; return rc;
@ -366,14 +366,14 @@ void nbd_client_detach_aio_context(BlockDriverState *bs)
{ {
aio_set_fd_handler(bdrv_get_aio_context(bs), aio_set_fd_handler(bdrv_get_aio_context(bs),
nbd_get_client_session(bs)->sioc->fd, nbd_get_client_session(bs)->sioc->fd,
false, NULL, NULL, NULL); false, NULL, NULL, NULL, NULL);
} }
void nbd_client_attach_aio_context(BlockDriverState *bs, void nbd_client_attach_aio_context(BlockDriverState *bs,
AioContext *new_context) AioContext *new_context)
{ {
aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd, aio_set_fd_handler(new_context, nbd_get_client_session(bs)->sioc->fd,
false, nbd_reply_ready, NULL, bs); false, nbd_reply_ready, NULL, NULL, bs);
} }
void nbd_client_close(BlockDriverState *bs) void nbd_client_close(BlockDriverState *bs)

View File

@ -197,7 +197,8 @@ static void nfs_set_events(NFSClient *client)
aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context), aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context),
false, false,
(ev & POLLIN) ? nfs_process_read : NULL, (ev & POLLIN) ? nfs_process_read : NULL,
(ev & POLLOUT) ? nfs_process_write : NULL, client); (ev & POLLOUT) ? nfs_process_write : NULL,
NULL, client);
} }
client->events = ev; client->events = ev;
@ -395,7 +396,7 @@ static void nfs_detach_aio_context(BlockDriverState *bs)
NFSClient *client = bs->opaque; NFSClient *client = bs->opaque;
aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context), aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context),
false, NULL, NULL, NULL); false, NULL, NULL, NULL, NULL);
client->events = 0; client->events = 0;
} }
@ -415,7 +416,7 @@ static void nfs_client_close(NFSClient *client)
nfs_close(client->context, client->fh); nfs_close(client->context, client->fh);
} }
aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context), aio_set_fd_handler(client->aio_context, nfs_get_fd(client->context),
false, NULL, NULL, NULL); false, NULL, NULL, NULL, NULL);
nfs_destroy_context(client->context); nfs_destroy_context(client->context);
} }
memset(client, 0, sizeof(NFSClient)); memset(client, 0, sizeof(NFSClient));

View File

@ -664,7 +664,7 @@ static coroutine_fn void do_co_req(void *opaque)
co = qemu_coroutine_self(); co = qemu_coroutine_self();
aio_set_fd_handler(srco->aio_context, sockfd, false, aio_set_fd_handler(srco->aio_context, sockfd, false,
NULL, restart_co_req, co); NULL, restart_co_req, NULL, co);
ret = send_co_req(sockfd, hdr, data, wlen); ret = send_co_req(sockfd, hdr, data, wlen);
if (ret < 0) { if (ret < 0) {
@ -672,7 +672,7 @@ static coroutine_fn void do_co_req(void *opaque)
} }
aio_set_fd_handler(srco->aio_context, sockfd, false, aio_set_fd_handler(srco->aio_context, sockfd, false,
restart_co_req, NULL, co); restart_co_req, NULL, NULL, co);
ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr)); ret = qemu_co_recv(sockfd, hdr, sizeof(*hdr));
if (ret != sizeof(*hdr)) { if (ret != sizeof(*hdr)) {
@ -698,7 +698,7 @@ out:
/* there is at most one request for this sockfd, so it is safe to /* there is at most one request for this sockfd, so it is safe to
* set each handler to NULL. */ * set each handler to NULL. */
aio_set_fd_handler(srco->aio_context, sockfd, false, aio_set_fd_handler(srco->aio_context, sockfd, false,
NULL, NULL, NULL); NULL, NULL, NULL, NULL);
srco->ret = ret; srco->ret = ret;
srco->finished = true; srco->finished = true;
@ -760,7 +760,7 @@ static coroutine_fn void reconnect_to_sdog(void *opaque)
AIOReq *aio_req, *next; AIOReq *aio_req, *next;
aio_set_fd_handler(s->aio_context, s->fd, false, NULL, aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
NULL, NULL); NULL, NULL, NULL);
close(s->fd); close(s->fd);
s->fd = -1; s->fd = -1;
@ -964,7 +964,7 @@ static int get_sheep_fd(BDRVSheepdogState *s, Error **errp)
} }
aio_set_fd_handler(s->aio_context, fd, false, aio_set_fd_handler(s->aio_context, fd, false,
co_read_response, NULL, s); co_read_response, NULL, NULL, s);
return fd; return fd;
} }
@ -1226,7 +1226,7 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
qemu_co_mutex_lock(&s->lock); qemu_co_mutex_lock(&s->lock);
s->co_send = qemu_coroutine_self(); s->co_send = qemu_coroutine_self();
aio_set_fd_handler(s->aio_context, s->fd, false, aio_set_fd_handler(s->aio_context, s->fd, false,
co_read_response, co_write_request, s); co_read_response, co_write_request, NULL, s);
socket_set_cork(s->fd, 1); socket_set_cork(s->fd, 1);
/* send a header */ /* send a header */
@ -1245,7 +1245,7 @@ static void coroutine_fn add_aio_request(BDRVSheepdogState *s, AIOReq *aio_req,
out: out:
socket_set_cork(s->fd, 0); socket_set_cork(s->fd, 0);
aio_set_fd_handler(s->aio_context, s->fd, false, aio_set_fd_handler(s->aio_context, s->fd, false,
co_read_response, NULL, s); co_read_response, NULL, NULL, s);
s->co_send = NULL; s->co_send = NULL;
qemu_co_mutex_unlock(&s->lock); qemu_co_mutex_unlock(&s->lock);
} }
@ -1396,7 +1396,7 @@ static void sd_detach_aio_context(BlockDriverState *bs)
BDRVSheepdogState *s = bs->opaque; BDRVSheepdogState *s = bs->opaque;
aio_set_fd_handler(s->aio_context, s->fd, false, NULL, aio_set_fd_handler(s->aio_context, s->fd, false, NULL,
NULL, NULL); NULL, NULL, NULL);
} }
static void sd_attach_aio_context(BlockDriverState *bs, static void sd_attach_aio_context(BlockDriverState *bs,
@ -1406,7 +1406,7 @@ static void sd_attach_aio_context(BlockDriverState *bs,
s->aio_context = new_context; s->aio_context = new_context;
aio_set_fd_handler(new_context, s->fd, false, aio_set_fd_handler(new_context, s->fd, false,
co_read_response, NULL, s); co_read_response, NULL, NULL, s);
} }
/* TODO Convert to fine grained options */ /* TODO Convert to fine grained options */
@ -1520,7 +1520,7 @@ static int sd_open(BlockDriverState *bs, QDict *options, int flags,
return 0; return 0;
out: out:
aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
false, NULL, NULL, NULL); false, NULL, NULL, NULL, NULL);
if (s->fd >= 0) { if (s->fd >= 0) {
closesocket(s->fd); closesocket(s->fd);
} }
@ -1559,7 +1559,7 @@ static void sd_reopen_commit(BDRVReopenState *state)
if (s->fd) { if (s->fd) {
aio_set_fd_handler(s->aio_context, s->fd, false, aio_set_fd_handler(s->aio_context, s->fd, false,
NULL, NULL, NULL); NULL, NULL, NULL, NULL);
closesocket(s->fd); closesocket(s->fd);
} }
@ -1583,7 +1583,7 @@ static void sd_reopen_abort(BDRVReopenState *state)
if (re_s->fd) { if (re_s->fd) {
aio_set_fd_handler(s->aio_context, re_s->fd, false, aio_set_fd_handler(s->aio_context, re_s->fd, false,
NULL, NULL, NULL); NULL, NULL, NULL, NULL);
closesocket(re_s->fd); closesocket(re_s->fd);
} }
@ -1972,7 +1972,7 @@ static void sd_close(BlockDriverState *bs)
} }
aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd, aio_set_fd_handler(bdrv_get_aio_context(bs), s->fd,
false, NULL, NULL, NULL); false, NULL, NULL, NULL, NULL);
closesocket(s->fd); closesocket(s->fd);
g_free(s->host_spec); g_free(s->host_spec);
} }

View File

@ -911,7 +911,7 @@ static coroutine_fn void set_fd_handler(BDRVSSHState *s, BlockDriverState *bs)
rd_handler, wr_handler); rd_handler, wr_handler);
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
false, rd_handler, wr_handler, co); false, rd_handler, wr_handler, NULL, co);
} }
static coroutine_fn void clear_fd_handler(BDRVSSHState *s, static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
@ -919,7 +919,7 @@ static coroutine_fn void clear_fd_handler(BDRVSSHState *s,
{ {
DPRINTF("s->sock=%d", s->sock); DPRINTF("s->sock=%d", s->sock);
aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock, aio_set_fd_handler(bdrv_get_aio_context(bs), s->sock,
false, NULL, NULL, NULL); false, NULL, NULL, NULL, NULL);
} }
/* A non-blocking call returned EAGAIN, so yield, ensuring the /* A non-blocking call returned EAGAIN, so yield, ensuring the

View File

@ -175,7 +175,7 @@ int win32_aio_attach(QEMUWin32AIOState *aio, HANDLE hfile)
void win32_aio_detach_aio_context(QEMUWin32AIOState *aio, void win32_aio_detach_aio_context(QEMUWin32AIOState *aio,
AioContext *old_context) AioContext *old_context)
{ {
aio_set_event_notifier(old_context, &aio->e, false, NULL); aio_set_event_notifier(old_context, &aio->e, false, NULL, NULL);
aio->is_aio_context_attached = false; aio->is_aio_context_attached = false;
} }
@ -184,7 +184,7 @@ void win32_aio_attach_aio_context(QEMUWin32AIOState *aio,
{ {
aio->is_aio_context_attached = true; aio->is_aio_context_attached = true;
aio_set_event_notifier(new_context, &aio->e, false, aio_set_event_notifier(new_context, &aio->e, false,
win32_aio_completion_cb); win32_aio_completion_cb, NULL);
} }
QEMUWin32AIOState *win32_aio_init(void) QEMUWin32AIOState *win32_aio_init(void)

View File

@ -588,13 +588,19 @@ void virtio_blk_handle_vq(VirtIOBlock *s, VirtQueue *vq)
blk_io_plug(s->blk); blk_io_plug(s->blk);
while ((req = virtio_blk_get_request(s, vq))) { do {
if (virtio_blk_handle_request(req, &mrb)) { virtio_queue_set_notification(vq, 0);
virtqueue_detach_element(req->vq, &req->elem, 0);
virtio_blk_free_request(req); while ((req = virtio_blk_get_request(s, vq))) {
break; if (virtio_blk_handle_request(req, &mrb)) {
virtqueue_detach_element(req->vq, &req->elem, 0);
virtio_blk_free_request(req);
break;
}
} }
}
virtio_queue_set_notification(vq, 1);
} while (!virtio_queue_empty(vq));
if (mrb.num_reqs) { if (mrb.num_reqs) {
virtio_blk_submit_multireq(s->blk, &mrb); virtio_blk_submit_multireq(s->blk, &mrb);

View File

@ -592,26 +592,32 @@ static void virtio_scsi_handle_cmd_req_submit(VirtIOSCSI *s, VirtIOSCSIReq *req)
void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq) void virtio_scsi_handle_cmd_vq(VirtIOSCSI *s, VirtQueue *vq)
{ {
VirtIOSCSIReq *req, *next; VirtIOSCSIReq *req, *next;
int ret; int ret = 0;
QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs); QTAILQ_HEAD(, VirtIOSCSIReq) reqs = QTAILQ_HEAD_INITIALIZER(reqs);
while ((req = virtio_scsi_pop_req(s, vq))) { do {
ret = virtio_scsi_handle_cmd_req_prepare(s, req); virtio_queue_set_notification(vq, 0);
if (!ret) {
QTAILQ_INSERT_TAIL(&reqs, req, next); while ((req = virtio_scsi_pop_req(s, vq))) {
} else if (ret == -EINVAL) { ret = virtio_scsi_handle_cmd_req_prepare(s, req);
/* The device is broken and shouldn't process any request */ if (!ret) {
while (!QTAILQ_EMPTY(&reqs)) { QTAILQ_INSERT_TAIL(&reqs, req, next);
req = QTAILQ_FIRST(&reqs); } else if (ret == -EINVAL) {
QTAILQ_REMOVE(&reqs, req, next); /* The device is broken and shouldn't process any request */
blk_io_unplug(req->sreq->dev->conf.blk); while (!QTAILQ_EMPTY(&reqs)) {
scsi_req_unref(req->sreq); req = QTAILQ_FIRST(&reqs);
virtqueue_detach_element(req->vq, &req->elem, 0); QTAILQ_REMOVE(&reqs, req, next);
virtio_scsi_free_req(req); blk_io_unplug(req->sreq->dev->conf.blk);
scsi_req_unref(req->sreq);
virtqueue_detach_element(req->vq, &req->elem, 0);
virtio_scsi_free_req(req);
}
} }
} }
}
virtio_queue_set_notification(vq, 1);
} while (ret != -EINVAL && !virtio_queue_empty(vq));
QTAILQ_FOREACH_SAFE(req, &reqs, next, next) { QTAILQ_FOREACH_SAFE(req, &reqs, next, next) {
virtio_scsi_handle_cmd_req_submit(s, req); virtio_scsi_handle_cmd_req_submit(s, req);

View File

@ -87,8 +87,8 @@ struct VirtQueue
/* Last used index value we have signalled on */ /* Last used index value we have signalled on */
bool signalled_used_valid; bool signalled_used_valid;
/* Notification enabled? */ /* Nested host->guest notification disabled counter */
bool notification; unsigned int notification_disabled;
uint16_t queue_index; uint16_t queue_index;
@ -201,7 +201,7 @@ static inline void vring_used_flags_unset_bit(VirtQueue *vq, int mask)
static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val) static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val)
{ {
hwaddr pa; hwaddr pa;
if (!vq->notification) { if (vq->notification_disabled) {
return; return;
} }
pa = vq->vring.used + offsetof(VRingUsed, ring[vq->vring.num]); pa = vq->vring.used + offsetof(VRingUsed, ring[vq->vring.num]);
@ -210,7 +210,13 @@ static inline void vring_set_avail_event(VirtQueue *vq, uint16_t val)
void virtio_queue_set_notification(VirtQueue *vq, int enable) void virtio_queue_set_notification(VirtQueue *vq, int enable)
{ {
vq->notification = enable; if (enable) {
assert(vq->notification_disabled > 0);
vq->notification_disabled--;
} else {
vq->notification_disabled++;
}
if (virtio_vdev_has_feature(vq->vdev, VIRTIO_RING_F_EVENT_IDX)) { if (virtio_vdev_has_feature(vq->vdev, VIRTIO_RING_F_EVENT_IDX)) {
vring_set_avail_event(vq, vring_avail_idx(vq)); vring_set_avail_event(vq, vring_avail_idx(vq));
} else if (enable) { } else if (enable) {
@ -959,7 +965,7 @@ void virtio_reset(void *opaque)
virtio_queue_set_vector(vdev, i, VIRTIO_NO_VECTOR); virtio_queue_set_vector(vdev, i, VIRTIO_NO_VECTOR);
vdev->vq[i].signalled_used = 0; vdev->vq[i].signalled_used = 0;
vdev->vq[i].signalled_used_valid = false; vdev->vq[i].signalled_used_valid = false;
vdev->vq[i].notification = true; vdev->vq[i].notification_disabled = 0;
vdev->vq[i].vring.num = vdev->vq[i].vring.num_default; vdev->vq[i].vring.num = vdev->vq[i].vring.num_default;
vdev->vq[i].inuse = 0; vdev->vq[i].inuse = 0;
} }
@ -1770,7 +1776,7 @@ int virtio_load(VirtIODevice *vdev, QEMUFile *f, int version_id)
vdev->vq[i].vring.desc = qemu_get_be64(f); vdev->vq[i].vring.desc = qemu_get_be64(f);
qemu_get_be16s(f, &vdev->vq[i].last_avail_idx); qemu_get_be16s(f, &vdev->vq[i].last_avail_idx);
vdev->vq[i].signalled_used_valid = false; vdev->vq[i].signalled_used_valid = false;
vdev->vq[i].notification = true; vdev->vq[i].notification_disabled = 0;
if (vdev->vq[i].vring.desc) { if (vdev->vq[i].vring.desc) {
/* XXX virtio-1 devices */ /* XXX virtio-1 devices */
@ -2047,15 +2053,47 @@ static void virtio_queue_host_notifier_aio_read(EventNotifier *n)
} }
} }
static void virtio_queue_host_notifier_aio_poll_begin(EventNotifier *n)
{
VirtQueue *vq = container_of(n, VirtQueue, host_notifier);
virtio_queue_set_notification(vq, 0);
}
static bool virtio_queue_host_notifier_aio_poll(void *opaque)
{
EventNotifier *n = opaque;
VirtQueue *vq = container_of(n, VirtQueue, host_notifier);
if (virtio_queue_empty(vq)) {
return false;
}
virtio_queue_notify_aio_vq(vq);
return true;
}
static void virtio_queue_host_notifier_aio_poll_end(EventNotifier *n)
{
VirtQueue *vq = container_of(n, VirtQueue, host_notifier);
/* Caller polls once more after this to catch requests that race with us */
virtio_queue_set_notification(vq, 1);
}
void virtio_queue_aio_set_host_notifier_handler(VirtQueue *vq, AioContext *ctx, void virtio_queue_aio_set_host_notifier_handler(VirtQueue *vq, AioContext *ctx,
VirtIOHandleOutput handle_output) VirtIOHandleOutput handle_output)
{ {
if (handle_output) { if (handle_output) {
vq->handle_aio_output = handle_output; vq->handle_aio_output = handle_output;
aio_set_event_notifier(ctx, &vq->host_notifier, true, aio_set_event_notifier(ctx, &vq->host_notifier, true,
virtio_queue_host_notifier_aio_read); virtio_queue_host_notifier_aio_read,
virtio_queue_host_notifier_aio_poll);
aio_set_event_notifier_poll(ctx, &vq->host_notifier,
virtio_queue_host_notifier_aio_poll_begin,
virtio_queue_host_notifier_aio_poll_end);
} else { } else {
aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL); aio_set_event_notifier(ctx, &vq->host_notifier, true, NULL, NULL);
/* Test and clear notifier before after disabling event, /* Test and clear notifier before after disabling event,
* in case poll callback didn't have time to run. */ * in case poll callback didn't have time to run. */
virtio_queue_host_notifier_aio_read(&vq->host_notifier); virtio_queue_host_notifier_aio_read(&vq->host_notifier);

View File

@ -44,6 +44,7 @@ void qemu_aio_ref(void *p);
typedef struct AioHandler AioHandler; typedef struct AioHandler AioHandler;
typedef void QEMUBHFunc(void *opaque); typedef void QEMUBHFunc(void *opaque);
typedef bool AioPollFn(void *opaque);
typedef void IOHandler(void *opaque); typedef void IOHandler(void *opaque);
struct ThreadPool; struct ThreadPool;
@ -130,6 +131,18 @@ struct AioContext {
int external_disable_cnt; int external_disable_cnt;
/* Number of AioHandlers without .io_poll() */
int poll_disable_cnt;
/* Polling mode parameters */
int64_t poll_ns; /* current polling time in nanoseconds */
int64_t poll_max_ns; /* maximum polling time in nanoseconds */
int64_t poll_grow; /* polling time growth factor */
int64_t poll_shrink; /* polling time shrink factor */
/* Are we in polling mode or monitoring file descriptors? */
bool poll_started;
/* epoll(7) state used when built with CONFIG_EPOLL */ /* epoll(7) state used when built with CONFIG_EPOLL */
int epollfd; int epollfd;
bool epoll_enabled; bool epoll_enabled;
@ -295,8 +308,12 @@ bool aio_pending(AioContext *ctx);
/* Dispatch any pending callbacks from the GSource attached to the AioContext. /* Dispatch any pending callbacks from the GSource attached to the AioContext.
* *
* This is used internally in the implementation of the GSource. * This is used internally in the implementation of the GSource.
*
* @dispatch_fds: true to process fds, false to skip them
* (can be used as an optimization by callers that know there
* are no fds ready)
*/ */
bool aio_dispatch(AioContext *ctx); bool aio_dispatch(AioContext *ctx, bool dispatch_fds);
/* Progress in completing AIO work to occur. This can issue new pending /* Progress in completing AIO work to occur. This can issue new pending
* aio as a result of executing I/O completion or bh callbacks. * aio as a result of executing I/O completion or bh callbacks.
@ -325,8 +342,17 @@ void aio_set_fd_handler(AioContext *ctx,
bool is_external, bool is_external,
IOHandler *io_read, IOHandler *io_read,
IOHandler *io_write, IOHandler *io_write,
AioPollFn *io_poll,
void *opaque); void *opaque);
/* Set polling begin/end callbacks for a file descriptor that has already been
* registered with aio_set_fd_handler. Do nothing if the file descriptor is
* not registered.
*/
void aio_set_fd_poll(AioContext *ctx, int fd,
IOHandler *io_poll_begin,
IOHandler *io_poll_end);
/* Register an event notifier and associated callbacks. Behaves very similarly /* Register an event notifier and associated callbacks. Behaves very similarly
* to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks * to event_notifier_set_handler. Unlike event_notifier_set_handler, these callbacks
* will be invoked when using aio_poll(). * will be invoked when using aio_poll().
@ -337,7 +363,17 @@ void aio_set_fd_handler(AioContext *ctx,
void aio_set_event_notifier(AioContext *ctx, void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier, EventNotifier *notifier,
bool is_external, bool is_external,
EventNotifierHandler *io_read); EventNotifierHandler *io_read,
AioPollFn *io_poll);
/* Set polling begin/end callbacks for an event notifier that has already been
* registered with aio_set_event_notifier. Do nothing if the event notifier is
* not registered.
*/
void aio_set_event_notifier_poll(AioContext *ctx,
EventNotifier *notifier,
EventNotifierHandler *io_poll_begin,
EventNotifierHandler *io_poll_end);
/* Return a GSource that lets the main loop poll the file descriptors attached /* Return a GSource that lets the main loop poll the file descriptors attached
* to this AioContext. * to this AioContext.
@ -474,4 +510,17 @@ static inline bool aio_context_in_iothread(AioContext *ctx)
*/ */
void aio_context_setup(AioContext *ctx); void aio_context_setup(AioContext *ctx);
/**
* aio_context_set_poll_params:
* @ctx: the aio context
* @max_ns: how long to busy poll for, in nanoseconds
* @grow: polling time growth factor
* @shrink: polling time shrink factor
*
* Poll mode can be disabled by setting poll_max_ns to 0.
*/
void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
int64_t grow, int64_t shrink,
Error **errp);
#endif #endif

View File

@ -28,6 +28,11 @@ typedef struct {
QemuCond init_done_cond; /* is thread initialization done? */ QemuCond init_done_cond; /* is thread initialization done? */
bool stopping; bool stopping;
int thread_id; int thread_id;
/* AioContext poll parameters */
int64_t poll_max_ns;
int64_t poll_grow;
int64_t poll_shrink;
} IOThread; } IOThread;
#define IOTHREAD(obj) \ #define IOTHREAD(obj) \

View File

@ -63,7 +63,7 @@ void qemu_set_fd_handler(int fd,
{ {
iohandler_init(); iohandler_init();
aio_set_fd_handler(iohandler_ctx, fd, false, aio_set_fd_handler(iohandler_ctx, fd, false,
fd_read, fd_write, opaque); fd_read, fd_write, NULL, opaque);
} }
/* reaping of zombies. right now we're not passing the status to /* reaping of zombies. right now we're not passing the status to

View File

@ -98,6 +98,18 @@ static void iothread_complete(UserCreatable *obj, Error **errp)
return; return;
} }
aio_context_set_poll_params(iothread->ctx,
iothread->poll_max_ns,
iothread->poll_grow,
iothread->poll_shrink,
&local_error);
if (local_error) {
error_propagate(errp, local_error);
aio_context_unref(iothread->ctx);
iothread->ctx = NULL;
return;
}
qemu_mutex_init(&iothread->init_done_lock); qemu_mutex_init(&iothread->init_done_lock);
qemu_cond_init(&iothread->init_done_cond); qemu_cond_init(&iothread->init_done_cond);
@ -120,10 +132,82 @@ static void iothread_complete(UserCreatable *obj, Error **errp)
qemu_mutex_unlock(&iothread->init_done_lock); qemu_mutex_unlock(&iothread->init_done_lock);
} }
typedef struct {
const char *name;
ptrdiff_t offset; /* field's byte offset in IOThread struct */
} PollParamInfo;
static PollParamInfo poll_max_ns_info = {
"poll-max-ns", offsetof(IOThread, poll_max_ns),
};
static PollParamInfo poll_grow_info = {
"poll-grow", offsetof(IOThread, poll_grow),
};
static PollParamInfo poll_shrink_info = {
"poll-shrink", offsetof(IOThread, poll_shrink),
};
static void iothread_get_poll_param(Object *obj, Visitor *v,
const char *name, void *opaque, Error **errp)
{
IOThread *iothread = IOTHREAD(obj);
PollParamInfo *info = opaque;
int64_t *field = (void *)iothread + info->offset;
visit_type_int64(v, name, field, errp);
}
static void iothread_set_poll_param(Object *obj, Visitor *v,
const char *name, void *opaque, Error **errp)
{
IOThread *iothread = IOTHREAD(obj);
PollParamInfo *info = opaque;
int64_t *field = (void *)iothread + info->offset;
Error *local_err = NULL;
int64_t value;
visit_type_int64(v, name, &value, &local_err);
if (local_err) {
goto out;
}
if (value < 0) {
error_setg(&local_err, "%s value must be in range [0, %"PRId64"]",
info->name, INT64_MAX);
goto out;
}
*field = value;
if (iothread->ctx) {
aio_context_set_poll_params(iothread->ctx,
iothread->poll_max_ns,
iothread->poll_grow,
iothread->poll_shrink,
&local_err);
}
out:
error_propagate(errp, local_err);
}
static void iothread_class_init(ObjectClass *klass, void *class_data) static void iothread_class_init(ObjectClass *klass, void *class_data)
{ {
UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass); UserCreatableClass *ucc = USER_CREATABLE_CLASS(klass);
ucc->complete = iothread_complete; ucc->complete = iothread_complete;
object_class_property_add(klass, "poll-max-ns", "int",
iothread_get_poll_param,
iothread_set_poll_param,
NULL, &poll_max_ns_info, &error_abort);
object_class_property_add(klass, "poll-grow", "int",
iothread_get_poll_param,
iothread_set_poll_param,
NULL, &poll_grow_info, &error_abort);
object_class_property_add(klass, "poll-shrink", "int",
iothread_get_poll_param,
iothread_set_poll_param,
NULL, &poll_shrink_info, &error_abort);
} }
static const TypeInfo iothread_info = { static const TypeInfo iothread_info = {

View File

@ -1366,19 +1366,18 @@ static void nbd_restart_write(void *opaque)
static void nbd_set_handlers(NBDClient *client) static void nbd_set_handlers(NBDClient *client)
{ {
if (client->exp && client->exp->ctx) { if (client->exp && client->exp->ctx) {
aio_set_fd_handler(client->exp->ctx, client->sioc->fd, aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true,
true,
client->can_read ? nbd_read : NULL, client->can_read ? nbd_read : NULL,
client->send_coroutine ? nbd_restart_write : NULL, client->send_coroutine ? nbd_restart_write : NULL,
client); NULL, client);
} }
} }
static void nbd_unset_handlers(NBDClient *client) static void nbd_unset_handlers(NBDClient *client)
{ {
if (client->exp && client->exp->ctx) { if (client->exp && client->exp->ctx) {
aio_set_fd_handler(client->exp->ctx, client->sioc->fd, aio_set_fd_handler(client->exp->ctx, client->sioc->fd, true, NULL,
true, NULL, NULL, NULL); NULL, NULL, NULL);
} }
} }

View File

@ -15,6 +15,7 @@ void aio_set_fd_handler(AioContext *ctx,
bool is_external, bool is_external,
IOHandler *io_read, IOHandler *io_read,
IOHandler *io_write, IOHandler *io_write,
AioPollFn *io_poll,
void *opaque) void *opaque)
{ {
abort(); abort();

View File

@ -128,7 +128,7 @@ static void *test_acquire_thread(void *opaque)
static void set_event_notifier(AioContext *ctx, EventNotifier *notifier, static void set_event_notifier(AioContext *ctx, EventNotifier *notifier,
EventNotifierHandler *handler) EventNotifierHandler *handler)
{ {
aio_set_event_notifier(ctx, notifier, false, handler); aio_set_event_notifier(ctx, notifier, false, handler, NULL);
} }
static void dummy_notifier_read(EventNotifier *n) static void dummy_notifier_read(EventNotifier *n)
@ -388,7 +388,7 @@ static void test_aio_external_client(void)
for (i = 1; i < 3; i++) { for (i = 1; i < 3; i++) {
EventNotifierTestData data = { .n = 0, .active = 10, .auto_set = true }; EventNotifierTestData data = { .n = 0, .active = 10, .auto_set = true };
event_notifier_init(&data.e, false); event_notifier_init(&data.e, false);
aio_set_event_notifier(ctx, &data.e, true, event_ready_cb); aio_set_event_notifier(ctx, &data.e, true, event_ready_cb, NULL);
event_notifier_set(&data.e); event_notifier_set(&data.e);
for (j = 0; j < i; j++) { for (j = 0; j < i; j++) {
aio_disable_external(ctx); aio_disable_external(ctx);

View File

@ -25,6 +25,12 @@
# #
# The <format-string> should be a sprintf()-compatible format string. # The <format-string> should be a sprintf()-compatible format string.
# aio-posix.c
run_poll_handlers_begin(void *ctx, int64_t max_ns) "ctx %p max_ns %"PRId64
run_poll_handlers_end(void *ctx, bool progress) "ctx %p progress %d"
poll_shrink(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
poll_grow(void *ctx, int64_t old, int64_t new) "ctx %p old %"PRId64" new %"PRId64
# thread-pool.c # thread-pool.c
thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p" thread_pool_submit(void *pool, void *req, void *opaque) "pool %p req %p opaque %p"
thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d" thread_pool_complete(void *pool, void *req, void *opaque, int ret) "pool %p req %p opaque %p ret %d"

View File

@ -95,7 +95,7 @@ int event_notifier_set_handler(EventNotifier *e,
EventNotifierHandler *handler) EventNotifierHandler *handler)
{ {
aio_set_fd_handler(iohandler_get_aio_context(), e->rfd, is_external, aio_set_fd_handler(iohandler_get_aio_context(), e->rfd, is_external,
(IOHandler *)handler, NULL, e); (IOHandler *)handler, NULL, NULL, e);
return 0; return 0;
} }