diff --git a/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.c b/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.c index 01192fc82f..7a98294533 100644 --- a/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.c +++ b/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.c @@ -1,6 +1,7 @@ /* GStreamer * Copyright (C) 1999,2000 Erik Walthinsen - * 2000,2005 Wim Taymans + * 2000 Wim Taymans + * 2005 Wim Taymans * 2023 Havard Graff * 2023 Camilo Celis Guzman * @@ -25,17 +26,155 @@ /** * SECTION:gstbaseidlesrc * @title: GstBaseIdleSrc - * @short_description: Base class for getrange based source elements - * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink + * @short_description: Base class for push-mode source elements that operate + * in idle-driven or controlled pacing + * @see_also: #GstBaseSrc, #GstPushSrc, #GstBaseSink + * + * #GstBaseIdleSrc is a base class for source elements that operate purely in + * push mode. Unlike #GstBaseSrc, it does not support pull-based scheduling or + * random access through #GstBaseSrcClass::create, and instead expects subclasses + * to push data downstream at their own pace or from an external producer. + * + * ## Purpose + * + * This base class is intended for elements that generate or forward data + * asynchronously, such as live capture devices, idle-loop sources, or + * externally-driven producers. It provides the fundamental mechanics of + * caps negotiation, allocation setup, buffer timestamping, and live-source + * handling, while delegating actual data generation to subclasses. + * + * The element manages a single source pad named `"src"` and operates only + * in push mode. + * + * ## Features + * + * - Push-only operation (no getrange/pull support) + * - Optional live-source mode using gst_base_idle_src_set_live() + * - Buffer allocation via the #GstBaseIdleSrcClass.alloc vfunc, which by + * default uses the negotiated #GstBufferPool / #GstAllocator + * - Negotiation helpers for caps and allocation queries + * - Thread-pool support for external or shared producers + * - Optional automatic timestamping + * + * ## Subclass Responsibilities + * + * A typical subclass of #GstBaseIdleSrc needs to: + * + * 1. Install a static pad template for the `"src"` pad in `class_init()`. + * 2. Implement @start and @stop to open and close resources. + * 3. Produce and submit buffers using gst_base_idle_src_submit_buffer() or + * gst_base_idle_src_submit_buffer_list(). + * + * Optionally, the subclass can override virtual methods such as: + * + * - @get_caps, @fixate, @set_caps, @negotiate — for caps negotiation. + * - @decide_allocation — to customize memory allocation. + * - @query — to handle custom queries (e.g., latency, position). + * - @event — to handle upstream events such as flushes or EOS. + * - @alloc — to allocate output buffers. + * + * ## Live Sources + * + * Live sources are sources that produce data in real time and may discard + * data when paused. Typical examples are capture devices or sensors. Live + * sources should call gst_base_idle_src_set_live() during setup. + * + * A live source does not produce data while in the PAUSED state. The state + * change from READY to PAUSED will therefore return + * %GST_STATE_CHANGE_NO_PREROLL to indicate that no preroll data is available. + * + * When PLAYING, live sources may timestamp buffers using the current running + * time if gst_base_idle_src_set_do_timestamp() was enabled. This makes it + * easier to synchronize generated data to the pipeline clock. + * + * ## Thread Pool Management + * + * #GstBaseIdleSrc uses an internal #GstTaskPool to schedule buffer submission + * and auxiliary tasks. This mechanism helps to avoid the overhead of creating + * and destroying threads too frequently when data is submitted in rapid + * succession. + * + * By default, the internal pool is configured with a maximum of one thread, + * which provides a simple but efficient way to handle continuous or bursty + * submission patterns without incurring repeated thread creation and teardown + * costs. + * + * Applications or higher-level elements can override the default pool using + * gst_base_idle_src_set_thread_pool(). This allows sharing a thread pool + * among multiple similar slow-paced sources, reducing context-switch overhead + * and improving resource utilization when several sources push data at a + * modest rate. + * + * The currently configured thread pool can be retrieved with + * gst_base_idle_src_get_thread_pool(). + * + * ## Example Subclass + * + * |[ + * static void + * my_idle_src_class_init (GstMyIdleSrcClass *klass) + * { + * GstElementClass *element_class = GST_ELEMENT_CLASS (klass); + * gst_element_class_add_static_pad_template (element_class, &srctemplate); + * + * gst_element_class_set_static_metadata (element_class, + * "My Idle Source", + * "Source/Push", + * "Example push-mode idle source", + * "Author "); + * + * klass->start = GST_DEBUG_FUNCPTR (my_idle_src_start); + * klass->stop = GST_DEBUG_FUNCPTR (my_idle_src_stop); + * } + * + * static GstFlowReturn + * my_idle_src_loop (GstMyIdleSrc *self) + * { + * GstBaseIdleSrc *src = GST_BASE_IDLE_SRC (self); + * GstBaseIdleSrcClass *klass = GST_BASE_IDLE_SRC_GET_CLASS (src); + * GstBuffer *buffer = NULL; + * GstFlowReturn ret; + * + * ret = klass->alloc (src, FRAME_SIZE, &buffer); + * if (ret != GST_FLOW_OK) + * return ret; + * + * // ... fill buffer ... + * + * gst_base_idle_src_submit_buffer (src, buffer); + * return GST_FLOW_OK; + * } + * ]| + * + * ## Threading and Data Submission + * + * Data can be produced in any thread, including one managed by a custom + * #GstTaskPool. Use gst_base_idle_src_set_thread_pool() to configure a + * task pool and gst_base_idle_src_submit_buffer() or + * gst_base_idle_src_submit_buffer_list() to submit data. + * + * ## Notes + * + * - Only one source pad named "src" is supported. + * - Only push-mode operation is possible. + * - Pull-mode functions (e.g., create/getrange) are not implemented. + * - gst_base_idle_src_set_format() defines the active #GstFormat used for + * segments and events. + * - Subclasses should avoid blocking indefinitely in @start or @stop, as these + * are called during state changes. + * + * ## Typical Use Cases + * + * - Synthetic or idle data generators + * - Real-time data capture devices + * - Streaming relays or adapters that push data from another thread + * - Sources that wrap asynchronous external APIs or IO loops * - */ - #ifdef HAVE_CONFIG_H # include "config.h" #endif -#include #include #include @@ -70,10 +209,10 @@ struct _GstBaseIdleSrcPrivate { /* if a stream-start event should be sent */ gboolean stream_start_pending; /* STREAM_LOCK */ - /* if segment should be sent and a * seqnum if it was originated by a seek */ gboolean segment_pending; /* OBJECT_LOCK */ + gboolean do_timestamp; /* OBJECT_LOCK */ guint32 segment_seqnum; /* OBJECT_LOCK */ /* startup latency is the time it takes between going to PLAYING and producing @@ -83,25 +222,23 @@ struct _GstBaseIdleSrcPrivate /* timestamp offset, this is the offset add to the values of gst_times for * pseudo live sources */ GstClockTimeDiff ts_offset; /* OBJECT_LOCK */ - - gboolean do_timestamp; /* OBJECT_LOCK */ - - /* QoS *//* with LOCK */ + /* QoS */ gdouble proportion; /* OBJECT_LOCK */ GstClockTime earliest_time; /* OBJECT_LOCK */ - GstBufferPool *pool; /* OBJECT_LOCK */ + GstBufferPool *buf_pool; /* OBJECT_LOCK */ GstAllocator *allocator; /* OBJECT_LOCK */ - GstAllocationParams params; /* OBJECT_LOCK */ - GQueue *obj_queue; - GThread *thread; + gpointer thread_handle; + GstTaskPool *thread_pool; /* OBJECT_LOCK */ + gboolean owns_thread_pool; /* OBJECT_LOCK - TRUE iff we created it at init() */ + + GstAllocationParams params; /* OBJECT_LOCK */ }; static GstElementClass *parent_class = NULL; static gint private_offset = 0; - static void gst_base_idle_src_start_task (GstBaseIdleSrc * src, gboolean wait); static void gst_base_idle_src_process_object_queue (GstBaseIdleSrc * src); @@ -111,17 +248,13 @@ gst_base_idle_src_get_instance_private (GstBaseIdleSrc * self) return (G_STRUCT_MEMBER_P (self, private_offset)); } -/* TODO: do we support anything other than _BUFFER/_BYTES ? */ /** * gst_base_idle_src_set_format: * @src: base source instance * @format: the format to use * * Sets the default format of the source. This will be the format used - * for sending SEGMENT events and for performing seeks. - * - * If a format of GST_FORMAT_BYTES is set, the element will be able to - * operate in pull mode if the #GstBaseIdleSrcClass::is_seekable returns %TRUE. + * for sending SEGMENT events. * * This function must only be called in states < %GST_STATE_PAUSED. */ @@ -130,6 +263,8 @@ gst_base_idle_src_set_format (GstBaseIdleSrc * src, GstFormat format) { g_return_if_fail (GST_IS_BASE_IDLE_SRC (src)); g_return_if_fail (GST_STATE (src) <= GST_STATE_READY); + g_return_if_fail (GST_FORMAT_BUFFERS == format || GST_FORMAT_BYTES == format + || GST_FORMAT_TIME == format); GST_OBJECT_LOCK (src); gst_segment_init (&src->segment, format); @@ -228,65 +363,19 @@ gst_base_idle_src_get_do_timestamp (GstBaseIdleSrc * src) return res; } -/** - * gst_base_idle_src_new_segment: - * @src: a #GstBaseIdleSrc - * @segment: a pointer to a #GstSegment - * - * Prepare a new segment for emission downstream. This function must - * only be called by derived sub-classes, and only from the #GstBaseIdleSrcClass::create function, - * as the stream-lock needs to be held. - * - * The format for the @segment must be identical with the current format - * of the source, as configured with gst_base_idle_src_set_format(). - * - * The format of @src must not be %GST_FORMAT_UNDEFINED and the format - * should be configured via gst_base_idle_src_set_format() before calling this method. - * - * Returns: %TRUE if preparation of new segment succeeded. - * - * Since: 1.18 - */ -gboolean -gst_base_idle_src_new_segment (GstBaseIdleSrc * src, const GstSegment * segment) +static void +gst_base_idle_src_queue_object_unlocked (GstBaseIdleSrc * src, + GstMiniObject * obj) { - g_return_val_if_fail (GST_IS_BASE_IDLE_SRC (src), FALSE); - g_return_val_if_fail (segment != NULL, FALSE); - - GST_OBJECT_LOCK (src); - - if (src->segment.format == GST_FORMAT_UNDEFINED) { - /* subclass must set valid format before calling this method */ - GST_WARNING_OBJECT (src, "segment format is not configured yet, ignore"); - GST_OBJECT_UNLOCK (src); - return FALSE; - } - - if (src->segment.format != segment->format) { - GST_WARNING_OBJECT (src, "segment format mismatched, ignore"); - GST_OBJECT_UNLOCK (src); - return FALSE; - } - - gst_segment_copy_into (segment, &src->segment); - - /* Mark pending segment. Will be sent before next data */ - src->priv->segment_pending = TRUE; - src->priv->segment_seqnum = gst_util_seqnum_next (); - - GST_DEBUG_OBJECT (src, "Starting new segment %" GST_SEGMENT_FORMAT, segment); - - GST_OBJECT_UNLOCK (src); - - return TRUE; + GST_LOG_OBJECT (src, "Queuing: %" GST_PTR_FORMAT, obj); + g_queue_push_tail (src->priv->obj_queue, obj); } static void gst_base_idle_src_queue_object (GstBaseIdleSrc * src, GstMiniObject * obj) { - GST_LOG_OBJECT (src, "Queuing: %" GST_PTR_FORMAT, obj); GST_OBJECT_LOCK (src); - g_queue_push_tail (src->priv->obj_queue, obj); + gst_base_idle_src_queue_object_unlocked (src, obj); GST_OBJECT_UNLOCK (src); } @@ -317,7 +406,6 @@ gst_base_idle_src_set_caps (GstBaseIdleSrc * src, GstCaps * caps) } else { if (bclass->set_caps) res = bclass->set_caps (src, caps); - if (res) { gst_base_idle_src_queue_object (src, (GstMiniObject *) gst_event_new_caps (caps)); @@ -612,16 +700,16 @@ static void gst_base_idle_src_set_pool_flushing (GstBaseIdleSrc * src, gboolean flushing) { GstBaseIdleSrcPrivate *priv = src->priv; - GstBufferPool *pool; + GstBufferPool *buf_pool; GST_OBJECT_LOCK (src); - if ((pool = priv->pool)) - pool = gst_object_ref (pool); + if ((buf_pool = priv->buf_pool)) + buf_pool = gst_object_ref (buf_pool); GST_OBJECT_UNLOCK (src); - if (pool) { - gst_buffer_pool_set_flushing (pool, flushing); - gst_object_unref (pool); + if (buf_pool) { + gst_buffer_pool_set_flushing (buf_pool, flushing); + gst_object_unref (buf_pool); } } @@ -710,7 +798,6 @@ gst_base_idle_src_update_qos (GstBaseIdleSrc * src, GST_OBJECT_UNLOCK (src); } - static gboolean gst_base_idle_src_default_event (GstBaseIdleSrc * src, GstEvent * event) { @@ -809,28 +896,28 @@ gst_base_idle_src_get_property (GObject * object, guint prop_id, GValue * value, static gboolean gst_base_idle_src_set_allocation (GstBaseIdleSrc * src, - GstBufferPool * pool, GstAllocator * allocator, + GstBufferPool * buf_pool, GstAllocator * allocator, const GstAllocationParams * params) { - GstAllocator *oldalloc; - GstBufferPool *oldpool; + GstAllocator *old_alloc; + GstBufferPool *old_buf_pool; GstBaseIdleSrcPrivate *priv = src->priv; - if (pool) { - GST_DEBUG_OBJECT (src, "activate pool"); - if (!gst_buffer_pool_set_active (pool, TRUE)) + if (buf_pool) { + GST_DEBUG_OBJECT (src, "activate buffer pool"); + if (!gst_buffer_pool_set_active (buf_pool, TRUE)) goto activate_failed; } GST_OBJECT_LOCK (src); - oldpool = priv->pool; - priv->pool = pool; + old_buf_pool = priv->buf_pool; + priv->buf_pool = buf_pool; - oldalloc = priv->allocator; + old_alloc = priv->allocator; priv->allocator = allocator; - if (priv->pool) - gst_object_ref (priv->pool); + if (priv->buf_pool) + gst_object_ref (priv->buf_pool); if (priv->allocator) gst_object_ref (priv->allocator); @@ -840,16 +927,16 @@ gst_base_idle_src_set_allocation (GstBaseIdleSrc * src, gst_allocation_params_init (&priv->params); GST_OBJECT_UNLOCK (src); - if (oldpool) { + if (old_buf_pool) { /* only deactivate if the pool is not the one we're using */ - if (oldpool != pool) { + if (old_buf_pool != buf_pool) { GST_DEBUG_OBJECT (src, "deactivate old pool"); - gst_buffer_pool_set_active (oldpool, FALSE); + gst_buffer_pool_set_active (old_buf_pool, FALSE); } - gst_object_unref (oldpool); + gst_object_unref (old_buf_pool); } - if (oldalloc) { - gst_object_unref (oldalloc); + if (old_alloc) { + gst_object_unref (old_alloc); } return TRUE; @@ -866,7 +953,7 @@ gst_base_idle_src_decide_allocation_default (GstBaseIdleSrc * src, GstQuery * query) { GstCaps *outcaps; - GstBufferPool *pool; + GstBufferPool *buf_pool; guint size, min, max; GstAllocator *allocator; GstAllocationParams params; @@ -888,40 +975,41 @@ gst_base_idle_src_decide_allocation_default (GstBaseIdleSrc * src, } if (gst_query_get_n_allocation_pools (query) > 0) { - gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max); + gst_query_parse_nth_allocation_pool (query, 0, &buf_pool, &size, &min, + &max); - if (pool == NULL) { + if (buf_pool == NULL) { /* no pool, we can make our own */ - GST_DEBUG_OBJECT (src, "no pool, making new pool"); - pool = gst_buffer_pool_new (); + GST_DEBUG_OBJECT (src, "no buffer pool, making new one"); + buf_pool = gst_buffer_pool_new (); } } else { - pool = NULL; + buf_pool = NULL; size = min = max = 0; } /* now configure */ - if (pool) { - config = gst_buffer_pool_get_config (pool); + if (buf_pool) { + config = gst_buffer_pool_get_config (buf_pool); gst_buffer_pool_config_set_params (config, outcaps, size, min, max); gst_buffer_pool_config_set_allocator (config, allocator, ¶ms); /* buffer pool may have to do some changes */ - if (!gst_buffer_pool_set_config (pool, config)) { - config = gst_buffer_pool_get_config (pool); + if (!gst_buffer_pool_set_config (buf_pool, config)) { + config = gst_buffer_pool_get_config (buf_pool); /* If change are not acceptable, fallback to generic pool */ if (!gst_buffer_pool_config_validate_params (config, outcaps, size, min, max)) { - GST_DEBUG_OBJECT (src, "unsupported pool, making new pool"); + GST_DEBUG_OBJECT (src, "unsupported buffer pool, making new one"); - gst_object_unref (pool); - pool = gst_buffer_pool_new (); + gst_object_unref (buf_pool); + buf_pool = gst_buffer_pool_new (); gst_buffer_pool_config_set_params (config, outcaps, size, min, max); gst_buffer_pool_config_set_allocator (config, allocator, ¶ms); } - if (!gst_buffer_pool_set_config (pool, config)) + if (!gst_buffer_pool_set_config (buf_pool, config)) goto config_failed; } } @@ -933,9 +1021,9 @@ gst_base_idle_src_decide_allocation_default (GstBaseIdleSrc * src, if (allocator) gst_object_unref (allocator); - if (pool) { - gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max); - gst_object_unref (pool); + if (buf_pool) { + gst_query_set_nth_allocation_pool (query, 0, buf_pool, size, min, max); + gst_object_unref (buf_pool); } return TRUE; @@ -944,7 +1032,7 @@ gst_base_idle_src_decide_allocation_default (GstBaseIdleSrc * src, GST_ELEMENT_ERROR (src, RESOURCE, SETTINGS, ("Failed to configure the buffer pool"), ("Configuration is most likely invalid, please report this issue.")); - gst_object_unref (pool); + gst_object_unref (buf_pool); return FALSE; } @@ -954,7 +1042,7 @@ gst_base_idle_src_prepare_allocation (GstBaseIdleSrc * src, GstCaps * caps) GstBaseIdleSrcClass *bclass; gboolean result = TRUE; GstQuery *query; - GstBufferPool *pool = NULL; + GstBufferPool *buf_pool = NULL; GstAllocator *allocator = NULL; GstAllocationParams params; @@ -988,14 +1076,14 @@ gst_base_idle_src_prepare_allocation (GstBaseIdleSrc * src, GstCaps * caps) } if (gst_query_get_n_allocation_pools (query) > 0) - gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL); + gst_query_parse_nth_allocation_pool (query, 0, &buf_pool, NULL, NULL, NULL); - result = gst_base_idle_src_set_allocation (src, pool, allocator, ¶ms); + result = gst_base_idle_src_set_allocation (src, buf_pool, allocator, ¶ms); if (allocator) gst_object_unref (allocator); - if (pool) - gst_object_unref (pool); + if (buf_pool) + gst_object_unref (buf_pool); gst_query_unref (query); @@ -1134,8 +1222,112 @@ gst_base_idle_src_get_buffer_pool (GstBaseIdleSrc * src) g_return_val_if_fail (GST_IS_BASE_IDLE_SRC (src), NULL); GST_OBJECT_LOCK (src); - if (src->priv->pool) - ret = gst_object_ref (src->priv->pool); + if (src->priv->buf_pool) + ret = gst_object_ref (src->priv->buf_pool); + GST_OBJECT_UNLOCK (src); + + return ret; +} + +/** + * gst_base_idle_src_set_thread_pool: + * @src: a #GstBaseIdleSrc + * @thread_pool: (transfer full): a #GstTaskPool + * + * Sets the thread pool to be used internally for scheduling buffer/event + * pushes. The @thread_pool is expected to already have been prepared via + * gst_task_pool_prepare(). Ownership of @thread_pool is transferred to @src. + * + * This may be used to share a single pool between several slow-paced sources. + * The element will not call gst_task_pool_cleanup() on an externally provided + * pool — cleanup is the responsibility of the code that created it. + * + * This function must only be called when the element is not yet running + * (i.e. before transitioning to %GST_STATE_PAUSED). Swapping pools mid-flow + * is not supported because already-queued buffers would be orphaned on the + * previous pool. + */ +void +gst_base_idle_src_set_thread_pool (GstBaseIdleSrc * src, + GstTaskPool * thread_pool) +{ + GstBaseIdleSrcPrivate *priv; + GstTaskPool *old_pool = NULL; + GstState state = GST_STATE_VOID_PENDING; + gpointer old_handle = NULL; + gboolean old_owned = FALSE; + + g_return_if_fail (GST_IS_BASE_IDLE_SRC (src)); + g_return_if_fail (GST_IS_TASK_POOL (thread_pool)); + + /* Thread pool may only be swapped while the + * element is in NULL or READY. + * + * A `running`-only check is insufficient: during teardown stop() clears + * `running` while the element is still in PAUSED and may still be + * joining/draining inside drain_and_join(). A concurrent set_thread_pool() + * in that window would swap pools while the worker still references + * the old one. */ + state = GST_STATE (src); + if (state > GST_STATE_READY || GST_STATE_PENDING (src) > GST_STATE_READY + || g_atomic_int_get (&src->running)) { + GST_WARNING_OBJECT (src, + "Refusing to swap thread pool: element must be in NULL or READY " + "(state=%s, pending=%s, running=%d). Call set_thread_pool() " + "before transitioning to PAUSED.", + gst_element_state_get_name (state), + gst_element_state_get_name (GST_STATE_PENDING (src)), + g_atomic_int_get (&src->running)); + gst_object_unref (thread_pool); + return; + } + + priv = src->priv; + + GST_OBJECT_LOCK (src); + old_pool = priv->thread_pool; + old_handle = priv->thread_handle; + old_owned = priv->owns_thread_pool; + priv->thread_pool = thread_pool; /* transfer full */ + priv->thread_handle = NULL; + priv->owns_thread_pool = FALSE; /* externally provided, now */ + GST_OBJECT_UNLOCK (src); + + /* Drain any pending work on the *previous* pool — the handle is only valid + * against the pool that produced it. Then drop our reference. Note we do + * NOT call gst_task_pool_cleanup() here: the old pool may be shared with + * other elements. */ + if (old_pool) { + if (old_handle) { + gst_task_pool_join (old_pool, old_handle); + } + /* Only cleanup if WE created the previous pool at init() */ + if (old_owned) { + gst_task_pool_cleanup (old_pool); + } + gst_object_unref (old_pool); + } + + GST_DEBUG_OBJECT (src, "Configured thread pool %p", thread_pool); +} + +/** + * gst_base_idle_src_get_thread_pool: + * @src: a #GstBaseIdleSrc + * + * Return: (transfer full): the instance of the #GstTaskPool used + * by the src; unref it after usage + */ +GstTaskPool * +gst_base_idle_src_get_thread_pool (GstBaseIdleSrc * src) +{ + GstTaskPool *ret = NULL; + + g_return_val_if_fail (GST_IS_BASE_IDLE_SRC (src), NULL); + + GST_OBJECT_LOCK (src); + if (src->priv->thread_pool) + ret = gst_object_ref (src->priv->thread_pool); GST_OBJECT_UNLOCK (src); return ret; @@ -1192,36 +1384,72 @@ gst_base_idle_src_add_timestamp (GstBaseIdleSrc * src, GstBuffer * buf) GST_BUFFER_DTS (buf) = running_time; } +static gboolean +gst_base_idle_src_buffer_list_add_timestamp_func (GstBuffer ** buf, guint idx, + gpointer user_data) +{ + GstBaseIdleSrc *src = GST_BASE_IDLE_SRC (user_data); + gst_base_idle_src_add_timestamp (src, *buf); + return TRUE; +} + static void gst_base_idle_src_process_object (GstBaseIdleSrc * src, GstMiniObject * obj) { + GstFlowReturn ret = GST_FLOW_OK; GstPad *pad = src->srcpad; GST_PAD_STREAM_LOCK (pad); if (GST_IS_BUFFER (obj)) { GstBuffer *buf = GST_BUFFER_CAST (obj); - GstFlowReturn ret; if (src->priv->do_timestamp) { gst_base_idle_src_add_timestamp (src, buf); } GST_DEBUG_OBJECT (src, "About to push Buffer %" GST_PTR_FORMAT, buf); - ret = gst_pad_push (pad, buf); - if (ret != GST_FLOW_OK) - GST_ERROR ("Got ret: %s", gst_flow_get_name (ret)); + goto check_ret_error; + } else if (GST_IS_BUFFER_LIST (obj)) { + GstBufferList *buf_list = GST_BUFFER_LIST_CAST (obj); + + if (src->priv->do_timestamp) { + gst_buffer_list_foreach (buf_list, + gst_base_idle_src_buffer_list_add_timestamp_func, src); + } + + GST_DEBUG_OBJECT (src, "About to push BufferList %" GST_PTR_FORMAT, + buf_list); + ret = gst_pad_push_list (pad, buf_list); + goto check_ret_error; } else if (GST_IS_EVENT (obj)) { GstEvent *event = GST_EVENT_CAST (obj); - gboolean ret; GST_DEBUG_OBJECT (src, "About to push Event %" GST_PTR_FORMAT, event); - ret = gst_pad_push_event (pad, event); - if (!ret) - GST_ERROR ("HUUUUAA"); - } else if (GST_IS_CAPS (obj)) { - GST_DEBUG_OBJECT (src, "About to push Caps %" GST_PTR_FORMAT, obj); + /* Hold a ref across the push so the event remains valid for the + * failure log below: gst_pad_push_event() consumes our queue's ref + * unconditionally (success and failure). The extra ref-pair is a + * cold-path price — pushing an event is already expensive and the + * warning only fires on failure — and it lets us log the full + * GST_PTR_FORMAT */ + gst_event_ref (event); + if (!gst_pad_push_event (pad, event)) { + /* Mirror GstBaseSrc behaviour for non-flow events: log and continue. + * We deliberately do NOT post a message on the bus here — event push + * failures are typically transient (e.g. peer not linked yet) and not + * a fatal element error. Flow errors are still surfaced below via + * GST_ELEMENT_FLOW_ERROR(). */ + GST_WARNING_OBJECT (src, "Failed to push event %" GST_PTR_FORMAT, event); + } + gst_event_unref (event); + } else { + GST_ERROR_OBJECT (src, "Unknown object %" GST_PTR_FORMAT " type", obj); + gst_mini_object_unref (obj); + } +check_ret_error: + if (ret != GST_FLOW_OK && ret != GST_FLOW_FLUSHING) { + GST_ELEMENT_FLOW_ERROR (src, ret); } GST_PAD_STREAM_UNLOCK (pad); @@ -1233,7 +1461,6 @@ gst_base_idle_src_process_object_queue (GstBaseIdleSrc * src) GstMiniObject *obj; GST_OBJECT_LOCK (src); while ((obj = g_queue_pop_head (src->priv->obj_queue))) { - GST_OBJECT_UNLOCK (src); gst_base_idle_src_process_object (src, obj); GST_OBJECT_LOCK (src); @@ -1241,7 +1468,7 @@ gst_base_idle_src_process_object_queue (GstBaseIdleSrc * src) GST_OBJECT_UNLOCK (src); } -static gpointer +static void gst_base_idle_src_func (gpointer user_data) { GstBaseIdleSrc *src = GST_BASE_IDLE_SRC (user_data); @@ -1254,47 +1481,110 @@ gst_base_idle_src_func (gpointer user_data) GST_ELEMENT_FLOW_ERROR (src, GST_FLOW_NOT_NEGOTIATED); gst_pad_push_event (src->srcpad, gst_event_new_eos ()); GST_PAD_STREAM_UNLOCK (src->srcpad); - return NULL; + return; } } GST_PAD_STREAM_UNLOCK (src->srcpad); gst_base_idle_src_process_object_queue (src); - return NULL; + return; } static void gst_base_idle_src_start_task (GstBaseIdleSrc * src, gboolean wait) { - GST_DEBUG_OBJECT (src, "Starting Task"); - /* if we already have an outstanding task, join it */ - if (src->priv->thread) - g_thread_join (src->priv->thread); - - src->priv->thread = - g_thread_new (GST_ELEMENT_NAME (src), gst_base_idle_src_func, src); - if (wait) { - g_thread_join (src->priv->thread); - src->priv->thread = NULL; + GstBaseIdleSrcPrivate *priv = src->priv; + GstTaskPool *pool; + gpointer prev_handle, new_handle; + GError *error = NULL; + + GST_OBJECT_LOCK (src); + pool = priv->thread_pool ? gst_object_ref (priv->thread_pool) : NULL; + prev_handle = priv->thread_handle; + priv->thread_handle = NULL; + GST_OBJECT_UNLOCK (src); + + if (G_UNLIKELY (pool == NULL)) { + GST_WARNING_OBJECT (src, "No thread pool configured"); + return; } + + /* Join the *previous* handle on the pool it was issued on, regardless of + * whether we go on to push a new one. Because we cleared + * priv->thread_handle above, set_thread_pool() will not also try to join + * it from the other side. */ + if (prev_handle) + gst_task_pool_join (pool, prev_handle); + + /* Re-check running after the snapshot: a concurrent stop() may have + * cleared it after our caller's submit_*() check. If so, do not push a + * new task — the queue is being drained by stop() and any new task would + * race against teardown and potentially push on a deactivated pad. + * + * A residual race remains (running may flip to FALSE between this check + * and the push), but it is benign: stop() snapshots thread_handle under + * OBJECT_LOCK, so it will either see and join our newly installed handle + * below, or finalize will. */ + if (!g_atomic_int_get (&src->running)) { + GST_DEBUG_OBJECT (src, "Not running, skipping task push"); + gst_object_unref (pool); + return; + } + + new_handle = gst_task_pool_push (pool, gst_base_idle_src_func, src, &error); + if (G_UNLIKELY (error != NULL)) { + GST_ERROR_OBJECT (src, "Failed to push task to pool: %s", error->message); + g_clear_error (&error); + gst_object_unref (pool); + return; + } + + if (wait && new_handle) { + gst_task_pool_join (pool, new_handle); + new_handle = NULL; + } + + GST_OBJECT_LOCK (src); + /* Only install our handle if no concurrent set_thread_pool() happened + * meanwhile (defensive: set_thread_pool() rejects mid-flow swaps). */ + if (priv->thread_pool == pool) { + priv->thread_handle = new_handle; + new_handle = NULL; + } + GST_OBJECT_UNLOCK (src); + + /* If the pool was swapped under us, drain our handle now. */ + if (new_handle) + gst_task_pool_join (pool, new_handle); + + gst_object_unref (pool); } static void gst_base_idle_src_check_pending_segment (GstBaseIdleSrc * src) { - GST_DEBUG_OBJECT (src, "Checking pending segment"); - /* push events to close/start our segment before we push the buffer. */ + /* Build the SEGMENT event under OBJECT_LOCK so the check-and-clear of + * segment_pending, the seqnum update, and the queue insert all happen + * atomically. Without this, two concurrent producers can both observe + * segment_pending=TRUE, both enqueue a SEGMENT event, and both increment + * segment_seqnum — yielding duplicate segments and a torn seqnum. + * + * gst_event_new_segment / gst_event_set_seqnum / gst_util_seqnum_next + * do not call back into the element, so it is safe to hold the lock + * across them. */ + GST_OBJECT_LOCK (src); if (G_UNLIKELY (src->priv->segment_pending)) { GstEvent *seg_event = gst_event_new_segment (&src->segment); gst_event_set_seqnum (seg_event, src->priv->segment_seqnum); src->priv->segment_seqnum = gst_util_seqnum_next (); - gst_base_idle_src_queue_object (src, (GstMiniObject *) seg_event); - GST_DEBUG_OBJECT (src, "Queing segment event %" GST_PTR_FORMAT, seg_event); - src->priv->segment_pending = FALSE; + + GST_DEBUG_OBJECT (src, "Queuing segment event %" GST_PTR_FORMAT, seg_event); + gst_base_idle_src_queue_object_unlocked (src, (GstMiniObject *) seg_event); } + GST_OBJECT_UNLOCK (src); } static void @@ -1314,6 +1604,54 @@ gst_base_idle_src_add_stream_start (GstBaseIdleSrc * src) g_free (stream_id); } +/* Atomically detach any installed task handle and steal the queued objects, + * then join the worker outside the lock and unref the drained miniobjects. + * + * Shared between gst_base_idle_src_stop() and the failure path in + * gst_base_idle_src_start(): both must tear down any background activity + * that a concurrent producer may have started during the brief running=TRUE + * window, without leaking the buffers it queued. + * + * Callers must have already published running=FALSE atomically so no *new* + * producers can pile in after this function returns. */ +static void +gst_base_idle_src_drain_and_join (GstBaseIdleSrc * src) +{ + GstBaseIdleSrcPrivate *priv = src->priv; + GstTaskPool *pool = NULL; + gpointer handle = NULL; + GQueue drained = G_QUEUE_INIT; + GstMiniObject *obj; + + /* Snapshot pool + handle and steal the queue under the lock. */ + GST_OBJECT_LOCK (src); + if (priv->thread_pool) + pool = gst_object_ref (priv->thread_pool); + handle = priv->thread_handle; + priv->thread_handle = NULL; + /* O(1) steal — see commentary at the original use site in stop(). */ + drained = *priv->obj_queue; + g_queue_init (priv->obj_queue); + GST_OBJECT_UNLOCK (src); + + /* Join outside the lock (worker takes the same lock itself). */ + if (handle && pool) + gst_task_pool_join (pool, handle); + g_clear_object (&pool); + + while ((obj = g_queue_pop_head (&drained))) + gst_mini_object_unref (obj); + + /* Second pass: catch anything the worker pushed back via queue_object() + * during the join, or that a producer raced past the !running check. */ + GST_OBJECT_LOCK (src); + drained = *priv->obj_queue; + g_queue_init (priv->obj_queue); + GST_OBJECT_UNLOCK (src); + while ((obj = g_queue_pop_head (&drained))) + gst_mini_object_unref (obj); +} + static gboolean gst_base_idle_src_start (GstBaseIdleSrc * src) { @@ -1322,14 +1660,20 @@ gst_base_idle_src_start (GstBaseIdleSrc * src) GST_DEBUG_OBJECT (src, "Starting"); + /* Initialize segment + pending state under OBJECT_LOCK *before* publishing + * running=TRUE. A producer thread that observes running=TRUE and enters + * submit_buffer*() must see a consistent segment_pending/segment_seqnum, + * not a torn or stale snapshot. */ GST_OBJECT_LOCK (src); - gst_segment_init (&src->segment, src->segment.format); - GST_OBJECT_UNLOCK (src); - - src->running = TRUE; src->priv->segment_pending = TRUE; src->priv->segment_seqnum = gst_util_seqnum_next (); + GST_OBJECT_UNLOCK (src); + + /* Publish running=TRUE with release semantics — producers' atomic load + * acts as the matching acquire and is guaranteed to observe the segment + * state written above. */ + g_atomic_int_set (&src->running, TRUE); bclass = GST_BASE_IDLE_SRC_GET_CLASS (src); if (bclass->start) @@ -1347,6 +1691,20 @@ gst_base_idle_src_start (GstBaseIdleSrc * src) could_not_start: { + /* Subclass refused to start. Roll back the running flag and tear down + * any work a producer may have queued (and any task it may have pushed) + * during the brief running=TRUE window before the subclass returned. + * Without this, late producers would leak miniobjects into priv->obj_queue + * and a subsequent successful start() would push those stale objects + * downstream as if they had just been submitted. */ + g_atomic_int_set (&src->running, FALSE); + + GST_OBJECT_LOCK (src); + src->priv->segment_pending = FALSE; + GST_OBJECT_UNLOCK (src); + + gst_base_idle_src_drain_and_join (src); + GST_DEBUG_OBJECT (src, "could not start"); /* subclass is supposed to post a message but we post one as a fallback * just in case. We don't have to call _stop. */ @@ -1359,21 +1717,15 @@ static gboolean gst_base_idle_src_stop (GstBaseIdleSrc * src) { GstBaseIdleSrcClass *bclass; - GstMiniObject *obj; gboolean result = TRUE; GST_DEBUG_OBJECT (src, "stopping source"); - src->running = FALSE; - if (src->priv->thread) { - g_thread_join (src->priv->thread); - src->priv->thread = NULL; - } + /* Publish !running atomically so any producer that has not yet entered + * queue_object() bails out in submit_buffer*(). */ + g_atomic_int_set (&src->running, FALSE); - /* clean up any leftovers on the queue */ - while ((obj = g_queue_pop_head (src->priv->obj_queue))) { - gst_mini_object_unref (obj); - } + gst_base_idle_src_drain_and_join (src); bclass = GST_BASE_IDLE_SRC_GET_CLASS (src); if (bclass->stop) @@ -1441,15 +1793,12 @@ gst_base_idle_src_activate_mode (GstPad * pad, GstObject * parent, return res; } - /** * gst_base_idle_src_submit_buffer: * @src: a #GstBaseIdleSrc * @buffer: (transfer full): a #GstBuffer * * Subclasses can call this to submit a buffer to be pushed out later. - * - * Since: 1.22 */ void gst_base_idle_src_submit_buffer (GstBaseIdleSrc * src, GstBuffer * buffer) @@ -1457,7 +1806,7 @@ gst_base_idle_src_submit_buffer (GstBaseIdleSrc * src, GstBuffer * buffer) g_return_if_fail (GST_IS_BASE_IDLE_SRC (src)); g_return_if_fail (GST_IS_BUFFER (buffer)); - if (!src->running) { + if (!g_atomic_int_get (&src->running)) { GST_ERROR_OBJECT (src, "Sending buffer to stopped src is not valid"); gst_buffer_unref (buffer); return; @@ -1465,7 +1814,6 @@ gst_base_idle_src_submit_buffer (GstBaseIdleSrc * src, GstBuffer * buffer) gst_base_idle_src_check_pending_segment (src); - /* we need it to be writable later in get_range() where we use get_writable */ gst_base_idle_src_queue_object (src, (GstMiniObject *) buffer); gst_base_idle_src_start_task (src, FALSE); @@ -1477,8 +1825,6 @@ gst_base_idle_src_submit_buffer (GstBaseIdleSrc * src, GstBuffer * buffer) * @buffer_list: (transfer full): a #GstBufferList * * Subclasses can call this to submit a buffer list to be pushed out later. - * - * Since: 1.22 */ void gst_base_idle_src_submit_buffer_list (GstBaseIdleSrc * src, @@ -1487,7 +1833,7 @@ gst_base_idle_src_submit_buffer_list (GstBaseIdleSrc * src, g_return_if_fail (GST_IS_BASE_IDLE_SRC (src)); g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list)); - if (!src->running) { + if (!g_atomic_int_get (&src->running)) { GST_ERROR_OBJECT (src, "Sending bufferlist to stopped src is not valid"); gst_buffer_list_unref (buffer_list); return; @@ -1495,7 +1841,6 @@ gst_base_idle_src_submit_buffer_list (GstBaseIdleSrc * src, gst_base_idle_src_check_pending_segment (src); - /* we need it to be writable later in get_range() where we use get_writable */ gst_base_idle_src_queue_object (src, (GstMiniObject *) buffer_list); GST_LOG_OBJECT (src, "%u buffers submitted in buffer list", @@ -1504,75 +1849,86 @@ gst_base_idle_src_submit_buffer_list (GstBaseIdleSrc * src, gst_base_idle_src_start_task (src, FALSE); } - -/** - * gst_base_idle_src_alloc_buffer: - * @src: a #GstBaseIdleSrc - * @size: a gsize with the size of the buffer - * @buffer: (transfer full): a #GstBuffer - * - * Subclasses can call this to alloc a buffer. - * - * Since: 1.22 - */ -GstFlowReturn -gst_base_idle_src_alloc_buffer (GstBaseIdleSrc * src, - gsize size, GstBuffer ** buffer) +static GstFlowReturn +gst_base_idle_src_default_alloc (GstBaseIdleSrc * src, + guint size, GstBuffer ** buffer) { - GstFlowReturn ret; + GstFlowReturn ret = GST_FLOW_OK; GstBaseIdleSrcPrivate *priv = src->priv; - GstBufferPool *pool = NULL; + GstBufferPool *buf_pool = NULL; GstAllocator *allocator = NULL; GstAllocationParams params; GST_OBJECT_LOCK (src); - if (priv->pool) { - pool = gst_object_ref (priv->pool); + if (priv->buf_pool) { + buf_pool = gst_object_ref (priv->buf_pool); } else if (priv->allocator) { allocator = gst_object_ref (priv->allocator); } params = priv->params; GST_OBJECT_UNLOCK (src); - if (pool) { - ret = gst_buffer_pool_acquire_buffer (pool, buffer, NULL); - } else if (size != -1) { + if (buf_pool) { + ret = gst_buffer_pool_acquire_buffer (buf_pool, buffer, NULL); + } else { *buffer = gst_buffer_new_allocate (allocator, size, ¶ms); if (G_UNLIKELY (*buffer == NULL)) goto alloc_failed; - - ret = GST_FLOW_OK; - } else { - GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?", - size); - goto alloc_failed; } done: - if (pool) - gst_object_unref (pool); + if (buf_pool) + gst_object_unref (buf_pool); if (allocator) gst_object_unref (allocator); return ret; - /* ERRORS */ alloc_failed: - { - GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size); - ret = GST_FLOW_ERROR; - goto done; - } + GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size); + ret = GST_FLOW_ERROR; + goto done; } static void gst_base_idle_src_finalize (GObject * object) { - GstBaseIdleSrc *src; - src = GST_BASE_IDLE_SRC (object); + GstBaseIdleSrc *src = GST_BASE_IDLE_SRC (object); + GstBaseIdleSrcPrivate *priv = src->priv; + GstMiniObject *obj; + + /* Defensively tear down any work still in flight. In a well-behaved + * pipeline stop() will already have done this, but finalize is the + * unconditional fallback: nothing prevents a subclass from dropping + * the last ref while `running` is still TRUE (e.g. if the element + * was never activated, or if a wrapper holds extra refs past stop()) */ + g_atomic_int_set (&src->running, FALSE); + gst_base_idle_src_drain_and_join (src); + + /* The queue is now guaranteed empty (drain_and_join() emptied it in + * its second pass under the lock, and no producer can have added to + * it since: the last ref to `src` is being dropped here, so by + * definition no other thread holds a pointer to call submit_*() with). + * Free the GQueue itself. + * + * Note: we still pop-and-unref in a loop as a belt-and-braces guard + * in case a subclass override somehow re-queued something synchronously + * from its own dispose chain. */ + while ((obj = g_queue_pop_head (priv->obj_queue))) + gst_mini_object_unref (obj); + g_queue_free (priv->obj_queue); + priv->obj_queue = NULL; + + /* drain_and_join() already cleared priv->thread_handle. */ + g_assert (priv->thread_handle == NULL); + + /* Only cleanup the pool if WE created it. An externally-provided pool + * may be shared with other elements; its owner is responsible for + * gst_task_pool_cleanup(). */ + if (priv->thread_pool && priv->owns_thread_pool) + gst_task_pool_cleanup (priv->thread_pool); - g_queue_free (src->priv->obj_queue); - /* FIXME: empty this queue potentially... */ + g_clear_object (&priv->thread_pool); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -1589,8 +1945,8 @@ gst_base_idle_src_class_init (GstBaseIdleSrcClass * klass) if (private_offset != 0) g_type_class_adjust_private_offset (klass, &private_offset); - GST_DEBUG_CATEGORY_INIT (gst_base_idle_src_debug, "idlesrc", 0, - "idlesrc element"); + GST_DEBUG_CATEGORY_INIT (gst_base_idle_src_debug, "baseidlesrc", 0, + "base idlesrc element"); parent_class = g_type_class_peek_parent (klass); @@ -1611,11 +1967,11 @@ gst_base_idle_src_class_init (GstBaseIdleSrcClass * klass) klass->fixate = GST_DEBUG_FUNCPTR (gst_base_idle_src_default_fixate); klass->query = GST_DEBUG_FUNCPTR (gst_base_idle_src_default_query); klass->event = GST_DEBUG_FUNCPTR (gst_base_idle_src_default_event); + klass->alloc = GST_DEBUG_FUNCPTR (gst_base_idle_src_default_alloc); klass->decide_allocation = GST_DEBUG_FUNCPTR (gst_base_idle_src_decide_allocation_default); /* Registering debug symbols for function pointers */ - GST_DEBUG_REGISTER_FUNCPTR (gst_base_idle_src_event); GST_DEBUG_REGISTER_FUNCPTR (gst_base_idle_src_query); GST_DEBUG_REGISTER_FUNCPTR (gst_base_idle_src_fixate); @@ -1635,25 +1991,42 @@ gst_base_idle_src_init (GstBaseIdleSrc * src, gpointer g_class) GST_DEBUG_OBJECT (src, "creating src pad"); pad = gst_pad_new_from_template (pad_template, "src"); - - GST_DEBUG_OBJECT (src, "setting functions on src pad"); gst_pad_set_activatemode_function (pad, gst_base_idle_src_activate_mode); gst_pad_set_event_function (pad, gst_base_idle_src_event); gst_pad_set_query_function (pad, gst_base_idle_src_query); - /* hold pointer to pad */ src->srcpad = pad; - GST_DEBUG_OBJECT (src, "adding src pad"); gst_element_add_pad (GST_ELEMENT (src), pad); - /* we operate in BYTES by default */ gst_base_idle_src_set_format (src, GST_FORMAT_BYTES); src->priv->do_timestamp = DEFAULT_DO_TIMESTAMP; - GST_OBJECT_FLAG_SET (src, GST_ELEMENT_FLAG_SOURCE); + /* Plain g_queue_new(): stop() steals the queue contents via a struct copy, + * which assumes no GDestroyNotify is attached. If we ever need one, switch + * to g_queue_init() of an embedded GQueue and clear via g_queue_clear_full() + * after the steal. */ src->priv->obj_queue = g_queue_new (); + /* Default internal pool — capped at 1 worker. */ + { + GstTaskPool *thread_pool = gst_shared_task_pool_new (); + GError *error = NULL; + + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (thread_pool), + 1); + gst_task_pool_prepare (thread_pool, &error); + if (G_UNLIKELY (error != NULL)) { + GST_ERROR_OBJECT (src, "Failed to prepare default thread pool: %s", + error->message); + g_clear_error (&error); + gst_object_unref (thread_pool); + thread_pool = NULL; + } + src->priv->thread_pool = thread_pool; + src->priv->owns_thread_pool = (thread_pool != NULL); + } + GST_DEBUG_OBJECT (src, "init done"); } diff --git a/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.h b/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.h index cf9fea5660..d16a83e155 100644 --- a/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.h +++ b/subprojects/gstreamer/libs/gst/base/gstbaseidlesrc.h @@ -2,6 +2,8 @@ * Copyright (C) 1999,2000 Erik Walthinsen * 2000 Wim Taymans * 2005 Wim Taymans + * 2023 Havard Graff + * 2023 Camilo Celis Guzman * * gstbaseidlesrc.h: * @@ -87,23 +89,18 @@ struct _GstBaseIdleSrc { * setting a fixate function on the source pad. * @set_caps: Notify subclass of changed output caps * @decide_allocation: configure the allocation query - * @start: Start processing. Subclasses should open resources and prepare - * to produce data. Implementation should call gst_base_idle_src_start_complete() - * when the operation completes, either from the current thread or any other - * thread that finishes the start operation asynchronously. + * @start: Start processing. Subclasses should open resources + * and prepare to produce data. * @stop: Stop processing. Subclasses should use this to close resources. - * @get_size: Return the total size of the resource, in the format set by - * gst_base_idle_src_set_format(). * @query: Handle a requested query. * @event: Override this to implement custom event handling. - * @alloc: Ask the subclass to allocate a buffer with for offset and size. The - * default implementation will create a new buffer from the negotiated allocator. - * @fill: Ask the subclass to fill the buffer with data for offset and size. The - * passed buffer is guaranteed to hold the requested amount of bytes. + * @alloc: Ask the subclass to allocate an output buffer of @size bytes. The + * default implementation will use the negotiated #GstBufferPool when set, + * otherwise it falls back to gst_buffer_new_allocate() with the negotiated + * allocator and parameters. * * Subclasses can override any of the available virtual methods or not, as - * needed. At the minimum, the @create method should be overridden to produce - * buffers. + * needed. */ struct _GstBaseIdleSrcClass { GstElementClass parent_class; @@ -125,6 +122,21 @@ struct _GstBaseIdleSrcClass { /* notify the subclass of new caps */ gboolean (*set_caps) (GstBaseIdleSrc *src, GstCaps *caps); + /** + * GstBaseIdleSrcClass::alloc: + * @src: a #GstBaseIdleSrc + * @size: the requested size of the buffer in bytes + * @buf: (out) (transfer full) (nullable): the resulting #GstBuffer + * + * Ask the subclass to allocate an output buffer of @size bytes. The default + * implementation uses the negotiated #GstBufferPool or, when none is set, + * the negotiated #GstAllocator together with the configured + * #GstAllocationParams. + * + * Returns: a #GstFlowReturn + */ + GstFlowReturn (*alloc) (GstBaseIdleSrc *src, guint size, GstBuffer **buf); + /* setup allocation query */ gboolean (*decide_allocation) (GstBaseIdleSrc *src, GstQuery *query); @@ -132,17 +144,6 @@ struct _GstBaseIdleSrcClass { gboolean (*start) (GstBaseIdleSrc *src); gboolean (*stop) (GstBaseIdleSrc *src); - /** - * GstBaseIdleSrcClass::get_size: - * @size: (out): - * - * Get the total size of the resource in the format set by - * gst_base_idle_src_set_format(). - * - * Returns: %TRUE if the size is available and has been set. - */ - gboolean (*get_size) (GstBaseIdleSrc *src, guint64 *size); - /* notify subclasses of a query */ gboolean (*query) (GstBaseIdleSrc *src, GstQuery *query); @@ -166,10 +167,6 @@ gboolean gst_base_idle_src_is_live (GstBaseIdleSrc *src); GST_BASE_API void gst_base_idle_src_set_format (GstBaseIdleSrc *src, GstFormat format); -GST_BASE_API -void gst_base_idle_src_set_automatic_eos (GstBaseIdleSrc * src, gboolean automatic_eos); - - GST_BASE_API gboolean gst_base_idle_src_negotiate (GstBaseIdleSrc *src); @@ -185,16 +182,18 @@ void gst_base_idle_src_set_do_timestamp (GstBaseIdleSrc *src, gboolea GST_BASE_API gboolean gst_base_idle_src_get_do_timestamp (GstBaseIdleSrc *src); -GST_BASE_API -gboolean gst_base_idle_src_new_segment (GstBaseIdleSrc *src, - const GstSegment * segment); - GST_BASE_API gboolean gst_base_idle_src_set_caps (GstBaseIdleSrc *src, GstCaps *caps); GST_BASE_API GstBufferPool * gst_base_idle_src_get_buffer_pool (GstBaseIdleSrc *src); +GST_BASE_API +void gst_base_idle_src_set_thread_pool (GstBaseIdleSrc *src, GstTaskPool *thread_pool); + +GST_BASE_API +GstTaskPool * gst_base_idle_src_get_thread_pool (GstBaseIdleSrc *src); + GST_BASE_API void gst_base_idle_src_get_allocator (GstBaseIdleSrc *src, GstAllocator **allocator, @@ -208,11 +207,6 @@ GST_BASE_API void gst_base_idle_src_submit_buffer_list (GstBaseIdleSrc * src, GstBufferList * buffer_list); -GST_BASE_API -GstFlowReturn gst_base_idle_src_alloc_buffer (GstBaseIdleSrc * src, - gsize size, - GstBuffer ** buffer); - G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstBaseIdleSrc, gst_object_unref) G_END_DECLS diff --git a/subprojects/gstreamer/tests/check/libs/baseidlesrc.c b/subprojects/gstreamer/tests/check/libs/baseidlesrc.c index 7d7aa1a4a3..d4bebe5d0d 100644 --- a/subprojects/gstreamer/tests/check/libs/baseidlesrc.c +++ b/subprojects/gstreamer/tests/check/libs/baseidlesrc.c @@ -42,6 +42,14 @@ static GType test_idle_src_get_type (void); G_DEFINE_TYPE (TestIdleSrc, test_idle_src, GST_TYPE_BASE_IDLE_SRC); +static GstFlowReturn +test_idle_src_alloc (TestIdleSrc * src, GstBuffer ** buf) +{ + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstBaseIdleSrcClass *klass = GST_BASE_IDLE_SRC_GET_CLASS (base_src); + return klass->alloc (base_src, 100, buf); +} + static void test_idle_src_init (TestIdleSrc * src) { @@ -64,16 +72,51 @@ GST_START_TEST (baseidlesrc_up_and_down) h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); gst_harness_teardown (h); + g_object_unref (src); } GST_END_TEST; GST_START_TEST (baseidlesrc_submit_buffer) { - GstElement *src; + TestIdleSrc *src; + GstHarness *h; + GstBaseIdleSrc *base_src; + GstBuffer *buf; + guint i; + + src = g_object_new (test_idle_src_get_type (), NULL); + base_src = GST_BASE_IDLE_SRC (src); + + h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + + gst_harness_set_sink_caps_str (h, "video/x-raw,format=RGB,width=1,height=1"); + gst_harness_play (h); + + for (i = 0; i < 5; i++) { + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + GST_BUFFER_PTS (buf) = i * GST_MSECOND; + gst_base_idle_src_submit_buffer (base_src, buf); + + buf = gst_harness_pull (h); + fail_unless (buf != NULL); + fail_unless_equals_uint64 (GST_BUFFER_PTS (buf), i * GST_MSECOND); + gst_buffer_unref (buf); + } + + gst_harness_teardown (h); + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_submit_buffer_list) +{ + TestIdleSrc *src; GstHarness *h; GstBaseIdleSrc *base_src; GstBuffer *buf; + GstBufferList *buf_list; guint i; src = g_object_new (test_idle_src_get_type (), NULL); @@ -84,14 +127,703 @@ GST_START_TEST (baseidlesrc_submit_buffer) gst_harness_set_sink_caps_str (h, "foo/bar"); gst_harness_play (h); + buf_list = gst_buffer_list_new_sized (20); + for (i = 0; i < 5; i++) { - fail_unless_equals_int (GST_FLOW_OK, - gst_base_idle_src_alloc_buffer (base_src, 100, &buf)); + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + gst_buffer_list_insert (buf_list, -1, buf); + } + + gst_base_idle_src_submit_buffer_list (base_src, buf_list); + // FIXME: Enable check once _pull_list API lands. + // gst_buffer_list_unref (gst_harness_pull_list (h)); + + gst_harness_teardown (h); + g_object_unref (src); +} + +GST_END_TEST; + +static void +fail_unless_equals_event_type (const GstEvent * event, + GstEventType expected_type) +{ + fail_unless (GST_EVENT_TYPE (event) == expected_type, + "'%s' expected, got '%s'", gst_event_type_get_name (expected_type), + gst_event_type_get_name (GST_EVENT_TYPE (event))); +} + +GST_START_TEST (baseidlesrc_handle_events) +{ + TestIdleSrc *src; + GstHarness *h; + GstBaseIdleSrc *base_src; + GstBuffer *buf; + GstEvent *event; + + src = g_object_new (test_idle_src_get_type (), NULL); + base_src = GST_BASE_IDLE_SRC (src); + + h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); + + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + gst_base_idle_src_submit_buffer (base_src, buf); + + event = gst_harness_pull_event (h); + fail_unless_equals_event_type (event, GST_EVENT_STREAM_START); + gst_event_unref (event); + + event = gst_harness_pull_event (h); + fail_unless_equals_event_type (event, GST_EVENT_SEGMENT); + gst_event_unref (event); + + gst_buffer_unref (gst_harness_pull (h)); + + gst_harness_teardown (h); + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_thread_pool_set_and_get) +{ + GstElement *src; + GstBaseIdleSrc *base_src; + GstTaskPool *thread_pool; + GstTaskPool *new_thread_pool; + GError *err = NULL; + + src = g_object_new (test_idle_src_get_type (), NULL); + base_src = GST_BASE_IDLE_SRC (src); + + /* The element must expose a default internal pool. */ + thread_pool = gst_base_idle_src_get_thread_pool (base_src); + fail_unless (thread_pool != NULL); + gst_object_unref (thread_pool); + + /* Build a replacement pool and prepare it, checking for failure. */ + new_thread_pool = gst_shared_task_pool_new (); + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (new_thread_pool), + 2); + gst_task_pool_prepare (new_thread_pool, &err); + fail_unless (err == NULL, "task pool prepare failed: %s", + err ? err->message : "(no error)"); + + /* set_thread_pool() takes ownership (transfer full), so we ref-up for our + * own use here. */ + gst_base_idle_src_set_thread_pool (base_src, + gst_object_ref (new_thread_pool)); + + thread_pool = gst_base_idle_src_get_thread_pool (base_src); + fail_unless (thread_pool == new_thread_pool); + fail_unless_equals_int (2, + gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL + (thread_pool))); + + gst_object_unref (thread_pool); /* drop the get_thread_pool() ref */ + g_object_unref (src); /* this releases the element's ref */ + + /* Now we own the last ref; we may safely cleanup + unref our local one. */ + gst_task_pool_cleanup (new_thread_pool); + gst_object_unref (new_thread_pool); +} + +GST_END_TEST; + +#define MAX_SRCS 16 + +static gpointer +_push_func (gpointer data) +{ + GstBuffer *buf; + GstBufferList *buf_list; + guint i, j; + + GstHarness *h = data; + GstElement *e = h->element; + TestIdleSrc *src = (TestIdleSrc *) e; + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + + /* push some buffer lists */ + GST_LOG ("Pushing some buffer lists from source %s", + gst_element_get_name (e)); + for (i = 0; i < 5; i++) { + buf_list = gst_buffer_list_new_sized (20); + for (j = 0; j < 5; j++) { + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + gst_buffer_list_insert (buf_list, -1, buf); + } + gst_base_idle_src_submit_buffer_list (base_src, buf_list); + if (g_random_int_range (0, 100) == 3) { + GST_LOG ("Randomly yielding during buffer list push from source %s", + gst_element_get_name (e)); + g_thread_yield (); + } + } + + /* yield to cause some havoc */ + GST_LOG ("Yielding from source %s", gst_element_get_name (e)); + g_thread_yield (); + + /* push some buffers */ + GST_LOG ("Pushing some buffers from source %s", gst_element_get_name (e)); + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + for (i = 0; i < 100; i++) { + gst_base_idle_src_submit_buffer (base_src, gst_buffer_ref (buf)); + if (g_random_int_range (0, 100) == 3) { + GST_LOG ("Randomly yielding during buffer push from source %s", + gst_element_get_name (e)); + g_thread_yield (); + } + } + gst_buffer_unref (buf); + + return NULL; +} + +GST_START_TEST (baseidlesrc_thread_pool_submit) +{ + GstHarness *hs[MAX_SRCS]; + GstElement *srcs[MAX_SRCS]; + GThread *threads[MAX_SRCS]; + GstBaseIdleSrc *base_src; + GError *err = NULL; + GstTaskPool *pool; + guint i; + + pool = gst_shared_task_pool_new (); + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), + MAX_SRCS / 2); + gst_task_pool_prepare (pool, &err); + fail_unless (err == NULL, "task pool prepare failed: %s", + err ? err->message : "(no error)"); + + /* create all sources and harnesses in one go */ + for (i = 0; i < MAX_SRCS; i++) { + srcs[i] = g_object_new (test_idle_src_get_type (), NULL); + base_src = GST_BASE_IDLE_SRC (srcs[i]); + + /* transfer-full — give each element its own ref */ + gst_base_idle_src_set_thread_pool (base_src, gst_object_ref (pool)); + + hs[i] = gst_harness_new_with_element (GST_ELEMENT (srcs[i]), NULL, "src"); + gst_harness_set_sink_caps_str (hs[i], "foo/bar"); + gst_harness_play (hs[i]); + } + + for (i = 0; i < MAX_SRCS; i++) { + char *thread_name = g_strdup_printf ("pusher-%d", i); + threads[i] = g_thread_new (thread_name, _push_func, hs[i]); + g_free (thread_name); + } + + for (i = 0; i < MAX_SRCS; i++) + g_thread_join (threads[i]); + + for (i = 0; i < MAX_SRCS; i++) { + gst_harness_teardown (hs[i]); + g_object_unref (srcs[i]); + } + + /* All elements have released their refs; we own the last one — safe to + * cleanup + unref. */ + gst_task_pool_cleanup (pool); + gst_object_unref (pool); +} + +GST_END_TEST; + +static void +_yield_task (G_GNUC_UNUSED void *user_data) +{ + g_thread_yield (); +} + +GST_START_TEST (baseidlesrc_default_thread_pool_is_prepared) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstTaskPool *pool = gst_base_idle_src_get_thread_pool (base_src); + GError *err = NULL; + gpointer handle; + + fail_unless (pool != NULL); + fail_unless (GST_IS_SHARED_TASK_POOL (pool)); + fail_unless_equals_int (1, + gst_shared_task_pool_get_max_threads (GST_SHARED_TASK_POOL (pool))); + + /* Already prepared → push must succeed without prepare(). */ + handle = gst_task_pool_push (pool, _yield_task, NULL, &err); + fail_unless (err == NULL, "default pool push failed: %s", + err ? err->message : ""); + if (handle) + gst_task_pool_join (pool, handle); + + gst_object_unref (pool); + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_replace_thread_pool_preserves_shared_pool) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GError *err = NULL; + GstTaskPool *shared, *other; + gpointer h; + + shared = gst_shared_task_pool_new (); + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (shared), 2); + gst_task_pool_prepare (shared, &err); + fail_unless (err == NULL); + + gst_base_idle_src_set_thread_pool (base_src, gst_object_ref (shared)); + + other = gst_shared_task_pool_new (); + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (other), 1); + gst_task_pool_prepare (other, &err); + fail_unless (err == NULL); + gst_base_idle_src_set_thread_pool (base_src, gst_object_ref (other)); + + /* Would fail/UAF if shared had been cleaned up. */ + h = gst_task_pool_push (shared, _yield_task, NULL, &err); + fail_unless (err == NULL); + if (h) + gst_task_pool_join (shared, h); + + g_object_unref (src); + + gst_task_pool_cleanup (other); + gst_object_unref (other); + + gst_task_pool_cleanup (shared); + gst_object_unref (shared); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_finalize_one_keeps_shared_pool_alive) +{ + GError *err = NULL; + GstTaskPool *pool = gst_shared_task_pool_new (); + TestIdleSrc *a, *b; + GstHarness *hb; + GstBuffer *buf; + + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (pool), 2); + gst_task_pool_prepare (pool, &err); + fail_unless (err == NULL); + + a = g_object_new (test_idle_src_get_type (), NULL); + b = g_object_new (test_idle_src_get_type (), NULL); + gst_base_idle_src_set_thread_pool (GST_BASE_IDLE_SRC (a), + gst_object_ref (pool)); + gst_base_idle_src_set_thread_pool (GST_BASE_IDLE_SRC (b), + gst_object_ref (pool)); + + hb = gst_harness_new_with_element (GST_ELEMENT (b), NULL, "src"); + gst_harness_set_sink_caps_str (hb, "foo/bar"); + gst_harness_play (hb); + + g_object_unref (a); /* must not break b */ + + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (b, &buf)); + gst_base_idle_src_submit_buffer (GST_BASE_IDLE_SRC (b), buf); + gst_buffer_unref (gst_harness_pull (hb)); + + gst_harness_teardown (hb); + g_object_unref (b); + + gst_task_pool_cleanup (pool); + gst_object_unref (pool); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_submission_order_is_preserved) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstHarness *h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + const guint N = 200; + guint i; + + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); + + for (i = 0; i < N; i++) { + GstBuffer *buf; + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + GST_BUFFER_OFFSET (buf) = i; + gst_base_idle_src_submit_buffer (base_src, buf); + } + for (i = 0; i < N; i++) { + GstBuffer *buf = gst_harness_pull (h); + fail_unless_equals_uint64 (GST_BUFFER_OFFSET (buf), i); + gst_buffer_unref (buf); + } + + gst_harness_teardown (h); + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_do_timestamp_on_buffer_list) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstHarness *h; + GstBufferList *list; + guint i; + + gst_base_idle_src_set_do_timestamp (base_src, TRUE); + h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); + + list = gst_buffer_list_new_sized (4); + for (i = 0; i < 4; i++) { + GstBuffer *buf; + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + gst_buffer_list_insert (list, -1, buf); + } + gst_base_idle_src_submit_buffer_list (base_src, list); + + for (i = 0; i < 4; i++) { + GstBuffer *buf = gst_harness_pull (h); + fail_unless (buf != NULL); + fail_unless (GST_BUFFER_PTS_IS_VALID (buf)); + gst_buffer_unref (buf); + } + + gst_harness_teardown (h); + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_set_format_rejects_invalid) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + + ASSERT_CRITICAL (gst_base_idle_src_set_format (base_src, GST_FORMAT_PERCENT)); + ASSERT_CRITICAL (gst_base_idle_src_set_format (base_src, GST_FORMAT_DEFAULT)); + + gst_base_idle_src_set_format (base_src, GST_FORMAT_BYTES); + gst_base_idle_src_set_format (base_src, GST_FORMAT_TIME); + gst_base_idle_src_set_format (base_src, GST_FORMAT_BUFFERS); + + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_submit_pull_loop_no_deadlock) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstHarness *h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + gint64 deadline; + guint i; + + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); + deadline = g_get_monotonic_time () + 5 * G_TIME_SPAN_SECOND; + + for (i = 0; i < 100; i++) { + GstBuffer *buf; + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); gst_base_idle_src_submit_buffer (base_src, buf); gst_buffer_unref (gst_harness_pull (h)); + fail_unless (g_get_monotonic_time () < deadline, + "submit/pull loop deadlocked or stalled"); + } + + gst_harness_teardown (h); + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_submit_after_stop_is_safe) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstHarness *h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + GstBuffer *buf; + + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); + gst_harness_teardown (h); + + fail_unless_equals_int (GST_FLOW_OK, test_idle_src_alloc (src, &buf)); + gst_base_idle_src_submit_buffer (base_src, buf); /* must drop, not crash */ + + g_object_unref (src); +} + +GST_END_TEST; + +GST_START_TEST (baseidlesrc_default_pool_is_cleaned_up_on_swap) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GError *err = NULL; + GstTaskPool *replacement = gst_shared_task_pool_new (); + + gst_shared_task_pool_set_max_threads (GST_SHARED_TASK_POOL (replacement), 1); + gst_task_pool_prepare (replacement, &err); + fail_unless (err == NULL); + + /* Swap out the default pool — the previous (default) one is owned by us + * and must be cleaned up + unreffed inside set_thread_pool(). Valgrind + * will catch the leak if cleanup() is skipped. */ + gst_base_idle_src_set_thread_pool (GST_BASE_IDLE_SRC (src), + gst_object_ref (replacement)); + + gst_task_pool_cleanup (replacement); + gst_object_unref (replacement); + + g_object_unref (src); +} + +GST_END_TEST; + +typedef struct +{ + GstBaseIdleSrc *src; + gint stop; /* atomic */ +} StopRaceCtx; + +static gpointer +_stop_race_producer (gpointer data) +{ + StopRaceCtx *c = data; + while (!g_atomic_int_get (&c->stop)) { + GstBuffer *buf = gst_buffer_new_allocate (NULL, 64, NULL); + gst_base_idle_src_submit_buffer (c->src, buf); + } + return NULL; +} + +GST_START_TEST (baseidlesrc_stop_races_with_submit) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstHarness *h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + StopRaceCtx ctx = { base_src, 0 }; + GThread *producer; + + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); + + producer = g_thread_new ("producer", _stop_race_producer, &ctx); + g_usleep (50 * 1000); /* let it churn */ + gst_harness_teardown (h); /* triggers stop() */ + g_atomic_int_set (&ctx.stop, 1); + g_thread_join (producer); + + g_object_unref (src); +} + +GST_END_TEST; + +/* A subclass whose start() vfunc deliberately fails. Used to verify that + * the base class clears `running` when subclass activation fails, so that + * subsequent submit_buffer*() calls correctly drop buffers rather than + * queueing them into a source that never started. + * + * Regression test for: + * https://github.com/pexip/gstreamer/pull/45#discussion_r3361051394 + */ +typedef GstBaseIdleSrc FailingStartIdleSrc; +typedef GstBaseIdleSrcClass FailingStartIdleSrcClass; + +static GType failing_start_idle_src_get_type (void); +G_DEFINE_TYPE (FailingStartIdleSrc, failing_start_idle_src, + GST_TYPE_BASE_IDLE_SRC); + +static gboolean +failing_start_idle_src_start (G_GNUC_UNUSED GstBaseIdleSrc * src) +{ + return FALSE; +} + +static void +failing_start_idle_src_init (G_GNUC_UNUSED FailingStartIdleSrc * src) +{ +} + +static void +failing_start_idle_src_class_init (FailingStartIdleSrcClass * klass) +{ + GstBaseIdleSrcClass *bclass = GST_BASE_IDLE_SRC_CLASS (klass); + + gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass), + &src_template); + bclass->start = failing_start_idle_src_start; +} + +GST_START_TEST (baseidlesrc_failed_start_clears_running) +{ + GstBaseIdleSrc *src = g_object_new (failing_start_idle_src_get_type (), NULL); + GstBuffer *buf; + + /* Activating the pad invokes start(), which our subclass forces to + * return FALSE. The base class must roll back `running` so the element + * is not left in a half-started state. */ + fail_if (gst_pad_set_active (GST_BASE_IDLE_SRC_PAD (src), TRUE), + "pad activation must fail when subclass start() returns FALSE"); + + /* Invariant: running was rolled back. */ + fail_unless (!g_atomic_int_get (&src->running), + "running must be cleared after start() failure"); + + /* Behaviour: submit_buffer drops the (transfer-full) ref rather than + * queueing. We take an extra ref so we can observe what happened: + * - dropped (running == FALSE, correct): refcount goes from 2 to 1. + * - queued (running == TRUE, wrong): refcount stays at 2. */ + buf = gst_buffer_new_allocate (NULL, 64, NULL); + gst_buffer_ref (buf); + gst_base_idle_src_submit_buffer (src, buf); + fail_unless_equals_int (1, GST_MINI_OBJECT_REFCOUNT_VALUE (buf)); + gst_buffer_unref (buf); + + g_object_unref (src); +} + +GST_END_TEST; + + /* The documented contract is that set_thread_pool() must be called before + * the element transitions to PAUSED. Verify that a swap attempted while + * the element is playing is rejected and that the original pool is + * preserved (i.e. the rejected one was not silently adopted). */ +GST_START_TEST (baseidlesrc_set_thread_pool_rejected_when_playing) +{ + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstHarness *h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + GstTaskPool *original; + GstTaskPool *rejected; + GstTaskPool *current; + GError *err = NULL; + + /* Capture the default pool the element installed at init. */ + original = gst_base_idle_src_get_thread_pool (base_src); + fail_unless (original != NULL); + + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); /* element now in PAUSED (or beyond) */ + + rejected = gst_shared_task_pool_new (); + gst_task_pool_prepare (rejected, &err); + fail_unless (err == NULL); + + /* Should be refused — the contract requires NULL/READY. */ + gst_base_idle_src_set_thread_pool (base_src, gst_object_ref (rejected)); + + /* Pool must be unchanged. */ + current = gst_base_idle_src_get_thread_pool (base_src); + fail_unless (current == original, "set_thread_pool() must not adopt the " + "new pool while element is past READY"); + gst_object_unref (current); + + gst_harness_teardown (h); + g_object_unref (src); + + /* We still own our refs to both pools. */ + gst_object_unref (original); + gst_task_pool_cleanup (rejected); + gst_object_unref (rejected); +} + +GST_END_TEST; + +typedef struct +{ + GstBaseIdleSrc *src; + GAsyncQueue *start_gate; +} SegmentRaceCtx; + +static gpointer +_segment_race_producer (gpointer data) +{ + SegmentRaceCtx *c = data; + GstBuffer *buf; + + /* Block until the test thread releases the gate to maximize concurrency. */ + g_async_queue_pop (c->start_gate); + + buf = gst_buffer_new_allocate (NULL, 64, NULL); + gst_base_idle_src_submit_buffer (c->src, buf); + return NULL; +} + + /* Concurrent producers must observe a single SEGMENT event downstream, + * not one per producer that won the segment_pending check-and-clear race. + * Pre-fix, the unlocked check_pending_segment() could enqueue N duplicate + * SEGMENT events and tear the segment_seqnum. + * + * The test is necessarily probabilistic: with the bug it triggers most of + * the time on multi-core hardware (and always under TSan); with the fix it + * always passes. */ +GST_START_TEST (baseidlesrc_concurrent_submit_emits_one_segment) +{ +#define N_PRODUCERS 16 + TestIdleSrc *src = g_object_new (test_idle_src_get_type (), NULL); + GstBaseIdleSrc *base_src = GST_BASE_IDLE_SRC (src); + GstHarness *h = gst_harness_new_with_element (GST_ELEMENT (src), NULL, "src"); + GAsyncQueue *gate; + GThread *threads[N_PRODUCERS]; + SegmentRaceCtx ctx; + GstEvent *event; + guint i, segment_count = 0; + + gst_harness_set_sink_caps_str (h, "foo/bar"); + gst_harness_play (h); + + gate = g_async_queue_new (); + ctx.src = base_src; + ctx.start_gate = gate; + + for (i = 0; i < N_PRODUCERS; i++) { + gchar *name = g_strdup_printf ("producer-%u", i); + threads[i] = g_thread_new (name, _segment_race_producer, &ctx); + g_free (name); + } + + /* Release all producers at once. */ + for (i = 0; i < N_PRODUCERS; i++) + g_async_queue_push (gate, GINT_TO_POINTER (1)); + + for (i = 0; i < N_PRODUCERS; i++) + g_thread_join (threads[i]); + + /* Drain all buffers — this also flushes any pending events ahead of them + * through the worker, so by the time the loop exits every event the + * element ever queued has been delivered to the harness. */ + for (i = 0; i < N_PRODUCERS; i++) { + GstBuffer *buf = gst_harness_pull (h); + fail_unless (buf != NULL); + gst_buffer_unref (buf); + } + + /* Now count SEGMENT events. Must be exactly one. */ + while ((event = gst_harness_try_pull_event (h)) != NULL) { + if (GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT) + segment_count++; + gst_event_unref (event); } + fail_unless_equals_int (1, segment_count); + g_async_queue_unref (gate); gst_harness_teardown (h); + g_object_unref (src); +#undef N_PRODUCERS } GST_END_TEST; @@ -105,6 +837,23 @@ baseidlesrc_suite (void) suite_add_tcase (s, tc); tcase_add_test (tc, baseidlesrc_up_and_down); tcase_add_test (tc, baseidlesrc_submit_buffer); + tcase_add_test (tc, baseidlesrc_submit_buffer_list); + tcase_add_test (tc, baseidlesrc_handle_events); + tcase_add_test (tc, baseidlesrc_thread_pool_set_and_get); + tcase_add_test (tc, baseidlesrc_thread_pool_submit); + tcase_add_test (tc, baseidlesrc_default_thread_pool_is_prepared); + tcase_add_test (tc, baseidlesrc_replace_thread_pool_preserves_shared_pool); + tcase_add_test (tc, baseidlesrc_finalize_one_keeps_shared_pool_alive); + tcase_add_test (tc, baseidlesrc_submission_order_is_preserved); + tcase_add_test (tc, baseidlesrc_do_timestamp_on_buffer_list); + tcase_add_test (tc, baseidlesrc_set_format_rejects_invalid); + tcase_add_test (tc, baseidlesrc_submit_pull_loop_no_deadlock); + tcase_add_test (tc, baseidlesrc_submit_after_stop_is_safe); + tcase_add_test (tc, baseidlesrc_default_pool_is_cleaned_up_on_swap); + tcase_add_test (tc, baseidlesrc_stop_races_with_submit); + tcase_add_test (tc, baseidlesrc_failed_start_clears_running); + tcase_add_test (tc, baseidlesrc_set_thread_pool_rejected_when_playing); + tcase_add_test (tc, baseidlesrc_concurrent_submit_emits_one_segment); return s; }