Latitude/Longitude to Euclidean System

# inverse conversion codes
from pyproj import Proj, transform


def inverse_conv(x, y, target_proj="epsg:26985"):
    """

    :param x: Longitude 
    :param y: Latitude 
    :return: X and Y coordinates for the given projection system
    """
    inProj = Proj(init='epsg:4326')

    outProj = Proj(init=target_proj)

    x1, y1 = x, y
    x2, y2 = transform(inProj, outProj, x1, y1)

    return x2, y2

Please note that the transform function is currently outdated. Please refer to this link for future usage: Gotchas/FAQ — pyproj 3.3.0 documentation.

Here are some operations for .las file point extraction:

import numpy as np
import random
import laspy
import pandas as pd
from sklearn.cluster import KMeans, DBSCAN
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from StringIO import StringIO
import subprocess

import shapely.geometry as geometry
from descartes import PolygonPatch
import pylab as pl


# Convert las into 256 colored dataframe
def dataframe_entrance(laslocation):
    """

    :type laslocation: object
    """
    original_color16 = load_las2df(laslocation)
    added256 = las_dataframe_color_conversion(original_color16)
    return las_dataframe_add_hexcolor(added256.drop(added256.columns[[3, 4, 5]], axis=1))


# Input las file location
def load_las2df(inputpath):
    """

    :param inputpath:
    :return:
    """
    inFile = laspy.file.File(inputpath, mode="r")

    pdser_x = pd.Series(scaled_dimension(inFile, 0), name='X')
    pdser_y = pd.Series(scaled_dimension(inFile, 1), name='Y')
    pdser_z = pd.Series(scaled_dimension(inFile, 2), name='Z')
    pdser_r16 = pd.Series(inFile.red, name='R16')
    pdser_g16 = pd.Series(inFile.green, name='G16')
    pdser_b16 = pd.Series(inFile.blue, name='B16')

    return pd.DataFrame([pdser_x, pdser_y, pdser_z, pdser_r16, pdser_g16, pdser_b16]).transpose()


# This function returns actual projection values of the las file
def scaled_dimension(las_file, axis):
    """

    :param las_file:
    :param axis:
    :return:
    """
    if axis == 0:
        dimension = las_file.X
    elif axis == 1:
        dimension = las_file.Y
    elif axis == 2:
        dimension = las_file.Z
    else:
        print 'axis input error!'
        return None

    scale = las_file.header.scale[axis]
    offset = las_file.header.offset[axis]
    return (dimension * scale) + offset


# Input: pandas dataframe with ['X', 'Y', 'Z', 'R_256', 'G_256', 'B_256'] as columns
def las_dataframe_add_hexcolor(las_df):
    """

    :param las_df:
    :return:
    """
    one_seri = las_df.R_256
    r16 = lambda one_seri: '#' + "{:02x}".format(one_seri)
    rslt_R = one_seri.apply(r16)

    one_seri = las_df.G_256
    g16 = lambda one_seri: "{:02x}".format(one_seri)
    rslt_G = one_seri.apply(g16)

    one_seri = las_df.B_256
    b16 = lambda one_seri: "{:02x}".format(one_seri)
    rslt_B = one_seri.apply(b16)

    las_df['color'] = rslt_R + rslt_G + rslt_B

    return las_df


# Input: pandas dataframe with ['X', 'Y', 'Z', 'R16', 'G16', 'B16'] as columns
def las_dataframe_color_conversion(las_df):
    """

    :param las_df:
    :return:
    """
    las_df['R_256'] = (((las_df.R16 + 1) / 256) - 1).astype(int)
    las_df['G_256'] = (((las_df.G16 + 1) / 256) - 1).astype(int)
    las_df['B_256'] = (((las_df.B16 + 1) / 256) - 1).astype(int)

    return las_df


# Sample Dataframes
def dataframe_sampling(las_df, percentage=0.3):
    """

    :param las_df:
    :param percentage:
    :return:
    """
    sample_counts = int(percentage * len(las_df)) - 1
    rows = random.sample(las_df.index, sample_counts)

    return las_df.ix[rows]


# Side function, for generating color cluster mean and std
def color_kmeans_cluster_gen(input_df, num_of_clusters=2, visualize=False, colorscheme='real'):
    """

    :param input_df:
    :param num_of_clusters:
    :param visualize:
    :param colorscheme:
    :return:
    """
    new_df = input_df[['R_256', 'G_256', 'B_256']]
    X = new_df.as_matrix()
    estimator = KMeans(n_clusters=num_of_clusters)
    estimator.fit(X)

    labels = estimator.labels_

    means = []
    stds = []
    output_stat = []
    for label in range(num_of_clusters):
        tmpcolorDict = {}

        means.append(["{0:.0f}".format(X[labels == label, 0].mean()), "{0:.0f}".format(X[labels == label, 1].mean()),
                      "{0:.0f}".format(X[labels == label, 2].mean())])
        stds.append(["{0:.4f}".format(X[labels == label, 0].std()), "{0:.4f}".format(X[labels == label, 1].std()),
                     "{0:.4f}".format(X[labels == label, 2].std())])

        tmpcolorDict['color_mean'] = ["{0:.0f}".format(X[labels == label, 0].mean()),
                                      "{0:.0f}".format(X[labels == label, 1].mean()),
                                      "{0:.0f}".format(X[labels == label, 2].mean())]
        tmpcolorDict['color_std'] = ["{0:.4f}".format(X[labels == label, 0].std()),
                                     "{0:.4f}".format(X[labels == label, 1].std()),
                                     "{0:.4f}".format(X[labels == label, 2].std())]
        output_stat.append(tmpcolorDict)

    if visualize:
        fig = plt.figure(1, figsize=(10, 6))
        # plt.clf()
        ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)

        # plt.cla()
        if visualize:
            fig = plt.figure(1, figsize=(10, 6))
            ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)

            if colorscheme == 'real':
                ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=input_df.color.tolist())
            elif colorscheme == 'category':
                ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=labels.astype(np.float))
            else:
                raise Exception("color scheme not supported!")

        for label in range(num_of_clusters):
            ax.text3D(X[labels == label, 0].mean(),
                      X[labels == label, 1].mean(),
                      X[labels == label, 2].mean(),
                      ', '.join([str(c) for c in means[label]]),
                      horizontalalignment='center',
                      bbox=dict(alpha=.5, edgecolor='w', facecolor='w'))

        ax.set_xlabel('Red Channel')
        ax.set_ylabel('Green Channel')
        ax.set_zlabel('Blue Channel')
        plt.show()

    return output_stat


# Input las file dataframe, color scheme dictionary and times to the standard deviation
def filter_color_base(inputdf, RGB_colors, std_times=4, visualize=False):
    """

    :param inputdf:
    :param RGB_colors:
    :param std_times:
    :param visualize:
    :return:
    """
    df_list = []
    for one_color in RGB_colors:
        df_list.append(inputdf[(inputdf.R_256 < int(one_color['color_mean'][0]) + (
            std_times * float(one_color['color_std'][0]))) & (int(one_color['color_mean'][0]) - (
            std_times * float(one_color['color_std'][0])) < inputdf.R_256) & (
                                   inputdf.G_256 < int(one_color['color_mean'][1]) + (
                                       std_times * float(one_color['color_std'][1]))) & (
                               int(one_color['color_mean'][1]) - (
                                   std_times * float(one_color['color_std'][1])) < inputdf.G_256) & (
                                   inputdf.B_256 < int(one_color['color_mean'][2]) + (
                                       std_times * float(one_color['color_std'][2]))) & (
                               int(one_color['color_mean'][2]) - (
                                   std_times * float(one_color['color_std'][2])) < inputdf.B_256)])

    light_colors = pd.concat(df_list)

    if visualize:
        fig = plt.figure()
        ax = fig.add_subplot(111, projection='3d')

        x = light_colors.X.tolist()
        y = light_colors.Y.tolist()
        z = light_colors.Z.tolist()
        color = light_colors.color.tolist()

        ax.scatter(x, y, z, c=color, marker='o')

        ax.set_xlabel('X Direction')
        ax.set_ylabel('Y Direction')
        ax.set_zlabel('Altitude')

        plt.show()

    return light_colors


# perform spatial clustering on the dataframe points
def spatial_clustering_dbscan(inputdf, epsilon=0.3, visualize=False, colorscheme='real'):
    """

    :param inputdf:
    :param epsilon:
    :param visualize:
    :param colorscheme:
    :return:
    """
    positions = inputdf[['X', 'Y', 'Z']]
    X = positions.as_matrix()
    estimator = DBSCAN(eps=epsilon)

    estimator.fit(X)
    labels = estimator.labels_

    # for label in set(labels):
    #     print [X[labels == label, 0].mean(), X[labels == label, 1].mean(), X[labels == label, 2].mean()]

    if visualize:
        fig = plt.figure(1, figsize=(10, 6))
        ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)

        if colorscheme == 'real':
            ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=inputdf.color.tolist())
        elif colorscheme == 'category':
            ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=labels.astype(np.float))
        else:
            raise Exception("color scheme not supported!")

        for label in set(labels):
            ax.text3D(X[labels == label, 0].mean(),
                      X[labels == label, 1].mean(),
                      X[labels == label, 2].mean(),
                      label,
                      horizontalalignment='center',
                      bbox=dict(alpha=.5, edgecolor='w', facecolor='w'))

        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        plt.show()

    return X, labels, inputdf.color.tolist()


# perform spatial clustering on the dataframe points
def spatial_clustering_dbscan_means(inputdf, epsilon=0.3, visualize=False, colorscheme='real'):
    """

    :param inputdf:
    :param epsilon:
    :param visualize:
    :param colorscheme:
    :return:
    """
    positions = inputdf[['X', 'Y', 'Z']]
    X = positions.as_matrix()
    estimator = DBSCAN(eps=epsilon)

    estimator.fit(X)
    labels = estimator.labels_

    dfstring = [','.join([
        str(label),
        str(len(X[labels == label]) / (X[labels == label, 0].max() - X[labels == label, 0].min()) * (
        X[labels == label, 1].max() - X[labels == label, 1].min()) * (
            X[labels == label, 2].max() - X[labels == label, 2].min())),
        str(X[labels == label, 0].min()),
        str(X[labels == label, 0].max()),
        str(X[labels == label, 1].min()),
        str(X[labels == label, 1].max()),
        str(X[labels == label, 2].min()),
        str(X[labels == label, 2].max())
    ]) for label in set(labels) if label != -1]
    TESTDATA = StringIO('\n'.join(['cluster,density,xmin,xmax,ymin,ymax,zmin,zmax'] + dfstring))
    df = pd.read_csv(TESTDATA, sep=",")
    returndf = df[(df.density >= 100)]

    if visualize:
        fig = plt.figure(1, figsize=(10, 6))
        ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)

        if colorscheme == 'real':
            ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=inputdf.color.tolist())
        elif colorscheme == 'category':
            ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=labels.astype(np.float))
        else:
            raise Exception("color scheme not supported!")

        for label in set(labels):
            ax.text3D(X[labels == label, 0].mean(),
                      X[labels == label, 1].mean(),
                      X[labels == label, 2].mean(),
                      label,
                      horizontalalignment='center',
                      bbox=dict(alpha=.5, edgecolor='w', facecolor='w'))

        ax.set_xlabel('X')
        ax.set_ylabel('Y')
        ax.set_zlabel('Z')
        plt.show()

    return X, labels, inputdf.color.tolist(), returndf


# This function outputs a image for the clustering results from the top view, the clusters are bounded with rectangles.
def plot_polygons(polygons, X_coor_all, Y_coor_all, colorscheme):
    """

    :param polygons:
    :param X_coor_all:
    :param Y_coor_all:
    :param colorscheme:
    :return:
    """
    fig = pl.figure(figsize=(15, 15))
    ax = fig.add_subplot(111)

    for poly in polygons:
        x_min, y_min, x_max, y_max = poly.bounds
        patch = PolygonPatch(poly.buffer(.1), fc='#999999', ec='#000000', fill=True, zorder=-1)
        ax.add_patch(patch)

    # print len(X_coor_all), len(Y_coor_all), len(colorscheme)

    pl.plot(X_coor_all, Y_coor_all, 'o', color=colorscheme)
    pl.savefig('foo.png')


# This function generates the clusters in 2-dimension, the output of this function is the a dictionary of clusters which represented by 2-dimensional diagonal points (minX, minY, maxX, maxY).
def cluster_bboxes(X, labels, visualize=False, colorscheme='#f16824', img_gen=False):
    """

    :param X:
    :param labels:
    :param visualize:
    :param colorscheme:
    :param img_gen:
    :return:
    """
    selected_clusters = [clust for clust in set(labels) if clust != -1 and len(X[labels == clust]) > 20]
    XnY_List = [X[labels == cluster][:, [0, 1, 2]] for cluster in selected_clusters]

    cluster_geo = [geometry.MultiPoint(list([geometry.shape_revised(point) for point in XnY])) for XnY in XnY_List]

    if visualize:
        plot_polygons([clust.envelope for clust in cluster_geo], X[:, 0], X[:, 1], colorscheme)

    returnDict = {}

    bboxes = [clust.envelope.bounds for clust in cluster_geo]
    for oneclust_index in range(len(selected_clusters)):
        returnDict[selected_clusters[oneclust_index]] = bboxes[oneclust_index]

    return returnDict


# This function generates the clusters in 3-dimension, the output of this function is the a dictionary of clusters which represented by 3-dimensional diagonal points (minX, minY, minZ, maxX, maxY, maxZ).
def cluster_bboxes_3d(X, labels, visualize=False, colorscheme='#f16824', img_gen=False):
    """

    :param X:
    :param labels:
    :param visualize:
    :param colorscheme:
    :param img_gen:
    :return:
    """
    selected_clusters = [clust for clust in set(labels) if clust != -1 and len(X[labels == clust]) > 20]
    XnY_List = [[X[labels == cluster][:, [0, 1]], X[labels == cluster][:, [2]]] for cluster in selected_clusters]

    cluster_geo = [[geometry.MultiPoint(list([geometry.shape_revised(point) for point in XnY[0]])), XnY[1]] for XnY in
                   XnY_List]

    if visualize:
        plot_polygons([clust[0].envelope for clust in cluster_geo], X[:, 0], X[:, 1], colorscheme)

    returnDict = {}

    bboxes = [list(clust[0].envelope.bounds) + [min(clust[1])[0], max(clust[1])[0]] for clust in cluster_geo]
    for oneclust_index in range(len(selected_clusters)):
        returnDict[selected_clusters[oneclust_index]] = bboxes[oneclust_index]

    return returnDict


def b16_b8(targetPath):
    """

    :param targetPath:
    :return:
    """
    tmpdf = dataframe_entrance(targetPath)
    tmpdf = tmpdf.drop('color', axis=1)

    tmptxtpath = '/'.join(targetPath.split('/')[:-1]) + '/' + 'tmp_' + targetPath.split('/')[-1].split('.')[0] + '.txt'
    lasoutput = '/'.join(targetPath.split('/')[:-1]) + '/' + targetPath.split('/')[-1].split('.')[0] + '_8bit' + '.las'

    tmpdf.to_csv(tmptxtpath, header=False, index=False)
    shell_command_list = [
        'txt2las',
        '-i',
        tmptxtpath,
        '-o',
        lasoutput,
        '--parse',
        'xyzRGB'
    ]
    subprocess.call(shell_command_list)
    shell_command_list = [
        'rm',
        tmptxtpath
    ]
    subprocess.call(shell_command_list)

1 Like