Edit

IABSD.fr/src/sys/kern/kern_task.c

Branch :

  • Show log

    Commit

  • Author : mvs
    Date : 2025-01-13 03:21:10
    Hash : 2ee1bbb5
    Message : taskq_del_barrier(9) and timeout_del_barrier(9) should always call taskq_barrier(9) and timeout_barrier(9) respectively. In the case that task or timeout were scheduled to run both *_del_barrier(9) only remove it from pending queue and doesn't wait its finish. However task could be simultaneously running and scheduled to the next run, so they need to always wait until running handler has completed. ok dlg

  • sys/kern/kern_task.c
  • /*	$OpenBSD: kern_task.c,v 1.36 2025/01/13 03:21:10 mvs Exp $ */
    
    /*
     * Copyright (c) 2013 David Gwynne <dlg@openbsd.org>
     *
     * Permission to use, copy, modify, and distribute this software for any
     * purpose with or without fee is hereby granted, provided that the above
     * copyright notice and this permission notice appear in all copies.
     *
     * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
     * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     */
    
    #include <sys/param.h>
    #include <sys/systm.h>
    #include <sys/malloc.h>
    #include <sys/mutex.h>
    #include <sys/kthread.h>
    #include <sys/task.h>
    #include <sys/proc.h>
    #include <sys/witness.h>
    
    #include "kcov.h"
    #if NKCOV > 0
    #include <sys/kcov.h>
    #endif
    
    #ifdef WITNESS
    
    static struct lock_type taskq_lock_type = {
    	.lt_name = "taskq"
    };
    
    #define TASKQ_LOCK_FLAGS LO_WITNESS | LO_INITIALIZED | LO_SLEEPABLE | \
        (LO_CLASS_RWLOCK << LO_CLASSSHIFT)
    
    #endif /* WITNESS */
    
    struct taskq_thread {
    	SLIST_ENTRY(taskq_thread)
    				 tt_entry;
    	struct proc		*tt_thread;
    };
    SLIST_HEAD(taskq_threads, taskq_thread);
    
    struct taskq {
    	enum {
    		TQ_S_CREATED,
    		TQ_S_RUNNING,
    		TQ_S_DESTROYED
    	}			 tq_state;
    	unsigned int		 tq_running;
    	unsigned int		 tq_nthreads;
    	unsigned int		 tq_flags;
    	const char		*tq_name;
    
    	struct mutex		 tq_mtx;
    	struct task_list	 tq_worklist;
    
    	struct taskq_threads	 tq_threads;
    	unsigned int		 tq_barriers;
    	unsigned int		 tq_bgen;
    	unsigned int		 tq_bthreads;
    
    #ifdef WITNESS
    	struct lock_object	 tq_lock_object;
    #endif
    };
    
    static const char taskq_sys_name[] = "systq";
    
    struct taskq taskq_sys = {
    	.tq_state	= TQ_S_CREATED,
    	.tq_running	= 0,
    	.tq_nthreads	= 1,
    	.tq_flags	= 0,
    	.tq_name	= taskq_sys_name,
    	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
    			      taskq_sys_name, 0),
    	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys.tq_worklist),
    
    	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys.tq_threads),
    	.tq_barriers	= 0,
    	.tq_bgen	= 0,
    	.tq_bthreads	= 0,
    
    #ifdef WITNESS
    	.tq_lock_object	= {
    		.lo_name	= taskq_sys_name,
    		.lo_flags	= TASKQ_LOCK_FLAGS,
    	},
    #endif
    };
    
    static const char taskq_sys_mp_name[] = "systqmp";
    
    struct taskq taskq_sys_mp = {
    	.tq_state	= TQ_S_CREATED,
    	.tq_running	= 0,
    	.tq_nthreads	= 1,
    	.tq_flags	= TASKQ_MPSAFE,
    	.tq_name	= taskq_sys_mp_name,
    	.tq_mtx		= MUTEX_INITIALIZER_FLAGS(IPL_HIGH,
    			      taskq_sys_mp_name, 0),
    	.tq_worklist	= TAILQ_HEAD_INITIALIZER(taskq_sys_mp.tq_worklist),
    
    	.tq_threads	= SLIST_HEAD_INITIALIZER(taskq_sys_mp.tq_threads),
    	.tq_barriers	= 0,
    	.tq_bgen	= 0,
    	.tq_bthreads	= 0,
    
    #ifdef WITNESS
    	.tq_lock_object = {
    		.lo_name	= taskq_sys_mp_name,
    		.lo_flags	= TASKQ_LOCK_FLAGS,
    	},
    #endif
    };
    
    struct taskq *const systq = &taskq_sys;
    struct taskq *const systqmp = &taskq_sys_mp;
    
    void	taskq_init(void); /* called in init_main.c */
    void	taskq_create_thread(void *);
    void	taskq_barrier_task(void *);
    int	taskq_next_work(struct taskq *, struct task *);
    void	taskq_thread(void *);
    
    void
    taskq_init(void)
    {
    	WITNESS_INIT(&systq->tq_lock_object, &taskq_lock_type);
    	kthread_create_deferred(taskq_create_thread, systq);
    	WITNESS_INIT(&systqmp->tq_lock_object, &taskq_lock_type);
    	kthread_create_deferred(taskq_create_thread, systqmp);
    }
    
    struct taskq *
    taskq_create(const char *name, unsigned int nthreads, int ipl,
        unsigned int flags)
    {
    	struct taskq *tq;
    
    	tq = malloc(sizeof(*tq), M_DEVBUF, M_WAITOK);
    	if (tq == NULL)
    		return (NULL);
    
    	tq->tq_state = TQ_S_CREATED;
    	tq->tq_running = 0;
    	tq->tq_nthreads = nthreads;
    	tq->tq_name = name;
    	tq->tq_flags = flags;
    
    	mtx_init_flags(&tq->tq_mtx, ipl, name, 0);
    	TAILQ_INIT(&tq->tq_worklist);
    
    	SLIST_INIT(&tq->tq_threads);
    	tq->tq_barriers = 0;
    	tq->tq_bgen = 0;
    	tq->tq_bthreads = 0;
    
    #ifdef WITNESS
    	memset(&tq->tq_lock_object, 0, sizeof(tq->tq_lock_object));
    	tq->tq_lock_object.lo_name = name;
    	tq->tq_lock_object.lo_flags = TASKQ_LOCK_FLAGS;
    	witness_init(&tq->tq_lock_object, &taskq_lock_type);
    #endif
    
    	/* try to create a thread to guarantee that tasks will be serviced */
    	kthread_create_deferred(taskq_create_thread, tq);
    
    	return (tq);
    }
    
    void
    taskq_destroy(struct taskq *tq)
    {
    	mtx_enter(&tq->tq_mtx);
    	switch (tq->tq_state) {
    	case TQ_S_CREATED:
    		/* tq is still referenced by taskq_create_thread */
    		tq->tq_state = TQ_S_DESTROYED;
    		mtx_leave(&tq->tq_mtx);
    		return;
    
    	case TQ_S_RUNNING:
    		tq->tq_state = TQ_S_DESTROYED;
    		break;
    
    	default:
    		panic("unexpected %s tq state %u", tq->tq_name, tq->tq_state);
    	}
    
    	while (tq->tq_running > 0) {
    		wakeup(tq);
    		msleep_nsec(&tq->tq_running, &tq->tq_mtx, PWAIT, "tqdestroy",
    		    INFSLP);
    	}
    	mtx_leave(&tq->tq_mtx);
    
    	free(tq, M_DEVBUF, sizeof(*tq));
    }
    
    void
    taskq_create_thread(void *arg)
    {
    	struct taskq *tq = arg;
    	int rv;
    
    	mtx_enter(&tq->tq_mtx);
    
    	switch (tq->tq_state) {
    	case TQ_S_DESTROYED:
    		mtx_leave(&tq->tq_mtx);
    		free(tq, M_DEVBUF, sizeof(*tq));
    		return;
    
    	case TQ_S_CREATED:
    		tq->tq_state = TQ_S_RUNNING;
    		break;
    
    	default:
    		panic("unexpected %s tq state %d", tq->tq_name, tq->tq_state);
    	}
    
    	do {
    		tq->tq_running++;
    		mtx_leave(&tq->tq_mtx);
    
    		rv = kthread_create(taskq_thread, tq, NULL, tq->tq_name);
    
    		mtx_enter(&tq->tq_mtx);
    		if (rv != 0) {
    			printf("unable to create thread for \"%s\" taskq\n",
    			    tq->tq_name);
    
    			tq->tq_running--;
    			/* could have been destroyed during kthread_create */
    			if (tq->tq_state == TQ_S_DESTROYED &&
    			    tq->tq_running == 0)
    				wakeup_one(&tq->tq_running);
    			break;
    		}
    	} while (tq->tq_running < tq->tq_nthreads);
    
    	mtx_leave(&tq->tq_mtx);
    }
    
    void
    taskq_barrier_task(void *p)
    {
    	struct taskq *tq = p;
    	unsigned int gen;
    
    	mtx_enter(&tq->tq_mtx);
    	tq->tq_bthreads++;
    	wakeup(&tq->tq_bthreads);
    
    	gen = tq->tq_bgen;
    	do {
    		msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
    		    PWAIT, "tqbarend", INFSLP);
    	} while (gen == tq->tq_bgen);
    	mtx_leave(&tq->tq_mtx);
    }
    
    static void
    taskq_do_barrier(struct taskq *tq)
    {
    	struct task t = TASK_INITIALIZER(taskq_barrier_task, tq);
    	struct proc *thread = curproc;
    	struct taskq_thread *tt;
    
    	mtx_enter(&tq->tq_mtx);
    	tq->tq_barriers++;
    
    	/* is the barrier being run from a task inside the taskq? */
    	SLIST_FOREACH(tt, &tq->tq_threads, tt_entry) {
    		if (tt->tt_thread == thread) {
    			tq->tq_bthreads++;
    			wakeup(&tq->tq_bthreads);
    			break;
    		}
    	}
    
    	while (tq->tq_bthreads < tq->tq_nthreads) {
    		/* shove the task into the queue for a worker to pick up */
    		SET(t.t_flags, TASK_ONQUEUE);
    		TAILQ_INSERT_TAIL(&tq->tq_worklist, &t, t_entry);
    		wakeup_one(tq);
    
    		msleep_nsec(&tq->tq_bthreads, &tq->tq_mtx,
    		    PWAIT, "tqbar", INFSLP);
    
    		/*
    		 * another thread running a barrier might have
    		 * done this work for us.
    		 */
    		if (ISSET(t.t_flags, TASK_ONQUEUE))
    			TAILQ_REMOVE(&tq->tq_worklist, &t, t_entry);
    	}
    
    	if (--tq->tq_barriers == 0) {
    		/* we're the last one out */
    		tq->tq_bgen++;
    		wakeup(&tq->tq_bgen);
    		tq->tq_bthreads = 0;
    	} else {
    		unsigned int gen = tq->tq_bgen;
    		do {
    			msleep_nsec(&tq->tq_bgen, &tq->tq_mtx,
    			    PWAIT, "tqbarwait", INFSLP);
    		} while (gen == tq->tq_bgen);
    	}
    	mtx_leave(&tq->tq_mtx);
    }
    
    void
    taskq_barrier(struct taskq *tq)
    {
    	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
    
    	taskq_do_barrier(tq);
    }
    
    void
    taskq_del_barrier(struct taskq *tq, struct task *t)
    {
    	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
    
    	task_del(tq, t);
    	taskq_do_barrier(tq);
    }
    
    void
    task_set(struct task *t, void (*fn)(void *), void *arg)
    {
    	t->t_func = fn;
    	t->t_arg = arg;
    	t->t_flags = 0;
    }
    
    int
    task_add(struct taskq *tq, struct task *w)
    {
    	int rv = 0;
    
    	if (ISSET(w->t_flags, TASK_ONQUEUE))
    		return (0);
    
    	mtx_enter(&tq->tq_mtx);
    	if (!ISSET(w->t_flags, TASK_ONQUEUE)) {
    		rv = 1;
    		SET(w->t_flags, TASK_ONQUEUE);
    		TAILQ_INSERT_TAIL(&tq->tq_worklist, w, t_entry);
    #if NKCOV > 0
    		if (!kcov_cold)
    			w->t_process = curproc->p_p;
    #endif
    	}
    	mtx_leave(&tq->tq_mtx);
    
    	if (rv)
    		wakeup_one(tq);
    
    	return (rv);
    }
    
    int
    task_del(struct taskq *tq, struct task *w)
    {
    	int rv = 0;
    
    	if (!ISSET(w->t_flags, TASK_ONQUEUE))
    		return (0);
    
    	mtx_enter(&tq->tq_mtx);
    	if (ISSET(w->t_flags, TASK_ONQUEUE)) {
    		rv = 1;
    		CLR(w->t_flags, TASK_ONQUEUE);
    		TAILQ_REMOVE(&tq->tq_worklist, w, t_entry);
    	}
    	mtx_leave(&tq->tq_mtx);
    
    	return (rv);
    }
    
    int
    taskq_next_work(struct taskq *tq, struct task *work)
    {
    	struct task *next;
    
    	mtx_enter(&tq->tq_mtx);
    	while ((next = TAILQ_FIRST(&tq->tq_worklist)) == NULL) {
    		if (tq->tq_state != TQ_S_RUNNING) {
    			mtx_leave(&tq->tq_mtx);
    			return (0);
    		}
    
    		msleep_nsec(tq, &tq->tq_mtx, PWAIT, "bored", INFSLP);
    	}
    
    	TAILQ_REMOVE(&tq->tq_worklist, next, t_entry);
    	CLR(next->t_flags, TASK_ONQUEUE);
    
    	*work = *next; /* copy to caller to avoid races */
    
    	next = TAILQ_FIRST(&tq->tq_worklist);
    	mtx_leave(&tq->tq_mtx);
    
    	if (next != NULL && tq->tq_nthreads > 1)
    		wakeup_one(tq);
    
    	return (1);
    }
    
    void
    taskq_thread(void *xtq)
    {
    	struct taskq_thread self = { .tt_thread = curproc };
    	struct taskq *tq = xtq;
    	struct task work;
    	int last;
    
    	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
    		KERNEL_UNLOCK();
    
    	mtx_enter(&tq->tq_mtx);
    	SLIST_INSERT_HEAD(&tq->tq_threads, &self, tt_entry);
    	mtx_leave(&tq->tq_mtx);
    
    	WITNESS_CHECKORDER(&tq->tq_lock_object, LOP_NEWORDER, NULL);
    
    	while (taskq_next_work(tq, &work)) {
    		WITNESS_LOCK(&tq->tq_lock_object, 0);
    #if NKCOV > 0
    		kcov_remote_enter(KCOV_REMOTE_COMMON, work.t_process);
    #endif
    		(*work.t_func)(work.t_arg);
    #if NKCOV > 0
    		kcov_remote_leave(KCOV_REMOTE_COMMON, work.t_process);
    #endif
    		WITNESS_UNLOCK(&tq->tq_lock_object, 0);
    		sched_pause(yield);
    	}
    
    	mtx_enter(&tq->tq_mtx);
    	SLIST_REMOVE(&tq->tq_threads, &self, taskq_thread, tt_entry);
    	last = (--tq->tq_running == 0);
    	mtx_leave(&tq->tq_mtx);
    
    	if (ISSET(tq->tq_flags, TASKQ_MPSAFE))
    		KERNEL_LOCK();
    
    	if (last)
    		wakeup_one(&tq->tq_running);
    
    	kthread_exit(0);
    }