本文已使用 Google Cloud Translation API 自动翻译。
某些文档最好以原文阅读。
随着大数据的日益普及,对大数据分析的需求也在上升。 Apache Spark 是最流行的大数据处理框架之一,广泛用于大数据分析。
Spark 是一种快速的内存数据处理引擎,具有强大的 SQL 和流功能。它可以处理批处理和实时数据处理。 Spark 易于使用,并具有一组丰富的数据处理、机器学习和图形处理库。
在本文中,我们将讨论如何使用 Apache Spark 执行大数据分析。我们将涵盖以下主题:
Spark 可以安装在独立服务器或机器集群上。它也可以在云端运行。在本文中,我们假设您安装了独立的 Spark。
在开始之前,您需要具备以下条件:
spark-2.4.5-bin-hadoop2.7
目录移动到您要安装Spark 的位置。$SPARK_HOME/bin
目录添加到您的 PATH
环境变量中。您现在可以通过运行“spark-shell”命令来启动 Spark shell。
Spark 可以从各种来源加载数据,包括 HDFS、S3 和本地文件。在本节中,我们将讨论如何从本地文件加载数据并在 Spark 中进行处理。
首先,我们将创建一个名为 sample.txt 的文本文件,其中包含以下内容:
This is a sample text file.
我们现在可以使用 spark.read.text() 方法将这个文件加载到 Spark 中:
val textFile = spark.read.text("sample.txt")
textFile
变量是一个包含 sample.txt
文件内容的 Dataset[String]
。我们现在可以对该数据集执行各种操作,例如过滤、转换和聚合。
例如,我们可以使用 filter()
方法过滤掉不包含单词 sample
的行:
val filteredTextFile = textFile.filter(line => line.contains("sample"))
我们还可以使用 map()
方法来转换数据集:
val mappedTextFile = textFile.map(line => line.toUpperCase())
最后,我们可以使用 groupBy() 方法按线的长度对数据集进行分组:
val groupedTextFile = textFile.groupBy(line => line.length)
Spark SQL 是 Spark 的一个模块,可让您处理结构化数据。它提供了一个 DataFrame API,可用于从各种来源加载数据,例如 HDFS、S3 和本地文件。
在本节中,我们将讨论如何使用 Spark SQL 从本地文件加载数据并进行处理。
首先,我们将创建一个名为 people.txt 的文本文件,其中包含以下内容:
id,name,age
1,john,26
2,mary,29
3,jane,21
4,bob,27
我们现在可以使用 spark.read.csv() 方法将此文件加载到 Spark DataFrame 中:
val peopleDF = spark.read.csv("people.txt")
peopleDF
DataFrame 具有以下架构:
root
|-- id: string (nullable = true)
|-- name: string (nullable = true)
|-- age: string (nullable = true)
我们现在可以将此 DataFrame 注册为临时视图,以便我们可以在其上运行 SQL 查询:
peopleDF.createOrReplaceTempView("people")
我们现在可以在 people 视图上运行 SQL 查询。例如,我们可以查询视图以查找所有 25 岁以上的人的姓名:
spark.sql("SELECT name FROM people WHERE age > 25").show()
应输出以下内容:
+-----+
| name|
+-----+
| john|
| mary|
| bob|
+-----+
Spark Streaming 是 Spark 的一个模块,可让您处理实时数据流。它可以从 Kafka、Flume 和 Twitter 等来源接收实时数据流。
在本节中,我们将讨论如何使用 Spark Streaming 处理来自 Twitter 的实时数据流。
首先,您需要创建一个 Twitter API 应用程序。您可以按照 Twitter 开发人员文档 中的说明执行此操作。
创建 Twitter API 应用程序后,您需要使用您的 Twitter API 凭据创建一个名为“twitter.txt”的文件。该文件应具有以下格式:
consumerKey = <your consumer key>
consumerSecret = <your consumer secret>
accessToken = <your access token>
accessTokenSecret = <your access token secret>
您现在可以通过运行以下命令来启动 Spark Streaming shell:
spark-shell --packages org.apache.spark:spark-streaming-twitter_2.11:2.4.5
这将使用“spark-streaming-twitter_2.11”包启动 Spark shell。
我们现在可以创建一个 Spark Streaming 上下文并设置检查点目录:
import org.apache.spark._
import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("checkpoint")
我们现在可以使用之前创建的 twitter.txt
文件创建 Twitter 流:
import org.apache.spark.streaming.twitter._
val twitterStream = TwitterUtils.createStream(ssc, None, Seq(args(0)))
twitterStream
是一个包含实时 Twitter 流的 DStream[Status]
。我们现在可以对该流执行各种操作,例如转换、过滤和聚合。
例如,我们可以使用 map()
方法来转换流:
val textStream = twitterStream.map(status => status.getText())
我们现在可以使用 foreachRDD() 方法来处理流:
textStream.foreachRDD { (rdd, time) =>
val count = rdd.count()
println(s"Received ${count} tweets at time ${time}")
}
最后,我们可以启动流处理:
ssc.start()
Spark MLlib 是 Spark 的一个模块,提供各种机器学习算法。在本节中,我们将讨论如何使用 Spark MLlib 训练机器学习模型。
首先,我们将创建一个名为“training.txt”的文本文件,其中包含以下内容:
id,features,label
1,[1.0,0.0,0.0],0.0
2,[0.0,1.0,0.0],1.0
3,[0.0,0.0,1.0],2.0
我们现在可以使用 spark.read.csv() 方法将此文件加载到 Spark DataFrame 中:
val trainingDF = spark.read.csv("training.txt")
trainingDF
DataFrame 具有以下架构:
root
|-- id: string (nullable = true)
|-- features: vector (nullable = true)
|-- label: double (nullable = true)
我们现在可以创建一个 LogisticRegression 对象并设置参数:
import org.apache.spark.ml.classification.LogisticRegression
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
我们现在可以使用 fit()
方法训练模型:
val model = lr.fit(trainingDF)
我们现在可以将训练好的模型保存到一个文件中:
model.write.overwrite().save("model")
Spark GraphX 是 Spark 的一个模块,提供各种图形处理算法。在本节中,我们将讨论如何使用 Spark GraphX 来处理图形。
首先,我们将创建一个名为“edges.txt”的文本文件,其中包含以下内容:
src,dst,weight
1,2,1.0
2,3,2.0
3,4,3.0
4,1,4.0
我们现在可以使用 spark.read.csv() 方法将此文件加载到 Spark DataFrame 中:
val edgesDF = spark.read.csv("edges.txt")
edgesDF
DataFrame 具有以下架构:
root
|-- src: string (nullable = true)
|-- dst: string (nullable = true)
|-- weight: double (nullable = true)
我们现在可以从 edgesDF DataFrame 创建一个 Graph 对象:
import org.apache.spark.graphx._
val graph = Graph.fromEdges(edgesDF, "weight")
graph
是一个 Graph[Double, Double]
包含来自 edgesDF
DataFrame 的边。我们现在可以对该图执行各种操作,例如转换、过滤和聚合。
例如,我们可以使用 mapEdges()
方法来变换边缘:
val transformedGraph = graph.mapEdges(e => e.attr * 2)
我们还可以使用 mapVertices()
方法来转换顶点:
val transformedGraph = graph.mapVertices { (id, attr) =>
attr * 2
}
最后,我们可以使用 aggregateMessages() 方法来聚合边:
val aggregatedGraph = graph.aggregateMessages[Double](
ctx => ctx.sendToDst(ctx.srcAttr),
(a, b) => a + b
)
在本文中,我们讨论了如何使用 Apache Spark 执行大数据分析。我们涵盖了以下主题: