//-------------------------------------------------------------------------- // File and Version Information: // $Id: APPLevel3CslOutputModule.cc,v 1.1 2003/10/02 15:56:26 dldecker Exp $ // // Description: // Class APPCslOutputModule. This is an online output module // for the AC++ framework. It connects to the Consumer-Server/Logger // sends events to the CSL. // // Environment: // Software developed for CDF II. // // Author List: // Kevin McFarland Original Author // //------------------------------------------------------------------------ #include #include #include using std::ostringstream; using std::dec; using std::setw; using std::setfill; using std::resetiosflags; using std::hex; using std::cout; using std::endl; //----------------------- // This Class's Header -- //----------------------- #include "Level3Mods/APPLevel3CslOutputModule.hh" //------------------------------- // Collaborating Class Headers -- //------------------------------- #include "Framework/APPJob.hh" #include "Framework/AppDeferredStream.hh" #include "Framework/AppDeferredOutCommand.hh" class AbsEvent; #include "Framework/APPList.hh" #include "Framework/APPListIterator.hh" #include "Framework/APPFramework.hh" #include "Framework/APPPath.hh" // for EDM2 Format X conversion #include "TROOT.h" #include "Edm/EventRecord.hh" #include "Level3Mods/APPLevel3InputModule.hh" //---------------------------------- // Online System headers -- //---------------------------------- extern "C" { #include "ConsumerInterface/csl_report.h" #include "ConsumerInterface/csl_misc_defns.h" #include "ConsumerInterface/csl_msghdr.h" #include "ConsumerInterface/csl_timeouts.h" #include "ConsumerInterface/l3_cs_interface.h" #include "ConsumerInterface/l3_csl_socket_cplusplus_safe.h" #include "ConsumerInterface/l3_ipckey.h" } //----------------------------------------------------------------------- // Local Macros, Typedefs, Structures, Unions and Forward Declarations -- //----------------------------------------------------------------------- static const char rcsid[] = "$Id: APPLevel3CslOutputModule.cc,v 1.1 2003/10/02 15:56:26 dldecker Exp $"; //---------------- // Constructors -- //---------------- APPLevel3CslOutputModule ::APPLevel3CslOutputModule(const char* const theName, const char* const theDescription, const int4 exeTag) : APPLevel3SenderOutputModule ( headerWords() , theName, theDescription, exeTag ), _theCslHostname ( "theCslHostname",this,"b0urpc05.fnal.gov" ), _thePartitionId ( "thePartitionId",this,0 ), _theIpcBaseKey ( "theIpcBaseKey",this,0x4400 ), _isSilent ( "isSilent",this,false ), _globalPortDaemonActive ( "globalPortDaemonActive",this,false ), _partitionId ( 0 ), _sevbInterface ("sevbInterface", this, false) { // Define parameters for talk-to // ----------------------------- _theCslHostname.addDescription(" \tHostname for the CSL (default b0urpc05.fnal.gov)"); _thePartitionId.addDescription( " \tID of Partition to which this sender belongs (default 0)"); _theIpcBaseKey.addDescription( " \tIPC Base Key for sender (UPS default of 0x4400)"); _isSilent.addDescription(" \tRun in quiet mode (default false)"); _globalPortDaemonActive.addDescription( " \tUse CSL global port daemon (default true)"); commands()->append(&_theCslHostname); commands()->append(&_thePartitionId); commands()->append(&_theIpcBaseKey); commands()->append(&_isSilent); commands()->append(&_globalPortDaemonActive); commands()->append(&_sevbInterface); // Establish the process ID // identify as a particular process // note that this should probably someday distinguish itself from Level3 _processID = L3_DISPATCHER ; // necessary for current consumer server fooling // Set the interface to NULL _theInterface = NULL; } //-------------- // Destructor -- //-------------- APPLevel3CslOutputModule::~APPLevel3CslOutputModule( ) { } //-------------- // Operations -- //-------------- AppResult APPLevel3CslOutputModule::initIOSystem( ) { // Get the current input interface if the software event builder is // being used if (sevbInterface ()) { APPLevel3InputModule *inputModule; inputModule = (APPLevel3InputModule *) (framework () -> theInputModule ()); _theInterface = (SevbL3Interface *) (inputModule -> getInterface ()); } else _theInterface = NULL; // // initializing streams has to be done here // see AppStreamsOutputModule::initIOSystem // AppStream** theStream; APPListIterator theIterator( *streams( ) ); while ( theStream = theIterator( ) ) { if ( (*theStream)->isEnabled( ) && ! (*theStream)->isOpen( ) ) { bool status = (*theStream)->open( ); } } // set the paritionId from the talk-to parameter _partitionId = _thePartitionId.value(); // // initIOSystem operation establishes the connection with the consumer // server. // (1) get port numbers/connection info from the server // (2) connect // string hostname = _theCslHostname.value() ; int4 cs_requestor_port,cs_receiver_port; int4 cs_global_daemon_port = _theIpcBaseKey.value() + CS_GLOBAL_DAEMON_PORT; bool usePortDaemon = false; if ( _globalPortDaemonActive.value() ) { errlog.setSubroutine("APPLevel3CslOutputModule::initIOSystem"); errlog(ELwarning, "CSL global Port Daemon Server unsupported") << endmsg; } if ( usePortDaemon ) { if ( ! _isSilent.value() ) { errlog.setSubroutine("APPLevel3CslOutputModule::initIOSystem"); errlog(ELinfo,"Calling l3_cs_socket_get_CS_port_numbers for host ") << hostname << " using global port : " << cs_global_daemon_port << endmsg; } //if (l3_csl_socket_get_CS_port_numbers( // cs_global_daemon_port, // hostname.c_str(), // &cs_requestor_port, // &cs_receiver_port) == FAILURE) // { // csl_report_write(CSL_REPLVL_ERROR, // "APPLevel3CslOutputModule::initIOSystem", // __FILE__, __LINE__, // "l3_cs_socket_get_CS_port_numbers() -> error") ; // return AppResult::ERROR ; // } } else //hardcoded port numbers { cs_requestor_port = _theIpcBaseKey.value() + L3_CS_REQUESTOR_PORT ; cs_receiver_port = _theIpcBaseKey.value() + L3_CS_RECEIVER_PORT ; } if ( ! _isSilent.value() ) { ostringstream errmsg; errmsg << hex << cs_requestor_port << ", CS_Receiver port = " << hex << cs_receiver_port << dec; errlog.setSubroutine("APPLevel3CslOutputModule::initIOSystem"); errlog(ELinfo,"CS_Requestor port = ") << errmsg << endmsg; } // now make the connection _csSocket = l3_csl_socket_connect(cs_receiver_port, hostname.c_str(), _processID , CSL_RECEIVER, _partitionId ); if (l3_csl_socket_q_label(_csSocket) < SUCCESS) { if (sevbInterface ()) _theInterface -> send_message (0, CDFR2DM_CSL_CONNECT_FAIL_BIT, ""); csl_report_write(CSL_REPLVL_ERROR, "APPLevel3CslOutputModule::beginJob", __FILE__, __LINE__, "FAILED to connect to Consumer_Server") ; return AppResult::ERROR; } if (sevbInterface ()) _theInterface -> send_message (0, CDFR2DM_CSL_CONNECT_BIT, ""); return AppResult::OK; } AppResult APPLevel3CslOutputModule::sendFlatEvent( bool sendIt, size_t flatEventLength, errorStatus status) { // make the event message msgbuf_t eventMsgbuf ; eventMsgbuf.label = SUCCESS ; eventMsgbuf.cur_length = flatEventLength; eventMsgbuf.buf_size = _onlineBufferSize ; eventMsgbuf.buffer = (msghdr_t *) _onlineBuffer ; if ( _debug.value() ) { std::cout << " Event message size is " << eventMsgbuf.cur_length << std::endl; } // // deal with the error status // (a) send an acknowledgement of it through the error logger // (b) change the event type if necessary to "bad" // if (status.first) // there was an error { errlog.setSubroutine("APPLevel3CslOutputModule::sendFlatEvent"); errlog(ELinfo, "framework reported an error:") << status.second << endmsg; // // if got a severe error on a regular event, send as an error event // ("BAD" message type) // if (_useErrorEventStream.value() && eventMsgbuf.buffer->msg_type == L3_CS_RUN_EVENT ) { // send as an error event if (eventMsgbuf.buffer->msg_type == L3_CS_RUN_EVENT) eventMsgbuf.buffer->msg_type = L3_CS_RUN_EVENT_BAD; else if (eventMsgbuf.buffer->msg_type == L3_CS_RUN_BEGIN) eventMsgbuf.buffer->msg_type = L3_CS_RUN_BEGIN_BAD; else if (eventMsgbuf.buffer->msg_type == L3_CS_RUN_END) eventMsgbuf.buffer->msg_type = L3_CS_RUN_END_BAD; else { errlog.setSubroutine("APPLevel3CslOutputModule::sendFlatEvent"); errlog(ELsevere, "attempted to flag unknown event type (") << eventMsgbuf.buffer->msg_type << ") as BAD " << endmsg; } } } // send the event (always send if there is an error) if ( ! sendIt && ! status.first ) { if (sevbInterface ()) _theInterface -> signalSoftEvb (); return (AppResult::OK); } int send_status = (int) l3_csl_socket_send(_csSocket,eventMsgbuf) ; // SHUTDOWN, BROKEN_PIPE, CONNECTION_RESET ==> indicate that the socket // connection to the client is lost at an unexpected point. // This is a severe error which causes an error state to propogate back... if ( (send_status == SHUTDOWN ) || (send_status == BROKEN_PIPE ) || (send_status == CONNECTION_RESET) ) { if (sevbInterface ()) { _theInterface -> send_message (0, CDFR2DM_CSL_CONNECTION_FAILURE_BIT, ""); _theInterface -> signalSoftEvb (); } csl_report_write(CSL_REPLVL_ERROR, "APPLevel3CslModule::outputEvent", __FILE__, __LINE__, "Broken socket connection to Consumer Server") ; return AppResult::ERROR ; } else if (send_status == TIMEOUT) { if (sevbInterface ()) { _theInterface -> send_message (0, CDFR2DM_CSL_SEND_FAIL_BIT, ""); _theInterface -> signalSoftEvb (); } csl_report_write(CSL_REPLVL_ERROR, "APPLevel3CslModule::outputEvent", __FILE__, __LINE__, "Timeout in send() event to Consumer Server") ; return AppResult::ERROR ; } else if (send_status < SUCCESS) { if (sevbInterface ()) { _theInterface -> send_message (0, CDFR2DM_CSL_SEND_FAIL_BIT, ""); _theInterface -> signalSoftEvb (); } csl_report_write(CSL_REPLVL_ERROR, "APPLevel3CslModule::outputEvent", __FILE__, __LINE__, "l3_csl_socket_send() -> error") ; return AppResult::ERROR ; } if (sevbInterface ()) { _theInterface -> send_message (0, CDFR2DM_EVENT_SENT_BIT, ""); _theInterface -> signalSoftEvb (); } return AppResult::OK; } AppResult APPLevel3CslOutputModule::terminateIOSystem( ) { // // endJob operation disconnects from the consumer server // (1) disconnect from _csSocket // int ret_code = l3_csl_socket_disconnect(_csSocket, _processID, CSL_RECEIVER); if (ret_code < SUCCESS) { if (ret_code == TIMEOUT) { /* if (sevbInterface ()) _theInterface -> send_message (0, CDFR2DM_CSL_NO_GOODBYE_BIT, ""); */ csl_report_write(CSL_REPLVL_ERROR, "APPLevel3CslOutputModule::endJob", __FILE__, __LINE__, "WARNING: no Goodbye reply from Consumer_Server"); } else { if (sevbInterface ()) /* _theInterface -> send_message (0, CDFR2DM_CSL_DISCONNECTION_FAIL_BIT, ""); */ csl_report_write(CSL_REPLVL_INFO, "APPLevel3CslOutputModule::endJob", __FILE__, __LINE__, "WARNING: Imperfect Consumer_Server disconnect"); } } /* if (sevbInterface ()) _theInterface -> send_message (0, CDFR2DM_CSL_DISCONNECT_BIT, ""); */ return AppResult::OK; } AppResult APPLevel3CslOutputModule::disconnectOnAbort() { // // here the output module contains the outside connection // terminateIOSystem(); return AppResult::OK; } int APPLevel3CslOutputModule:: headerWords() const { return(sizeof(msghdr_t)/sizeof(int4)); } long APPLevel3CslOutputModule:: externalMaximumBufferSize() const { // no external input on online buffer size; 0 is a safe default return(0); } void APPLevel3CslOutputModule:: fillHeaderWords(int4* pHeader, const AppStopType& theDispatchStopType, size_t length, long processId) { msghdr_t* header = (msghdr_t*) pHeader; int4 eventMessage = L3_CS_RUN_EVENT; if ( theDispatchStopType == AppStopType::begin_run ) eventMessage = L3_CS_RUN_BEGIN; else if ( theDispatchStopType == AppStopType::end_run ) eventMessage = L3_CS_RUN_END; header->msg_type = eventMessage; // message type header->msg_length = length; // message length header->msg_origination = processId ; // sender ID header->msg_destination = CSL_RECEIVER; // destination ID } // function that determines whether the input comes from the software // event builder or not bool APPLevel3CslOutputModule::sevbInterface( ) const { return _sevbInterface.value(); } //------------- // Selectors -- //------------- const char * APPLevel3CslOutputModule::rcsId( ) const { return rcsid; }