久久精品亚洲综合一品|国产精品福利片免费看|国产精品青青青高清在线|亚洲一级大尺码毛片专区|国产精品99精品久久免费|91精品人人槡人妻人人玩|日韩精品久久久久久久久久欠|91福利精品老师国产自产在线

        第十周,大數(shù)據(jù)建模學(xué)習(xí)應(yīng)用于模具行業(yè)的培訓(xùn)班

        2019-11-03 16:09:50

        早上, 很早到書城, 今天最后一天培訓(xùn), 還是在  SPARK 復(fù)習(xí)。


                開發(fā) SPARK 使用  scala 、java 、 python 、 api 以及    shell 語言開發(fā)的  搜索引擎。


               缺點:    不適合  web服務(wù)  ,   dao層    、web爬蟲 


               優(yōu)點:    伯克利數(shù)據(jù)分析 生態(tài)圈  , 機(jī)器學(xué)習(xí) , 數(shù)據(jù)挖掘, 數(shù)據(jù)庫, 信息檢索, 自然語言處理, 語音識別

                              以  spark core 為核心 , 從  hdfs  , amazon  s3  hbase 等持久讀取數(shù)據(jù)


         image.png


        SPARK SQL  基于內(nèi)存的, 



        image.png

        查看數(shù)據(jù)庫

        深圳塑膠模具廠,深圳市模具廠,深圳模具廠,深圳模具,深圳塑膠模具

        image.png

        image.png

        image.png

        image.png


        image.png



        這個在前端也能看到,點擊進(jìn)去, 詳細(xì)說明。


        image.png

               image.png

        image.png

        image.png



        image.png

        image.png

        image.pngimage.png


        HIVE 不支持事物的, 增刪改查。 ACRD 使用 


        image.png



        在那一臺機(jī)器完成的, 可以看看

        image.png


        提交的人物, 由系統(tǒng)分配節(jié)點去跑。


        image.png

        image.png



        image.png





        HIVE 主要做 數(shù)據(jù)倉庫  DW   , 

        image.png



        插入數(shù)據(jù)完成, 做一下  查詢數(shù)據(jù)

        image.png



        將  HIVE 的內(nèi)核, 改成  SPARK 以后, 提升速度 80倍

        image.png



        image.png



        image.png




        上面是   HOVE 的插入和查詢的過程。


        接著跑  SPARK SQL 的過程


        image.png


        image.png


        所有的  數(shù)據(jù)庫和表, 都是存儲在  HDFS 上的。


        現(xiàn)在要用  SPARK SQL 來執(zhí)行。


        image.png


        image.png


        image.png


        show database  

        use  

        select 

        image.png


        image.png




        這個是 SPARK的管理頁面


        image.png



        image.png

        image.png


        image.png

        image.png




        image.png

        image.png

        image.png

        image.png

        image.png

        image.png

        image.png


        image.png

        image.png

        image.png


        image.png


        DAG 的有向性的結(jié)合圖



        DAG Scheduler 





        image.png


        RDD 經(jīng)過 4個 聚合步驟后, 形成一個  DAG , 多個 DAG 組合編程  DAG計劃表


        下面是  scala 寫的


        image.png



        要統(tǒng)計下面文件中的個數(shù)


        image.png


        image.png

        上述雖然是  一行代碼, 但是也顯示的淋漓盡致


        image.png


        image.png




        運行的狀況


        image.png



        image.png


        image.png




        image.png


        image.png


        image.png



        管理頁面的結(jié)果image.png


        image.png


        環(huán)境配置image.png


        image.png



        image.png


        image.png


        SPARK   單詞統(tǒng)計的 DEMO  , 接下來再看看


        image.png

        image.png


        image.png



        SPARK STREM 消費kaFka 的數(shù)據(jù),  管理者確定是   ZooKeeper


        image.png



        image.png



        image.png


        下午講解     Klin       做一些報表的功能, SPARK 




        image.png

        要搭建不同的集群, 由原來的 store 的程序, 遷移到  spark 上來,    Flink 是把交互式查詢和實時計算合在一起了。

        Flink 也是用 Scala 來寫。

        image.png


        1、客戶端提交作業(yè)后,啟動Driver,Driver是Spark作業(yè)的Master(也就是通過Driver來啟動Receiver,定時去啟動任務(wù)的處理,

        注意的是,驅(qū)動啟動任務(wù)會受前一個任務(wù)執(zhí)行的影響。也就是前一個任務(wù)沒有執(zhí)行完成后,是不會啟動后邊的任務(wù)的。 

        所以,注意你的streaming的執(zhí)行時間,絕對不要超過Recive數(shù)據(jù)的時間)


        2、每個作業(yè)包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個Receiver task。

        (一個Executor就是一個spark進(jìn)程,在yarn中就是一個container,這個大家應(yīng)該知道。

        然后Receiver task是在driver中創(chuàng)建的,我理解一個Receiver是運行在一個Executor中的。

        然后如果想要創(chuàng)建多個Receiver,那么需要大概這樣做(1 to 10).map(_.createStream…),這樣就能創(chuàng)建10個receiver task啦。 

        注意這個數(shù)量當(dāng)然不能超過你的結(jié)點數(shù)量啦。 

        還有個問題,通常使用kafka比較合適,因為kafka是stream向kafka來poll數(shù)據(jù)。

        而他媽的flume默認(rèn)只支持pull,如果想支持poll,那需要定制sink,那真是太惡心了。)


        3、Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報給Driver,然后備份到另外一個Executor上。

        (默認(rèn)情況下接受數(shù)據(jù)是200毫秒生成一個block,我理解一個block應(yīng)該是一個partition?

        這個還不確定,需要對照源代碼看一下;

        然后會把生成的Block隨機(jī)扔到不同的Executor,同時,driver去派發(fā)任務(wù)時,也會找到就近的Executor。

        我理解,節(jié)點中的所有executor都應(yīng)該會有數(shù)據(jù)才對)


        4、ReceiverTracker維護(hù)Receiver匯報的BlockId。

            (這個ReceiverTracker應(yīng)該是維護(hù)在Driver中,Driver會根據(jù)維護(hù)的這些數(shù)據(jù)塊進(jìn)行任務(wù)的派發(fā))


        5、Driver定時生成JobGenerator,根據(jù)DStream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。


        6、JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,

              每個stage包含一到多個task。(我記得DAGScheduler會對任務(wù)做一層優(yōu)化)


        7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運行狀態(tài)。


        8、當(dāng)tasks,stages,jobset完成后,單個batch(批處理)才算完成。


        image.png

        具體流程:

        1. 客戶端提交作業(yè)后啟動Driver,Driver是spark作業(yè)的Master

        2. 每個作業(yè)包含多個Executor,每個Executor以線程的方式運行task,Spark Streaming至少包含一個receiver task。

        3. Receiver接收數(shù)據(jù)后生成Block,并把BlockId匯報給Driver,然后備份到另外一個Executor上。

        4. ReceiverTracker維護(hù)Reciver匯報的BlockId。

        5. Driver定時啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建Jobset,交給JobScheduler。

        6. JobScheduler負(fù)責(zé)調(diào)度Jobset,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個stage包含一到多個task。

        7. TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運行狀態(tài)。

        8. 當(dāng)tasks,stages,jobset完成后,單個batch才算完成。



        image.png



        ---------------------------------------------------   第十周  盧老師 講課筆記


        Spark Streaming原理:


        Spark Streaming是基于spark的流式批處理引擎,其基本原理是把輸入的數(shù)據(jù)以某一時間間隔批量的處理,當(dāng)批處理縮短到秒級時,便可以用于處理實時數(shù)據(jù)流。


        kafka


        flume                                        HDFS


        hdfs      -------> spark streaming --------> DataBase


        zeromq                   |                   Dashboards

                                       | 

        twitter                    |

                                      |

        input data streaming -->sparkstreaming-->batches of input data--->spark engine --->batches of processed data




        live input stream --> spark Streaming --->t t t --->spark job --->results



        sparkstreaming架構(gòu)


        spark streaming作業(yè)流程:


        Driver:

        receivertracker

        jobGenerator

        JobScheduler

        DagScheduler

        TaskScheduler


        BlockManagerMaster


        Executor:

        Receiver

        BlockManagerSlave


        Executor:

        Task

        BlockManagerSlave



        運行機(jī)制:

        1、客戶端提交作業(yè)后啟動一個Driver,Driver是spark作業(yè)的Master。


        2、每個作業(yè)包含多個Executor,每個Executor以線程的方式運行Task,Spark Streaing至少包含一個receiver task。


        3、Receiver接收數(shù)據(jù)后生成Block,并把Blockid匯報給Driver,然后備份到另外一個Executor上


        4、ReceiverTracker維護(hù)Recivver匯報的BlockId。


        5、Driver定時啟動JobGenerator,根據(jù)Dstream的關(guān)系生成邏輯RDD,然后創(chuàng)建JobSet,交給JobScheduler.


        6、JobScheduler負(fù)責(zé)調(diào)度JobSet,交給DAGScheduler,DAGScheduler根據(jù)邏輯RDD,生成相應(yīng)的Stages,每個Stages包含一個到多個Task。


        7、TaskScheduler負(fù)責(zé)把task調(diào)度到Executor上,并維護(hù)task的運行狀態(tài)。


        8、當(dāng)tasks、stages、jobset完成后,單個batch才算完成。





        Spark Streaming消費Kafka數(shù)據(jù):


        import org.apache.kafka.clients.consumer.ConsumerRecord

        import org.apache.kafka.common.serialization.StringDeserializer

        import org.apache.spark.streaming.kafka010._

        import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent

        import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe


        val kafkaParams = Map[String, Object](

          "bootstrap.servers" -> "localhost:9092,anotherhost:9092",

          "key.deserializer" -> classOf[StringDeserializer],

          "value.deserializer" -> classOf[StringDeserializer],

          "group.id" -> "use_a_separate_group_id_for_each_stream",

          "auto.offset.reset" -> "latest",

          "enable.auto.commit" -> (false: java.lang.Boolean)

        )


        val topics = Array("topicA", "topicB")

        val stream = KafkaUtils.createDirectStream[String, String](

          streamingContext,

          PreferConsistent,

          Subscribe[String, String](topics, kafkaParams)

        )


        stream.map(record => (record.key, record.value))





        Spark統(tǒng)計單詞次數(shù):


        text_file = sc.textFile("hdfs://...")

        counts = text_file.flatMap(lambda line: line.split(" ")) \

                     .map(lambda word: (word, 1)) \

                     .reduceByKey(lambda a, b: a + b)

        counts.saveAsTextFile("hdfs://...")




        SparkSQL數(shù)據(jù)查詢:


        // Creates a DataFrame based on a table named "people"

        // stored in a MySQL database.

        val url =

          "jdbc:mysql://yourIP:yourPort/test?user=yourUsername;password=yourPassword"

        val df = sqlContext

          .read

          .format("jdbc")

          .option("url", url)

          .option("dbtable", "people")

          .load()


        // Looks the schema of this DataFrame.

        df.printSchema()


        // Counts people by age

        val countsByAge = df.groupBy("age").count()

        countsByAge.show()


        // Saves countsByAge to S3 in the JSON format.

        countsByAge.write.format("json").save("s3a://...")




        Spark機(jī)器學(xué)習(xí)實現(xiàn)邏輯回歸算法:


        // Every record of this DataFrame contains the label and

        // features represented by a vector.

        val df = sqlContext.createDataFrame(data).toDF("label", "features")


        // Set parameters for the algorithm.

        // Here, we limit the number of iterations to 10.

        val lr = new LogisticRegression().setMaxIter(10)


        // Fit the model to the data.

        val model = lr.fit(df)


        // Inspect the model: get the feature weights.

        val weights = model.weights


        // Given a dataset, predict each point's label, and show the results.

        model.transform(df).show()









        -------------------------------------------------------------------------------------------------------


        最后做一次復(fù)習(xí) ,  有始有終。


        image.png


        image.png

        -------在最下面增加 2行代碼。


        image.png

        如果有 幾千臺服務(wù)器, 必須配置免登錄,  生成的密鑰, 追加到這個文件中去。


        image.png


        image.png

        image.png


        hadoop 的官方網(wǎng)站, 有非常詳細(xì)的內(nèi)容。


        image.png


        image.png


        多種方式都可以查看是不是啟動成功


        image.png


        image.png


        image.png


        image.png

        50070端口, HDFS的端口


        搭建完成后,     


        image.png



        下面是  Yarn 的配置信息

        image.png


        需要 shuffle  的, 需要增加 各類  shuffle .


        FIFO     FAIR    先進(jìn)先出    公平調(diào)度  


        上面是 Yarn 的配置文件, 這是第三周講的


        第四周講   Hbase , 是 google  的大表實現(xiàn)

        image.png

        image.png

        image.png

        image.png

        image.png

        image.png


        image.png


        image.png


        image.png


        image.png


        image.png



        image.png

        image.png


        image.png


        image.png


        image.pngimage.png

        image.png


        image.png


        單機(jī)版和集群版的不同

        image.png


        image.png

        image.png


        image.png

        image.png


        image.png

        image.png

        image.png

        image.png



        image.pngimage.png


        image.png


        第5周 、 第六周 


        image.png


        image.png

        image.png


        image.png

        image.png

        一切階是對像


        image.png

        image.png


        image.png

        image.png

        image.png



        image.png


        image.png


        以上是   5周   Scala  



        image.png




        image.png


        image.png


        image.png



        image.png



        image.png


        image.png


        image.png



        image.png

        image.png


        image.png


        image.png




















        首頁
        產(chǎn)品
        新聞
        聯(lián)系
        清远市| 东丽区| 库尔勒市| 洛阳市| 灌阳县| 正镶白旗| 中宁县| 石门县| 康平县| 枞阳县| 和田县| 沽源县| 察隅县| 衡阳市| 浑源县| 丽水市| 湖口县| 九龙城区| 荔浦县| 江孜县| 贵南县| 和林格尔县| 满洲里市| 兴文县| 宁河县| 衡阳市| 灵宝市| 隆安县| 镇康县| 峨眉山市| 偏关县| 邛崃市| 营山县| 德保县| 穆棱市| 曲松县| 桂阳县| 彩票| 天峨县| 南溪县| 赤峰市|