PySpark를 사용하여 CSV 파일 로드
스파크가 처음이고 스파크가 있는 파일에서 CSV 데이터를 읽으려고 합니다.제가 하고 있는 일은 다음과 같습니다.
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
이 전화를 통해 파일의 첫 번째 두 열 목록이 표시될 것으로 예상되지만 다음 오류가 발생합니다.
파일", 행 1, IndexError: list index out of range
CSV 파일이 두 개 이상의 열로 표시됩니다.
스파크 2.0.0+
내장된 CSV 데이터 소스를 직접 사용할 수 있습니다.
spark.read.csv(
"some_input_file.csv",
header=True,
mode="DROPMALFORMED",
schema=schema
)
또는
(
spark.read
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.csv("some_input_file.csv")
)
외부 의존성을 포함하지 않고.
스파크 < 2.0.0:
일반적인 경우에는 사소한 것과는 거리가 먼 수동 구문 분석 대신 다음을 권장합니다.
스파크 CSV가 경로에 포함되어 있는지 확인합니다(--packages
,--jars
,--driver-class-path
)
다음과 같이 데이터를 로드합니다.
df = (
sqlContext
.read.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferschema", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
로드, 스키마 추론, 잘못된 형식의 행 삭제를 처리할 수 있으며 Python에서 JVM으로 데이터를 전달할 필요가 없습니다.
참고:
스키마를 알고 있다면 스키마 추론을 피하고 전달하는 것이 좋습니다.DataFrameReader
세 개의 열(정수, 이중 및 문자열)이 있다고 가정합니다.
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([
StructField("A", IntegerType()),
StructField("B", DoubleType()),
StructField("C", StringType())
])
(
sqlContext
.read
.format("com.databricks.spark.csv")
.schema(schema)
.option("header", "true")
.option("mode", "DROPMALFORMED")
.load("some_input_file.csv")
)
모든 라인에 두 개 이상의 열이 있어야 합니까?확인을 위해 다음과 같은 것을 시도해 볼 수 있습니까?:
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)>1) \
.map(lambda line: (line[0],line[1])) \
.collect()
또는 범인을 인쇄할 수 있습니다(있는 경우).
sc.textFile("file.csv") \
.map(lambda line: line.split(",")) \
.filter(lambda line: len(line)<=1) \
.collect()
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")
print(df.collect())
또한 Pandas를 사용하여 CSV 파일을 읽은 다음 Pandas DataFrame을 Spark로 가져오는 옵션도 있습니다.
예:
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
pandas_df = pd.read_csv('file.csv') # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
쉼표로 나누기만 하면 필드 내에 있는 쉼표도 분할됩니다(예:a,b,"1,2,3",c
), 따라서 권장되지 않습니다. DataFrames API를 사용하려면 0323의 답변이 좋지만, 기본 Spark를 고수하려면 csv 모듈을 사용하여 기본 Python의 csvs를 구문 분석할 수 있습니다.
# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
편집: @muon이 코멘트에서 언급했듯이, 이것은 헤더를 다른 행처럼 처리하므로 수동으로 추출해야 합니다.예를들면,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(수정하지 마십시오.header
필터가 평가되기 전에).그러나 이 시점에서는 기본 제공 CSV 파서를 사용하는 것이 더 나을 수 있습니다.
여기는 PYSPARK입니다.
path="Your file path with file name"
df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)
그러면 확인할 수 있습니다.
df.show(5)
df.count()
csv를 데이터 프레임으로 로드하려는 경우 다음을 수행할 수 있습니다.
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format('com.databricks.spark.csv') \
.options(header='true', inferschema='true') \
.load('sampleFile.csv') # this is your csv file
저한테는 잘 됐어요.
이것은 JP Mercier가 Panda를 사용하는 것에 대해 처음에 제안한 것과 일치하지만, 주요 수정 사항은 다음과 같습니다.만약 여러분이 판다에게 데이터를 덩어리로 읽어준다면, 그것은 더 유연해질 것입니다.즉, 판다가 실제로 처리할 수 있는 것보다 훨씬 큰 파일을 구문 분석하여 더 작은 크기로 스파크에 전달할 수 있습니다. (이것은 또한 사람들이 모든 것을 판다에 로드할 수 있다면 스파크를 사용하고 싶어하는 이유에 대한 의견에 대한 답변이기도 합니다.)
from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd
sc = SparkContext('local','example') # if using locally
sql_sc = SQLContext(sc)
Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)
for chunky in chunk_100k:
Spark_Full += sc.parallelize(chunky.values.tolist())
YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
일반 csv 파일에는 다음과 같은 https://github.com/seahboonsiew/pyspark-csv 옵션도 있습니다.
다음과 같은 맥락이 있다고 가정합니다.
sc = SparkContext
sqlCtx = SQLContext or HiveContext
먼저 SparkContext를 사용하여 실행자에게 pyspark-csv.py 를 배포합니다.
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
SparkContext를 통해 CSV 데이터 읽기 및 DataFrame으로 변환
plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
줄 이 없을 CSV " " " " " " " " " " " " " " 를 사용하여 할 수 .textFile()
그리고 그것을 구문 분석합니다.
import csv
import StringIO
def loadRecord(line):
input = StringIO.StringIO(line)
reader = csv.DictReader(input, fieldnames=["name1", "name2"])
return reader.next()
input = sc.textFile(inputFile).map(loadRecord)
데이터 집합에 열 수가 2개 미만인 행이 하나 이상 있는 경우 이 오류가 발생할 수 있습니다.
저도 Pyspark가 처음이라 CSV 파일을 읽으려고 합니다.다음 코드가 작동했습니다.
내가 사용하는 이 코드에서 kaggle의 데이터 세트 링크는 https://www.kaggle.com/carrie1/ecommerce-data 입니다.
스키마를 언급하지 않고 다음을 수행합니다.
from pyspark.sql import SparkSession
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()
이제 열: sdfData.column을 확인합니다.
출력은 다음과 같습니다.
['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']
각 열의 데이터 유형을 확인합니다.
sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))
그러면 데이터 유형이 StringType인 모든 열이 포함된 데이터 프레임이 제공됩니다.
스키마 포함:스키마를 알고 있거나 위 표에 있는 열의 데이터 유형을 변경하려는 경우 이를 사용합니다(예: 다음 열이 있고 각 열에 대해 특정 데이터 유형으로 사용하기를 원한다고 가정합니다.
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
schema = StructType([\
StructField("InvoiceNo", IntegerType()),\
StructField("StockCode", StringType()), \
StructField("Description", StringType()),\
StructField("Quantity", IntegerType()),\
StructField("InvoiceDate", StringType()),\
StructField("CustomerID", DoubleType()),\
StructField("Country", StringType())\
])
scSpark = SparkSession \
.builder \
.appName("Python Spark SQL example: Reading CSV file with schema") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()
sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)
이제 스키마에서 각 열의 데이터 유형을 확인합니다.
sdfData.schema
StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))
편집됨: 스키마를 명시적으로 언급하지 않고도 다음 줄의 코드를 사용할 수 있습니다.
sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema
출력은 다음과 같습니다.
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))
출력은 다음과 같습니다.
sdfData.show()
+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
| 536365| 85123A|WHITE HANGING HEA...| 6|12/1/2010 8:26| 2.55| 17850|
| 536365| 71053| WHITE METAL LANTERN| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84406B|CREAM CUPID HEART...| 8|12/1/2010 8:26| 2.75| 17850|
| 536365| 84029G|KNITTED UNION FLA...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|12/1/2010 8:26| 3.39| 17850|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|12/1/2010 8:26| 7.65| 17850|
| 536365| 21730|GLASS STAR FROSTE...| 6|12/1/2010 8:26| 4.25| 17850|
| 536366| 22633|HAND WARMER UNION...| 6|12/1/2010 8:28| 1.85| 17850|
| 536366| 22632|HAND WARMER RED P...| 6|12/1/2010 8:28| 1.85| 17850|
| 536367| 84879|ASSORTED COLOUR B...| 32|12/1/2010 8:34| 1.69| 13047|
| 536367| 22745|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22748|POPPY'S PLAYHOUSE...| 6|12/1/2010 8:34| 2.1| 13047|
| 536367| 22749|FELTCRAFT PRINCES...| 8|12/1/2010 8:34| 3.75| 13047|
| 536367| 22310|IVORY KNITTED MUG...| 6|12/1/2010 8:34| 1.65| 13047|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|12/1/2010 8:34| 4.25| 13047|
| 536367| 22623|BOX OF VINTAGE JI...| 3|12/1/2010 8:34| 4.95| 13047|
| 536367| 22622|BOX OF VINTAGE AL...| 2|12/1/2010 8:34| 9.95| 13047|
| 536367| 21754|HOME BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95| 13047|
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95| 13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows
을 할 때spark.read.csv
는 옵션을 는 나 옵 하 것 이 는escape='"'
그리고.multiLine=True
CSV 표준에 대한 가장 일관된 솔루션을 제공하며, Google Sheets에서 내보낸 CSV 파일에서 가장 잘 작동합니다.
그것은,
#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
inferSchema=False, header=True)
다음과 같은 방법으로 CSV 파일을 읽습니다.
df= spark.read.format("csv").option("multiline", True).option("quote", "\"").option("escape", "\"").option("header",True).load(df_path)
스파크 버전은 3.0.1입니다.
언급URL : https://stackoverflow.com/questions/28782940/load-csv-file-with-pyspark
'sourcecode' 카테고리의 다른 글
호출-정지 방법 - 자체 서명된 인증서 무시 (0) | 2023.08.16 |
---|---|
복사 구현 시 모범 사례구역 포함: (0) | 2023.08.11 |
ASP를 수정하는 방법.NET 오류 "'nnn.aspx' 파일이 사전 컴파일되지 않았으므로 요청할 수 없습니다."? (0) | 2023.08.11 |
봄 "URL이 정상화되지 않아 요청이 거부되었습니다."어떤 URL이 사용되었는지 어떻게 알 수 있습니까? (0) | 2023.08.11 |
php에서 데이터베이스 데이터를 비교하는 방법 (0) | 2023.08.11 |