/////////////////////////////////////////////////////////////////////////////// // dfc_ana runs on an individual file // run_dfc_ana runs on a dataset // example: // // root // .L Stntuple/ana/dfc_ana.C // run_dfc_ana("stntuple/dev_240","bpel08") ; > .bpel08.log // // cat results/bhel08-hpte-catalog.log | grep bhel08-hpte.0 \ // | grep -v OBJ | grep -v Stntuple | grep -v mv | grep -v root:// /////////////////////////////////////////////////////////////////////////////// #include "string.h" TStnAna* x = NULL; TChain* chain; TDFCModule* m_dfc; TStnCatalog* catalog; //_____________________________________________________________________________ int dfc_ana(const char* InputFile, Int_t PrintLevel = 0, const char* OutputDir = "", const char* DfcDsID = 0, const char* DfcBook = "", const char* DbID = "production_file_catalog_write") { // always process all the events catalog = new TStnCatalog(); chain = new TChain("STNTUPLE"); catalog->InitChain(chain,InputFile); x = new TStnAna(chain); // x->Clear(); m_dfc = new TDFCModule; x->AddModule(m_dfc); m_dfc->SetOutputDir(OutputDir); if (DfcDsID) m_dfc->SetDataSet(DfcDsID,DfcBook,DbID); m_dfc->SetPrintLevel(PrintLevel); if (PrintLevel > 100) m_dfc->SetExecCommands(PrintLevel/100); x->SetNEventsToReport(100000000); x->Run(); return m_dfc->ReturnCode(); } //_____________________________________________________________________________ void run_dfc_ana(const char* Book, const char* Dataset, const char* Fileset = 0, const char* RemoteDir = "ftp://ewk@fcdfdata030.fnal.gov:/cdf/scratch/ewk/gmbs08/stntuple/dev_240", const char* DfcDsID = "sewk01", const char* DfcBook = "cdfpewk") { char script[200] = "/cdf/home/cdfopr/cafdfc/scripts/get_list_of_files"; char fn[200]; if (Fileset != 0) { sprintf(cmd,"ssh fcdflnx3 %s -b %s -d %s -s %s \| awk \'{print $2}\'", script,Book,Dataset,Fileset); } else { sprintf(cmd,"ssh fcdflnx3 %s -b %s -d %s \| awk \'{print $2}\'", script,Book,Dataset); } printf("executing: %s\n",cmd); FILE* f = gSystem->OpenPipe(cmd,"r"); int i = 0; while (fscanf(f,"%s",fn) > 0) { printf("------------------- %s --------------------\n", fn); dfc_ana(fn,11,RemoteDir,DfcDsID,DfcBook); i++; } fclose(f); } //_____________________________________________________________________________ void catalog_list_of_files(const char* ListOfFiles) { // catalog list of files returned by the shell command printf(" to be cataloged: %s\n"); puts(ListOfFiles); int n = strlen(ListOfFiles); char *s, *file; char del [] = " \n"; s = new char[n+1]; strcpy(s,ListOfFiles); file = strtok(s,del); while (file) { printf(" -- file %s\n",file); dfc_ana(file,1); file = strtok(0,del); } delete [] s; } //_____________________________________________________________________________ void catalog_list_of_files() { char* files = gSystem->Getenv("TO_BE_CATALOGED"); if (files) { catalog_list_of_files(files); } else { printf( " list is NULL\n"); } } //_____________________________________________________________________________ void catalog_directory(const char* Dir, const char* Pattern) { // char cmd[200], fn[200]; sprintf(cmd,"ls -l %s/* | grep %s | grep %s | awk '{ print $9}'", Dir,Dir,Pattern); printf("executing: %s\n",cmd); FILE* pipe = gSystem->OpenPipe(cmd,"r"); while (fscanf(pipe,"%s",fn) > 0) { dfc_ana(fn,1); } gSystem->ClosePipe(pipe); } //_____________________________________________________________________________ int concatenate(const char* ListOfFiles, const char* OutputFile , const char* Book , const char* DatasetID ) { // example: ListOfFiles = "/cdf/opr2/cdfopr/val/results/aa*" // note: this routine deletes concatenated files // do not concatenate if total size of the files pointed to // is less than 10 GB TStnAna* x; TChain* chain; char cmd[200], fn[200]; int size; int kMinTotSize = 10.; //----------------------------------------------------------------------------- // loop over the files in results and move them to ./buffer in chunks of 1 GB //----------------------------------------------------------------------------- int sum_size = 0; int max_size = 1024000000; float tot_size; sprintf(cmd,"ls -l %s | awk '{n =n+$5} END{print n/1e9}'",ListOfFiles); FILE* f = gSystem->OpenPipe(cmd,"r"); fscanf(f,"%f",&tot_size); printf("tot_size = %f\n",tot_size); if (tot_size < kMinTotSize) return -1; // finish sprintf(cmd,"ls -l %s | awk '{print $5" "$9}'",ListOfFiles); FILE* f = gSystem->OpenPipe(cmd,"r"); TObjArray* list_of_files = new TObjArray(100); while (fscanf(f,"%i %s",&size,fn) > 0) { if ((size < 1000000000) && (sum_size+size < 1500000000)) { sum_size += size; list_of_files->Add(new TObjString(fn)); if (sum_size > max_size) break; } else { // large file, deal with it separately if (sum_size < 400000000) { list_of_files->Add(new TObjString(fn)); } break; } } fclose(f); int nfiles = list_of_files->GetEntriesFast(); if (nfiles > 1) { //----------------------------------------------------------------------------- // concatenate chain = new TChain("STNTUPLE"); for (int i=0; iAt(i); chain->AddFile(s->String().Data(),TChain::kBigNumber); } x = new TStnAna(chain); x->GetInputModule()->SetPrintLevel(1); om = new TStnOutputModule(OutputFile); om->SetMaxFileSize(1700); x->SetOutputModule(om); // om->DropDataBlock("L3SummaryBlock"); x->Run(); delete x; // chain->Delete(); // delete chain; } else { //----------------------------------------------------------------------------- // 1 file do not need to concatenate, just move //----------------------------------------------------------------------------- TObjString* s = (TObjString*) list_of_files->At(0); sprintf(cmd,"mv %s %s",s->String().Data(),OutputFile); gSystem->Exec(cmd); } //----------------------------------------------------------------------------- // supposedly done with the concatenation // finally catalog the file and move it to sewk00 //----------------------------------------------------------------------------- int rc = dfc_ana(OutputFile, 111, "",DatasetID,Book); printf(" --- dfc_ana: return code RC=%i\n",rc); if (rc == 0) { //----------------------------------------------------------------------------- // last step: if everything is OK, delete the input files //----------------------------------------------------------------------------- for (int i=0; iAt(i); sprintf(cmd,"rm %s",s->String().Data()); printf("executing: %s\n",cmd); gSystem->Exec(cmd); } } return 0; } //_____________________________________________________________________________ int concatenate() { char output_file[200], pid[100]; const char* data_server = gSystem->Getenv("DATA_SERVER" ); const char* list_of_files = gSystem->Getenv("LIST_OF_FILES"); const char* book = gSystem->Getenv("BOOK" ); const char* dataset_id = gSystem->Getenv("DATASET_ID" ); const char* output_dir = gSystem->Getenv("OUTPUT_DIR" ); const char* pwd = gSystem->Getenv("PWD" ); sprintf(output_file,"%s/%s/output.%i",pwd,output_dir,gSystem->GetPid()); printf("list_of_files = %s\n",list_of_files); printf("output_file = %s\n",output_file ); printf("output_dir = %s\n",output_dir ); printf("book = %s\n",book ); printf("dataset_id = %s\n",dataset_id ); // return 0; TChain* chain = new TChain("STNTUPLE"); TObjArray* list = new TObjArray(100); char *cp, *file; char delimitors[] = " \n"; int n=strlen(list_of_files); cp = new char[n+1]; strcpy(cp,list_of_files); /* Make writable copy. */ file = strtok (cp, delimitors); /* token => "words" */ int nfiles=0; while (file) { printf(" new file : %s\n",file); if (data_server) { // modify file name to read data remotely chain->AddFile(Form("root://%s/%s",data_server,file), TChain::kBigNumber); } else { chain->AddFile(file,TChain::kBigNumber); } nfiles++; file = strtok (NULL,delimitors); } TStnAna* x = new TStnAna(chain); x->GetInputModule()->SetPrintLevel(1); TStnOutputModule* om = new TStnOutputModule(output_file); om->SetMaxFileSize(1700); x->SetOutputModule(om); x->Run(); delete x; //----------------------------------------------------------------------------- // done with the concatenation, catalog the file and move it to sewk00 //----------------------------------------------------------------------------- int rc = dfc_ana(output_file, 111,output_dir,dataset_id,book); printf(" --- dfc_ana: return code RC=%i\n",rc); return 0; } //_____________________________________________________________________________ int concatenate(const char* RequestFile) { // request file contains all the information in it TString cmdd; char output_file[200], pid[100]; FILE* pipe; const char data_server[200], book[100], dataset_id[100], output_dir[200]; cmdd = Form("cat %s | awk '{ if ($2 == \"DATA_SERVER\") print $3}'",RequestFile); pipe = gSystem->OpenPipe(cmdd.Data(),"r"); fscanf(pipe,"%s",data_server); cmdd = Form("cat %s | awk '{ if ($2 == \"BOOK\") print $3}'",RequestFile); pipe = gSystem->OpenPipe(cmdd.Data(),"r"); fscanf(pipe,"%s",book); cmdd = Form("cat %s | awk '{ if ($2 == \"DATASET_ID\") print $3}'",RequestFile); pipe = gSystem->OpenPipe(cmdd.Data(),"r"); fscanf(pipe,"%s",dataset_id); cmdd = Form("cat %s | awk '{ if ($2 == \"OUTPUT_DIR\") print $3}'",RequestFile); pipe = gSystem->OpenPipe(cmdd.Data(),"r"); fscanf(pipe,"%s",output_dir); sprintf(output_file,"%s/output.%i",output_dir,gSystem->GetPid()); printf("output_file = %s\n",output_file ); printf("output_dir = %s\n",output_dir ); printf("book = %s\n",book ); printf("dataset_id = %s\n",dataset_id ); // return 0; TChain* chain = new TChain("STNTUPLE"); TObjArray* list = new TObjArray(100); char c[1000], file[1000], fn[1000]; int done = 0; FILE* f = fopen(RequestFile,"r"); if (! f) { return -1; } while ( ((c[0]=getc(f)) != EOF) && !done) { // check if it is a comment line if (c[0] != '#') { ungetc(c[0],f); // read next filename fscanf(f,"%s",file); if (data_server) { // modify file name to read data remotely sprintf(fn,"%s/%s",data_server,file); // printf(" add %s\n",fn); chain->AddFile(fn,TChain::kBigNumber); } else { chain->AddFile(file,TChain::kBigNumber); } } else { fgets(c,1000,f); } } chain->GetListOfFiles()->Print(); // return 0; TStnAna* x = new TStnAna(chain); x->GetInputModule()->SetPrintLevel(1); TStnOutputModule* om = new TStnOutputModule(output_file); om->SetMaxFileSize(1700); x->SetOutputModule(om); x->Run(); delete x; //----------------------------------------------------------------------------- // done with the concatenation, catalog the file and move it to sewk00 //----------------------------------------------------------------------------- int rc = dfc_ana(output_file, 111,output_dir,dataset_id,book); printf(" --- dfc_ana: return code RC=%i\n",rc); return 0; } //_____________________________________________________________________________ int concatenate_dataset(const char* Dataset) { TString ds = Dataset; if ((ds == "sewk00") || (ds == "express")) { while (concatenate("/cdf/opr2/cdfopr/val/results/aa*", "/cdf/opr2/cdfopr/val/sewk00/aaaaaa.output", "cdfpewk", "sewk00") == 0) { } } else if (ds == "jewk00") { while (concatenate("/cdf/opr2/cdfopr/jpmm08/JPsi_*", "/cdf/opr2/cdfopr/jewk00/aaaaaa.output", "cdfpewk", "jewk00") == 0) {}; } }