【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2

【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2,第1张

【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2

【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part1
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part2
【Tinkerpop整理】以Spark为引擎全量检索图库数据流程源码解析-part3

6,HadoopInputFormat
//静态块,构建JanusGraphVertex反序列化对象,构建JanusGraphHadoopSetupImpl对象时,会打开图
static {
    refCounter = new RefCountedCloseable<>((conf) ->
        new JanusGraphVertexDeserializer(new JanusGraphHadoopSetupImpl(conf)));
}
//构建HadoopRecordReader对象
@Override
public RecordReader createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
    //创建该对象时,inputFormat对象为:配置文件中配置的HbaseBinaryInputFormat对象;
    return new HadoopRecordReader(refCounter, inputFormat.createRecordReader(split, context));
}
7,JanusGraphHadoopSetupImpl
public JanusGraphHadoopSetupImpl(final Configuration config) {
    scanConf = ModifiableHadoopConfiguration.of(JanusGraphHadoopConfiguration.MAPRED_NS, config);
    //获取图配置信息
    BasicConfiguration bc = scanConf.getJanusGraphConf();
    //打开图对象
    graph = (StandardJanusGraph) JanusGraphFactory.open(bc);
    //开始事务
    tx = (StandardJanusGraphTx)graph.buildTransaction().readOnly().vertexCacheSize(200).start();
}
8, JanusGraphFactory
public static JanusGraph open(ReadConfiguration configuration, String backupName) {
    final ModifiableConfiguration config = new ModifiableConfiguration(ROOT_NS, (WriteConfiguration) configuration, BasicConfiguration.Restriction.NONE);
    final String graphName = config.has(GRAPH_NAME) ? config.get(GRAPH_NAME) : backupName;
    final JanusGraphManager jgm = JanusGraphManagerUtility.getInstance();
    if (null != graphName) {
        Preconditions.checkNotNull(jgm, JANUS_GRAPH_MANAGER_EXPECTED_STATE_MSG);
        return (JanusGraph) jgm.openGraph(graphName, gName -> new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration)));
    } else {
        if (jgm != null) {
            log.warn("You should supply "graph.graphname" in your .properties file configuration if you are opening " +
                     "a graph that has not already been opened at server start, i.e. it was " +
                     "defined in your YAML file. This will ensure the graph is tracked by the JanusGraphManager, " +
                     "which will enable autocommit and rollback functionality upon all gremlin script executions. " +
                     "Note that JanusGraphFactory#open(String === shortcut notation) does not support consuming the property " +
                     ""graph.graphname" so these graphs should be accessed dynamically by supplying a .properties file here " +
                     "or by using the ConfiguredGraphFactory.");
        }
        //构建图对象
        return new StandardJanusGraph(new GraphDatabaseConfigurationBuilder().build(configuration));
    }
}
9,GraphDatabaseConfigurationBuilder

构建graph存储库配置信息(e.g:后端存储为hbase:Hbase库的配置信息)

public GraphDatabaseConfiguration build(ReadConfiguration localConfig){

    Preconditions.checkNotNull(localConfig);

    BasicConfiguration localBasicConfiguration = new BasicConfiguration(ROOT_NS,localConfig, BasicConfiguration.Restriction.NONE);
    ModifiableConfiguration overwrite = new ModifiableConfiguration(ROOT_NS,new CommonsConfiguration(), BasicConfiguration.Restriction.NONE);
    //此处获取的对象是:HbaseStoreManeger,数据存储管理对象
    final KeyColumnValueStoreManager storeManager = Backend.getStorageManager(localBasicConfiguration);
    final StoreFeatures storeFeatures = storeManager.getFeatures();
    
    //获取全局配置信息:
    
    final ReadConfiguration globalConfig = new ReadConfigurationBuilder().buildGlobalConfiguration(
        localConfig, localBasicConfiguration, overwrite, storeManager,
        new ModifiableConfigurationBuilder(), new KCVSConfigurationBuilder());

    //Copy over local config options
    //获取局部本地配置信息;即创建人物时,传入参数信息
    
    ModifiableConfiguration localConfiguration = new ModifiableConfiguration(ROOT_NS, new CommonsConfiguration(), BasicConfiguration.Restriction.LOCAL);
    localConfiguration.setAll(getLocalSubset(localBasicConfiguration.getAll()));
    
    Configuration combinedConfig = new MixedConfiguration(ROOT_NS,globalConfig,localConfig);

    //Compute unique instance id
    String uniqueGraphId = UniqueInstanceIdRetriever.getInstance().getOrGenerateUniqueInstanceId(combinedConfig);
    overwrite.set(UNIQUE_INSTANCE_ID, uniqueGraphId);

    checkAndOverwriteTransactionLogConfiguration(combinedConfig, overwrite, storeFeatures);
    checkAndOverwriteSystemManagementLogConfiguration(combinedConfig, overwrite);

    MergedConfiguration configuration = new MergedConfiguration(overwrite,combinedConfig);

    return new GraphDatabaseConfiguration(localConfig, localConfiguration, uniqueGraphId, configuration);
}
10 StandardJanusGraph
静态代码块,注册相关策略
static {
    TraversalStrategies graphStrategies =
        TraversalStrategies.GlobalCache.getStrategies(Graph.class)
            .clone()
            .addStrategies(AdjacentVertexFilterOptimizerStrategy.instance(),
                           AdjacentVertexHasIdOptimizerStrategy.instance(),
                           AdjacentVertexIsOptimizerStrategy.instance(),
                           JanusGraphLocalQueryOptimizerStrategy.instance(),
                           JanusGraphStepStrategy.instance(),
                           JanusGraphIoRegistrationStrategy.instance());

    //Register with cache
    TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraph.class, graphStrategies);
    TraversalStrategies.GlobalCache.registerStrategies(StandardJanusGraphTx.class, graphStrategies);
}
private final GraphDatabaseConfiguration config;
private final Backend backend;
private final IDManager idManager;
private final VertexIDAssigner idAssigner;
private final TimestampProvider times;

//Serializers
protected final IndexSerializer indexSerializer;
protected final EdgeSerializer edgeSerializer;
protected final Serializer serializer;

//Caches
public final SliceQuery vertexExistenceQuery;
private final RelationQueryCache queryCache;
private final SchemaCache schemaCache;

//Log
private final ManagementLogger managementLogger;

//Shutdown hook
private volatile ShutdownThread shutdownHook;

private volatile boolean isOpen;
private final AtomicLong txCounter;

private final Set openTransactions;

private final String name;
//构造函数,配置相关属性
public StandardJanusGraph(GraphDatabaseConfiguration configuration) {

    this.config = configuration;
    //获取存储对象
    this.backend = configuration.getBackend();

    this.name = configuration.getGraphName();

    this.idAssigner = config.getIDAssigner(backend);
    this.idManager = idAssigner.getIDManager();

    this.serializer = config.getSerializer();
    StoreFeatures storeFeatures = backend.getStoreFeatures();
    this.indexSerializer = new IndexSerializer(configuration.getConfiguration(), this.serializer,
            this.backend.getIndexInformation(), storeFeatures.isDistributed() && storeFeatures.isKeyOrdered());
    this.edgeSerializer = new EdgeSerializer(this.serializer);
    this.vertexExistenceQuery = edgeSerializer.getQuery(baseKey.VertexExists, Direction.OUT, new EdgeSerializer.TypedInterval[0]).setLimit(1);
    this.queryCache = new RelationQueryCache(this.edgeSerializer);
    this.schemaCache = configuration.getTypeCache(typeCacheRetrieval);
    this.times = configuration.getTimestampProvider();

    isOpen = true;
    txCounter = new AtomicLong(0);
    openTransactions = Collections.newSetFromMap(new ConcurrentHashMap(100, 0.75f, 1));

    //Register instance and ensure uniqueness
    String uniqueInstanceId = configuration.getUniqueGraphId();
    ModifiableConfiguration globalConfig = getGlobalSystemConfig(backend);
    final boolean instanceExists = globalConfig.has(REGISTRATION_TIME, uniqueInstanceId);
    final boolean replaceExistingInstance = configuration.getConfiguration().get(REPLACE_INSTANCE_IF_EXISTS);
    if (instanceExists && !replaceExistingInstance) {
        throw new JanusGraphException(String.format("A JanusGraph graph with the same instance id [%s] is already open. Might required forced shutdown.", uniqueInstanceId));
    } else if (instanceExists && replaceExistingInstance) {
        log.debug(String.format("Instance [%s] already exists. Opening the graph per " + REPLACE_INSTANCE_IF_EXISTS.getName() + " configuration.", uniqueInstanceId));
    }
    globalConfig.set(REGISTRATION_TIME, times.getTime(), uniqueInstanceId);

    Log managementLog = backend.getSystemMgmtLog();
    managementLogger = new ManagementLogger(this, managementLog, schemaCache, this.times);
    managementLog.registerReader(ReadMarker.fromNow(), managementLogger);

    shutdownHook = new ShutdownThread(this);
    Runtime.getRuntime().addShutdownHook(shutdownHook);
    log.debug("Installed shutdown hook {}", shutdownHook, new Throwable("Hook creation trace"));
}

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/zaji/5691327.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)

保存