Flagging poor-quality scientific imagery in distributed memory

This is a final project for CS 599: High-Performance Computing at Northern Arizona University. The final script is shown below. The linked pdf contains the final project write-up.

//mpic++ -I /scratch/csb67/HPC/Final/eigen -O3 CS599_project_csb67.cpp -lm -o CS599_project_csb67

//srun -n<# of Ranks> <Program> <IMAGE DIRECTORY> <# of Dimensions for clustering> <# of Clusters>
//srun -n 10 CS599_project_csb67 ./Images 2 10

#include <stdio.h>
#include <unistd.h>
#include <mpi.h>
#include <stdlib.h>
#include <math.h>
#include <string.h>
#include <dirent.h>
#include <iostream>

// Define / Includes for Image IO
#define STB_IMAGE_IMPLEMENTATION
#include "stb_image.h"

// Include for PCA with Eigen
#include <Eigen/Dense>
#include <Eigen/Eigenvalues>

using namespace Eigen;
using namespace std;

//function prototypes
double calcDist(int iA, double** datasetA, int iB, double ** datasetB, int DIM);


#define CHANNEL_NUM 1 // Number of image bands
#define KMEANSITERS 10 // Total kmeans iterations

int main(int argc, char **argv){

    int my_rank, nprocs;

    MPI_Init(&argc,&argv);
    MPI_Comm_rank(MPI_COMM_WORLD,&my_rank);
    MPI_Comm_size(MPI_COMM_WORLD,&nprocs);

    //Process command-line arguments
    char inD[500];
    int DIM;
    int KMEANS;

    strcpy(inD, argv[1]);
    sscanf(argv[2], "%d", &DIM);
    sscanf(argv[3], "%d", &KMEANS);

    // Variables
    int fileCnt=0, i=0;
    DIR *d;
    struct dirent *dir;

    int * buckets=(int*)malloc(sizeof(int)*(nprocs*2));
    int * myRange = (int*)malloc(sizeof(int)*(2)); //local lower/uper bucket range
    int myCount = 0; //Local image file count

    int * imgCluster; // store image cluster assignment
    int * globalClusterCnt = (int*)malloc(sizeof(int)*KMEANS); // store global count of points per cluster

    // Allocate array to store mean centers
    double ** meanCenters = (double**)malloc(sizeof(double*)*KMEANS);
    for (int i=0; i<KMEANS; i++){
        meanCenters[i]=(double*)malloc(sizeof(double)*DIM);
    }
    double * initCenters=(double*)malloc(sizeof(double)*(KMEANS*DIM)); // Store initial mean centers

    // For timers
    double start, end, totalTime, totalTimeMax;
    double allocS, allocE, allocTime, allocTimeMax;
    double pcaS, pcaE, pcaTime, pcaTimeMax;
    double kmeanS, kmeanE, kmeanTime, kmeanTimeMax;


    //Begin Timer
    start=MPI_Wtime();

    ////////////////////////////////////////////////////////////////////////
    // All ranks store filenames
    d = opendir(inD);

    while((dir=readdir(d)) != NULL){
        if(dir->d_type == DT_REG){
            fileCnt++;
        }
    }
    rewinddir(d);

    char * fileList[fileCnt];

    while((dir = readdir(d)) != NULL){
        if(dir->d_type == DT_REG){
            fileList[i] = (char*)malloc(strlen(dir->d_name)+1);
            strcpy (fileList[i], dir->d_name);
            i++;
        }
    }

    //Change directory to image directory
    chdir(inD);

    // Begin workload distribution timer
    allocS=MPI_Wtime();

    ////////////////////////////////////////////////////////////////////////////
    // Root determines total workload and assigns ~even work to each rank
    if (my_rank==0){
        printf("\nFile Count: %d\n", fileCnt);

        ////////////////////////////////////////////////////////////////////////
        // Determine File Size
        unsigned long totalSize=0;
        long * fileSize = (long*)malloc(sizeof(long)*fileCnt);
        for (int i=0; i<fileCnt; i++){

            FILE* fp = fopen(fileList[i], "rb");
            fseek(fp, 0L, SEEK_END);
            long int res = ftell(fp);
            totalSize += res;
            fileSize[i] = res;

            fclose(fp);
        }
        printf("Total Bytes: %d\n", totalSize);
        printf("Bytes per Processor: ~%d\n\n", totalSize/nprocs);

        ////////////////////////////////////////////////////////////////////////
        // Allocate the work ~evenly

        int start = 0, end = 0, rank = 0;
        unsigned long rankSize = 0; //Store current rank's workload in bytes

        for (int i = 0; i<fileCnt; i++){
            if ( (rankSize + fileSize[i]) < (totalSize/nprocs) ){
                rankSize += fileSize[i];
            }else{
                buckets[rank*2]=start;
                buckets[rank*2+1]=i;
                start=i+1;
                rank++;
                rankSize=0;
            }
        }

        // Assign leftovers to last rank
        buckets[(nprocs*2)-2] = (buckets[(nprocs*2)-3])+1;
        buckets[(nprocs*2)-1] = fileCnt-1;
    }

    ////////////////////////////////////////////////////////////////////////////
    // Broadcast bucket workload to all ranks
    MPI_Bcast(buckets, nprocs*2, MPI_INT, 0, MPI_COMM_WORLD);
    myRange[0] = buckets[my_rank*2]; // Store my lower bucket val
    myRange[1] = buckets[my_rank*2+1]; // Store my upper bucket val
    myCount = myRange[1] - myRange[0] + 1;

    // End workload distribution timer
    allocE=MPI_Wtime();

    //Begin PCA Timer
    pcaS=MPI_Wtime();

    ////////////////////////////////////////////////////////////////////////////
    // Iterate each image and calculate PC's

    // Store reduced values from data reduction with PC's
    double ** dataReduct = (double**)malloc(sizeof(double*)*myCount);
    for (int i=0; i<myCount; i++){
        dataReduct[i]=(double*)malloc(sizeof(double*)*DIM);
    }

    // Iterate each image
    for (int i=0; i<myCount; i++){
        int im_row, im_col, bpp;
        // Open the image
        uint8_t * image = stbi_load(fileList[i+myRange[0]], &im_col, &im_row, &bpp, CHANNEL_NUM);

        // Allocate matrix to store pixel values
        MatrixXd pixM(im_row, im_col);

        // Store pixel values
        int p = 0;
        for (int r=0; r<im_row; r++){
            for (int c=0; c<im_col; c++){
                pixM(r, c)=image[p];
                p++;
            }
        }

        // Normalize pixel values
        pixM.normalize();

        // Close image
        stbi_image_free(image);

        // Find the mean
        double M = pixM.mean();

        // Center values on the mean
        for (int r=0; r<im_row; r++){
            for (int c=0; c<im_col; c++){
                pixM(r,c)=pixM(r,c)-M;
            }
        }

        // Calculate variance-covariance matrix
        MatrixXd varCov = MatrixXd::Zero(im_row, im_row);
        for (int colA=0; colA<im_row; colA++){
            for (int colB=0; colB<im_col; colB++){
                varCov(colA, colB) = ( pixM.col(colA).cwiseProduct(pixM.col(colB)) ).sum() / (pixM.rows()-1);
            }
        }

        // Calculate eigenvectors
        SelfAdjointEigenSolver<MatrixXd> es(varCov);
        // EigenSolver<MatrixXd> es(pixM);

        // Store eigenvectors
        MatrixXd eig_v(im_row, DIM);
        for (int k=0; k<im_row; k++){
            for (int p=0; p<DIM; p++){
                eig_v(k,p)=es.eigenvectors().col(im_row-1-p)[k];
            }
        }

        // Data reduction with PC's
        VectorXd pixR = (pixM*eig_v).colwise().sum();

        // Store reduced dimensions
        for (int p=0; p<DIM; p++){
            dataReduct[i][p]=pixR[p];
        }


    }

    //End PCA Timer
    pcaE=MPI_Wtime();

    //Begin KMEAN Timer
    kmeanS=MPI_Wtime();

    ////////////////////////////////////////////////////////////////////////////
    // Prep for Kmeans clustering

    // Allocate memory for image cluster assignment
    imgCluster=(int*)malloc(sizeof(int*)*myCount);

    // Rank 0 initializes mean centers
    if (my_rank==0){
        // initCenters=(double*)malloc(sizeof(double)*(KMEANS*DIM));

        for (int k=0; k<KMEANS; k++){
            for (int d=0; d<DIM; d++){
                initCenters[(k*DIM)+d]=dataReduct[k][d];
            }
        }
    }

    // Broadcast initial mean centers
    MPI_Bcast(initCenters, (KMEANS*DIM), MPI_DOUBLE, 0, MPI_COMM_WORLD);

    // Store new mean centers
    for (int k = 0; k<KMEANS; k++){
        for (int d = 0; d<DIM; d++){
            double val = initCenters[(k*DIM)+d];
            meanCenters[k][d] = val;
        }
    }
    if (my_rank==0){
        printf("\n\n*******\nInitial\n*******\n");
        for (int k = 0; k<KMEANS; k++){
            printf("Cluster: %d | Centroid: (%f, %f)\n", k, meanCenters[k][0], meanCenters[k][1]);
        }
        printf("\n\n");
    }

    ////////////////////////////////////////////////////////////////////////////
    // Do K-Means clustering
    for (int kIters=0; kIters<KMEANSITERS; kIters++){ // Total Iterations
        for (int p=0; p<myCount; p++){ // Iterate each image
            double shortDist=-1;
            for (int k=0; k<KMEANS; k++){ // Iterate each kmean centroid

                // Calculate distance between point and cluster centroid
                double imgToClust = calcDist(p, dataReduct, k, meanCenters, DIM);

                // Store distance if it is less then the previous distance and update cluster assignment
                if ( (imgToClust < shortDist) || (shortDist == -1)){
                    shortDist = imgToClust; // Store distance
                    imgCluster[p] = k; // Store cluster assignment
                }
            }
        }
        if (kIters<KMEANSITERS-1){
            ////////////////////////////////////////////////////////////////////
            // Determine local number of points per cluster
            int * localClustCnt = (int*)calloc(KMEANS,sizeof(int)); // store local count of points per cluster

            for (int i = 0; i<myCount; i++){
                localClustCnt[imgCluster[i]]++;
            }

            ////////////////////////////////////////////////////////////////////////
            // Determine global number of points per cluster
            MPI_Reduce(localClustCnt, globalClusterCnt, KMEANS, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);

            // Free local cluster count
            free(localClustCnt);

            // Update local points per cluster with global points per cluster
            MPI_Bcast(globalClusterCnt, KMEANS, MPI_INT, 0, MPI_COMM_WORLD);

            ////////////////////////////////////////////////////////////////////////
            // Calculate local weighted mean
            double * localCenters = (double*)malloc(sizeof(double)*KMEANS*DIM);
            double * newCenters = (double*)malloc(sizeof(double)*KMEANS*DIM);

            for (int k=0; k<KMEANS; k++){ // Iterate each cluster
                for (int d=0; d<DIM; d++){ // Iterate each dimension
                    double sum = 0; // Store sum of points in cluster
                    for (int i = 0; i<myCount; i++){ // Iterate each point
                        if (imgCluster[i] == k){ // If current point belongs to cluster k
                            sum += dataReduct[i][d]; // Add point value to sum
                        }
                    }
                    // To avoid divide by 0
                    if (globalClusterCnt[k] > 0){
                        localCenters[(k*DIM)+d] = sum/globalClusterCnt[k];
                    }else{
                        localCenters[(k*DIM)+d] = 0;
                    }
                }
            }

            // Sum each ranks weighted mean to get global mean centers
            MPI_Reduce(localCenters, newCenters, (KMEANS*DIM), MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);

            // Broadcast new mean centers
            MPI_Bcast(newCenters, (KMEANS*DIM), MPI_DOUBLE, 0, MPI_COMM_WORLD);

            // Store new mean centers
            for (int k = 0; k<KMEANS; k++){
                for (int d = 0; d<DIM; d++){
                    double val = newCenters[(k*DIM)+d];
                    meanCenters[k][d] = val;
                }
            }


            free(localCenters);
            free(newCenters);
        }
    }
    if (my_rank==0){
        printf("\n\n*****\nFINAL\n*****\n");
        for (int k = 0; k<KMEANS; k++){
            printf("Cluster: %d | Centroid: (%f, %f) | Count: %d\n", k, meanCenters[k][0], meanCenters[k][1], globalClusterCnt[k]);
        }
        printf("\n\n");
    }

    //End KMEAN Timer
    kmeanE=MPI_Wtime();

    ////////////////////////////////////////////////////////////////////////////
    // Timers
    // Workload distribution time
    allocTime= allocE-allocS;
    MPI_Reduce(&allocTime, &allocTimeMax, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);

    // PCA Time
    pcaTime = pcaE-pcaS;
    MPI_Reduce(&pcaTime, &pcaTimeMax, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);

    // Kmean Time
    kmeanTime = kmeanE-kmeanS;
    MPI_Reduce(&kmeanTime, &kmeanTimeMax, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);

    // Total Response Time
    end=MPI_Wtime();
    totalTime = end-start;
    MPI_Reduce(&totalTime, &totalTimeMax, 1, MPI_DOUBLE, MPI_MAX, 0, MPI_COMM_WORLD);

    if (my_rank==0){
        printf("\n\n******\nTIMERS\n******\n");
        printf("Total Response Time (s): %f\n", totalTimeMax);
        printf("Total Distribution Time (s): %f\n", allocTimeMax);
        printf("Total PCA Time (s): %f\n", pcaTimeMax);
        printf("Total KMEAN Time (s): %f\n", kmeanTimeMax);
        printf("\n\n");
    }

    ////////////////////////////////////////////////////////////////////////////
    // Free memory
    free(globalClusterCnt);
    for (int i =0; i<KMEANS; i++){
        free(meanCenters[i]);
    }
    free(meanCenters);
    free(myRange);
    free(buckets);
    free(imgCluster);

    MPI_Finalize();
    return 0;
}

// Function to calculate distance between two points
double calcDist(int iA, double** datasetA, int iB, double ** datasetB, int DIM){
    // iA & iB : row index of point in dataset
    double * pA = datasetA[iA];
    double * pB = datasetB[iB];

    double sqSum = 0;

    for (int i = 0; i<DIM; i++){
        sqSum += pow( (pA[i] - pB[i]), 2);
    }
    return sqrt(sqSum);
}