pion ICE流程分析

pion ICE流程分析,第1张

func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, error) {
	// https://w3c.github.io/webrtc-pc/#constructor (Step #2)
	// Some variables defined explicitly despite their implicit zero values to
	// allow better readability to understand what is happening.
	pc := &PeerConnection{
		statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()),
		configuration: Configuration{
			ICEServers:           []ICEServer{},
			ICETransportPolicy:   ICETransportPolicyAll,
			BundlePolicy:         BundlePolicyBalanced,
			RTCPMuxPolicy:        RTCPMuxPolicyRequire,
			Certificates:         []Certificate{},
			ICECandidatePoolSize: 0,
		},
		ops:                    newOperations(),
		isClosed:               &atomicBool{},
		isNegotiationNeeded:    &atomicBool{},
		negotiationNeededState: negotiationNeededStateEmpty,
		lastOffer:              "",
		lastAnswer:             "",
		greaterMid:             -1,
		signalingState:         SignalingStateStable,

		api: api,
		log: api.settingEngine.LoggerFactory.NewLogger("pc"),
	}
	pc.iceConnectionState.Store(ICEConnectionStateNew)
	pc.connectionState.Store(PeerConnectionStateNew)

	i, err := api.interceptorRegistry.Build("")
	if err != nil {
		return nil, err
	}

	pc.api = &API{
		settingEngine: api.settingEngine,
		interceptor:   i,
	}

	if api.settingEngine.disableMediaEngineCopy {
		pc.api.mediaEngine = api.mediaEngine
	} else {
		pc.api.mediaEngine = api.mediaEngine.copy()
	}

	if err = pc.initConfiguration(configuration); err != nil {
		return nil, err
	}

	pc.iceGatherer, err = pc.createICEGatherer()
	if err != nil {
		return nil, err
	}

	// Create the ice transport
	iceTransport := pc.createICETransport()
	pc.iceTransport = iceTransport

	// Create the DTLS transport
	dtlsTransport, err := pc.api.NewDTLSTransport(pc.iceTransport, pc.configuration.Certificates)
	if err != nil {
		return nil, err
	}
	pc.dtlsTransport = dtlsTransport

	// Create the SCTP transport
	pc.sctpTransport = pc.api.NewSCTPTransport(pc.dtlsTransport)

	// Wire up the on datachannel handler
	pc.sctpTransport.OnDataChannel(func(d *DataChannel) {
		pc.mu.RLock()
		handler := pc.onDataChannelHandler
		pc.mu.RUnlock()
		if handler != nil {
			handler(d)
		}
	})

	pc.interceptorRTCPWriter = pc.api.interceptor.BindRTCPWriter(interceptor.RTCPWriterFunc(pc.writeRTCP))

	return pc, nil
}
// NewAgent creates a new Agent
func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
	var err error
	if config.PortMax < config.PortMin {
		return nil, ErrPort
	}

	mDNSName := config.MulticastDNSHostName
	if mDNSName == "" {
		if mDNSName, err = generateMulticastDNSName(); err != nil {
			return nil, err
		}
	}

	if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {
		return nil, ErrInvalidMulticastDNSHostName
	}

	mDNSMode := config.MulticastDNSMode
	if mDNSMode == 0 {
		mDNSMode = MulticastDNSModeQueryOnly
	}

	loggerFactory := config.LoggerFactory
	if loggerFactory == nil {
		loggerFactory = logging.NewDefaultLoggerFactory()
	}
	log := loggerFactory.NewLogger("ice")

	var mDNSConn *mdns.Conn
	mDNSConn, mDNSMode, err = createMulticastDNS(mDNSMode, mDNSName, log)
	// Opportunistic mDNS: If we can't open the connection, that's ok: we
	// can continue without it.
	if err != nil {
		log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)
	}
	closeMDNSConn := func() {
		if mDNSConn != nil {
			if mdnsCloseErr := mDNSConn.Close(); mdnsCloseErr != nil {
				log.Warnf("Failed to close mDNS: %v", mdnsCloseErr)
			}
		}
	}

	startedCtx, startedFn := context.WithCancel(context.Background())

	a := &Agent{
		chanTask:          make(chan task),
		chanState:         make(chan ConnectionState),
		chanCandidate:     make(chan Candidate),
		chanCandidatePair: make(chan *CandidatePair),
		tieBreaker:        globalMathRandomGenerator.Uint64(),
		lite:              config.Lite,
		gatheringState:    GatheringStateNew,
		connectionState:   ConnectionStateNew,
		localCandidates:   make(map[NetworkType][]Candidate),
		remoteCandidates:  make(map[NetworkType][]Candidate),
		urls:              config.Urls,
		networkTypes:      config.NetworkTypes,
		onConnected:       make(chan struct{}),
		buffer:            packetio.NewBuffer(),
		done:              make(chan struct{}),
		taskLoopDone:      make(chan struct{}),
		startedCh:         startedCtx.Done(),
		startedFn:         startedFn,
		portmin:           config.PortMin,
		portmax:           config.PortMax,
		loggerFactory:     loggerFactory,
		log:               log,
		net:               config.Net,
		proxyDialer:       config.ProxyDialer,

		mDNSMode: mDNSMode,
		mDNSName: mDNSName,
		mDNSConn: mDNSConn,

		gatherCandidateCancel: func() {},

		forceCandidateContact: make(chan bool, 1),

		interfaceFilter: config.InterfaceFilter,

		insecureSkipVerify: config.InsecureSkipVerify,
	}

	a.tcpMux = config.TCPMux
	if a.tcpMux == nil {
		a.tcpMux = newInvalidTCPMux()
	}
	a.udpMux = config.UDPMux
	a.udpMuxSrflx = config.UDPMuxSrflx

	if a.net == nil {
		a.net = vnet.NewNet(nil)
	} else if a.net.IsVirtual() {
		a.log.Warn("vnet is enabled")
		if a.mDNSMode != MulticastDNSModeDisabled {
			a.log.Warn("vnet does not support mDNS yet")
		}
	}

	config.initWithDefaults(a)

	// Make sure the buffer doesn't grow indefinitely.
	// NOTE: We actually won't get anywhere close to this limit.
	// SRTP will constantly read from the endpoint and drop packets if it's full.
	a.buffer.SetLimitSize(maxBufferSize)

	if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {
		closeMDNSConn()
		return nil, ErrLiteUsingNonHostCandidates
	}

	if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {
		closeMDNSConn()
		return nil, ErrUselessUrlsProvided
	}

	if err = config.initExtIPMapping(a); err != nil {
		closeMDNSConn()
		return nil, err
	}

	go a.taskLoop()
	a.startOnConnectionStateChangeRoutine()

	// Restart is also used to initialize the agent for the first time
	if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
		closeMDNSConn()
		_ = a.Close()
		return nil, err
	}

	return a, nil
}

推荐一个零声学院免费公开课程,个人觉得老师讲得不错,分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/web/993237.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存