sourcecode

PySpark를 사용하여 CSV 파일 로드

codebag 2023. 8. 11. 21:48
반응형

PySpark를 사용하여 CSV 파일 로드

스파크가 처음이고 스파크가 있는 파일에서 CSV 데이터를 읽으려고 합니다.제가 하고 있는 일은 다음과 같습니다.

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

이 전화를 통해 파일의 첫 번째 두 열 목록이 표시될 것으로 예상되지만 다음 오류가 발생합니다.

파일", 행 1, IndexError: list index out of range

CSV 파일이 두 개 이상의 열로 표시됩니다.

스파크 2.0.0+

내장된 CSV 데이터 소스를 직접 사용할 수 있습니다.

spark.read.csv(
    "some_input_file.csv", 
    header=True, 
    mode="DROPMALFORMED", 
    schema=schema
)

또는

(
    spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv")
)

외부 의존성을 포함하지 않고.

스파크 < 2.0.0:

일반적인 경우에는 사소한 것과는 거리가 먼 수동 구문 분석 대신 다음을 권장합니다.

스파크 CSV가 경로에 포함되어 있는지 확인합니다(--packages,--jars,--driver-class-path)

다음과 같이 데이터를 로드합니다.

df = (
    sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

로드, 스키마 추론, 잘못된 형식의 행 삭제를 처리할 수 있으며 Python에서 JVM으로 데이터를 전달할 필요가 없습니다.

참고:

스키마를 알고 있다면 스키마 추론을 피하고 전달하는 것이 좋습니다.DataFrameReader세 개의 열(정수, 이중 및 문자열)이 있다고 가정합니다.

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(
    sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv")
)

모든 라인에 두 개 이상의 열이 있어야 합니까?확인을 위해 다음과 같은 것을 시도해 볼 수 있습니까?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

또는 범인을 인쇄할 수 있습니다(있는 경우).

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|")

print(df.collect())

또한 Pandas를 사용하여 CSV 파일을 읽은 다음 Pandas DataFrame을 Spark로 가져오는 옵션도 있습니다.

예:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)

쉼표로 나누기만 하면 필드 내에 있는 쉼표도 분할됩니다(예:a,b,"1,2,3",c), 따라서 권장되지 않습니다. DataFrames API를 사용하려면 0323의 답변이 좋지만, 기본 Spark를 고수하려면 csv 모듈을 사용하여 기본 Python의 csvs를 구문 분석할 수 있습니다.

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

편집: @muon이 코멘트에서 언급했듯이, 이것은 헤더를 다른 행처럼 처리하므로 수동으로 추출해야 합니다.예를들면,header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(수정하지 마십시오.header필터가 평가되기 전에).그러나 이 시점에서는 기본 제공 CSV 파서를 사용하는 것이 더 나을 수 있습니다.

여기는 PYSPARK입니다.

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

그러면 확인할 수 있습니다.

df.show(5)
df.count()

csv를 데이터 프레임으로 로드하려는 경우 다음을 수행할 수 있습니다.

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

저한테는 잘 됐어요.

이것은 JP Mercier가 Panda를 사용하는 것에 대해 처음에 제안한 과 일치하지만, 주요 수정 사항은 다음과 같습니다.만약 여러분이 판다에게 데이터를 덩어리로 읽어준다면, 그것은 더 유연해질 것입니다.즉, 판다가 실제로 처리할 수 있는 것보다 훨씬 큰 파일을 구문 분석하여 더 작은 크기로 스파크에 전달할 수 있습니다. (이것은 또한 사람들이 모든 것을 판다에 로드할 수 있다면 스파크를 사용하고 싶어하는 이유에 대한 의견에 대한 답변이기도 합니다.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()

일반 csv 파일에는 다음과 같은 https://github.com/seahboonsiew/pyspark-csv 옵션도 있습니다.

다음과 같은 맥락이 있다고 가정합니다.

sc = SparkContext
sqlCtx = SQLContext or HiveContext

먼저 SparkContext를 사용하여 실행자에게 pyspark-csv.py 를 배포합니다.

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

SparkContext를 통해 CSV 데이터 읽기 및 DataFrame으로 변환

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)

줄 이 없을 CSV " " " " " " " " " " " " " " 를 사용하여 할 수 .textFile()그리고 그것을 구문 분석합니다.

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)

데이터 집합에 열 수가 2개 미만인 행이 하나 이상 있는 경우 이 오류가 발생할 수 있습니다.

저도 Pyspark가 처음이라 CSV 파일을 읽으려고 합니다.다음 코드가 작동했습니다.

내가 사용하는 이 코드에서 kaggle의 데이터 세트 링크는 https://www.kaggle.com/carrie1/ecommerce-data 입니다.

스키마를 언급하지 않고 다음을 수행합니다.

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

이제 열: sdfData.column을 확인합니다.

출력은 다음과 같습니다.

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

각 열의 데이터 유형을 확인합니다.

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

그러면 데이터 유형이 StringType인 모든 열이 포함된 데이터 프레임이 제공됩니다.

스키마 포함:스키마를 알고 있거나 위 표에 있는 열의 데이터 유형을 변경하려는 경우 이를 사용합니다(예: 다음 열이 있고 각 열에 대해 특정 데이터 유형으로 사용하기를 원한다고 가정합니다.

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

이제 스키마에서 각 열의 데이터 유형을 확인합니다.

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

편집됨: 스키마를 명시적으로 언급하지 않고도 다음 줄의 코드를 사용할 수 있습니다.

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

출력은 다음과 같습니다.

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

출력은 다음과 같습니다.

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows

을 할 때spark.read.csv는 옵션을 는 나 옵 하 것 이 는escape='"'그리고.multiLine=TrueCSV 표준에 대한 가장 일관된 솔루션을 제공하며, Google Sheets에서 내보낸 CSV 파일에서 가장 잘 작동합니다.

그것은,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)

다음과 같은 방법으로 CSV 파일을 읽습니다.

df= spark.read.format("csv").option("multiline", True).option("quote", "\"").option("escape", "\"").option("header",True).load(df_path)

스파크 버전은 3.0.1입니다.

언급URL : https://stackoverflow.com/questions/28782940/load-csv-file-with-pyspark

반응형