我有两个表具有相同的列名,相同的数据,相同的行数但行的顺序可能不同 . 现在我从table_1中选择A列,从table_2中选择A列并比较这些值 . 我如何使用PySpark SQL实现这一点,我可以做sha2 / md5校验和并进行比较吗?
from pyspark import SparkContext, SparkConf
from pyspark.sql import HiveContext
from pyspark.sql.types import *
from pyspark.sql import Row
import pyspark.sql.functions as f
app_name="test"
table1="DB1.department"
table2="DB2.department"
conf = SparkConf().setAppName(app_name)
sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
query1="select * from %s" %(table1)
df1 = sqlContext.sql(query1)
query2="select * from %s" %(table2)
df2 = sqlContext.sql(query2)
df3=sqlContext.sql(SELECT DB1.departmentid FROM DB1.department a FULL JOIN
DB2.department b ON a.departmentid = b.departmentid WHERE a.departmentid
IS NULL OR b.departmentid IS NULL)
df5=sqlContext.sql("select md5(departmentid) from department1")
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/sql/context.py", line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File "/usr/lib/spark/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py", line
813, in __call__
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 51, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u"cannot resolve 'md5(departmentid)'
due to data type mismatch: argument 1 requires binary type, however,
'departmentid'属于bigint类型 . 第1行11“
当尝试使用md5校验和时,它说它期望binarytype但是部门ID是bigint
表格1:
departmentid departmentname departmentaddress
1 A Newyork
2 B Newjersey
3 C SanJose
4 D WashingtonDC
5 E Mexico
6 F Delhi
7 G Pune
8 H chennai
表2:
departmentid departmentname departmentaddress
7 G Pune
8 H chennai
1 A Newyork
2 B Newjersey
3 C SanJose
4 D WashingtonDC
5 E Mexico
6 F Delhi
在表格中,两行的顺序刚刚改变,但数据仍然保持不变,现在技术上这两个表格是相同的 . 除非添加新行或修改值,否则两个表是相同的(表格以示例和解释为准,实际上我们处理Bigdata)
1 回答
最简单的解决方案是:
示例数据:
检查:
使用外连接
你会得到相同的结果:
md5
不适合您,因为它不是聚合函数 . 它计算特定值的校验和 .