*
* 1) epmutex (mutex)
* 2) ep->mtx (mutex)
- * 3) ep->wq.lock (spinlock)
+ * 3) ep->lock (rwlock)
*
* The acquire order is the one listed above, from 1 to 3.
- * We need a spinlock (ep->wq.lock) because we manipulate objects
+ * We need a rwlock (ep->lock) because we manipulate objects
* from inside the poll callback, that might be triggered from
* a wake_up() that in turn might be called from IRQ context.
* So we can't sleep inside the poll callback and hence we need
* of epoll file descriptors, we use the current recursion depth as
* the lockdep subkey.
* It is possible to drop the "ep->mtx" and to use the global
- * mutex "epmutex" (together with "ep->wq.lock") to have it working,
+ * mutex "epmutex" (together with "ep->lock") to have it working,
* but having "ep->mtx" will make the interface more scalable.
* Events that require holding "epmutex" are very rare, while for
* normal operations the epoll private "ep->mtx" will guarantee
* This structure is stored inside the "private_data" member of the file
* structure and represents the main data structure for the eventpoll
* interface.
- *
- * Access to it is protected by the lock inside wq.
*/
struct eventpoll {
/*
/* List of ready file descriptors */
struct list_head rdllist;
+ /* Lock which protects rdllist and ovflist */
+ rwlock_t lock;
+
/* RB tree root used to store monitored fd structs */
struct rb_root_cached rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
- * holding ->wq.lock.
+ * holding ->lock.
*/
struct epitem *ovflist;
* because we want the "sproc" callback to be able to do it
* in a lockless way.
*/
- spin_lock_irq(&ep->wq.lock);
+ write_lock_irq(&ep->lock);
list_splice_init(&ep->rdllist, &txlist);
WRITE_ONCE(ep->ovflist, NULL);
- spin_unlock_irq(&ep->wq.lock);
+ write_unlock_irq(&ep->lock);
/*
* Now call the callback function.
*/
res = (*sproc)(ep, &txlist, priv);
- spin_lock_irq(&ep->wq.lock);
+ write_lock_irq(&ep->lock);
/*
* During the time we spent inside the "sproc" callback, some
* other events might have been queued by the poll callback.
* the ->poll() wait list (delayed after we release the lock).
*/
if (waitqueue_active(&ep->wq))
- wake_up_locked(&ep->wq);
+ wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
- spin_unlock_irq(&ep->wq.lock);
+ write_unlock_irq(&ep->lock);
if (!ep_locked)
mutex_unlock(&ep->mtx);
rb_erase_cached(&epi->rbn, &ep->rbr);
- spin_lock_irq(&ep->wq.lock);
+ write_lock_irq(&ep->lock);
if (ep_is_linked(epi))
list_del_init(&epi->rdllink);
- spin_unlock_irq(&ep->wq.lock);
+ write_unlock_irq(&ep->lock);
wakeup_source_unregister(ep_wakeup_source(epi));
/*
* Walks through the whole tree by freeing each "struct epitem". At this
* point we are sure no poll callbacks will be lingering around, and also by
* holding "epmutex" we can be sure that no file cleanup code will hit
- * us during this operation. So we can avoid the lock on "ep->wq.lock".
+ * us during this operation. So we can avoid the lock on "ep->lock".
* We do not need to lock ep->mtx, either, we only do it to prevent
* a lockdep warning.
*/
goto free_uid;
mutex_init(&ep->mtx);
+ rwlock_init(&ep->lock);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
}
#endif /* CONFIG_CHECKPOINT_RESTORE */
+/**
+ * Adds a new entry to the tail of the list in a lockless way, i.e.
+ * multiple CPUs are allowed to call this function concurrently.
+ *
+ * Beware: it is necessary to prevent any other modifications of the
+ * existing list until all changes are completed, in other words
+ * concurrent list_add_tail_lockless() calls should be protected
+ * with a read lock, where write lock acts as a barrier which
+ * makes sure all list_add_tail_lockless() calls are fully
+ * completed.
+ *
+ * Also an element can be locklessly added to the list only in one
+ * direction i.e. either to the tail either to the head, otherwise
+ * concurrent access will corrupt the list.
+ *
+ * Returns %false if element has been already added to the list, %true
+ * otherwise.
+ */
+static inline bool list_add_tail_lockless(struct list_head *new,
+ struct list_head *head)
+{
+ struct list_head *prev;
+
+ /*
+ * This is simple 'new->next = head' operation, but cmpxchg()
+ * is used in order to detect that same element has been just
+ * added to the list from another CPU: the winner observes
+ * new->next == new.
+ */
+ if (cmpxchg(&new->next, new, head) != new)
+ return false;
+
+ /*
+ * Initially ->next of a new element must be updated with the head
+ * (we are inserting to the tail) and only then pointers are atomically
+ * exchanged. XCHG guarantees memory ordering, thus ->next should be
+ * updated before pointers are actually swapped and pointers are
+ * swapped before prev->next is updated.
+ */
+
+ prev = xchg(&head->prev, new);
+
+ /*
+ * It is safe to modify prev->next and new->prev, because a new element
+ * is added only to the tail and new->next is updated before XCHG.
+ */
+
+ prev->next = new;
+ new->prev = prev;
+
+ return true;
+}
+
+/**
+ * Chains a new epi entry to the tail of the ep->ovflist in a lockless way,
+ * i.e. multiple CPUs are allowed to call this function concurrently.
+ *
+ * Returns %false if epi element has been already chained, %true otherwise.
+ */
+static inline bool chain_epi_lockless(struct epitem *epi)
+{
+ struct eventpoll *ep = epi->ep;
+
+ /* Check that the same epi has not been just chained from another CPU */
+ if (cmpxchg(&epi->next, EP_UNACTIVE_PTR, NULL) != EP_UNACTIVE_PTR)
+ return false;
+
+ /* Atomically exchange tail */
+ epi->next = xchg(&ep->ovflist, epi);
+
+ return true;
+}
+
/*
* This is the callback that is passed to the wait queue wakeup
* mechanism. It is called by the stored file descriptors when they
* have events to report.
+ *
+ * This callback takes a read lock in order not to content with concurrent
+ * events from another file descriptors, thus all modifications to ->rdllist
+ * or ->ovflist are lockless. Read lock is paired with the write lock from
+ * ep_scan_ready_list(), which stops all list modifications and guarantees
+ * that lists state is seen correctly.
+ *
+ * Another thing worth to mention is that ep_poll_callback() can be called
+ * concurrently for the same @epi from different CPUs if poll table was inited
+ * with several wait queues entries. Plural wakeup from different CPUs of a
+ * single wait queue is serialized by wq.lock, but the case when multiple wait
+ * queues are used should be detected accordingly. This is detected using
+ * cmpxchg() operation.
*/
static int ep_poll_callback(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
{
int pwake = 0;
- unsigned long flags;
struct epitem *epi = ep_item_from_wait(wait);
struct eventpoll *ep = epi->ep;
__poll_t pollflags = key_to_poll(key);
+ unsigned long flags;
int ewake = 0;
- spin_lock_irqsave(&ep->wq.lock, flags);
+ read_lock_irqsave(&ep->lock, flags);
ep_set_busy_poll_napi_id(epi);
* chained in ep->ovflist and requeued later on.
*/
if (READ_ONCE(ep->ovflist) != EP_UNACTIVE_PTR) {
- if (epi->next == EP_UNACTIVE_PTR) {
- epi->next = READ_ONCE(ep->ovflist);
- WRITE_ONCE(ep->ovflist, epi);
+ if (epi->next == EP_UNACTIVE_PTR &&
+ chain_epi_lockless(epi))
ep_pm_stay_awake_rcu(epi);
- }
goto out_unlock;
}
/* If this file is already in the ready list we exit soon */
- if (!ep_is_linked(epi)) {
- list_add_tail(&epi->rdllink, &ep->rdllist);
+ if (!ep_is_linked(epi) &&
+ list_add_tail_lockless(&epi->rdllink, &ep->rdllist)) {
ep_pm_stay_awake_rcu(epi);
}
break;
}
}
- wake_up_locked(&ep->wq);
+ wake_up(&ep->wq);
}
if (waitqueue_active(&ep->poll_wait))
pwake++;
out_unlock:
- spin_unlock_irqrestore(&ep->wq.lock, flags);
+ read_unlock_irqrestore(&ep->lock, flags);
/* We have to call this outside the lock */
if (pwake)
goto error_remove_epi;
/* We have to drop the new item inside our item list to keep track of it */
- spin_lock_irq(&ep->wq.lock);
+ write_lock_irq(&ep->lock);
/* record NAPI ID of new item if present */
ep_set_busy_poll_napi_id(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
- wake_up_locked(&ep->wq);
+ wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
- spin_unlock_irq(&ep->wq.lock);
+ write_unlock_irq(&ep->lock);
atomic_long_inc(&ep->user->epoll_watches);
* list, since that is used/cleaned only inside a section bound by "mtx".
* And ep_insert() is called with "mtx" held.
*/
- spin_lock_irq(&ep->wq.lock);
+ write_lock_irq(&ep->lock);
if (ep_is_linked(epi))
list_del_init(&epi->rdllink);
- spin_unlock_irq(&ep->wq.lock);
+ write_unlock_irq(&ep->lock);
wakeup_source_unregister(ep_wakeup_source(epi));
* 1) Flush epi changes above to other CPUs. This ensures
* we do not miss events from ep_poll_callback if an
* event occurs immediately after we call f_op->poll().
- * We need this because we did not take ep->wq.lock while
+ * We need this because we did not take ep->lock while
* changing epi above (but ep_poll_callback does take
- * ep->wq.lock).
+ * ep->lock).
*
* 2) We also need to ensure we do not miss _past_ events
* when calling f_op->poll(). This barrier also
* list, push it inside.
*/
if (ep_item_poll(epi, &pt, 1)) {
- spin_lock_irq(&ep->wq.lock);
+ write_lock_irq(&ep->lock);
if (!ep_is_linked(epi)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
ep_pm_stay_awake(epi);
/* Notify waiting tasks that events are available */
if (waitqueue_active(&ep->wq))
- wake_up_locked(&ep->wq);
+ wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
- spin_unlock_irq(&ep->wq.lock);
+ write_unlock_irq(&ep->lock);
}
/* We have to call this outside the lock */
*/
timed_out = 1;
- spin_lock_irq(&ep->wq.lock);
+ write_lock_irq(&ep->lock);
eavail = ep_events_available(ep);
- spin_unlock_irq(&ep->wq.lock);
+ write_unlock_irq(&ep->lock);
goto send_events;
}