Openmp – while loop for text file reading and using a pipeline

c++hpcopenmpparallel-processing

I discovered that openmp doesn't support while loops( or at least doesn't like them too much).
And also doesn't like the ' != ' operator.

I have this bit of code.

int count = 1;
#pragma omp parallel for
    while ( fgets(buff, BUFF_SIZE, f) != NULL )
    {
        len = strlen(buff);
        int sequence_counter = segment_read(buff,len,count);
        if (sequence_counter == 1)
        {
            count_of_reads++;
            printf("\n Total No. of reads: %d \n",count_of_reads);
        }
    count++;
    }

Any clues as to how to manage this ? I read somewhere ( another post on stackoverflow included) that I can use a pipeline. What is that ? and how to implement it ?

Best Solution

It's too bad people are so quick to select the best answer. Here is my answer.
First, you should read the file into a buffer with something like fread. This is very quick. An example of how to do this can be found here http://www.cplusplus.com/reference/cstdio/fread/

Then you can operate on the buffer in parallel with OpenMP. I have implemented most of this for you. Below is the code. You did not provide the segment_read function so I created a dummy one. I used a few functions from C++ such as std::vector and std::sort but with a little more work you could do this in pure C as well.

Edit: I edited this code and was able to remove the sorting and critical section.

I compiled with g++ foo.cpp -o foo -fopenmp -O3

#include <stdio.h>
#include <omp.h>
#include <vector>

using namespace std;

int segment_read(char *buff, const int len, const int count) {
  return 1;  
}

void foo(char* buffer, size_t size) {
    int count_of_reads = 0;
    int count = 1;
    std::vector<int> *posa;
    int nthreads;

    #pragma omp parallel 
    {
        nthreads = omp_get_num_threads();
        const int ithread = omp_get_thread_num();
        #pragma omp single 
        {
            posa = new vector<int>[nthreads];
            posa[0].push_back(0);
        }

        //get the number of lines and end of line position
        #pragma omp for reduction(+: count)
        for(int i=0; i<size; i++) {
            if(buffer[i] == '\n') { //should add EOF as well to be safe
                count++;
                posa[ithread].push_back(i);
            }
        }

        #pragma omp for     
        for(int i=1; i<count ;i++) {    
            const int len = posa[ithread][i] - posa[ithread][i-1];
            char* buff = &buffer[posa[ithread][i-1]];
            const int sequence_counter = segment_read(buff,len,i);
            if (sequence_counter == 1) {
                #pragma omp atomic
                count_of_reads++;
                printf("\n Total No. of reads: %d \n",count_of_reads);
            }

        }
    }
    delete[] posa;
}

int main () {
  FILE * pFile;
  long lSize;
  char * buffer;
  size_t result;

  pFile = fopen ( "myfile.txt" , "rb" );
  if (pFile==NULL) {fputs ("File error",stderr); exit (1);}

  // obtain file size:
  fseek (pFile , 0 , SEEK_END);
  lSize = ftell (pFile);
  rewind (pFile);

  // allocate memory to contain the whole file:
  buffer = (char*) malloc (sizeof(char)*lSize);
  if (buffer == NULL) {fputs ("Memory error",stderr); exit (2);}

  // copy the file into the buffer:
  result = fread (buffer,1,lSize,pFile);
  if (result != lSize) {fputs ("Reading error",stderr); exit (3);}

  /* the whole file is now loaded in the memory buffer. */
  foo(buffer, result);
  // terminate


  fclose (pFile);
  free (buffer);
  return 0;
}
Related Question