|
OpenMAXBellagio 0.9.3
|
00001 00028 #include <omxcore.h> 00029 #include <omx_base_sink.h> 00030 00031 OMX_ERRORTYPE omx_base_sink_Constructor(OMX_COMPONENTTYPE *openmaxStandComp,OMX_STRING cComponentName) { 00032 OMX_ERRORTYPE err = OMX_ErrorNone; 00033 omx_base_sink_PrivateType* omx_base_sink_Private; 00034 00035 if (openmaxStandComp->pComponentPrivate) { 00036 omx_base_sink_Private = (omx_base_sink_PrivateType*)openmaxStandComp->pComponentPrivate; 00037 } else { 00038 omx_base_sink_Private = calloc(1,sizeof(omx_base_sink_PrivateType)); 00039 if (!omx_base_sink_Private) { 00040 return OMX_ErrorInsufficientResources; 00041 } 00042 } 00043 00044 // we could create our own port structures here 00045 // fixme maybe the base class could use a "port factory" function pointer? 00046 err = omx_base_component_Constructor(openmaxStandComp,cComponentName); 00047 00048 /* here we can override whatever defaults the base_component constructor set 00049 * e.g. we can override the function pointers in the private struct */ 00050 omx_base_sink_Private = openmaxStandComp->pComponentPrivate; 00051 00052 omx_base_sink_Private->BufferMgmtFunction = omx_base_sink_BufferMgmtFunction; 00053 00054 return err; 00055 } 00056 00057 OMX_ERRORTYPE omx_base_sink_Destructor(OMX_COMPONENTTYPE *openmaxStandComp) 00058 { 00059 return omx_base_component_Destructor(openmaxStandComp); 00060 } 00061 00067 void* omx_base_sink_BufferMgmtFunction (void* param) { 00068 OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param; 00069 omx_base_component_PrivateType* omx_base_component_Private = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate; 00070 omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private; 00071 omx_base_PortType *pInPort = (omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX]; 00072 tsem_t* pInputSem = pInPort->pBufferSem; 00073 queue_t* pInputQueue = pInPort->pBufferQueue; 00074 OMX_BUFFERHEADERTYPE* pInputBuffer = NULL; 00075 OMX_COMPONENTTYPE* target_component; 00076 OMX_BOOL isInputBufferNeeded = OMX_TRUE; 00077 int inBufExchanged = 0; 00078 00079 omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID = (long int)syscall(__NR_gettid); 00080 DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s the thread ID is %i\n", __func__, (int)omx_base_sink_Private->bellagioThreads->nThreadBufferMngtID); 00081 00082 DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__); 00083 while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting || omx_base_component_Private->state == OMX_StatePause || 00084 omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){ 00085 00086 /*Wait till the ports are being flushed*/ 00087 pthread_mutex_lock(&omx_base_sink_Private->flush_mutex); 00088 while( PORT_IS_BEING_FLUSHED(pInPort)) { 00089 pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex); 00090 00091 if(isInputBufferNeeded==OMX_FALSE) { 00092 pInPort->ReturnBufferFunction(pInPort,pInputBuffer); 00093 inBufExchanged--; 00094 pInputBuffer=NULL; 00095 isInputBufferNeeded=OMX_TRUE; 00096 DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning input buffer\n"); 00097 } 00098 DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__); 00099 00100 tsem_up(omx_base_sink_Private->flush_all_condition); 00101 tsem_down(omx_base_sink_Private->flush_condition); 00102 pthread_mutex_lock(&omx_base_sink_Private->flush_mutex); 00103 } 00104 pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex); 00105 00106 /*No buffer to process. So wait here*/ 00107 if((pInputSem->semval==0 && isInputBufferNeeded==OMX_TRUE ) && 00108 (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) { 00109 DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for input buffer \n"); 00110 tsem_down(omx_base_sink_Private->bMgmtSem); 00111 } 00112 00113 if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) { 00114 DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__); 00115 break; 00116 } 00117 00118 DEBUG(DEB_LEV_FULL_SEQ, "Waiting for input buffer semval=%d in %s\n",pInputSem->semval, __func__); 00119 if(pInputSem->semval>0 && isInputBufferNeeded==OMX_TRUE ) { 00120 tsem_down(pInputSem); 00121 if(pInputQueue->nelem>0){ 00122 inBufExchanged++; 00123 isInputBufferNeeded=OMX_FALSE; 00124 pInputBuffer = dequeue(pInputQueue); 00125 if(pInputBuffer == NULL){ 00126 DEBUG(DEB_LEV_ERR, "Had NULL input buffer!!\n"); 00127 break; 00128 } 00129 } 00130 } 00131 00132 if(isInputBufferNeeded==OMX_FALSE) { 00133 if((pInputBuffer->nFlags & OMX_BUFFERFLAG_EOS) ==OMX_BUFFERFLAG_EOS) { 00134 DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in input buffer\n"); 00135 00136 (*(omx_base_component_Private->callbacks->EventHandler)) 00137 (openmaxStandComp, 00138 omx_base_component_Private->callbackData, 00139 OMX_EventBufferFlag, /* The command was completed */ 00140 0, /* The commands was a OMX_CommandStateSet */ 00141 pInputBuffer->nFlags, /* The state has been changed in message->messageParam2 */ 00142 NULL); 00143 pInputBuffer->nFlags=0; 00144 } 00145 00146 target_component=(OMX_COMPONENTTYPE*)pInputBuffer->hMarkTargetComponent; 00147 if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) { 00148 /*Clear the mark and generate an event*/ 00149 (*(omx_base_component_Private->callbacks->EventHandler)) 00150 (openmaxStandComp, 00151 omx_base_component_Private->callbackData, 00152 OMX_EventMark, /* The command was completed */ 00153 1, /* The commands was a OMX_CommandStateSet */ 00154 0, /* The state has been changed in message->messageParam2 */ 00155 pInputBuffer->pMarkData); 00156 } else if(pInputBuffer->hMarkTargetComponent!=NULL){ 00157 /*If this is not the target component then pass the mark*/ 00158 DEBUG(DEB_LEV_FULL_SEQ, "Can't Pass Mark. This is a Sink!!\n"); 00159 } 00160 00161 if((omx_base_sink_Private->state == OMX_StateExecuting) || (omx_base_sink_Private->state == OMX_StateIdle)) { 00162 if ((omx_base_sink_Private->BufferMgmtCallback && pInputBuffer->nFilledLen > 0) 00163 || (pInputBuffer->nFlags)){ 00164 (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer); 00165 } 00166 else { 00167 /*If no buffer management call back the explicitly consume input buffer*/ 00168 pInputBuffer->nFilledLen = 0; 00169 } 00170 } else { 00171 DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%s) TrState (%s)\n", 00172 __func__, stateName(omx_base_sink_Private->state), 00173 transientStateName(omx_base_component_Private->transientState)); 00174 if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState || 00175 OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) { 00176 pInputBuffer->nFilledLen = 0; 00177 } 00178 } 00179 /*Input Buffer has been completely consumed. So, get new input buffer*/ 00180 00181 if(omx_base_sink_Private->state==OMX_StatePause && !PORT_IS_BEING_FLUSHED(pInPort)) { 00182 /*Waiting at paused state*/ 00183 tsem_wait(omx_base_sink_Private->bStateSem); 00184 } 00185 00186 /*Input Buffer has been completely consumed. So, return input buffer*/ 00187 if(pInputBuffer->nFilledLen==0) { 00188 pInPort->ReturnBufferFunction(pInPort,pInputBuffer); 00189 inBufExchanged--; 00190 pInputBuffer=NULL; 00191 isInputBufferNeeded = OMX_TRUE; 00192 } 00193 00194 } 00195 } 00196 DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n"); 00197 return NULL; 00198 } 00199 00206 void* omx_base_sink_twoport_BufferMgmtFunction (void* param) { 00207 OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param; 00208 omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate; 00209 omx_base_sink_PrivateType* omx_base_sink_Private = (omx_base_sink_PrivateType*)omx_base_component_Private; 00210 omx_base_PortType *pInPort[2]; 00211 tsem_t* pInputSem[2]; 00212 queue_t* pInputQueue[2]; 00213 OMX_BUFFERHEADERTYPE* pInputBuffer[2]; 00214 OMX_COMPONENTTYPE* target_component; 00215 OMX_BOOL isInputBufferNeeded[2]; 00216 int i,outBufExchanged[2]; 00217 00218 pInPort[0]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX]; 00219 pInPort[1]=(omx_base_PortType *)omx_base_sink_Private->ports[OMX_BASE_SINK_INPUTPORT_INDEX_1]; 00220 pInputSem[0] = pInPort[0]->pBufferSem; 00221 pInputSem[1] = pInPort[1]->pBufferSem; 00222 pInputQueue[0] = pInPort[0]->pBufferQueue; 00223 pInputQueue[1] = pInPort[1]->pBufferQueue; 00224 pInputBuffer[1]= pInputBuffer[0]=NULL; 00225 isInputBufferNeeded[0]=isInputBufferNeeded[1]=OMX_TRUE; 00226 outBufExchanged[0]=outBufExchanged[1]=0; 00227 00228 DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__); 00229 while(omx_base_sink_Private->state == OMX_StateIdle || omx_base_sink_Private->state == OMX_StateExecuting || omx_base_sink_Private->state == OMX_StatePause || 00230 omx_base_sink_Private->transientState == OMX_TransStateLoadedToIdle){ 00231 00232 /*Wait till the ports are being flushed*/ 00233 pthread_mutex_lock(&omx_base_sink_Private->flush_mutex); 00234 while( PORT_IS_BEING_FLUSHED(pInPort[0]) || 00235 PORT_IS_BEING_FLUSHED(pInPort[1])) { 00236 pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex); 00237 00238 DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n", 00239 __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval); 00240 00241 if(isInputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[1])) { 00242 pInPort[1]->ReturnBufferFunction(pInPort[1],pInputBuffer[1]); 00243 outBufExchanged[1]--; 00244 pInputBuffer[1]=NULL; 00245 isInputBufferNeeded[1]=OMX_TRUE; 00246 DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 1 buffer\n"); 00247 } 00248 00249 if(isInputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pInPort[0])) { 00250 pInPort[0]->ReturnBufferFunction(pInPort[0],pInputBuffer[0]); 00251 outBufExchanged[0]--; 00252 pInputBuffer[0]=NULL; 00253 isInputBufferNeeded[0]=OMX_TRUE; 00254 DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning Input 0 buffer\n"); 00255 } 00256 00257 DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n", 00258 __func__,outBufExchanged[0],isInputBufferNeeded[0],outBufExchanged[1],isInputBufferNeeded[1],pInputSem[0]->semval,pInputSem[1]->semval); 00259 00260 tsem_up(omx_base_sink_Private->flush_all_condition); 00261 tsem_down(omx_base_sink_Private->flush_condition); 00262 pthread_mutex_lock(&omx_base_sink_Private->flush_mutex); 00263 } 00264 pthread_mutex_unlock(&omx_base_sink_Private->flush_mutex); 00265 00266 /*No buffer to process. So wait here*/ 00267 if((isInputBufferNeeded[0]==OMX_TRUE && pInputSem[0]->semval==0) && 00268 (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid)) { 00269 //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else 00270 DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 0\n"); 00271 tsem_down(omx_base_sink_Private->bMgmtSem); 00272 00273 } 00274 if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) { 00275 DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__); 00276 break; 00277 } 00278 if((isInputBufferNeeded[1]==OMX_TRUE && pInputSem[1]->semval==0) && 00279 (omx_base_sink_Private->state != OMX_StateLoaded && omx_base_sink_Private->state != OMX_StateInvalid) && 00280 !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) { 00281 //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else 00282 DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next Input buffer 1\n"); 00283 tsem_down(omx_base_sink_Private->bMgmtSem); 00284 00285 } 00286 if(omx_base_sink_Private->state == OMX_StateLoaded || omx_base_sink_Private->state == OMX_StateInvalid) { 00287 DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__); 00288 break; 00289 } 00290 00291 DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for Input buffer 0 semval=%d \n",pInputSem[0]->semval); 00292 if(pInputSem[0]->semval>0 && isInputBufferNeeded[0]==OMX_TRUE ) { 00293 tsem_down(pInputSem[0]); 00294 if(pInputQueue[0]->nelem>0){ 00295 outBufExchanged[0]++; 00296 isInputBufferNeeded[0]=OMX_FALSE; 00297 pInputBuffer[0] = dequeue(pInputQueue[0]); 00298 if(pInputBuffer[0] == NULL){ 00299 DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!!\n"); 00300 break; 00301 } 00302 } 00303 } 00304 /*When we have input buffer to process then get one Input buffer*/ 00305 if(pInputSem[1]->semval>0 && isInputBufferNeeded[1]==OMX_TRUE) { 00306 tsem_down(pInputSem[1]); 00307 DEBUG(DEB_LEV_FULL_SEQ, "Wait over for Input buffer 1 semval=%d \n",pInputSem[1]->semval); 00308 if(pInputQueue[1]->nelem>0){ 00309 outBufExchanged[1]++; 00310 isInputBufferNeeded[1]=OMX_FALSE; 00311 pInputBuffer[1] = dequeue(pInputQueue[1]); 00312 if(pInputBuffer[1] == NULL){ 00313 DEBUG(DEB_LEV_ERR, "Had NULL Input buffer!! op is=%d,iq=%d\n",pInputSem[1]->semval,pInputQueue[1]->nelem); 00314 break; 00315 } 00316 } 00317 } 00318 00319 for(i=0;i < (omx_base_component_Private->sPortTypesParam[OMX_PortDomainAudio].nPorts + 00320 omx_base_component_Private->sPortTypesParam[OMX_PortDomainVideo].nPorts + 00321 omx_base_component_Private->sPortTypesParam[OMX_PortDomainImage].nPorts + 00322 omx_base_component_Private->sPortTypesParam[OMX_PortDomainOther].nPorts);i++) { 00323 00324 if(omx_base_sink_Private->ports[i]->sPortParam.eDomain != OMX_PortDomainOther){ /* clock ports are not to be processed */ 00325 /*Process Input buffer of Port i */ 00326 if(isInputBufferNeeded[i]==OMX_FALSE) { 00327 00328 /*Pass the Mark to all outgoing buffers*/ 00329 if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){ 00330 pInputBuffer[i]->hMarkTargetComponent = omx_base_sink_Private->pMark.hMarkTargetComponent; 00331 pInputBuffer[i]->pMarkData = omx_base_sink_Private->pMark.pMarkData; 00332 } 00333 00334 target_component=(OMX_COMPONENTTYPE*)pInputBuffer[i]->hMarkTargetComponent; 00335 if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) { 00336 /*Clear the mark and generate an event*/ 00337 (*(omx_base_sink_Private->callbacks->EventHandler)) 00338 (openmaxStandComp, 00339 omx_base_sink_Private->callbackData, 00340 OMX_EventMark, /* The command was completed */ 00341 1, /* The commands was a OMX_CommandStateSet */ 00342 i, /* The state has been changed in message->messageParam2 */ 00343 pInputBuffer[i]->pMarkData); 00344 } else if(pInputBuffer[i]->hMarkTargetComponent!=NULL){ 00345 /*If this is not the target component then pass the mark*/ 00346 //pInputBuffer[i]->pMarkData=NULL; 00347 DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n"); 00348 } 00349 00350 if(omx_base_sink_Private->state == OMX_StateExecuting) { 00351 if (omx_base_sink_Private->BufferMgmtCallback && pInputBuffer[i]->nFilledLen > 0) { 00352 (*(omx_base_sink_Private->BufferMgmtCallback))(openmaxStandComp, pInputBuffer[i]); 00353 } else { 00354 /*If no buffer management call back then don't produce any Input buffer*/ 00355 pInputBuffer[i]->nFilledLen = 0; 00356 } 00357 } else { 00358 DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_sink_Private->state); 00359 00360 if(OMX_TransStateExecutingToIdle == omx_base_component_Private->transientState || 00361 OMX_TransStatePauseToIdle == omx_base_component_Private->transientState) { 00362 pInputBuffer[i]->nFilledLen = 0; 00363 } 00364 } 00365 00366 if((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS && pInputBuffer[i]->nFilledLen==0) { 00367 DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pInputBuffer[i]->nFilledLen); 00368 (*(omx_base_sink_Private->callbacks->EventHandler)) 00369 (openmaxStandComp, 00370 omx_base_sink_Private->callbackData, 00371 OMX_EventBufferFlag, /* The command was completed */ 00372 i, /* The commands was a OMX_CommandStateSet */ 00373 pInputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */ 00374 NULL); 00375 } 00376 if(omx_base_sink_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pInPort[0]) || PORT_IS_BEING_FLUSHED(pInPort[1]))) { 00377 /*Waiting at paused state*/ 00378 tsem_wait(omx_base_component_Private->bStateSem); 00379 } 00380 00381 /*Input Buffer has been produced or EOS. So, return Input buffer and get new buffer*/ 00382 if(pInputBuffer[i]->nFilledLen ==0 || ((pInputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS)){ 00383 pInPort[i]->ReturnBufferFunction(pInPort[i],pInputBuffer[i]); 00384 outBufExchanged[i]--; 00385 pInputBuffer[i]=NULL; 00386 isInputBufferNeeded[i]=OMX_TRUE; 00387 } 00388 } 00389 } 00390 } 00391 00392 /*Clear the Mark*/ 00393 if(omx_base_sink_Private->pMark.hMarkTargetComponent != NULL){ 00394 omx_base_sink_Private->pMark.hMarkTargetComponent = NULL; 00395 omx_base_sink_Private->pMark.pMarkData = NULL; 00396 } 00397 } 00398 DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n"); 00399 return NULL; 00400 }