mirror of
				https://github.com/datahub-project/datahub.git
				synced 2025-11-03 20:27:50 +00:00 
			
		
		
		
	
		
			
	
	
		
			56 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			56 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| 
								 | 
							
								from pyspark.sql import SparkSession
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								TEST_CASE_NAME = "PythonHiveInHiveOut"
							 | 
						||
| 
								 | 
							
								DATA_DIR = "../resources/data"
							 | 
						||
| 
								 | 
							
								TEST_DB = TEST_CASE_NAME
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								spark = SparkSession \
							 | 
						||
| 
								 | 
							
								    .builder \
							 | 
						||
| 
								 | 
							
								    .appName(TEST_CASE_NAME) \
							 | 
						||
| 
								 | 
							
								    .enableHiveSupport() \
							 | 
						||
| 
								 | 
							
								    .getOrCreate()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								def tbl(tbl_name):
							 | 
						||
| 
								 | 
							
								  return TEST_DB + "." + tbl_name
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								spark.sql("DROP DATABASE IF EXISTS " + TEST_DB + " CASCADE")  
							 | 
						||
| 
								 | 
							
								spark.sql("CREATE DATABASE IF NOT EXISTS " + TEST_DB)
							 | 
						||
| 
								 | 
							
								spark.sql("DROP TABLE IF EXISTS " +tbl("hivetab"))
							 | 
						||
| 
								 | 
							
								spark.sql("DROP TABLE IF EXISTS " +tbl("foo5"))
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								df1 = spark.read \
							 | 
						||
| 
								 | 
							
								        .option("header", "true") \
							 | 
						||
| 
								 | 
							
								        .csv(DATA_DIR + "/in1.csv") \
							 | 
						||
| 
								 | 
							
								        .withColumnRenamed("c1", "a") \
							 | 
						||
| 
								 | 
							
								        .withColumnRenamed("c2", "b")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								df2 = spark.read \
							 | 
						||
| 
								 | 
							
								        .option("header", "true") \
							 | 
						||
| 
								 | 
							
								        .csv(DATA_DIR + "/in2.csv") \
							 | 
						||
| 
								 | 
							
								        .withColumnRenamed("c1", "c") \
							 | 
						||
| 
								 | 
							
								        .withColumnRenamed("c2", "d")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								df1.createOrReplaceTempView("v1")
							 | 
						||
| 
								 | 
							
								df2.createOrReplaceTempView("v2")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# CreateHiveTableAsSelectCommand
							 | 
						||
| 
								 | 
							
								spark.sql(
							 | 
						||
| 
								 | 
							
								        "create table " + tbl("foo5") + " as " + "(select v1.a, v1.b, v2.c, v2.d from v1 join v2 on v1.id = v2.id)")
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# CreateHiveTableAsSelectCommand
							 | 
						||
| 
								 | 
							
								spark.sql("create table " + tbl("hivetab") + " as " + "(select * from " + tbl("foo5") + ")");
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# InsertIntoHiveTable
							 | 
						||
| 
								 | 
							
								spark.sql("insert into " + tbl("hivetab") + " (select * from " + tbl("foo5") + ")");
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								df = spark.sql("select * from " + tbl("foo5"));
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								# InsertIntoHiveTable
							 | 
						||
| 
								 | 
							
								df.write.insertInto(tbl("hivetab"));
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								spark.stop()
							 | 
						||
| 
								 | 
							
								
							 | 
						||
| 
								 | 
							
								
							 |