# inverse conversion codes
from pyproj import Proj, transform
def inverse_conv(x, y, target_proj="epsg:26985"):
"""
:param x: Longitude
:param y: Latitude
:return: X and Y coordinates for the given projection system
"""
inProj = Proj(init='epsg:4326')
outProj = Proj(init=target_proj)
x1, y1 = x, y
x2, y2 = transform(inProj, outProj, x1, y1)
return x2, y2
Please note that the transform
function is currently outdated. Please refer to this link for future usage: Gotchas/FAQ — pyproj 3.3.0 documentation.
Here are some operations for .las file point extraction:
import numpy as np
import random
import laspy
import pandas as pd
from sklearn.cluster import KMeans, DBSCAN
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D
from StringIO import StringIO
import subprocess
import shapely.geometry as geometry
from descartes import PolygonPatch
import pylab as pl
# Convert las into 256 colored dataframe
def dataframe_entrance(laslocation):
"""
:type laslocation: object
"""
original_color16 = load_las2df(laslocation)
added256 = las_dataframe_color_conversion(original_color16)
return las_dataframe_add_hexcolor(added256.drop(added256.columns[[3, 4, 5]], axis=1))
# Input las file location
def load_las2df(inputpath):
"""
:param inputpath:
:return:
"""
inFile = laspy.file.File(inputpath, mode="r")
pdser_x = pd.Series(scaled_dimension(inFile, 0), name='X')
pdser_y = pd.Series(scaled_dimension(inFile, 1), name='Y')
pdser_z = pd.Series(scaled_dimension(inFile, 2), name='Z')
pdser_r16 = pd.Series(inFile.red, name='R16')
pdser_g16 = pd.Series(inFile.green, name='G16')
pdser_b16 = pd.Series(inFile.blue, name='B16')
return pd.DataFrame([pdser_x, pdser_y, pdser_z, pdser_r16, pdser_g16, pdser_b16]).transpose()
# This function returns actual projection values of the las file
def scaled_dimension(las_file, axis):
"""
:param las_file:
:param axis:
:return:
"""
if axis == 0:
dimension = las_file.X
elif axis == 1:
dimension = las_file.Y
elif axis == 2:
dimension = las_file.Z
else:
print 'axis input error!'
return None
scale = las_file.header.scale[axis]
offset = las_file.header.offset[axis]
return (dimension * scale) + offset
# Input: pandas dataframe with ['X', 'Y', 'Z', 'R_256', 'G_256', 'B_256'] as columns
def las_dataframe_add_hexcolor(las_df):
"""
:param las_df:
:return:
"""
one_seri = las_df.R_256
r16 = lambda one_seri: '#' + "{:02x}".format(one_seri)
rslt_R = one_seri.apply(r16)
one_seri = las_df.G_256
g16 = lambda one_seri: "{:02x}".format(one_seri)
rslt_G = one_seri.apply(g16)
one_seri = las_df.B_256
b16 = lambda one_seri: "{:02x}".format(one_seri)
rslt_B = one_seri.apply(b16)
las_df['color'] = rslt_R + rslt_G + rslt_B
return las_df
# Input: pandas dataframe with ['X', 'Y', 'Z', 'R16', 'G16', 'B16'] as columns
def las_dataframe_color_conversion(las_df):
"""
:param las_df:
:return:
"""
las_df['R_256'] = (((las_df.R16 + 1) / 256) - 1).astype(int)
las_df['G_256'] = (((las_df.G16 + 1) / 256) - 1).astype(int)
las_df['B_256'] = (((las_df.B16 + 1) / 256) - 1).astype(int)
return las_df
# Sample Dataframes
def dataframe_sampling(las_df, percentage=0.3):
"""
:param las_df:
:param percentage:
:return:
"""
sample_counts = int(percentage * len(las_df)) - 1
rows = random.sample(las_df.index, sample_counts)
return las_df.ix[rows]
# Side function, for generating color cluster mean and std
def color_kmeans_cluster_gen(input_df, num_of_clusters=2, visualize=False, colorscheme='real'):
"""
:param input_df:
:param num_of_clusters:
:param visualize:
:param colorscheme:
:return:
"""
new_df = input_df[['R_256', 'G_256', 'B_256']]
X = new_df.as_matrix()
estimator = KMeans(n_clusters=num_of_clusters)
estimator.fit(X)
labels = estimator.labels_
means = []
stds = []
output_stat = []
for label in range(num_of_clusters):
tmpcolorDict = {}
means.append(["{0:.0f}".format(X[labels == label, 0].mean()), "{0:.0f}".format(X[labels == label, 1].mean()),
"{0:.0f}".format(X[labels == label, 2].mean())])
stds.append(["{0:.4f}".format(X[labels == label, 0].std()), "{0:.4f}".format(X[labels == label, 1].std()),
"{0:.4f}".format(X[labels == label, 2].std())])
tmpcolorDict['color_mean'] = ["{0:.0f}".format(X[labels == label, 0].mean()),
"{0:.0f}".format(X[labels == label, 1].mean()),
"{0:.0f}".format(X[labels == label, 2].mean())]
tmpcolorDict['color_std'] = ["{0:.4f}".format(X[labels == label, 0].std()),
"{0:.4f}".format(X[labels == label, 1].std()),
"{0:.4f}".format(X[labels == label, 2].std())]
output_stat.append(tmpcolorDict)
if visualize:
fig = plt.figure(1, figsize=(10, 6))
# plt.clf()
ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)
# plt.cla()
if visualize:
fig = plt.figure(1, figsize=(10, 6))
ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)
if colorscheme == 'real':
ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=input_df.color.tolist())
elif colorscheme == 'category':
ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=labels.astype(np.float))
else:
raise Exception("color scheme not supported!")
for label in range(num_of_clusters):
ax.text3D(X[labels == label, 0].mean(),
X[labels == label, 1].mean(),
X[labels == label, 2].mean(),
', '.join([str(c) for c in means[label]]),
horizontalalignment='center',
bbox=dict(alpha=.5, edgecolor='w', facecolor='w'))
ax.set_xlabel('Red Channel')
ax.set_ylabel('Green Channel')
ax.set_zlabel('Blue Channel')
plt.show()
return output_stat
# Input las file dataframe, color scheme dictionary and times to the standard deviation
def filter_color_base(inputdf, RGB_colors, std_times=4, visualize=False):
"""
:param inputdf:
:param RGB_colors:
:param std_times:
:param visualize:
:return:
"""
df_list = []
for one_color in RGB_colors:
df_list.append(inputdf[(inputdf.R_256 < int(one_color['color_mean'][0]) + (
std_times * float(one_color['color_std'][0]))) & (int(one_color['color_mean'][0]) - (
std_times * float(one_color['color_std'][0])) < inputdf.R_256) & (
inputdf.G_256 < int(one_color['color_mean'][1]) + (
std_times * float(one_color['color_std'][1]))) & (
int(one_color['color_mean'][1]) - (
std_times * float(one_color['color_std'][1])) < inputdf.G_256) & (
inputdf.B_256 < int(one_color['color_mean'][2]) + (
std_times * float(one_color['color_std'][2]))) & (
int(one_color['color_mean'][2]) - (
std_times * float(one_color['color_std'][2])) < inputdf.B_256)])
light_colors = pd.concat(df_list)
if visualize:
fig = plt.figure()
ax = fig.add_subplot(111, projection='3d')
x = light_colors.X.tolist()
y = light_colors.Y.tolist()
z = light_colors.Z.tolist()
color = light_colors.color.tolist()
ax.scatter(x, y, z, c=color, marker='o')
ax.set_xlabel('X Direction')
ax.set_ylabel('Y Direction')
ax.set_zlabel('Altitude')
plt.show()
return light_colors
# perform spatial clustering on the dataframe points
def spatial_clustering_dbscan(inputdf, epsilon=0.3, visualize=False, colorscheme='real'):
"""
:param inputdf:
:param epsilon:
:param visualize:
:param colorscheme:
:return:
"""
positions = inputdf[['X', 'Y', 'Z']]
X = positions.as_matrix()
estimator = DBSCAN(eps=epsilon)
estimator.fit(X)
labels = estimator.labels_
# for label in set(labels):
# print [X[labels == label, 0].mean(), X[labels == label, 1].mean(), X[labels == label, 2].mean()]
if visualize:
fig = plt.figure(1, figsize=(10, 6))
ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)
if colorscheme == 'real':
ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=inputdf.color.tolist())
elif colorscheme == 'category':
ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=labels.astype(np.float))
else:
raise Exception("color scheme not supported!")
for label in set(labels):
ax.text3D(X[labels == label, 0].mean(),
X[labels == label, 1].mean(),
X[labels == label, 2].mean(),
label,
horizontalalignment='center',
bbox=dict(alpha=.5, edgecolor='w', facecolor='w'))
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.show()
return X, labels, inputdf.color.tolist()
# perform spatial clustering on the dataframe points
def spatial_clustering_dbscan_means(inputdf, epsilon=0.3, visualize=False, colorscheme='real'):
"""
:param inputdf:
:param epsilon:
:param visualize:
:param colorscheme:
:return:
"""
positions = inputdf[['X', 'Y', 'Z']]
X = positions.as_matrix()
estimator = DBSCAN(eps=epsilon)
estimator.fit(X)
labels = estimator.labels_
dfstring = [','.join([
str(label),
str(len(X[labels == label]) / (X[labels == label, 0].max() - X[labels == label, 0].min()) * (
X[labels == label, 1].max() - X[labels == label, 1].min()) * (
X[labels == label, 2].max() - X[labels == label, 2].min())),
str(X[labels == label, 0].min()),
str(X[labels == label, 0].max()),
str(X[labels == label, 1].min()),
str(X[labels == label, 1].max()),
str(X[labels == label, 2].min()),
str(X[labels == label, 2].max())
]) for label in set(labels) if label != -1]
TESTDATA = StringIO('\n'.join(['cluster,density,xmin,xmax,ymin,ymax,zmin,zmax'] + dfstring))
df = pd.read_csv(TESTDATA, sep=",")
returndf = df[(df.density >= 100)]
if visualize:
fig = plt.figure(1, figsize=(10, 6))
ax = Axes3D(fig, rect=[0, 0, .95, 1], elev=48, azim=134)
if colorscheme == 'real':
ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=inputdf.color.tolist())
elif colorscheme == 'category':
ax.scatter(X[:, 0], X[:, 1], X[:, 2], c=labels.astype(np.float))
else:
raise Exception("color scheme not supported!")
for label in set(labels):
ax.text3D(X[labels == label, 0].mean(),
X[labels == label, 1].mean(),
X[labels == label, 2].mean(),
label,
horizontalalignment='center',
bbox=dict(alpha=.5, edgecolor='w', facecolor='w'))
ax.set_xlabel('X')
ax.set_ylabel('Y')
ax.set_zlabel('Z')
plt.show()
return X, labels, inputdf.color.tolist(), returndf
# This function outputs a image for the clustering results from the top view, the clusters are bounded with rectangles.
def plot_polygons(polygons, X_coor_all, Y_coor_all, colorscheme):
"""
:param polygons:
:param X_coor_all:
:param Y_coor_all:
:param colorscheme:
:return:
"""
fig = pl.figure(figsize=(15, 15))
ax = fig.add_subplot(111)
for poly in polygons:
x_min, y_min, x_max, y_max = poly.bounds
patch = PolygonPatch(poly.buffer(.1), fc='#999999', ec='#000000', fill=True, zorder=-1)
ax.add_patch(patch)
# print len(X_coor_all), len(Y_coor_all), len(colorscheme)
pl.plot(X_coor_all, Y_coor_all, 'o', color=colorscheme)
pl.savefig('foo.png')
# This function generates the clusters in 2-dimension, the output of this function is the a dictionary of clusters which represented by 2-dimensional diagonal points (minX, minY, maxX, maxY).
def cluster_bboxes(X, labels, visualize=False, colorscheme='#f16824', img_gen=False):
"""
:param X:
:param labels:
:param visualize:
:param colorscheme:
:param img_gen:
:return:
"""
selected_clusters = [clust for clust in set(labels) if clust != -1 and len(X[labels == clust]) > 20]
XnY_List = [X[labels == cluster][:, [0, 1, 2]] for cluster in selected_clusters]
cluster_geo = [geometry.MultiPoint(list([geometry.shape_revised(point) for point in XnY])) for XnY in XnY_List]
if visualize:
plot_polygons([clust.envelope for clust in cluster_geo], X[:, 0], X[:, 1], colorscheme)
returnDict = {}
bboxes = [clust.envelope.bounds for clust in cluster_geo]
for oneclust_index in range(len(selected_clusters)):
returnDict[selected_clusters[oneclust_index]] = bboxes[oneclust_index]
return returnDict
# This function generates the clusters in 3-dimension, the output of this function is the a dictionary of clusters which represented by 3-dimensional diagonal points (minX, minY, minZ, maxX, maxY, maxZ).
def cluster_bboxes_3d(X, labels, visualize=False, colorscheme='#f16824', img_gen=False):
"""
:param X:
:param labels:
:param visualize:
:param colorscheme:
:param img_gen:
:return:
"""
selected_clusters = [clust for clust in set(labels) if clust != -1 and len(X[labels == clust]) > 20]
XnY_List = [[X[labels == cluster][:, [0, 1]], X[labels == cluster][:, [2]]] for cluster in selected_clusters]
cluster_geo = [[geometry.MultiPoint(list([geometry.shape_revised(point) for point in XnY[0]])), XnY[1]] for XnY in
XnY_List]
if visualize:
plot_polygons([clust[0].envelope for clust in cluster_geo], X[:, 0], X[:, 1], colorscheme)
returnDict = {}
bboxes = [list(clust[0].envelope.bounds) + [min(clust[1])[0], max(clust[1])[0]] for clust in cluster_geo]
for oneclust_index in range(len(selected_clusters)):
returnDict[selected_clusters[oneclust_index]] = bboxes[oneclust_index]
return returnDict
def b16_b8(targetPath):
"""
:param targetPath:
:return:
"""
tmpdf = dataframe_entrance(targetPath)
tmpdf = tmpdf.drop('color', axis=1)
tmptxtpath = '/'.join(targetPath.split('/')[:-1]) + '/' + 'tmp_' + targetPath.split('/')[-1].split('.')[0] + '.txt'
lasoutput = '/'.join(targetPath.split('/')[:-1]) + '/' + targetPath.split('/')[-1].split('.')[0] + '_8bit' + '.las'
tmpdf.to_csv(tmptxtpath, header=False, index=False)
shell_command_list = [
'txt2las',
'-i',
tmptxtpath,
'-o',
lasoutput,
'--parse',
'xyzRGB'
]
subprocess.call(shell_command_list)
shell_command_list = [
'rm',
tmptxtpath
]
subprocess.call(shell_command_list)