//-------------------------------------------------------------------------- // File and Version Information: // $Id: APPLevel3SenderOutputModule.cc,v 1.56 2003/07/24 12:49:21 ksmcf Exp $ // // Description: // Class APPSenderOutputModule. This is an online output module // for the AC++ framework. It prepares events for sending to either // Level3 or the CSL in the online system // // Environment: // Software developed for CDF II. // // Author List: // Kevin McFarland Original Author // //------------------------------------------------------------------------ #include #include using std::ostringstream; using std::dec; using std::setw; using std::setfill; using std::resetiosflags; using std::hex; using std::cout; using std::endl; //----------------------- // This Class's Header -- //----------------------- #include "Level3Mods/APPLevel3SenderOutputModule.hh" //------------------------------- // Collaborating Class Headers -- //------------------------------- #include "Framework/APPJob.hh" #include "Framework/AppDeferredStream.hh" #include "Framework/AppDeferredOutCommand.hh" class AbsEvent; #include "Framework/APPList.hh" #include "Framework/APPListIterator.hh" #include "Framework/APPFramework.hh" #include "Framework/APPPath.hh" #include "ErrorLogger/ELadministrator.h" #include "Edm/ConstEventIter.hh" #include "Edm/ConstHandle.hh" // for EDM2 Format X conversion #include "TROOT.h" #include "Edm/EventRecord.hh" //----------------------------------------------------------------------- // Local Macros, Typedefs, Structures, Unions and Forward Declarations -- //----------------------------------------------------------------------- static const char rcsid[] = "$Id: APPLevel3SenderOutputModule.cc,v 1.56 2003/07/24 12:49:21 ksmcf Exp $"; //---------------- // Constructors -- //---------------- APPLevel3SenderOutputModule ::APPLevel3SenderOutputModule(int headerWords, const char* const theName, const char* const theDescription, const int4 exeTag) : APPLevel3GenericOutputModule ( theName, theDescription, exeTag ), _isSilent ( "isSilent",this,false ), _onlineBufferHexDump ( "onlineBufferHexDump",this,false ), _representationConversion ( "representationConversion",this,true ), _offsetLevel3PointerSpace ( "offsetLevel3PointerSpace",this,true), _maximumOnlineBufferSize ( "maximumOnlineBufferSize",this,524288), _onlineBuffer (NULL), _headerWords ( headerWords ) { // Define parameters for talk-to // ----------------------------- _representationConversion.addDescription(" \tIf true, convert data representation; else just swap bytes -- OLDEDM (Trybos) OUTPUT ONLY (default true)"); _offsetLevel3PointerSpace.addDescription(" \tOffset event buffer in message sent to Consumer-Server/Logger with Level-3 pointer space (default TRUE -- disabling this is for EXPERTS ONLY!)"); _isSilent.addDescription(" \tRun in quiet mode (default false)"); _onlineBufferHexDump.addDescription(" \tPrint an event-by-event online buffer hex dump of flat event. COPIOUS OUTPUT, DUDE! (default false)"); _maximumOnlineBufferSize.addDescription( " \tThe maximum online buffer size in long words; note that the online system can specify a higher number which will then be used (default is 262144 (1MB))"); commands()->append(&_isSilent); commands()->append(&_onlineBufferHexDump); commands()->append(&_representationConversion); commands()->append(&_offsetLevel3PointerSpace); commands()->append(&_maximumOnlineBufferSize); // // establish output commands (note "Deferred", meaning not written to a file) // commands( )->append( _outputCmd = new AppDeferredOutCommand( "output", this ) ); commands( )->append( _outCmd = new AppDeferredOutCommand( "out" , this ) ); } //-------------- // Destructor -- //-------------- APPLevel3SenderOutputModule::~APPLevel3SenderOutputModule( ) { delete _outputCmd; delete _outCmd; // delete _streams; } //-------------- // Operations -- //-------------- AppResult APPLevel3SenderOutputModule::beginJob( AbsEvent* aJob ) { // delay this to "just in time" creation to avoid problem with // buffer not being able to ask the interface how big it should // be // // createOnlineBuffer(); return AppResult::OK; } void APPLevel3SenderOutputModule::createOnlineBuffer() { errlog.setSubroutine("APPLevel3SenderOutputModule::createOnlineBuffer"); if (_onlineBuffer != NULL) { errlog(ELsevere, "Attempting to remake online buffer when one exists!") << endmsg; return; } // // Establish an Online Buffer // _onlineBufferSize = _maximumOnlineBufferSize.value(); if ( externalMaximumBufferSize() > _onlineBufferSize ) _onlineBufferSize = externalMaximumBufferSize(); _onlineBuffer = new int4[_onlineBufferSize]; size_t n_buffer_bytes = (_onlineBufferSize)*sizeof(int4); memset(_onlineBuffer, 0, n_buffer_bytes) ; // for the sake of purify // How much of this buffer gets used for headers? int eventBufferOffset = L3_CS_POINTER_SPACE; if ( ! _offsetLevel3PointerSpace.value() ) eventBufferOffset = 0; int headerBytes = sizeof(int4)*(_headerWords+eventBufferOffset); errlog(ELinfo, "Establishing buffer, ") << " header consists of headerWords, eventBufferOffset = " << _headerWords << ", " << eventBufferOffset << endmsg; #if ROOT_VERSION_CODE >= ROOT_VERSION(3,02,6) // We must pass a pointer to begin of memory allocation to TBuffer(). // TBuffer does not take ownership of the memory block, and will not delete it // in TBuffer destructor. _p_buffer = new TBuffer(TBuffer::kWrite, (n_buffer_bytes-headerBytes), (char *) &_onlineBuffer[_headerWords +eventBufferOffset], kFALSE) ; // TBuffer permits "ownership" to be specified in its c-tor in root v3.02.06. // This means DetachBuffer() is no longer necessary. // Rob Kennedy 09-01-02 #else // OLD APPROACH (root < 3.02.06), requires DetachBuffer() call later // We must pass a pointer to begin of memory allocation to TBuffer(). // TBuffer then assumes ownership of the memory block, and will delete it // in TBuffer destructor. _p_buffer = new TBuffer(TBuffer::kWrite, (n_buffer_bytes-headerBytes), (char *) &_onlineBuffer[_headerWords +eventBufferOffset]) ; // We are intentionally mis-quoting the "begin of buffer" address to TBuffer // to avoid an event copy. Since TBuffer will try to delete memory using the // the address we pass it, we *must* be sure to "detach" this buffer from // TBuffer before we allow TBuffer's destructor to be called. // Rob Kennedy 06-01-00 #endif // ROOT_VERSION_CODE } AppResult APPLevel3SenderOutputModule::outputEvent( AbsEvent*& anEvent , const AppStopType& theDispatchStopType) { if ( _onlineBuffer == NULL ) createOnlineBuffer(); // check for repeated BOR/EOR events if ( theDispatchStopType == AppStopType::begin_run ) { if ( _sentBeginRun ) { errlog.setSubroutine("APPLevel3SenderOutputModule::outputEvent"); errlog(ELinfo, "suppressing multiple BOR event") << endmsg; return AppResult::OK; // don't send repeated BOR events } else { _sentBeginRun = true; _sentEndRun = false; errlog.setSubroutine("APPLevel3SenderOutputModule::outputEvent"); errlog(ELinfo, "sending BOR event") << endmsg; } } if ( theDispatchStopType == AppStopType::end_run ) { if ( _sentEndRun ) { errlog.setSubroutine("APPLevel3SenderOutputModule::outputEvent"); errlog(ELinfo, "suppressing multiple EOR event") << endmsg; return AppResult::OK; // don't send repeated EOR events } else { _sentEndRun = true; _sentBeginRun = false; errlog.setSubroutine("APPLevel3SenderOutputModule::outputEvent"); errlog(ELinfo, "sending EOR event") << endmsg; } } // // create the replacement LRIH bank // createLevel3LRIH(_partitionId, anEvent); // // call the inherited childOutputEvent method to fill Level3 results banks // AppResult childResult = childOutputEvent( anEvent, theDispatchStopType ); if ( childResult != AppResult::OK ) return childResult; // // Find the total number of triggers passed // int nTrigPassed = 0; for (int i = 0; i < L3_BITMASK_WORDS; i++ ) { for (int j = 0; j < BITMASK_BITS; j++ ) { if (_cslMask.l3_bitmask[i]&(1<prewrite(); // // drop/keep objects according to stream // onlineDropKeep(anEvent); if (_debug.value()){ std::cout << "in Level3SenderOutputModule::outputEvent, after onlineDropKeep\n"; //------------------------------------------------------------- // Print listing header //------------------------------------------------------------- std::cout << "ByteIn Class Name (ObjectId: Bank Info ) Proc Name, Description\n" ; std::cout << "--------------------------------------------------------------------------------\n" ; //------------------------------------------------------------- // List objects in this event //------------------------------------------------------------- for (EventRecord::ConstIterator iter = anEvent->begin(); iter.is_valid(); ++iter) { ConstHandle handle(iter); std::string class_name = handle->class_name() ; Id object_id = handle->object_id() ; size_t nbytes_input = handle->nbytes_last_streamed_in() ; std::string process_name = handle->process_name() ; std::string description = handle->description() ; std::cout << std::setw( 6) << nbytes_input << " " << std::setw(24) << class_name << "(" << std::setw( 8) << object_id << std::endl; } } // debug // Header offset is specified in bytes. Total size event+header returned. // offset is now fixed to 0 (not an argument anymore) // to avoid ROOT memory management woes size_t byteLength = anEvent->serialize(*_p_buffer); // byteLength contains the length of just the event record size_t totalLength = byteLength + sizeof(int4)*(_headerWords+eventBufferOffset); if ( _debug.value() ) { std::cout << " Format X Event + header size is " << std::dec << totalLength << " bytes " << std::endl; std::cout << " header consists of headerWords, eventBufferOffset = " << _headerWords << ", " << eventBufferOffset << std::endl; } // check the error status of the event AND RESET THE ERRORS!!! errorStatus theErrorStatus = errorEvent(true); // now we have to add the information to be stored in the L3_CS_POINTER_SPACE if ( _offsetLevel3PointerSpace.value() ) // don't store the information if no space! { int partition= _partitionId; bool normalEvent = theDispatchStopType == AppStopType::physics_event; bool passedTrigger = (nTrigPassed >= 1) || (nStreamPassed >=1 ); bool errorOccurred = theErrorStatus.first; fillLevel3CsPointerSpace(&_onlineBuffer[_headerWords], L3_CS_POINTER_SPACE, _processID,partition, normalEvent,errorOccurred,passedTrigger, anEvent); } // does the pointer space exist? // fill header size_t length = byteLength + _headerWords*sizeof(int4) + eventBufferOffset*sizeof(int4); // message length fillHeaderWords(_onlineBuffer, theDispatchStopType, length, _processID); // online buffer hex dump if ( _onlineBufferHexDump.value() ) { std::cout << " _onlineBuffer bytestream hex dump before send" << std::endl; std::cout << hex; char* charBuff = (char*) _onlineBuffer; for (int i=0; i < _onlineBuffer[1]; i+=16) { std::cout << setfill('0') << setw(6) << i; for (int j=0;j=1 || nStreamPassed >= 1 || _passFailedSummary.value() || _passFailedEvents.value() ); return( sendFlatEvent( sendIt, length, theErrorStatus ) ); } AppResult APPLevel3SenderOutputModule::sendFlatEvent( bool sendIt, size_t flatEventLength, errorStatus status) { // dummy, must be overridden return AppResult::OK; } AppResult APPLevel3SenderOutputModule::disconnectOnAbort() { // dummy, must be overridden return AppResult::OK; } int APPLevel3SenderOutputModule::headerWords() const { // dummy, must be overridden return (0); } void APPLevel3SenderOutputModule:: fillHeaderWords(int4* pHeader, const AppStopType& theDispatchStopType, size_t length, long processId) { // dummy, must be overridden } AppResult APPLevel3SenderOutputModule::endJob( AbsEvent* aJob ) { // Trick to avoid TBuffer from deleting mis-quoted "begin of buffer" address if (_p_buffer) { #if ROOT_VERSION_CODE < ROOT_VERSION(3,02,6) // Not needed in ROOT 3.02.06 _p_buffer->DetachBuffer() ; // sets internal buffer pointer to 0 #endif delete _p_buffer ; } if (_onlineBuffer) delete[] _onlineBuffer; return AppResult::OK; } AppResult APPLevel3SenderOutputModule::abortJob( AbsEvent* anEvent ) { errlog.setSubroutine("APPLevel3SenderOutputModule::abortJob"); errlog(ELfatal,"abortJob method of OutputModule called;") << "attempting to output event" << endmsg; // // attempt to output the current event // "other_state" dispatch here ensures the event will be sent and // is not a BOR or EOR // if (anEvent != NULL) outputEvent(anEvent,AppStopType::other_state); // // now be really brave and try to terminate the IO System // AppResult disconnectResult = disconnectOnAbort(); // Trick to avoid TBuffer from deleting mis-quoted "begin of buffer" address if (_p_buffer) { #if ROOT_VERSION_CODE < ROOT_VERSION(3,02,6) // Not needed in ROOT 3.02.06 _p_buffer->DetachBuffer() ; // sets internal buffer pointer to 0 #endif delete _p_buffer ; } if (_onlineBuffer) delete[] _onlineBuffer; return disconnectResult; } //------------- // Selectors -- //------------- const char * APPLevel3SenderOutputModule::rcsId( ) const { return rcsid; }