[SCALA] dataframe으로 csv 파일을 읽는 동안 스키마를 제공합니다
SCALAdataframe으로 csv 파일을 읽는 동안 스키마를 제공합니다
나는 dataframe에 csv 파일을 읽으려고하고있다. 나는 내 CSV 파일을 알고 있기 때문에 내 dataframe의 스키마가되어야 하는지를 알고있다. 또한 나는 파일을 읽을 스파크 CSV 패키지를 사용하고 있습니다. 나는 다음과 같은 스키마를 지정하려고합니다.
val pagecount = sqlContext.read.format("csv")
.option("delimiter"," ").option("quote","")
.option("schema","project: string ,article: string ,requests: integer ,bytes_served: long")
.load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
내가 만든 데이터 프레임의 스키마를 검사 할 때, 자신의 스키마를 촬영 한 것으로 보인다. 나는 아무 잘못을하고 있습니까? 어떻게 언급 스키마를 데리러 불꽃을 만드는 방법?
> pagecount.printSchema
root
|-- _c0: string (nullable = true)
|-- _c1: string (nullable = true)
|-- _c2: string (nullable = true)
|-- _c3: string (nullable = true)
해결법
-
==============================
1.스키마를 지정하지 않아도, 아래의 코드를 사용해보십시오. 당신이 참으로 inferSchema을 줄 때 그것은 당신의 CSV 파일에서 소요됩니다.
스키마를 지정하지 않아도, 아래의 코드를 사용해보십시오. 당신이 참으로 inferSchema을 줄 때 그것은 당신의 CSV 파일에서 소요됩니다.
val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("header", "true") .option("inferSchema", "true") .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
수동으로 스키마를 지정하려면 다음과 같이 그것을 할 수 있습니다 :
import org.apache.spark.sql.types._ val customSchema = StructType(Array( StructField("project", StringType, true), StructField("article", StringType, true), StructField("requests", IntegerType, true), StructField("bytes_served", DoubleType, true)) ) val pagecount = sqlContext.read.format("csv") .option("delimiter"," ").option("quote","") .option("header", "true") .schema(customSchema) .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
-
==============================
2.@Nulu로 대답 덕분에, 그것은 최소한의 조정과 pyspark 작동
@Nulu로 대답 덕분에, 그것은 최소한의 조정과 pyspark 작동
from pyspark.sql.types import LongType, StringType, StructField, StructType, BooleanType, ArrayType, IntegerType customSchema = StructType(Array( StructField("project", StringType, true), StructField("article", StringType, true), StructField("requests", IntegerType, true), StructField("bytes_served", DoubleType, true))) pagecount = sc.read.format("com.databricks.spark.csv") .option("delimiter"," ") .option("quote","") .option("header", "false") .schema(customSchema) .load("dbfs:/databricks-datasets/wikipedia-datasets/data-001/pagecounts/sample/pagecounts-20151124-170000")
-
==============================
3.내 분석에 Arunakiran Nulu가 제공하는 솔루션을 (코드 참조)를 사용하고 있습니다. 이 컬럼에 올바른 유형을 할당 할 수에도 불구하고, 반환 된 모든 값은 null입니다. 이전에, 나는 옵션 .option ( "inferSchema", "진정한")로 시도하고 그것은 (다른 유형 있지만)을 dataframe에 올바른 값을 반환합니다.
내 분석에 Arunakiran Nulu가 제공하는 솔루션을 (코드 참조)를 사용하고 있습니다. 이 컬럼에 올바른 유형을 할당 할 수에도 불구하고, 반환 된 모든 값은 null입니다. 이전에, 나는 옵션 .option ( "inferSchema", "진정한")로 시도하고 그것은 (다른 유형 있지만)을 dataframe에 올바른 값을 반환합니다.
val customSchema = StructType(Array( StructField("numicu", StringType, true), StructField("fecha_solicitud", TimestampType, true), StructField("codtecnica", StringType, true), StructField("tecnica", StringType, true), StructField("finexploracion", TimestampType, true), StructField("ultimavalidacioninforme", TimestampType, true), StructField("validador", StringType, true))) val df_explo = spark.read .format("csv") .option("header", "true") .option("delimiter", "\t") .option("timestampFormat", "yyyy/MM/dd HH:mm:ss") .schema(customSchema) .load(filename)
결과
root |-- numicu: string (nullable = true) |-- fecha_solicitud: timestamp (nullable = true) |-- codtecnica: string (nullable = true) |-- tecnica: string (nullable = true) |-- finexploracion: timestamp (nullable = true) |-- ultimavalidacioninforme: timestamp (nullable = true) |-- validador: string (nullable = true)
테이블은 다음과 같습니다
|numicu|fecha_solicitud|codtecnica|tecnica|finexploracion|ultimavalidacioninforme|validador| +------+---------------+----------+-------+--------------+-----------------------+---------+ | null| null| null| null| null| null| null| | null| null| null| null| null| null| null| | null| null| null| null| null| null| null| | null| null| null| null| null| null| null|
-
==============================
4.여기 파이썬에서이 일에 관심이있는 사람들을 위해 작업 버전입니다.
여기 파이썬에서이 일에 관심이있는 사람들을 위해 작업 버전입니다.
customSchema = StructType([ StructField("IDGC", StringType(), True), StructField("SEARCHNAME", StringType(), True), StructField("PRICE", DoubleType(), True) ]) productDF = spark.read.load('/home/ForTesting/testProduct.csv', format="csv", header="true", sep='|', schema=customSchema) testProduct.csv ID|SEARCHNAME|PRICE 6607|EFKTON75LIN|890.88 6612|EFKTON100HEN|55.66
도움이 되었기를 바랍니다.
-
==============================
5.다음은 사용자 정의 스키마, 완전한 데모로 작업하는 방법은 다음과 같습니다
다음은 사용자 정의 스키마, 완전한 데모로 작업하는 방법은 다음과 같습니다
$> 쉘 코드,
echo " Slingo, iOS Slingo, Android " > game.csv
스칼라 코드 :
import org.apache.spark.sql.types._ val customSchema = StructType(Array( StructField("game_id", StringType, true), StructField("os_id", StringType, true) )) val csv_df = spark.read.format("csv").schema(customSchema).load("game.csv") csv_df.show csv_df.orderBy(asc("game_id"), desc("os_id")).show csv_df.createOrReplaceTempView("game_view") val sort_df = sql("select * from game_view order by game_id, os_id desc") sort_df.show
-
==============================
6.이 CSV를로드하는 동안 우리는 dataframe에 열 이름을 전달할 수있는 옵션 중 하나입니다.
이 CSV를로드하는 동안 우리는 dataframe에 열 이름을 전달할 수있는 옵션 중 하나입니다.
import pandas names = ['sepal-length', 'sepal-width', 'petal-length', 'petal-width', 'class'] dataset = pandas.read_csv("C:/Users/NS00606317/Downloads/Iris.csv", names=names, header=0) print(dataset.head(10))
산출
sepal-length sepal-width petal-length petal-width class 1 5.1 3.5 1.4 0.2 Iris-setosa 2 4.9 3.0 1.4 0.2 Iris-setosa 3 4.7 3.2 1.3 0.2 Iris-setosa 4 4.6 3.1 1.5 0.2 Iris-setosa 5 5.0 3.6 1.4 0.2 Iris-setosa 6 5.4 3.9 1.7 0.4 Iris-setosa 7 4.6 3.4 1.4 0.3 Iris-setosa 8 5.0 3.4 1.5 0.2 Iris-setosa 9 4.4 2.9 1.4 0.2 Iris-setosa 10 4.9 3.1 1.5 0.1 Iris-setosa
-
==============================
7.
// import Library import java.io.StringReader ; import au.com.bytecode.opencsv.CSVReader //filename var train_csv = "/Path/train.csv"; //read as text file val train_rdd = sc.textFile(train_csv) //use string reader to convert in proper format var full_train_data = train_rdd.map{line => var csvReader = new CSVReader(new StringReader(line)) ; csvReader.readNext(); } //declares types type s = String // declare case class for schema case class trainSchema (Loan_ID :s ,Gender :s, Married :s, Dependents :s,Education :s,Self_Employed :s,ApplicantIncome :s,CoapplicantIncome :s, LoanAmount :s,Loan_Amount_Term :s, Credit_History :s, Property_Area :s,Loan_Status :s) //create DF RDD with custom schema var full_train_data_with_schema = full_train_data.mapPartitionsWithIndex{(idx,itr)=> if (idx==0) itr.drop(1); itr.toList.map(x=> trainSchema(x(0),x(1),x(2),x(3),x(4),x(5),x(6),x(7),x(8),x(9),x(10),x(11),x(12))).iterator }.toDF
-
==============================
8.이런 경우에 어떤 사람은 날짜와 시간 스탬프 간단한 문자열로 스키마 정의에 관심이있는 경우
이런 경우에 어떤 사람은 날짜와 시간 스탬프 간단한 문자열로 스키마 정의에 관심이있는 경우
echo " 2019-07-02 22:11:11.000999, 01/01/2019, Suresh, abc 2019-01-02 22:11:11.000001, 01/01/2020, Aadi, xyz " > data.csv
user_schema = 'timesta TIMESTAMP,date DATE,first_name STRING , last_name STRING'
df = spark.read.csv(path='data.csv', schema = user_schema, sep=',', dateFormat='MM/dd/yyyy',timestampFormat='yyyy-MM-dd HH:mm:ss.SSSSSS') df.show(10, False) +-----------------------+----------+----------+---------+ |timesta |date |first_name|last_name| +-----------------------+----------+----------+---------+ |2019-07-02 22:11:11.999|2019-01-01| Suresh | abc | |2019-01-02 22:11:11.001|2020-01-01| Aadi | xyz | +-----------------------+----------+----------+---------+
-
==============================
9.이후 pyspark 2.4에서, 당신은 단순히 올바른 헤더를 설정 헤더 매개 변수를 사용할 수 있습니다 :
이후 pyspark 2.4에서, 당신은 단순히 올바른 헤더를 설정 헤더 매개 변수를 사용할 수 있습니다 :
data = spark.read.csv('data.csv', header=True)
스칼라를 사용하는 경우 마찬가지로, 당신은뿐만 아니라 헤더 매개 변수를 사용할 수 있습니다.
-
==============================
10.여기 내 솔루션입니다 :
여기 내 솔루션입니다 :
import org.apache.spark.sql.types._ val spark = org.apache.spark.sql.SparkSession.builder. master("local[*]"). appName("Spark CSV Reader"). getOrCreate() val movie_rating_schema = StructType(Array( StructField("UserID", IntegerType, true), StructField("MovieID", IntegerType, true), StructField("Rating", DoubleType, true), StructField("Timestamp", TimestampType, true))) val df_ratings: DataFrame = spark.read.format("csv"). option("header", "true"). option("mode", "DROPMALFORMED"). option("delimiter", ","). //option("inferSchema", "true"). option("nullValue", "null"). schema(movie_rating_schema). load(args(0)) //"file:///home/hadoop/spark-workspace/data/ml-20m/ratings.csv" val movie_avg_scores = df_ratings.rdd.map(_.toString()). map(line => { // drop "[", "]" and then split the str val fileds = line.substring(1, line.length() - 1).split(",") //extract (movie id, average rating) (fileds(1).toInt, fileds(2).toDouble) }). groupByKey(). map(data => { val avg: Double = data._2.sum / data._2.size (data._1, avg) })
from https://stackoverflow.com/questions/39926411/provide-schema-while-reading-csv-file-as-a-dataframe by cc-by-sa and MIT license
'SCALA' 카테고리의 다른 글
[SCALA] 어떻게 만들고 스칼라에서 다차원 배열을 사용 하는가? (0) | 2019.11.05 |
---|---|
[SCALA] 스파크 : 조건부 dataframe에 열 추가 (0) | 2019.11.05 |
[SCALA] 때 등호 스칼라 방법 선언에 서명을 사용 하는가? (0) | 2019.11.05 |
[SCALA] 하나를 사용하여 스칼라 코드에서 오류를 처리하는 (0) | 2019.11.05 |
[SCALA] 스칼라 함수를 정의하는 두 가지 방법. 차이점은 무엇입니까? (0) | 2019.11.05 |