OpenMAXBellagio 0.9.3
omx_base_source.c
Go to the documentation of this file.
00001 
00029 #include <omxcore.h>
00030 #include <omx_base_source.h>
00031 
00032 OMX_ERRORTYPE omx_base_source_Constructor(OMX_COMPONENTTYPE *openmaxStandComp, OMX_STRING cComponentName) {
00033 
00034   OMX_ERRORTYPE err = OMX_ErrorNone;
00035   omx_base_source_PrivateType* omx_base_source_Private;
00036 
00037   if (openmaxStandComp->pComponentPrivate) {
00038     omx_base_source_Private = (omx_base_source_PrivateType*)openmaxStandComp->pComponentPrivate;
00039   } else {
00040     omx_base_source_Private = calloc(1,sizeof(omx_base_source_PrivateType));
00041     if (!omx_base_source_Private) {
00042       return OMX_ErrorInsufficientResources;
00043     }
00044   }
00045 
00046   // we could create our own port structures here
00047   // fixme maybe the base class could use a "port factory" function pointer?
00048   err = omx_base_component_Constructor(openmaxStandComp, cComponentName);
00049 
00050   /* here we can override whatever defaults the base_component constructor set
00051   * e.g. we can override the function pointers in the private struct  */
00052   omx_base_source_Private = openmaxStandComp->pComponentPrivate;
00053   omx_base_source_Private->BufferMgmtFunction = omx_base_source_BufferMgmtFunction;
00054 
00055   return err;
00056 }
00057 
00058 OMX_ERRORTYPE omx_base_source_Destructor(OMX_COMPONENTTYPE *openmaxStandComp) {
00059 
00060   return omx_base_component_Destructor(openmaxStandComp);
00061 }
00062 
00068 void* omx_base_source_BufferMgmtFunction (void* param) {
00069 
00070   OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00071   omx_base_component_PrivateType* omx_base_component_Private = (omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00072   omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
00073   omx_base_PortType *pOutPort = (omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
00074   tsem_t* pOutputSem = pOutPort->pBufferSem;
00075   queue_t* pOutputQueue = pOutPort->pBufferQueue;
00076   OMX_BUFFERHEADERTYPE* pOutputBuffer = NULL;
00077   OMX_COMPONENTTYPE* target_component;
00078   OMX_BOOL isOutputBufferNeeded = OMX_TRUE;
00079   int outBufExchanged = 0;
00080 
00081   omx_base_source_Private->bellagioThreads->nThreadBufferMngtID = (long int)syscall(__NR_gettid);
00082   DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s the thread ID is %i\n", __func__, (int)omx_base_source_Private->bellagioThreads->nThreadBufferMngtID);
00083 
00084   DEBUG(DEB_LEV_FUNCTION_NAME, "In %s \n", __func__);
00085   while(omx_base_component_Private->state == OMX_StateIdle || omx_base_component_Private->state == OMX_StateExecuting ||
00086     omx_base_component_Private->state == OMX_StatePause || omx_base_component_Private->transientState == OMX_TransStateLoadedToIdle){
00087 
00088     /*Wait till the ports are being flushed*/
00089     pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00090     while( PORT_IS_BEING_FLUSHED(pOutPort)) {
00091       pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00092 
00093       if(isOutputBufferNeeded == OMX_FALSE) {
00094         pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
00095         outBufExchanged--;
00096         pOutputBuffer = NULL;
00097         isOutputBufferNeeded = OMX_TRUE;
00098         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output buffer\n");
00099       }
00100       DEBUG(DEB_LEV_FULL_SEQ, "In %s signalling flush all condition \n", __func__);
00101 
00102       tsem_up(omx_base_source_Private->flush_all_condition);
00103       tsem_down(omx_base_source_Private->flush_condition);
00104       pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00105     }
00106     pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00107 
00108     /*No buffer to process. So wait here*/
00109     if((isOutputBufferNeeded==OMX_TRUE && pOutputSem->semval==0) &&
00110       (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
00111       DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer \n");
00112       tsem_down(omx_base_source_Private->bMgmtSem);
00113     }
00114 
00115     if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
00116       DEBUG(DEB_LEV_FULL_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00117       break;
00118     }
00119 
00120     DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer semval=%d \n",pOutputSem->semval);
00121     if(pOutputSem->semval > 0 && isOutputBufferNeeded == OMX_TRUE ) {
00122       tsem_down(pOutputSem);
00123       if(pOutputQueue->nelem>0){
00124         outBufExchanged++;
00125         isOutputBufferNeeded = OMX_FALSE;
00126         pOutputBuffer = dequeue(pOutputQueue);
00127         if(pOutputBuffer == NULL){
00128           DEBUG(DEB_LEV_ERR, "In %s Had NULL output buffer!!\n",__func__);
00129           break;
00130         }
00131       }
00132     }
00133 
00134     if(isOutputBufferNeeded == OMX_FALSE) {
00135       if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS) {
00136         pOutputBuffer->nFlags = 0;
00137       }
00138 
00139       if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
00140         pOutputBuffer->hMarkTargetComponent = omx_base_source_Private->pMark.hMarkTargetComponent;
00141         pOutputBuffer->pMarkData            = omx_base_source_Private->pMark.pMarkData;
00142         omx_base_source_Private->pMark.hMarkTargetComponent = NULL;
00143         omx_base_source_Private->pMark.pMarkData            = NULL;
00144       }
00145 
00146       target_component = (OMX_COMPONENTTYPE*)pOutputBuffer->hMarkTargetComponent;
00147       if(target_component == (OMX_COMPONENTTYPE *)openmaxStandComp) {
00148         /*Clear the mark and generate an event*/
00149         (*(omx_base_component_Private->callbacks->EventHandler))
00150           (openmaxStandComp,
00151           omx_base_component_Private->callbackData,
00152           OMX_EventMark, /* The command was completed */
00153           1, /* The commands was a OMX_CommandStateSet */
00154           0, /* The state has been changed in message->messageParam2 */
00155           pOutputBuffer->pMarkData);
00156       } else if(pOutputBuffer->hMarkTargetComponent != NULL) {
00157         /*If this is not the target component then pass the mark*/
00158         DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
00159       }
00160 
00161       if(omx_base_source_Private->state == OMX_StateExecuting)  {
00162         if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer->nFilledLen == 0) {
00163           (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer);
00164         } else {
00165           /*It no buffer management call back then don't produce any output buffer*/
00166           pOutputBuffer->nFilledLen = 0;
00167         }
00168       } else {
00169         DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_source_Private->state);
00170       }
00171       if(omx_base_source_Private->state == OMX_StatePause && !PORT_IS_BEING_FLUSHED(pOutPort)) {
00172         /*Waiting at paused state*/
00173         tsem_wait(omx_base_source_Private->bStateSem);
00174       }
00175 
00176       if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS) {
00177         DEBUG(DEB_LEV_SIMPLE_SEQ, "Detected EOS flags in output buffer\n");
00178 
00179         (*(omx_base_component_Private->callbacks->EventHandler))
00180           (openmaxStandComp,
00181           omx_base_component_Private->callbackData,
00182           OMX_EventBufferFlag, /* The command was completed */
00183           0, /* The commands was a OMX_CommandStateSet */
00184           pOutputBuffer->nFlags, /* The state has been changed in message->messageParam2 */
00185           NULL);
00186         //pOutputBuffer->nFlags = 0;
00187         omx_base_source_Private->bIsEOSReached = OMX_TRUE;
00188       }
00189 
00190       /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
00191       if((pOutputBuffer->nFilledLen != 0) || ((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) ==OMX_BUFFERFLAG_EOS) || (omx_base_source_Private->bIsEOSReached == OMX_TRUE)) {
00192           if((pOutputBuffer->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS)
00193               DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s nFlags=%x Name=%s \n", __func__, (int)pOutputBuffer->nFlags, omx_base_source_Private->name);
00194           pOutPort->ReturnBufferFunction(pOutPort, pOutputBuffer);
00195           outBufExchanged--;
00196           pOutputBuffer = NULL;
00197           isOutputBufferNeeded = OMX_TRUE;
00198       }
00199     }
00200   }
00201   DEBUG(DEB_LEV_SIMPLE_SEQ, "Exiting Buffer Management Thread\n");
00202   return NULL;
00203 }
00204 
00211 void* omx_base_source_twoport_BufferMgmtFunction (void* param) {
00212   OMX_COMPONENTTYPE* openmaxStandComp = (OMX_COMPONENTTYPE*)param;
00213   omx_base_component_PrivateType* omx_base_component_Private=(omx_base_component_PrivateType*)openmaxStandComp->pComponentPrivate;
00214   omx_base_source_PrivateType* omx_base_source_Private = (omx_base_source_PrivateType*)omx_base_component_Private;
00215   omx_base_PortType *pOutPort[2];
00216   tsem_t* pOutputSem[2];
00217   queue_t* pOutputQueue[2];
00218   OMX_BUFFERHEADERTYPE* pOutputBuffer[2];
00219   OMX_COMPONENTTYPE* target_component;
00220   OMX_BOOL isOutputBufferNeeded[2];
00221   int i,outBufExchanged[2];
00222 
00223   pOutPort[0]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX];
00224   pOutPort[1]=(omx_base_PortType *)omx_base_source_Private->ports[OMX_BASE_SOURCE_OUTPUTPORT_INDEX_1];
00225   pOutputSem[0] = pOutPort[0]->pBufferSem;
00226   pOutputSem[1] = pOutPort[1]->pBufferSem;
00227   pOutputQueue[0] = pOutPort[0]->pBufferQueue;
00228   pOutputQueue[1] = pOutPort[1]->pBufferQueue;
00229   pOutputBuffer[1]= pOutputBuffer[0]=NULL;
00230   isOutputBufferNeeded[0]=isOutputBufferNeeded[1]=OMX_TRUE;
00231   outBufExchanged[0]=outBufExchanged[1]=0;
00232 
00233   DEBUG(DEB_LEV_FUNCTION_NAME, "In %s\n", __func__);
00234   while(omx_base_source_Private->state == OMX_StateIdle || omx_base_source_Private->state == OMX_StateExecuting ||  omx_base_source_Private->state == OMX_StatePause ||
00235     omx_base_source_Private->transientState == OMX_TransStateLoadedToIdle){
00236 
00237     /*Wait till the ports are being flushed*/
00238     pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00239     while( PORT_IS_BEING_FLUSHED(pOutPort[0]) ||
00240            PORT_IS_BEING_FLUSHED(pOutPort[1])) {
00241       pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00242 
00243       DEBUG(DEB_LEV_FULL_SEQ, "In %s 1 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
00244         __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
00245 
00246       if(isOutputBufferNeeded[1]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[1])) {
00247         pOutPort[1]->ReturnBufferFunction(pOutPort[1],pOutputBuffer[1]);
00248         outBufExchanged[1]--;
00249         pOutputBuffer[1]=NULL;
00250         isOutputBufferNeeded[1]=OMX_TRUE;
00251         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 1 buffer\n");
00252       }
00253 
00254       if(isOutputBufferNeeded[0]==OMX_FALSE && PORT_IS_BEING_FLUSHED(pOutPort[0])) {
00255         pOutPort[0]->ReturnBufferFunction(pOutPort[0],pOutputBuffer[0]);
00256         outBufExchanged[0]--;
00257         pOutputBuffer[0]=NULL;
00258         isOutputBufferNeeded[0]=OMX_TRUE;
00259         DEBUG(DEB_LEV_FULL_SEQ, "Ports are flushing,so returning output 0 buffer\n");
00260       }
00261 
00262       DEBUG(DEB_LEV_FULL_SEQ, "In %s 2 signalling flush all cond iE=%d,iF=%d,oE=%d,oF=%d iSemVal=%d,oSemval=%d\n",
00263         __func__,outBufExchanged[0],isOutputBufferNeeded[0],outBufExchanged[1],isOutputBufferNeeded[1],pOutputSem[0]->semval,pOutputSem[1]->semval);
00264 
00265       tsem_up(omx_base_source_Private->flush_all_condition);
00266       tsem_down(omx_base_source_Private->flush_condition);
00267       pthread_mutex_lock(&omx_base_source_Private->flush_mutex);
00268     }
00269     pthread_mutex_unlock(&omx_base_source_Private->flush_mutex);
00270 
00271     /*No buffer to process. So wait here*/
00272     if((isOutputBufferNeeded[0]==OMX_TRUE && pOutputSem[0]->semval==0) &&
00273       (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid)) {
00274       //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
00275       DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 0\n");
00276       tsem_down(omx_base_source_Private->bMgmtSem);
00277 
00278     }
00279     if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
00280       DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00281       break;
00282     }
00283     if((isOutputBufferNeeded[1]==OMX_TRUE && pOutputSem[1]->semval==0) &&
00284       (omx_base_source_Private->state != OMX_StateLoaded && omx_base_source_Private->state != OMX_StateInvalid) &&
00285        !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
00286       //Signalled from EmptyThisBuffer or FillThisBuffer or some thing else
00287       DEBUG(DEB_LEV_FULL_SEQ, "Waiting for next output buffer 1\n");
00288       tsem_down(omx_base_source_Private->bMgmtSem);
00289 
00290     }
00291     if(omx_base_source_Private->state == OMX_StateLoaded || omx_base_source_Private->state == OMX_StateInvalid) {
00292       DEBUG(DEB_LEV_SIMPLE_SEQ, "In %s Buffer Management Thread is exiting\n",__func__);
00293       break;
00294     }
00295 
00296     DEBUG(DEB_LEV_SIMPLE_SEQ, "Waiting for output buffer 0 semval=%d \n",pOutputSem[0]->semval);
00297     if(pOutputSem[0]->semval>0 && isOutputBufferNeeded[0]==OMX_TRUE ) {
00298       tsem_down(pOutputSem[0]);
00299       if(pOutputQueue[0]->nelem>0){
00300         outBufExchanged[0]++;
00301         isOutputBufferNeeded[0]=OMX_FALSE;
00302         pOutputBuffer[0] = dequeue(pOutputQueue[0]);
00303         if(pOutputBuffer[0] == NULL){
00304           DEBUG(DEB_LEV_ERR, "Had NULL output buffer!!\n");
00305           break;
00306         }
00307       }
00308     }
00309     /*When we have input buffer to process then get one output buffer*/
00310     if(pOutputSem[1]->semval>0 && isOutputBufferNeeded[1]==OMX_TRUE) {
00311       tsem_down(pOutputSem[1]);
00312       if(pOutputQueue[1]->nelem>0){
00313         outBufExchanged[1]++;
00314         isOutputBufferNeeded[1]=OMX_FALSE;
00315         pOutputBuffer[1] = dequeue(pOutputQueue[1]);
00316         if(pOutputBuffer[1] == NULL){
00317           DEBUG(DEB_LEV_ERR, "Had NULL output buffer!! op is=%d,iq=%d\n",pOutputSem[1]->semval,pOutputQueue[1]->nelem);
00318           break;
00319         }
00320       }
00321     }
00322 
00323     for(i=0;i < (omx_base_component_Private->sPortTypesParam[OMX_PortDomainAudio].nPorts  +
00324                  omx_base_component_Private->sPortTypesParam[OMX_PortDomainVideo].nPorts +
00325                  omx_base_component_Private->sPortTypesParam[OMX_PortDomainImage].nPorts +
00326                  omx_base_component_Private->sPortTypesParam[OMX_PortDomainOther].nPorts -1);i++) {
00327 
00328       if(omx_base_source_Private->ports[i]->sPortParam.eDomain!=OMX_PortDomainOther){ /* clock ports are not to be processed */
00329         /*Process Output buffer of Port i */
00330         if(isOutputBufferNeeded[i]==OMX_FALSE) {
00331 
00332           /*Pass the Mark to all outgoing buffers*/
00333           if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
00334             pOutputBuffer[i]->hMarkTargetComponent = omx_base_source_Private->pMark.hMarkTargetComponent;
00335             pOutputBuffer[i]->pMarkData            = omx_base_source_Private->pMark.pMarkData;
00336           }
00337 
00338           target_component=(OMX_COMPONENTTYPE*)pOutputBuffer[i]->hMarkTargetComponent;
00339           if(target_component==(OMX_COMPONENTTYPE *)openmaxStandComp) {
00340             /*Clear the mark and generate an event*/
00341             (*(omx_base_source_Private->callbacks->EventHandler))
00342               (openmaxStandComp,
00343               omx_base_source_Private->callbackData,
00344               OMX_EventMark, /* The command was completed */
00345               1, /* The commands was a OMX_CommandStateSet */
00346               i, /* The state has been changed in message->messageParam2 */
00347               pOutputBuffer[i]->pMarkData);
00348           } else if(pOutputBuffer[i]->hMarkTargetComponent!=NULL){
00349             /*If this is not the target component then pass the mark*/
00350             DEBUG(DEB_LEV_FULL_SEQ, "Pass Mark. This is a Source!!\n");
00351           }
00352 
00353           if(omx_base_source_Private->state == OMX_StateExecuting)  {
00354             if (omx_base_source_Private->BufferMgmtCallback && pOutputBuffer[i]->nFilledLen == 0) {
00355               (*(omx_base_source_Private->BufferMgmtCallback))(openmaxStandComp, pOutputBuffer[i]);
00356             } else {
00357               /*If no buffer management call back then don't produce any output buffer*/
00358               pOutputBuffer[i]->nFilledLen = 0;
00359             }
00360           } else {
00361             DEBUG(DEB_LEV_ERR, "In %s Received Buffer in non-Executing State(%x)\n", __func__, (int)omx_base_source_Private->state);
00362           }
00363 
00364           if((pOutputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS && pOutputBuffer[i]->nFilledLen==0) {
00365             DEBUG(DEB_LEV_FULL_SEQ, "Detected EOS flags in input buffer filled len=%d\n", (int)pOutputBuffer[i]->nFilledLen);
00366             (*(omx_base_source_Private->callbacks->EventHandler))
00367               (openmaxStandComp,
00368               omx_base_source_Private->callbackData,
00369               OMX_EventBufferFlag, /* The command was completed */
00370               i, /* The commands was a OMX_CommandStateSet */
00371               pOutputBuffer[i]->nFlags, /* The state has been changed in message->messageParam2 */
00372               NULL);
00373           }
00374           if(omx_base_source_Private->state==OMX_StatePause && !(PORT_IS_BEING_FLUSHED(pOutPort[0]) || PORT_IS_BEING_FLUSHED(pOutPort[1]))) {
00375             /*Waiting at paused state*/
00376             tsem_wait(omx_base_component_Private->bStateSem);
00377           }
00378 
00379            /*Output Buffer has been produced or EOS. So, return output buffer and get new buffer*/
00380           if(pOutputBuffer[i]->nFilledLen!=0 || (pOutputBuffer[i]->nFlags & OMX_BUFFERFLAG_EOS) == OMX_BUFFERFLAG_EOS){
00381             pOutPort[i]->ReturnBufferFunction(pOutPort[i],pOutputBuffer[i]);
00382             outBufExchanged[i]--;
00383             pOutputBuffer[i]=NULL;
00384             isOutputBufferNeeded[i]=OMX_TRUE;
00385           }
00386         }
00387       }
00388     }
00389 
00390     /*Clear the Mark*/
00391     if(omx_base_source_Private->pMark.hMarkTargetComponent != NULL){
00392       omx_base_source_Private->pMark.hMarkTargetComponent = NULL;
00393       omx_base_source_Private->pMark.pMarkData            = NULL;
00394     }
00395   }
00396   DEBUG(DEB_LEV_SIMPLE_SEQ,"Exiting Buffer Management Thread\n");
00397   return NULL;
00398 }