A small instance of visual analytics basing Spark(Python)

A small instance of visual analytics basing Spark(Python),第1张

概述Asmallinstanceof visualanalyticsbasingSpark(Python)Thetotaldelaytimeofthemajorairlinesinacertainmonth1.Preparation1.1.Data      ThisdatasetwasdownloadedfromtheU.S.DepartmentofTransportation,Officeofthe Secret A small instance of visual analyticsbasing Spark(Python)

The total delay time of the major airlines in a certain month


1.Preparation 1.1.Data

      This data set was downloaded from the U.S. Department of Transportation, Office of the Secretary of Research on November 30, 2014 and represents flight data for the domestic United States in April of 2014.

      The following CSV files are lookup tables:

    - airlines.csv

    - airports.csv

      And provIDed detailed information about the reference codes in the main data set. These files have header rows to IDentify their fIElds.

      The flights.csv contains flight statistics for April 2014 with the following fIElds:

- flight date     (yyyy-mm-dd)

- airline ID      (lookup in airlines.csv)

- flight num

- origin          (lookup in airports.csv)

- destination     (lookup in airports.csv)

- departure time  (HHMM)

- departure delay (minutes)

- arrival time    (HHMM)

- arrival delay   (minutes)

- air time        (minutes)

- distance        (miles)

 

1.2.Codes—a basic template

     A basic template for writing a Spark application in Python is as follows:

## Spark Application - execute with spark-submit## importsfrom pyspark import SparkConf, SparkContext## Module ConstantsAPP_name = "My Spark Application"## Closure Functions## Main functionalitydef main(sc):    passif __name__ == "__main__":    # Configure Spark    conf = SparkConf().setAppname(APP_name)    conf = conf.setMaster("local[*]")    sc   = SparkContext(conf=conf)    # Execute Main functionality    main(sc)


 

1.3. Codes—a basic template

The entire app is as follows:

 
## Spark Application - execute with spark-submit## importsimport csvimport matplotlib.pyplot as pltfrom StringIO import StringIOfrom datetime import datetimefrom collections import namedtuplefrom operator import add, itemgetterfrom pyspark import SparkConf, SparkContext## Module ConstantsAPP_name = "Flight Delay Analysis"DATE_FMT = "%Y-%m-%d"TIME_FMT = "%H%M"fIElds   = ('date', 'airline', 'flightnum', 'origin', 'dest', 'dep',            'dep_delay', 'arv', 'arv_delay', 'airtime', 'distance')Flight   = namedtuple('Flight', fIElds)## Closure Functionsdef parse(row):    """    Parses a row and returns a named tuple.    """    row[0]  = datetime.strptime(row[0], DATE_FMT).date()    row[5]  = datetime.strptime(row[5], TIME_FMT).time()    row[6]  = float(row[6])    row[7]  = datetime.strptime(row[7], TIME_FMT).time()    row[8]  = float(row[8])    row[9]  = float(row[9])    row[10] = float(row[10])    return Flight(*row[:11])def split(line):    """    Operator function for splitting a line with csv module    """    reader = csv.reader(StringIO(line))    return reader.next()def plot(delays):    """    Show a bar chart of the total delay per airline    """    airlines = [d[0] for d in delays]    minutes  = [d[1] for d in delays]    index    = List(xrange(len(airlines)))    fig, axe = plt.subplots()    bars = axe.barh(index, minutes)    # Add the total minutes to the right    for IDx, air, min in zip(index, airlines, minutes):        if min > 0:            bars[IDx].set_color('#d9230f')            axe.annotate(" %0.0f min" % min, xy=(min+1, IDx+0.5), va='center')        else:            bars[IDx].set_color('#469408')            axe.annotate(" %0.0f min" % min, xy=(10, IDx+0.5), va='center')    # Set the ticks    ticks = plt.yticks([IDx+ 0.5 for IDx in index], airlines)    xt = plt.xticks()[0]    plt.xticks(xt, [' '] * len(xt))    # minimize chart junk    plt.grID(axis = 'x', color ='white', linestyle='-')    plt.Title('Total Minutes Delayed per Airline')    plt.show()## Main functionalitydef main(sc):    # Load the airlines lookup dictionary    airlines = dict(sc.textfile("ontime/airlines.csv").map(split).collect())    # broadcast the lookup dictionary to the cluster    airline_lookup = sc.broadcast(airlines)    # Read the CSV Data into an RDD    flights = sc.textfile("ontime/flights.csv").map(split).map(parse)    # Map the total delay to the airline (joined using the broadcast value)    delays  = flights.map(lambda f: (airline_lookup.value[f.airline],                                     add(f.dep_delay, f.arv_delay)))    # Reduce the total delay for the month to the airline    delays  = delays.reduceByKey(add).collect()    delays  = sorted(delays, key=itemgetter(1))    # ProvIDe output from the driver    for d in delays:        print "%0.0f minutes delayed\t%s" % (d[1], d[0])    # Show a bar chart of the delays    plot(delays)if __name__ == "__main__":    # Configure Spark    conf = SparkConf().setMaster("local[*]")    conf = conf.setAppname(APP_name)    sc   = SparkContext(conf=conf)    # Execute Main functionality    main(sc)

 

 

1.4. equipments

     A Ubuntu computer with spark、jdk、Scala

 

2.Steps 2.1.overvIEw data

codes

 

2.2. use the spark-submit command as follows:

  2.3.result

2.4.Analysis

       what is this code doing? Let's look particularly at the main function which does the work most directly related to Spark. First, we load up a CSV file into an RDD, then map the split function to it. The split function parses each line of text using the csvmodule and returns a tuple that represents the row. Finally we pass the collect action to the RDD, which brings the data from the RDD back to the driver as a Python List. In this case, airlines.csv is a small jump table that will allow us to join airline codes with the airline full name. We will store this jump table as a Python dictionary and then broadcast it to every node in the cluster using sc.broadcast.

       Next, the main function loads the much larger flights.csv. After splitting the CSV rows, we map the parse function to the CSV row, which converts dates and times to Python dates and times, and casts floating point numbers appropriately. It also stores the row as a namedTuple called Flight for efficIEnt ease of use.

       With an RDD of Flight objects in hand, we map an anonymous function that transforms the RDD to a serIEs of key-value pairs where the key is the name of the airline and the value is the sum of the arrival and departure delays. Each airline has its delay summed together using the reduceByKey action and the add operator, and this RDD is collected back to the driver (again the number airlines in the data is relatively small). Finally the delays are sorted in ascending order, then the output is printed to the console as well as visualized using matplotlib.

3.Q&A 3.1.importError: No module named matplotlib.pyplot

http://www.codeweblog.com/importerror-no-module-named-matplotlib-pyplot/

 

 

Note:

this demo came initially from the website

https://districtdatalabs.silvrback.com/getting-started-with-spark-in-python

I also find the chinese version

http://blog.jobbole.com/86232/

data come from github

https://github.com/bbengfort/hadoop-fundamentals/blob/master/data/ontime.zip



总结

以上是内存溢出为你收集整理的A small instance of visual analytics basing Spark(Python)全部内容,希望文章能够帮你解决A small instance of visual analytics basing Spark(Python)所遇到的程序开发问题。

如果觉得内存溢出网站内容还不错,欢迎将内存溢出网站推荐给程序员好友。

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/langs/1184703.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-06-03
下一篇 2022-06-03

发表评论

登录后才能评论

评论列表(0条)

保存