【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1

【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1,第1张

【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1

【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part3

1,SparkGraphComputer

1,driver端执行,并构建sparkContext;
2,,构建spark任务执行相关配置信息;
3,构建inputRdd,并启动executor任务执行;
源码如下:

// create a message-passing friendly rdd from the input rdd
boolean partitioned = false;
//读取数据,并构建RDD,
JavaPairRDD loadedGraphRDD = inputRDD.readGraphRDD(graphComputerConfiguration, sparkContext);
// if there are vertex or edge filters, filter the loaded graph rdd prior to partitioning and persisting
if (filtered) {
    this.logger.debug("Filtering the loaded graphRDD: " + this.graphFilter);
    loadedGraphRDD = SparkExecutor.applyGraphFilter(loadedGraphRDD, this.graphFilter);
}
// if the loaded graph RDD is already partitioned use that partitioner, else partition it with HashPartitioner
if (loadedGraphRDD.partitioner().isPresent())
    this.logger.debug("Using the existing partitioner associated with the loaded graphRDD: " + loadedGraphRDD.partitioner().get());
else {
    //是否对rdd跳过重新分区
    if (!skipPartitioner) {
        final Partitioner partitioner = new HashPartitioner(this.workersSet ? this.workers : loadedGraphRDD.partitions().size());
        this.logger.debug("Partitioning the loaded graphRDD: " + partitioner);
        loadedGraphRDD = loadedGraphRDD.partitionBy(partitioner);
        partitioned = true;
        assert loadedGraphRDD.partitioner().isPresent();
    } else {
        assert skipPartitioner == !loadedGraphRDD.partitioner().isPresent(); // no easy way to test this with a test case
        this.logger.debug("Partitioning has been skipped for the loaded graphRDD via " + GREMLIN_SPARK_SKIP_PARTITIONER);
    }
}
// if the loaded graphRDD was already partitioned previous, then this coalesce/repartition will not take place
//根据works配置数,对rdd进行重新分区
if (this.workersSet) {
    if (loadedGraphRDD.partitions().size() > this.workers) // ensures that the loaded graphRDD does not have more partitions than workers
        loadedGraphRDD = loadedGraphRDD.coalesce(this.workers);
    else if (loadedGraphRDD.partitions().size() < this.workers) // ensures that the loaded graphRDD does not have less partitions than workers
        loadedGraphRDD = loadedGraphRDD.repartition(this.workers);
}
// persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
if (!skipPersist && (!inputFromSpark || partitioned || filtered))
    loadedGraphRDD = loadedGraphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));

// final graph with view (for persisting and/or mapReducing -- may be null and thus, possible to save space/time)
JavaPairRDD computedGraphRDD = null;

// process the vertex program //

if (null != this.vertexProgram) {
    memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
    /
    // if there is a registered VertexProgramInterceptor, use it to bypass the GraphComputer semantics
    if (graphComputerConfiguration.containsKey(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)) {
        try {
            final SparkVertexProgramInterceptor interceptor =
                    (SparkVertexProgramInterceptor) Class.forName(graphComputerConfiguration.getString(Constants.GREMLIN_HADOOP_VERTEX_PROGRAM_INTERCEPTOR)).newInstance();
            computedGraphRDD = interceptor.apply(this.vertexProgram, loadedGraphRDD, memory);
        } catch (final ClassNotFoundException | IllegalAccessException | InstantiationException e) {
            throw new IllegalStateException(e.getMessage());
        }
    } else {  // standard GraphComputer semantics
        // get a configuration that will be propagated to all workers
        final HadoopConfiguration vertexProgramConfiguration = new HadoopConfiguration();
        this.vertexProgram.storeState(vertexProgramConfiguration);
        // set up the vertex program and wire up configurations
        this.vertexProgram.setup(memory);
        JavaPairRDD> viewIncomingRDD = null;
        memory.broadcastMemory(sparkContext);
        // execute the vertex program
        while (true) {
            if (Thread.interrupted()) {
                sparkContext.cancelAllJobs();
                throw new TraversalInterruptedException();
            }
            memory.setInExecute(true);
            //executor端对读取的数据进行转换 *** 作(主要是对点数据)
            viewIncomingRDD = SparkExecutor.executeVertexProgramIteration(loadedGraphRDD, viewIncomingRDD, memory, graphComputerConfiguration, vertexProgramConfiguration);
            memory.setInExecute(false);
            if (this.vertexProgram.terminate(memory))
                break;
            else {
                memory.incrIteration();
                memory.broadcastMemory(sparkContext);
            }
        }
        // if the graph will be continued to be used (persisted or mapreduced), then generate a view+graph
        if ((null != outputRDD && !this.persist.equals(Persist.NOTHING)) || !this.mapReducers.isEmpty()) {
            computedGraphRDD = SparkExecutor.prepareFinalGraphRDD(loadedGraphRDD, viewIncomingRDD, this.vertexProgram.getVertexComputeKeys());
            assert null != computedGraphRDD && computedGraphRDD != loadedGraphRDD;
        } else {
            // ensure that the computedGraphRDD was not created
            assert null == computedGraphRDD;
        }
    }
2,InputFormatRDD

构建HadoopRDD
源码如下:

@Override
public JavaPairRDD readGraphRDD(final Configuration configuration, final JavaSparkContext sparkContext) {
    //获取hadoop配置信息
    final org.apache.hadoop.conf.Configuration hadoopConfiguration = ConfUtil.makeHadoopConfiguration(configuration);
    //构建rdd,并将rdd转换为PairRdd
    return sparkContext.newAPIHadoopRDD(hadoopConfiguration,
            (Class>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_READER, InputFormat.class),
            NullWritable.class,
            VertexWritable.class)
            .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())));
}
3, ShuffleMapTask

触发写执行 *** 作,将数据写入block,并将block index信息写入dirver端的blockmanager对象

override def runTask(context: TaskContext): MapStatus = {
  // Deserialize the RDD using the broadcast variable.
  val threadMXBean = ManagementFactory.getThreadMXBean
  val deserializeStartTime = System.currentTimeMillis()
  val deserializeStartCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime
  } else 0L
  val ser = SparkEnv.get.closureSerializer.newInstance()
  val (rdd, dep) = ser.deserialize[(RDD[_], ShuffleDependency[_, _, _])](
    ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
  _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime
  _executorDeserializeCpuTime = if (threadMXBean.isCurrentThreadCpuTimeSupported) {
    threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
  } else 0L

  var writer: ShuffleWriter[Any, Any] = null
  try {
    val manager = SparkEnv.get.shuffleManager
    writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
    //触发写执行
    writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
    writer.stop(success = true).get
  } catch {
    case e: Exception =>
      try {
        if (writer != null) {
          writer.stop(success = false)
        }
      } catch {
        case e: Exception =>
          log.debug("Could not stop writer", e)
      }
      throw e
  }
}
4,RDD

 //第一步:迭代执行
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
  if (storageLevel != StorageLevel.NONE) {
    getOrCompute(split, context)
  } else {
      //执行compute *** 作or重checkpoint文件中读取数据
    computeOrReadCheckpoint(split, context)
  }
}
//第二步:执行compute *** 作
private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =
{
  if (isCheckpointedAndMaterialized) {
    firstParent[T].iterator(split, context)
  } else {
    compute(split, context)
  }
}
5,NewHadoopRdd

1,RDD的子类,执行该NewHadoopRDD的compute *** 作
2,构建reader对象,读取数据;
3,启动读取数据的reader任务;

override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
  val iter = new Iterator[(K, V)] {
      //构建切割对象,(读取hbase数据时,该对象是:TableSplit)
    private val split = theSplit.asInstanceOf[NewHadoopPartition]
    logInfo("Input split: " + split.serializableHadoopSplit)
    private val conf = getConf

    private val inputMetrics = context.taskMetrics().inputMetrics
    private val existingBytesRead = inputMetrics.bytesRead

    // Sets InputFileBlockHolder for the file block's information
    split.serializableHadoopSplit.value match {
      case fs: FileSplit =>
        InputFileBlockHolder.set(fs.getPath.toString, fs.getStart, fs.getLength)
      case _ =>
        InputFileBlockHolder.unset()
    }

    // Find a function that will return the FileSystem bytes read by this thread. Do this before
    // creating RecordReader, because RecordReader's constructor might read some bytes
    private val getBytesReadCallback: Option[() => Long] =
      split.serializableHadoopSplit.value match {
        case _: FileSplit | _: CombineFileSplit =>
          Some(SparkHadoopUtil.get.getFSBytesReadOnThreadCallback())
        case _ => None
      }

    // We get our input bytes from thread-local Hadoop FileSystem statistics.
    // If we do a coalesce, however, we are likely to compute multiple partitions in the same
    // task and in the same thread, in which case we need to avoid override values written by
    // previous partitions (SPARK-13071).
    private def updateBytesRead(): Unit = {
      getBytesReadCallback.foreach { getBytesRead =>
        inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
      }
    }
    //构建reader对象:(此处formart对象:HadoopInputFormart)
    private val format = inputFormatClass.newInstance
    format match {
      case configurable: Configurable =>
        configurable.setConf(conf)
      case _ =>
    }
    //构建恩物临时id
    private val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
    private val hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId)
    private var finished = false
    private var reader =
      try {
        val _reader = format.createRecordReader(
          split.serializableHadoopSplit.value, hadoopAttemptContext)
        _reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)
        _reader
      } catch {
        case e: FileNotFoundException if ignoreMissingFiles =>
          logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
          finished = true
          null
        // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
        case e: FileNotFoundException if !ignoreMissingFiles => throw e
        case e: IOException if ignoreCorruptFiles =>
          logWarning(
            s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
            e)
          finished = true
          null
      }

    // Register an on-task-completion callback to close the input stream.
    context.addTaskCompletionListener[Unit] { context =>
      // Update the bytesRead before closing is to make sure lingering bytesRead statistics in
      // this thread get correctly added.
      updateBytesRead()
      close()
    }

    private var havePair = false
    private var recordsSinceMetricsUpdate = 0

    override def hasNext: Boolean = {
      if (!finished && !havePair) {
        try {
          finished = !reader.nextKeyValue
        } catch {
          case e: FileNotFoundException if ignoreMissingFiles =>
            logWarning(s"Skipped missing file: ${split.serializableHadoopSplit}", e)
            finished = true
          // Throw FileNotFoundException even if `ignoreCorruptFiles` is true
          case e: FileNotFoundException if !ignoreMissingFiles => throw e
          case e: IOException if ignoreCorruptFiles =>
            logWarning(
              s"Skipped the rest content in the corrupted file: ${split.serializableHadoopSplit}",
              e)
            finished = true
        }
        if (finished) {
          // Close and release the reader here; close() will also be called when the task
          // completes, but for tasks that read from many files, it helps to release the
          // resources early.
          close()
        }
        havePair = !finished
      }
      !finished
    }

    override def next(): (K, V) = {
      if (!hasNext) {
        throw new java.util.NoSuchElementException("End of stream")
      }
      havePair = false
      if (!finished) {
        inputMetrics.incRecordsRead(1)
      }
      if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS == 0) {
        updateBytesRead()
      }
      (reader.getCurrentKey, reader.getCurrentValue)
    }

Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5696469.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存