本期內(nèi)容:
我們提供的服務(wù)有:成都做網(wǎng)站、成都網(wǎng)站建設(shè)、微信公眾號(hào)開(kāi)發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、柴桑ssl等。為1000+企事業(yè)單位解決了網(wǎng)站和推廣的問(wèn)題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的柴桑網(wǎng)站制作公司
1、Receiver啟動(dòng)方式的設(shè)想
2、Receiver啟動(dòng)源碼徹底分析
一:Receiver啟動(dòng)方式的設(shè)想
1. Spark Streaming通過(guò)Receiver持續(xù)不斷的從外部數(shù)據(jù)源接收數(shù)據(jù),并把數(shù)據(jù)匯報(bào)給Driver端,由此每個(gè)Batch Durations就可以根據(jù)匯報(bào)的數(shù)據(jù)生成不同的Job。
2. Receiver是在Spark Streaming應(yīng)用程序啟動(dòng)時(shí)啟動(dòng)的,那么我們找Receiver在哪里啟動(dòng)就應(yīng)該去找Spark Streaming的啟動(dòng)。
3. Receivers和InputDStreams是一一對(duì)應(yīng)的,默認(rèn)情況下一般只有一個(gè)Receiver.
如何啟動(dòng)Receiver?
1. 從Spark Core的角度來(lái)看,Receiver的啟動(dòng)Spark Core并不知道,就相當(dāng)于Linux的內(nèi)核之上所有的都是應(yīng)用程序,因此Receiver是通過(guò)Job的方式啟動(dòng)的
2. 一般情況下,只有一個(gè)Receiver,但是可以創(chuàng)建不同的數(shù)據(jù)來(lái)源的InputDStream.
final private[streaming] class DStreamGraph extends Serializable with Logging {
private val inputStreams = new ArrayBuffer[InputDStream[_]]() //數(shù)組
private val outputStreams = new ArrayBuffer[DStream[_]]()3. 啟動(dòng)Receiver的時(shí)候,啟動(dòng)一個(gè)Job,這個(gè)Job里面有RDD的transformations操作和action的操作,這個(gè)Job只有一個(gè)partition.這個(gè)partition的特殊是里面只有一個(gè)成員, 這個(gè)成員就是啟動(dòng)的Receiver. 4. 這樣做的問(wèn)題: a) 如果有多個(gè)InputDStream,那就要啟動(dòng)多個(gè)Receiver,每個(gè)Receiver也就相當(dāng)于分片partition,那我們啟動(dòng)Receiver的時(shí)候理想的情況下是在不同的機(jī)器上啟動(dòng)Receiver, 但是Spark Core的角度來(lái)看就是應(yīng)用程序,感覺(jué)不到Receiver的特殊性,所以就會(huì)按照正常的Job啟動(dòng)的方式來(lái)處理,極有可能在一個(gè)Executor上啟動(dòng)多個(gè)Receiver. 這樣的話就可能導(dǎo)致負(fù)載不均衡。 b) 有可能啟動(dòng)Receiver失敗,只要集群存在Receiver就不應(yīng)該失敗。 c) 運(yùn)行過(guò)程中,就默認(rèn)的而言如果是一個(gè)partition的話,那啟動(dòng)的時(shí)候就是一個(gè)Task,但是此Task也很可能失敗,因此以Task啟動(dòng)的Receiver也會(huì)掛掉。
由此,可以得出,對(duì)于Receiver失敗的話,后果是非常嚴(yán)重的,那么Spark Streaming如何防止這些事的呢,下面就尋找Receiver的創(chuàng)建
這里先給出答案,后面源碼會(huì)詳細(xì)分析:
a) Spark使用一個(gè)Job啟動(dòng)一個(gè)Receiver.最大程度的保證了負(fù)載均衡。
b) Spark Streaming指定每個(gè)Receiver運(yùn)行在哪些Executor上。
c) 如果Receiver啟動(dòng)失敗,此時(shí)并不是Job失敗,在內(nèi)部會(huì)重新啟動(dòng)Receiver.
接下來(lái)我們通過(guò)代碼一步一步解析Receiver是如何啟動(dòng)的
1、首先我們?cè)诰帉?xiě)具體的應(yīng)用程序的時(shí)候,都會(huì)調(diào)用StreamingContext的start方法,其實(shí)這就是job啟動(dòng)的源頭,我們先來(lái)看下start方法的源碼:
def start(): Unit = synchronized {
state match {
case INITIALIZED =>
startSite.set(DStream.getCreationSite())
StreamingContext.ACTIVATION_LOCK.synchronized {
StreamingContext.assertNoOtherContextIsActive()
try {
validate()
// Start the streaming scheduler in a new thread, so that thread local properties
// like call sites and job groups can be reset without affecting those of the
// current thread.
ThreadUtils.runInNewThread("streaming-start") {
sparkContext.setCallSite(startSite.get)
sparkContext.clearJobGroup()
sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
scheduler.start() //啟動(dòng)JobScheduler的start方法,啟動(dòng)子線程,一方面為了本地初始化工作,另外一方面是不要阻塞主線程。
}
state = StreamingContextState.ACTIVE
} catch {
case NonFatal(e) =>
logError("Error starting the context, marking it as stopped", e)
scheduler.stop(false)
state = StreamingContextState.STOPPED
throw e
}
StreamingContext.setActiveContext(this)
}
shutdownHookRef = ShutdownHookManager.addShutdownHook(
StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
// Registering Streaming Metrics at the start of the StreamingContext
assert(env.metricsSystem != null)
env.metricsSystem.registerSource(streamingSource)
uiTab.foreach(_.attach())
logInfo("StreamingContext started")
case ACTIVE =>
logWarning("StreamingContext has already been started")
case STOPPED =>
throw new IllegalStateException("StreamingContext has already been stopped")
}
}2、上面調(diào)用start方法的時(shí)候,會(huì)調(diào)用JobScheduler的start()方法,在該方法里面,receiverTracker啟動(dòng)了,源碼如下:
def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started
logDebug("Starting JobScheduler")
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()
// attach rate controllers of input streams to receive batch completion updates
for {
inputDStream <- ssc.graph.getInputStreams
rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)
listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
receiverTracker.start() //啟動(dòng)Receiver
jobGenerator.start()
logInfo("Started JobScheduler")
}3、我們接著看下receiverTracker的start()方法,在start方法里啟動(dòng)了RPC消息通信體,為啥呢?因?yàn)閞eceiverTracker會(huì)監(jiān)控整個(gè)集群中的Receiver,Receiver轉(zhuǎn)過(guò)來(lái)要向ReceiverTrackerEndpoint匯報(bào)自己的狀態(tài),接收的數(shù)據(jù),包括生命周期等信息
/** Start the endpoint and receiver execution thread. */
def start(): Unit = synchronized {
if (isTrackerStarted) {
throw new SparkException("ReceiverTracker already started")
}
if (!receiverInputStreams.isEmpty) { //Receiver的啟動(dòng)是依據(jù)數(shù)據(jù)流的
endpoint = ssc.env.rpcEnv.setupEndpoint(
"ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) //匯報(bào)狀態(tài)信息
if (!skipReceiverLaunch) launchReceivers() //發(fā)起Receiver
logInfo("ReceiverTracker started")
trackerState = Started
}
}4、基于ReceiverInputDStream(是在Driver端)來(lái)獲得具體的Receivers實(shí)例,然后再把他們分不到Worker節(jié)點(diǎn)上。一個(gè)ReceiverInputDStream只產(chǎn)生一個(gè)Receiver
/**
* Get the receivers from the ReceiverInputDStreams, distributes them to the
* worker nodes as a parallel collection, and runs them.
*/
private def launchReceivers(): Unit = {
val receivers = receiverInputStreams.map(nis => {
//一個(gè)輸入數(shù)據(jù)來(lái)源只產(chǎn)生一個(gè)Receiver
val rcvr = nis.getReceiver()
rcvr.setReceiverId(nis.id)
rcvr
})
runDummySparkJob() //啟動(dòng)虛擬Job來(lái)分配Receiver到不同的executor上
logInfo("Starting " + receivers.length + " receivers")
endpoint.send(StartAllReceivers(receivers))
}5、其中runDummySparkJob()為了確保所有節(jié)點(diǎn)活著,而且避免所有的receivers集中在一個(gè)節(jié)點(diǎn)上。
private def runDummySparkJob(): Unit = {
if (!ssc.sparkContext.isLocal) {
ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
}
assert(getExecutors.nonEmpty)
}ReceiverInputDStream中的getReceiver()方法獲得receiver對(duì)象然后將它發(fā)送到worker節(jié)點(diǎn)上實(shí)例化receiver,然后去接收數(shù)據(jù)。
此方法必須要在子類中實(shí)現(xiàn)。
/** * Gets the receiver object that will be sent to the worker nodes * to receive data. This method needs to defined by any specific implementation * of a ReceiverInputDStream. */ def getReceiver(): Receiver[T]
ReceiverInputDStream是抽象類,所以getReceiver方法必須要在繼承的子類中實(shí)現(xiàn)
private[streaming]
class SocketInputDStream[T: ClassTag](
ssc_ : StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](ssc_) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel) //調(diào)用SocketReceiver
}
}
private[streaming]
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends Receiver[T](storageLevel) with Logging {
def onStart() {
// Start the thread that receives data over a connection
new Thread("Socket Receiver") {
setDaemon(true)
override def run() { receive() } //啟動(dòng)線程,調(diào)用Receiver方法
}.start()
}在receive()方法中啟動(dòng)socket接收數(shù)據(jù)
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
var socket: Socket = null
try {
logInfo("Connecting to " + host + ":" + port)
socket = new Socket(host, port) //根據(jù)我們應(yīng)用程序傳入的host和post創(chuàng)建socket對(duì)象
logInfo("Connected to " + host + ":" + port)
val iterator = bytesToObjects(socket.getInputStream()) //接收數(shù)據(jù)
while(!isStopped && iterator.hasNext) {
store(iterator.next) //接收后的數(shù)據(jù)進(jìn)行存儲(chǔ)
}
if (!isStopped()) {
restart("Socket data stream had no more data")
} else {
logInfo("Stopped receiving")
}
} catch {
case e: java.net.ConnectException =>
restart("Error connecting to " + host + ":" + port, e)
case NonFatal(e) =>
logWarning("Error receiving data", e)
restart("Error receiving data", e)
} finally {
if (socket != null) {
socket.close()
logInfo("Closed socket to " + host + ":" + port)
}
}
}
}6、ReceiverTrackerEndpoint源碼如下:
override def receive: PartialFunction[Any, Unit] = {
// Local messages
case StartAllReceivers(receivers) =>
val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors) // receivers就是要啟動(dòng)的receiver,getExecutors獲得集群中的Executors的列表
for (receiver <- receivers) {
val executors = scheduledLocations(receiver.streamId)
updateReceiverScheduledExecutors(receiver.streamId, executors)
receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
startReceiver(receiver, executors) //循環(huán)receivers,每次將一個(gè)receiver傳入過(guò)去。
}
case RestartReceiver(receiver) =>
// Old scheduled executors minus the ones that are not active any more
val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId)
val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {
// Try global scheduling again
oldScheduledExecutors
} else {
val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)
// Clear "scheduledLocations" to indicate we are going to do local scheduling
val newReceiverInfo = oldReceiverInfo.copy(
state = ReceiverState.INACTIVE, scheduledLocations = None)
receiverTrackingInfos(receiver.streamId) = newReceiverInfo
schedulingPolicy.rescheduleReceiver(
receiver.streamId,
receiver.preferredLocation,
receiverTrackingInfos,
getExecutors)
}
// Assume there is one receiver restarting at one time, so we don't need to update
// receiverTrackingInfos
startReceiver(receiver, scheduledLocations)
case c: CleanupOldBlocks =>
receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))
case UpdateReceiverRateLimit(streamUID, newRate) =>
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
// Remote messages
case ReportError(streamId, message, error) =>
reportError(streamId, message, error)
}從注釋中可以看到,Spark Streaming指定receiver在那些Executors運(yùn)行,而不是基于Spark Core中的Task來(lái)指定。 通過(guò)StartAllReceivers將消息發(fā)送給ReceiverTrackerEndpoint
在for循環(huán)中為每個(gè)receiver分配相應(yīng)的executor。并調(diào)用startReceiver方法:
Receiver是以job的方式啟動(dòng)的!!! 這里你可能會(huì)有疑惑,沒(méi)有RDD和來(lái)的Job呢?首先,在startReceiver方法中,會(huì)將Receiver封裝成RDD
receiverRDD: RDD[Receiver[_]] =
(scheduledLocations.isEmpty) {
ssc..makeRDD((receiver))
} {
preferredLocations = scheduledLocations.map(_.toString).distinct
ssc..makeRDD((receiver -> preferredLocations))
}封裝成RDD后,將RDD提交到集群中運(yùn)行
future = ssc.sparkContext.submitJob[Receiver[_]]( receiverRDDstartReceiverFunc()(__) => ())
task被發(fā)送到executor中,從RDD中取出“Receiver”然后對(duì)它執(zhí)行startReceiverFunc:
// Function to start the receiver on the worker node
val startReceiverFunc: Iterator[Receiver[_]] => Unit =
(iterator: Iterator[Receiver[_]]) => {
if (!iterator.hasNext) {
throw new SparkException(
"Could not start receiver as object not found.")
}
if (TaskContext.get().attemptNumber() == 0) {
val receiver = iterator.next()
assert(iterator.hasNext == false)
val supervisor = new ReceiverSupervisorImpl( //Receiver注冊(cè)
receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
supervisor.start() //啟動(dòng)Receiver
supervisor.awaitTermination()
} else {
// It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
}
}在函數(shù)中創(chuàng)建了一個(gè)ReceiverSupervisorImpl對(duì)象。它用來(lái)管理具體的Receiver。
首先它會(huì)將Receiver注冊(cè)到ReceiverTracker中
override protected def onReceiverStart(): Boolean = {
val msg = RegisterReceiver(
streamId, receiver.getClass.getSimpleName, host, executorId, endpoint)
trackerEndpoint.askWithRetry[Boolean](msg)
}
如果注冊(cè)成功,通過(guò)supervisor.start()來(lái)啟動(dòng)Receiver
/** Start the supervisor */
def start() {
onStart()
startReceiver() //啟動(dòng)Receiver
}// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")回到receiverTracker的startReceiver方法中,只要Receiver對(duì)應(yīng)的Job結(jié)束了(無(wú)論是正常還是異常結(jié)束),而ReceiverTracker還沒(méi)有停止。
它將會(huì)向ReceiverTrackerEndpoint發(fā)送一個(gè)ReStartReceiver的方法。
// We will keep restarting the receiver job until ReceiverTracker is stopped
future.onComplete {
case Success(_) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
case Failure(e) =>
if (!shouldStartReceiver) {
onReceiverJobFinish(receiverId)
} else {
logError("Receiver has been stopped. Try to restart it.", e)
logInfo(s"Restarting Receiver $receiverId")
self.send(RestartReceiver(receiver))
}
}(submitJobThreadPool)
logInfo(s"Receiver ${receiver.streamId} started")重新為Receiver選擇一個(gè)executor,并再次運(yùn)行Receiver。直到ReceiverTracker啟動(dòng)為止。
在ReceiverTracker的receive方法中startReceiver方法第一個(gè)參數(shù)就是receiver,從實(shí)現(xiàn)的可以看出for循環(huán)不斷取出receiver,然后調(diào)用startReceiver。由此就可以得出一個(gè)Job只啟動(dòng)一個(gè)Receiver. 如果Receiver啟動(dòng)失敗,此時(shí)并不會(huì)認(rèn)為是作業(yè)失敗,會(huì)重新發(fā)消息給ReceiverTrackerEndpoint重新啟動(dòng)Receiver,這樣也就確保了Receivers一定會(huì)被啟動(dòng),這樣就不會(huì)像Task啟動(dòng)Receiver的話如果失敗受重試次數(shù)的影響。
簡(jiǎn)單的流程圖:

當(dāng)前文章:(版本定制)第9課:SparkStreaming源碼解讀之
分享網(wǎng)址:http://chinadenli.net/article34/gegjse.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供建站公司、動(dòng)態(tài)網(wǎng)站、營(yíng)銷型網(wǎng)站建設(shè)、云服務(wù)器、品牌網(wǎng)站建設(shè)、響應(yīng)式網(wǎng)站
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)