func (api *API) NewPeerConnection(configuration Configuration) (*PeerConnection, error) {
// https://w3c.github.io/webrtc-pc/#constructor (Step #2)
// Some variables defined explicitly despite their implicit zero values to
// allow better readability to understand what is happening.
pc := &PeerConnection{
statsID: fmt.Sprintf("PeerConnection-%d", time.Now().UnixNano()),
configuration: Configuration{
ICEServers: []ICEServer{},
ICETransportPolicy: ICETransportPolicyAll,
BundlePolicy: BundlePolicyBalanced,
RTCPMuxPolicy: RTCPMuxPolicyRequire,
Certificates: []Certificate{},
ICECandidatePoolSize: 0,
},
ops: newOperations(),
isClosed: &atomicBool{},
isNegotiationNeeded: &atomicBool{},
negotiationNeededState: negotiationNeededStateEmpty,
lastOffer: "",
lastAnswer: "",
greaterMid: -1,
signalingState: SignalingStateStable,
api: api,
log: api.settingEngine.LoggerFactory.NewLogger("pc"),
}
pc.iceConnectionState.Store(ICEConnectionStateNew)
pc.connectionState.Store(PeerConnectionStateNew)
i, err := api.interceptorRegistry.Build("")
if err != nil {
return nil, err
}
pc.api = &API{
settingEngine: api.settingEngine,
interceptor: i,
}
if api.settingEngine.disableMediaEngineCopy {
pc.api.mediaEngine = api.mediaEngine
} else {
pc.api.mediaEngine = api.mediaEngine.copy()
}
if err = pc.initConfiguration(configuration); err != nil {
return nil, err
}
pc.iceGatherer, err = pc.createICEGatherer()
if err != nil {
return nil, err
}
// Create the ice transport
iceTransport := pc.createICETransport()
pc.iceTransport = iceTransport
// Create the DTLS transport
dtlsTransport, err := pc.api.NewDTLSTransport(pc.iceTransport, pc.configuration.Certificates)
if err != nil {
return nil, err
}
pc.dtlsTransport = dtlsTransport
// Create the SCTP transport
pc.sctpTransport = pc.api.NewSCTPTransport(pc.dtlsTransport)
// Wire up the on datachannel handler
pc.sctpTransport.OnDataChannel(func(d *DataChannel) {
pc.mu.RLock()
handler := pc.onDataChannelHandler
pc.mu.RUnlock()
if handler != nil {
handler(d)
}
})
pc.interceptorRTCPWriter = pc.api.interceptor.BindRTCPWriter(interceptor.RTCPWriterFunc(pc.writeRTCP))
return pc, nil
}
// NewAgent creates a new Agent
func NewAgent(config *AgentConfig) (*Agent, error) { //nolint:gocognit
var err error
if config.PortMax < config.PortMin {
return nil, ErrPort
}
mDNSName := config.MulticastDNSHostName
if mDNSName == "" {
if mDNSName, err = generateMulticastDNSName(); err != nil {
return nil, err
}
}
if !strings.HasSuffix(mDNSName, ".local") || len(strings.Split(mDNSName, ".")) != 2 {
return nil, ErrInvalidMulticastDNSHostName
}
mDNSMode := config.MulticastDNSMode
if mDNSMode == 0 {
mDNSMode = MulticastDNSModeQueryOnly
}
loggerFactory := config.LoggerFactory
if loggerFactory == nil {
loggerFactory = logging.NewDefaultLoggerFactory()
}
log := loggerFactory.NewLogger("ice")
var mDNSConn *mdns.Conn
mDNSConn, mDNSMode, err = createMulticastDNS(mDNSMode, mDNSName, log)
// Opportunistic mDNS: If we can't open the connection, that's ok: we
// can continue without it.
if err != nil {
log.Warnf("Failed to initialize mDNS %s: %v", mDNSName, err)
}
closeMDNSConn := func() {
if mDNSConn != nil {
if mdnsCloseErr := mDNSConn.Close(); mdnsCloseErr != nil {
log.Warnf("Failed to close mDNS: %v", mdnsCloseErr)
}
}
}
startedCtx, startedFn := context.WithCancel(context.Background())
a := &Agent{
chanTask: make(chan task),
chanState: make(chan ConnectionState),
chanCandidate: make(chan Candidate),
chanCandidatePair: make(chan *CandidatePair),
tieBreaker: globalMathRandomGenerator.Uint64(),
lite: config.Lite,
gatheringState: GatheringStateNew,
connectionState: ConnectionStateNew,
localCandidates: make(map[NetworkType][]Candidate),
remoteCandidates: make(map[NetworkType][]Candidate),
urls: config.Urls,
networkTypes: config.NetworkTypes,
onConnected: make(chan struct{}),
buffer: packetio.NewBuffer(),
done: make(chan struct{}),
taskLoopDone: make(chan struct{}),
startedCh: startedCtx.Done(),
startedFn: startedFn,
portmin: config.PortMin,
portmax: config.PortMax,
loggerFactory: loggerFactory,
log: log,
net: config.Net,
proxyDialer: config.ProxyDialer,
mDNSMode: mDNSMode,
mDNSName: mDNSName,
mDNSConn: mDNSConn,
gatherCandidateCancel: func() {},
forceCandidateContact: make(chan bool, 1),
interfaceFilter: config.InterfaceFilter,
insecureSkipVerify: config.InsecureSkipVerify,
}
a.tcpMux = config.TCPMux
if a.tcpMux == nil {
a.tcpMux = newInvalidTCPMux()
}
a.udpMux = config.UDPMux
a.udpMuxSrflx = config.UDPMuxSrflx
if a.net == nil {
a.net = vnet.NewNet(nil)
} else if a.net.IsVirtual() {
a.log.Warn("vnet is enabled")
if a.mDNSMode != MulticastDNSModeDisabled {
a.log.Warn("vnet does not support mDNS yet")
}
}
config.initWithDefaults(a)
// Make sure the buffer doesn't grow indefinitely.
// NOTE: We actually won't get anywhere close to this limit.
// SRTP will constantly read from the endpoint and drop packets if it's full.
a.buffer.SetLimitSize(maxBufferSize)
if a.lite && (len(a.candidateTypes) != 1 || a.candidateTypes[0] != CandidateTypeHost) {
closeMDNSConn()
return nil, ErrLiteUsingNonHostCandidates
}
if config.Urls != nil && len(config.Urls) > 0 && !containsCandidateType(CandidateTypeServerReflexive, a.candidateTypes) && !containsCandidateType(CandidateTypeRelay, a.candidateTypes) {
closeMDNSConn()
return nil, ErrUselessUrlsProvided
}
if err = config.initExtIPMapping(a); err != nil {
closeMDNSConn()
return nil, err
}
go a.taskLoop()
a.startOnConnectionStateChangeRoutine()
// Restart is also used to initialize the agent for the first time
if err := a.Restart(config.LocalUfrag, config.LocalPwd); err != nil {
closeMDNSConn()
_ = a.Close()
return nil, err
}
return a, nil
}
推荐一个零声学院免费公开课程,个人觉得老师讲得不错,分享给大家:Linux,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK等技术内容,立即学习
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)