FFmpeg  4.3.9
slicethread.c
Go to the documentation of this file.
1 /*
2  * This file is part of FFmpeg.
3  *
4  * FFmpeg is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * FFmpeg is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with FFmpeg; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <stdatomic.h>
20 #include "slicethread.h"
21 #include "mem.h"
22 #include "thread.h"
23 #include "avassert.h"
24 
25 #if HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS2THREADS
26 
27 typedef struct WorkerContext {
31  pthread_t thread;
32  int done;
33 } WorkerContext;
34 
35 struct AVSliceThread {
36  WorkerContext *workers;
37  int nb_threads;
38  int nb_active_threads;
39  int nb_jobs;
40 
41  atomic_uint first_job;
42  atomic_uint current_job;
43  pthread_mutex_t done_mutex;
44  pthread_cond_t done_cond;
45  int done;
46  int finished;
47 
48  void *priv;
49  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads);
50  void (*main_func)(void *priv);
51 };
52 
53 static int run_jobs(AVSliceThread *ctx)
54 {
55  unsigned nb_jobs = ctx->nb_jobs;
56  unsigned nb_active_threads = ctx->nb_active_threads;
57  unsigned first_job = atomic_fetch_add_explicit(&ctx->first_job, 1, memory_order_acq_rel);
58  unsigned current_job = first_job;
59 
60  do {
61  ctx->worker_func(ctx->priv, current_job, first_job, nb_jobs, nb_active_threads);
62  } while ((current_job = atomic_fetch_add_explicit(&ctx->current_job, 1, memory_order_acq_rel)) < nb_jobs);
63 
64  return current_job == nb_jobs + nb_active_threads - 1;
65 }
66 
67 static void *attribute_align_arg thread_worker(void *v)
68 {
69  WorkerContext *w = v;
70  AVSliceThread *ctx = w->ctx;
71 
72  pthread_mutex_lock(&w->mutex);
73  pthread_cond_signal(&w->cond);
74 
75  while (1) {
76  w->done = 1;
77  while (w->done)
78  pthread_cond_wait(&w->cond, &w->mutex);
79 
80  if (ctx->finished) {
81  pthread_mutex_unlock(&w->mutex);
82  return NULL;
83  }
84 
85  if (run_jobs(ctx)) {
86  pthread_mutex_lock(&ctx->done_mutex);
87  ctx->done = 1;
88  pthread_cond_signal(&ctx->done_cond);
89  pthread_mutex_unlock(&ctx->done_mutex);
90  }
91  }
92 }
93 
94 int avpriv_slicethread_create(AVSliceThread **pctx, void *priv,
95  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
96  void (*main_func)(void *priv),
97  int nb_threads)
98 {
100  int nb_workers, i;
101  int ret;
102 
103  av_assert0(nb_threads >= 0);
104  if (!nb_threads) {
105  int nb_cpus = av_cpu_count();
106  if (nb_cpus > 1)
107  nb_threads = nb_cpus + 1;
108  else
109  nb_threads = 1;
110  }
111 
112  nb_workers = nb_threads;
113  if (!main_func)
114  nb_workers--;
115 
116  *pctx = ctx = av_mallocz(sizeof(*ctx));
117  if (!ctx)
118  return AVERROR(ENOMEM);
119 
120  if (nb_workers && !(ctx->workers = av_calloc(nb_workers, sizeof(*ctx->workers)))) {
121  av_freep(pctx);
122  return AVERROR(ENOMEM);
123  }
124 
125  ctx->priv = priv;
126  ctx->worker_func = worker_func;
127  ctx->main_func = main_func;
128  ctx->nb_threads = nb_threads;
129  ctx->nb_active_threads = 0;
130  ctx->nb_jobs = 0;
131  ctx->finished = 0;
132 
133  atomic_init(&ctx->first_job, 0);
134  atomic_init(&ctx->current_job, 0);
135  ret = pthread_mutex_init(&ctx->done_mutex, NULL);
136  if (ret) {
137  av_freep(&ctx->workers);
138  av_freep(pctx);
139  return AVERROR(ret);
140  }
141  ret = pthread_cond_init(&ctx->done_cond, NULL);
142  if (ret) {
143  ctx->nb_threads = main_func ? 0 : 1;
145  return AVERROR(ret);
146  }
147  ctx->done = 0;
148 
149  for (i = 0; i < nb_workers; i++) {
150  WorkerContext *w = &ctx->workers[i];
151  int ret;
152  w->ctx = ctx;
153  ret = pthread_mutex_init(&w->mutex, NULL);
154  if (ret) {
155  ctx->nb_threads = main_func ? i : i + 1;
157  return AVERROR(ret);
158  }
159  ret = pthread_cond_init(&w->cond, NULL);
160  if (ret) {
161  pthread_mutex_destroy(&w->mutex);
162  ctx->nb_threads = main_func ? i : i + 1;
164  return AVERROR(ret);
165  }
166  pthread_mutex_lock(&w->mutex);
167  w->done = 0;
168 
169  if (ret = pthread_create(&w->thread, NULL, thread_worker, w)) {
170  ctx->nb_threads = main_func ? i : i + 1;
171  pthread_mutex_unlock(&w->mutex);
172  pthread_cond_destroy(&w->cond);
173  pthread_mutex_destroy(&w->mutex);
175  return AVERROR(ret);
176  }
177 
178  while (!w->done)
179  pthread_cond_wait(&w->cond, &w->mutex);
180  pthread_mutex_unlock(&w->mutex);
181  }
182 
183  return nb_threads;
184 }
185 
186 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
187 {
188  int nb_workers, i, is_last = 0;
189 
190  av_assert0(nb_jobs > 0);
191  ctx->nb_jobs = nb_jobs;
192  ctx->nb_active_threads = FFMIN(nb_jobs, ctx->nb_threads);
193  atomic_store_explicit(&ctx->first_job, 0, memory_order_relaxed);
194  atomic_store_explicit(&ctx->current_job, ctx->nb_active_threads, memory_order_relaxed);
195  nb_workers = ctx->nb_active_threads;
196  if (!ctx->main_func || !execute_main)
197  nb_workers--;
198 
199  for (i = 0; i < nb_workers; i++) {
200  WorkerContext *w = &ctx->workers[i];
201  pthread_mutex_lock(&w->mutex);
202  w->done = 0;
203  pthread_cond_signal(&w->cond);
204  pthread_mutex_unlock(&w->mutex);
205  }
206 
207  if (ctx->main_func && execute_main)
208  ctx->main_func(ctx->priv);
209  else
210  is_last = run_jobs(ctx);
211 
212  if (!is_last) {
213  pthread_mutex_lock(&ctx->done_mutex);
214  while (!ctx->done)
215  pthread_cond_wait(&ctx->done_cond, &ctx->done_mutex);
216  ctx->done = 0;
217  pthread_mutex_unlock(&ctx->done_mutex);
218  }
219 }
220 
222 {
224  int nb_workers, i;
225 
226  if (!pctx || !*pctx)
227  return;
228 
229  ctx = *pctx;
230  nb_workers = ctx->nb_threads;
231  if (!ctx->main_func)
232  nb_workers--;
233 
234  ctx->finished = 1;
235  for (i = 0; i < nb_workers; i++) {
236  WorkerContext *w = &ctx->workers[i];
237  pthread_mutex_lock(&w->mutex);
238  w->done = 0;
239  pthread_cond_signal(&w->cond);
240  pthread_mutex_unlock(&w->mutex);
241  }
242 
243  for (i = 0; i < nb_workers; i++) {
244  WorkerContext *w = &ctx->workers[i];
245  pthread_join(w->thread, NULL);
246  pthread_cond_destroy(&w->cond);
247  pthread_mutex_destroy(&w->mutex);
248  }
249 
250  pthread_cond_destroy(&ctx->done_cond);
251  pthread_mutex_destroy(&ctx->done_mutex);
252  av_freep(&ctx->workers);
253  av_freep(pctx);
254 }
255 
256 #else /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
257 
259  void (*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads),
260  void (*main_func)(void *priv),
261  int nb_threads)
262 {
263  *pctx = NULL;
264  return AVERROR(EINVAL);
265 }
266 
267 void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
268 {
269  av_assert0(0);
270 }
271 
273 {
274  av_assert0(!pctx || !*pctx);
275 }
276 
277 #endif /* HAVE_PTHREADS || HAVE_W32THREADS || HAVE_OS32THREADS */
#define NULL
Definition: coverity.c:32
static av_always_inline int pthread_mutex_destroy(pthread_mutex_t *mutex)
Definition: os2threads.h:112
static AVMutex mutex
Definition: log.c:44
#define pthread_mutex_lock(a)
Definition: ffprobe.c:62
static av_always_inline int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
Definition: os2threads.h:192
int av_cpu_count(void)
Definition: cpu.c:267
Memory handling functions.
static void worker_func(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads)
Definition: pthread_slice.c:65
static av_always_inline int pthread_cond_destroy(pthread_cond_t *cond)
Definition: os2threads.h:144
void * av_calloc(size_t nmemb, size_t size)
Non-inlined equivalent of av_mallocz_array().
Definition: mem.c:245
#define av_assert0(cond)
assert() equivalent, that is always enabled.
Definition: avassert.h:37
struct AVSliceThread AVSliceThread
Definition: slicethread.h:22
static av_always_inline int pthread_cond_signal(pthread_cond_t *cond)
Definition: os2threads.h:152
#define i(width, name, range_min, range_max)
Definition: cbs_h2645.c:269
#define AVERROR(e)
Definition: error.h:43
simple assert() macros that are a bit more flexible than ISO C assert().
void * av_mallocz(size_t size)
Allocate a memory block with alignment suitable for all memory accesses (including vectors if availab...
Definition: mem.c:237
#define FFMIN(a, b)
Definition: common.h:96
void avpriv_slicethread_free(AVSliceThread **pctx)
Destroy slice threading context.
Definition: slicethread.c:272
uint8_t w
Definition: llviddspenc.c:38
AVFormatContext * ctx
Definition: movenc.c:48
static av_always_inline int pthread_join(pthread_t thread, void **value_ptr)
Definition: os2threads.h:94
static av_always_inline int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr)
Definition: os2threads.h:104
#define pthread_mutex_unlock(a)
Definition: ffprobe.c:66
#define atomic_fetch_add_explicit(object, operand, order)
Definition: stdatomic.h:149
#define attribute_align_arg
Definition: internal.h:62
static av_always_inline int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine)(void *), void *arg)
Definition: os2threads.h:80
typedef void(RENAME(mix_any_func_type))
int avpriv_slicethread_create(AVSliceThread **pctx, void *priv, void(*worker_func)(void *priv, int jobnr, int threadnr, int nb_jobs, int nb_threads), void(*main_func)(void *priv), int nb_threads)
Create slice threading context.
Definition: slicethread.c:258
void avpriv_slicethread_execute(AVSliceThread *ctx, int nb_jobs, int execute_main)
Execute slice threading.
Definition: slicethread.c:267
intptr_t atomic_uint
Definition: stdatomic.h:56
#define atomic_store_explicit(object, desired, order)
Definition: stdatomic.h:90
int(* cond)(enum AVPixelFormat pix_fmt)
Definition: pixdesc_query.c:28
int() main_func(AVCodecContext *c)
Definition: pthread_slice.c:41
_fmutex pthread_mutex_t
Definition: os2threads.h:53
static av_always_inline int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr)
Definition: os2threads.h:133
#define atomic_init(obj, value)
Definition: stdatomic.h:33
#define av_freep(p)