关于 .NET 运行时 Sockets Datagram Socket,大量 Socket 缓冲区内存问题

关于 .NET 运行时 Sockets Datagram Socket,大量 Socket 缓冲区内存问题,第1张

关于 .NET 运行时 Sockets Datagram Socket,大量 Socket 缓冲区内存问题

本文解决的问题如上所示,Datagram(数据报)类型的 Socket,这包含 “ICMP、UDP”、“RAW IP”等,那么为什么会发生这样的问题?

我们在阻塞模式下是不存在这个问题,但本文描述的问题仅仅包含在以下几个 Socket 异步接口实现上面。

以下为发生故障的 .NET API:

1、System::Net::Sockets::Socket::BeginSendTo

2、System::Net::Sockets::Socket::BeginReceiveFrom

3、System::Net::Sockets::Socket::EndSendTo

4、System::Net::Sockets::Socket::EndReceiveFrom

5、System::Net::Sockets::Socket::SendToAsync

6、System::Net::Sockets::Socket::ReceiveFromAsync

总结:

于 Windows 平台上而言,则是 .NET 基于 CompletionRoutine(完成例程)、IoCompletionPort(完成端口)实现导致的问题。

于 Linux 平台上而言,则是 .NET 基于 libuv(epoll)实现导致的问题。

但这东西可以说它并不是 BUG,但仅仅只是造成大量不必要的内存被分配,挤占系统硬件资源(服务器上是很忌讳的),C/C++ 直接利用 IOCP、EPOLL 实现多线程的工作队列仍旧有很多人会造成这样的问题存在,比如:把所有的需要 Completion Context 投递到 ROOT IoCP or epoll,所有完成工作线程从 ROOT 上等待, *** 作系统触发事件到达,并不是固定某个特定的线程的,多个线程等待相同的 handle,触发可能是等待的任意一个线程。

问题:(源于)

为了实现真正高效的,事件驱动状态机(EDSM),并且提供与多个平台相近的API实现,每个事件(IoCompletionContext)被投递到 .NET ThreadPool(完成事件上下文工作线程池)进行驱动,Windows 平台基于 IOCP/IOCR 模型进行驱动,Linux 由 epoll 模型进行驱动,它类似 C/C++ boost::asio 的 boost::asio::io_context(io_service)IO完成队列驱动上下文,但与 boost::asio 框架不同的是,每个 Socket 实例的每次 “完成事务、Completion Transaction” 可以由 .NET 完成队列线程池 dispatch,而 boost::asio,仅仅只允许最终由每个 asio 确切的具体对象构造时指定的 Executor(io_context) 进行驱动,即不存在 .NET Implement,可能导致的,由A线程执行可变为A线程自身直接完成,或由B/C/D任意一个线程进行完成。

而对于一个 MT(多线程)架构的 Datagram 网络应用程序而言,我们不需要为每个 Socket 都需要分配一个有效的内存缓冲区(RAM Buffer),仅仅只需要为每个线程分配固态缓冲区 64KB(65535字节)即可,纵然是分配此缓冲区大小仍旧存在 RAM 空间浪费(C/C++ 建议 64KB 尽量按照粒度分配,.NET 程式分配器自行对齐)

准确需要按照不同的网络来区别需要多大的缓冲区,例如 UDP 协议则应最大缓冲区大小为:65535 - sizeof(udp_hdr, 8),但借助 .NET 运行时提供的 Socket 是无法办到的,所以我们必须要解决这个问题,每个 UDP Dgram-socket 都需分配 64K 内存,那么 1G RAM 满打满算顶多只能分配 1.6W 个左右缓冲区,然而现实情况是根本不可能,能够分配几千个那都是烧了高香,因为你不可以单纯只算你具体分配了多少,.NET 一个托管对象实例需要占用很多无用户内存的。

#pragma pack(push, 1)
            struct udp_hdr {
            public:
                unsigned short                  src;
                unsigned short                  dest;  
                unsigned short                  len;
                unsigned short                  chksum;

            public:
                static struct udp_hdr*          Parse(struct ip_hdr* iphdr, const void* packet, int size);
            };
#pragma pack(pop)

但 TCP/IP 类型的 Stream Socket 是不建议为分个线程都分配固态缓冲区的,理由为每个 *** 作系统都为 TCP/IP 进行过单独的吞吐优化,如果一个线程上多个 TCP Socket 使用相同 RAM 地址缓冲区数据,那么这会导致严重的 TCP/IP Socket 网络IO的收发(Received[RX]、Sent[TX])吞吐性能严重降速,这不是框架问题,而是系统层实现问题,大量的SYN速度能快起来那就有问题了,强行解决办法则是抛弃系统层实现的易于应用层用户调用 TCP Socket,自主实现或引入一个 3rd TCP/IP 网络协议栈。

 那么既然已知道有这么个疑难问题,处理上就很简单的,本人基于 boost::asio 封装了一个可用的 C/C++/C# 库代码,看到本贴文内容的小伙伴,可自行通过其它方法自行实现或摘要本文提供的处理代码。

Usage:

        [MTAThread]
        private static void Main(string[] args)
        {
            IPAddress interfaceIP = IPAddress.Parse("192.168.0.24");
            AsyncScheduler scheduler = new AsyncScheduler(1);
            AsyncContext context = scheduler.GetContext();

            AsyncSocket socket = context.CreateSocket(new Socket(AddressFamily.InterNetwork, SocketType.Dgram, ProtocolType.Udp));
            socket.Socket.Bind(new IPEndPoint(interfaceIP, 0));

            byte[] buffer = context.Buffer;
            socket.SendTo(new byte[] { (byte)'1', (byte)'2', (byte)'3' }, 0, 3, new IPEndPoint(interfaceIP, 7000), null);
            socket.ReceiveFrom(buffer, 0, buffer.Length, (bytesTransferred, sourceEP) =>
            {
                string RX = Encoding.UTF8.GetString(buffer, 0, Math.Max(bytesTransferred, 0));
                Console.WriteLine($"bytesTransferred[{bytesTransferred}] RX[{RX}]");
            });
            Console.ReadKey(false);
        }

C/C++ Dlllibrary Implement: 

#include 
#include 
#include 
#ifdef _WIN32
#include 
#include 

#pragma comment(lib, "Ws2_32.lib")
#else
#include 
#include 
#include 
#include 
#endif

#include 
#include 
#include 
#include 

#ifndef LIBASIO_API
#ifdef __cplusplus 
#ifdef _WIN32
#define LIBASIO_API extern "C" __declspec(dllexport)
#else
#define LIBASIO_API extern "C" __attribute__((visibility("default")))
#endif
#else
#define LIBASIO_API
#endif
#endif

typedef struct {
    uint32_t            v4_or_v6_;
    union {
        struct {
            uint32_t    address_;
            uint32_t    port_;
        } in4_;
        struct {
            char        address_[16];
            uint32_t    port_;
        } in6_;
        char            data_[20];
    };
} libasio_endpoint;
typedef void(*libasio_post_callback)(void* context_, uint64_t key_);
typedef void(*libasio_sendto_callback)(void* socket_, uint64_t key_, int length_);
typedef void(*libasio_recvfrom_callback)(void* socket_, uint64_t key_, int length_, libasio_endpoint* remoteEP_);

typedef std::shared_ptr                      libasio_socket;
typedef std::shared_ptr                           libasio_context;
typedef std::unordered_map      libasio_context_hashtable;
typedef std::unordered_map  libasio_socket_hashtable;
typedef std::unordered_map libasio_so2ctx;

static libasio_so2ctx            _so2ctxs_;
static libasio_socket_hashtable  _sockets_;
static libasio_context_hashtable _contexts_;
static std::mutex                _syncobj_;

#define __lock__(obj) 
do {
    std::lock_guard scoped_(obj);
#define __unlock__ 
} while(0);

static libasio_context libasio_getcontext(boost::asio::io_context* context_) {
    libasio_context_hashtable::iterator tail = _contexts_.find(context_);
    libasio_context_hashtable::iterator endl = _contexts_.end();
    if (tail == endl) {
        return NULL;
    }
    return tail->second;
}

static libasio_context libasio_getcontext(boost::asio::ip::udp::socket* socket_) {
    libasio_so2ctx::iterator tail = _so2ctxs_.find(socket_);
    libasio_so2ctx::iterator endl = _so2ctxs_.end();
    if (tail == endl) {
        return NULL;
    }
    return tail->second;
}

static libasio_socket libasio_getsocket(boost::asio::ip::udp::socket* socket_) {
    libasio_socket_hashtable::iterator tail = _sockets_.find(socket_);
    libasio_socket_hashtable::iterator endl = _sockets_.end();
    if (tail == endl) {
        return NULL;
    }
    return tail->second;
}

LIBASIO_API
boost::asio::io_context* libasio_newcontext() {
    std::shared_ptr context_ = std::make_shared();
    std::thread([context_] {
        #ifdef _WIN32
        SetThreadPriority(GetCurrentProcess(), THREAD_PRIORITY_HIGHEST);
        #else
        
        struct sched_param param_;
        param_.sched_priority = sched_get_priority_max(SCHED_FIFO); // SCHED_RR
        pthread_setschedparam(pthread_self(), SCHED_FIFO, ¶m_);
        #endif

        boost::asio::io_context::work work_(*context_);
        boost::system::error_code ec_;
        context_->run(ec_);

        __lock__(_syncobj_){
            std::shared_ptr p = libasio_getcontext(context_.get());
            if (p) {
                _contexts_.erase(context_.get());
            }
        } __unlock__;
    }).detach();
    boost::asio::io_context* p = context_.get();
    __lock__(_syncobj_) {
        _contexts_[p] = std::move(context_);
    } __unlock__;
    return p;
}

LIBASIO_API
void libasio_closecontext(boost::asio::io_context* context_) {
    if (!context_) {
        return;
    }
    __lock__(_syncobj_) {
        std::shared_ptr p = libasio_getcontext(context_);
        if (!p) {
            return;
        }
        context_->stop();
        _contexts_.erase(context_);
    } __unlock__;
}

LIBASIO_API
bool libasio_postcontext(boost::asio::io_context* context_, uint64_t key_, libasio_post_callback callback_) {
    if (!context_ || !callback_) {
        return false;
    }
    __lock__(_syncobj_) {
        std::shared_ptr p = libasio_getcontext(context_);
        if (!p) {
            return false;
        }
        context_->post([context_, key_, callback_] {
            callback_(context_, key_);
        });
    } __unlock__;
    return true;
}

LIBASIO_API
boost::asio::ip::udp::socket* libasio_createsocket(boost::asio::io_context* context_, int sockfd_, bool v4_or_v6_) {
    if (!context_ || sockfd_ == -1) {
        return NULL;
    }
    __lock__(_syncobj_) {
        libasio_context context = libasio_getcontext(context_);
        if (!context) {
            return NULL;
        }
        libasio_socket socket_ = std::make_shared(*context_);
        if (v4_or_v6_) {
            socket_->assign(boost::asio::ip::udp::v4(), sockfd_);
        }
        else {
            socket_->assign(boost::asio::ip::udp::v6(), sockfd_);
        }
        boost::asio::ip::udp::socket* r_ = socket_.get();
        _sockets_[r_] = std::move(socket_);
        _so2ctxs_[r_] = std::move(context);
        return r_;
    } __unlock__;
}

LIBASIO_API
void libasio_closesocket(boost::asio::ip::udp::socket* socket_) {
    if (!socket_) {
        return;
    }
    __lock__(_syncobj_) {
        libasio_context context = libasio_getcontext(socket_);
        if (!context) {
            return;
        }
        libasio_socket socket = libasio_getsocket(socket_);
        boost::asio::post(*context, [context, socket] {
            if (socket->is_open()) {
                boost::system::error_code ec;
                try {
                    socket->cancel(ec);
                }
                catch (std::exception&) {}
                try {
                    socket->shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
                }
                catch (std::exception&) {}
                try {
                    socket->close(ec);
                }
                catch (std::exception&) {}
            }
        });
        _sockets_.erase(socket_);
        _so2ctxs_.erase(socket_);
    } __unlock__;
}

LIBASIO_API
bool libasio_recvfrom(boost::asio::ip::udp::socket* socket_, uint64_t key_, char* buf_, int size_, libasio_recvfrom_callback callback_) {
    if (!socket_ || !buf_ || size_ <= 0 || !callback_) {
        return false;
    }
    std::shared_ptr endpoint_ = std::make_shared();
    __lock__(_syncobj_) {
        libasio_context context_ = libasio_getcontext(socket_);
        if (!context_) {
            return false;
        }
        libasio_socket socket = libasio_getsocket(socket_);
        socket->async_receive_from(boost::asio::buffer(buf_, size_), *endpoint_,
            [context_, socket, key_, callback_, endpoint_](const boost::system::error_code& ec, uint32_t sz) {
                int length_ = -1;
                if (!ec) {
                    length_ = sz;
                }
                libasio_endpoint stack_;
                if (endpoint_->protocol() == boost::asio::ip::udp::v4()) {
                    stack_.v4_or_v6_ = 1;
                    stack_.in4_.address_ = htonl(endpoint_->address().to_v4().to_uint());
                    stack_.in4_.port_ = endpoint_->port();

                    callback_(socket.get(), key_, length_, &stack_);
                }
                else if (endpoint_->protocol() == boost::asio::ip::udp::v6()) {
                    stack_.v4_or_v6_ = 0;
                    stack_.in6_.port_ = endpoint_->port();

                    boost::asio::ip::address_v6::bytes_type addr_bytes_ = endpoint_->address().to_v6().to_bytes();
                    memcpy(stack_.in6_.address_, addr_bytes_.data(), addr_bytes_.size());

                    callback_(socket.get(), key_, length_, &stack_);
                }
                else {
                    callback_(socket.get(), key_, length_, NULL);
                }
            });
    } __unlock__;
    return true;
}

LIBASIO_API
bool libasio_sendto(boost::asio::ip::udp::socket* socket_, uint64_t key_, char* buf_, int size_, libasio_endpoint* endpoint_, libasio_sendto_callback callback_) {
    if (!socket_ || !buf_ || size_ <= 0 || !endpoint_) {
        return false;
    }
    boost::asio::ip::udp::endpoint sendtoEP_;
    if (endpoint_->v4_or_v6_) {
        sendtoEP_ = boost::asio::ip::udp::endpoint(boost::asio::ip::address_v4(ntohl(endpoint_->in4_.address_)), endpoint_->in4_.port_);
    }
    else {
        boost::asio::ip::address_v6::bytes_type addr_bytes_;
        memcpy(addr_bytes_.data(), endpoint_->in6_.address_, addr_bytes_.size());
        sendtoEP_ = boost::asio::ip::udp::endpoint(boost::asio::ip::address_v6(addr_bytes_), endpoint_->in6_.port_);
    }
    __lock__(_syncobj_) {
        libasio_context context_ = libasio_getcontext(socket_);
        if (!context_) {
            return false;
        }
        libasio_socket socket = libasio_getsocket(socket_);
        socket->async_send_to(boost::asio::buffer(buf_, size_), sendtoEP_,
            [context_, socket, key_, callback_](const boost::system::error_code& ec, uint32_t sz) {
                if (callback_) {
                    int length_ = -1;
                    if (!ec) {
                        length_ = sz;
                    }
                    callback_(socket.get(), key_, length_);
                }
            });
    } __unlock__;
    return true;
}

C# AsyncContext.cs

namespace Ppp.Net.Auxiliary
{
    using System;
    using System.Security;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Net.Sockets;
#if NETCOREAPP
    using System.Runtime.CompilerServices;
#endif
    using System.Runtime.InteropServices;
    using System.Threading;

    public unsafe sealed class AsyncContext : IDisposable
    {
        [Dllimport("libasio", EntryPoint = "libasio_newcontext", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.SysInt)]
        private static extern IntPtr libasio_new_context();

        [Dllimport("libasio", EntryPoint = "libasio_closecontext", CallingConvention = CallingConvention.Cdecl)]
        private static extern void libasio_closecontext([MarshalAs(UnmanagedType.SysInt)] IntPtr context_);

        [Dllimport("libasio", EntryPoint = "libasio_postcontext", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.Bool)]
        private static extern bool libasio_postcontext([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, long key_, [MarshalAs(UnmanagedType.FunctionPtr)] libasio_post_callback callback_);

        [UnmanagedFunctionPointer(CallingConvention.Cdecl)]
        private delegate void libasio_post_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, long key_);

        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private IntPtr _handle = IntPtr.Zero;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private bool _disposed = false;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private long _mapkey = 0;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly object _synobj = new object();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly byte[] _buffer = new byte[ushort.MaxValue];
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private GCHandle _buffer_gc = default(GCHandle);

        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly AsyncScheduler _scheduler = new AsyncScheduler(Environment.ProcessorCount);
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly libasio_post_callback _callback = (context_, key_) =>
        {
            _callbacks.TryGetValue(context_, out ConcurrentDictionary callbacks);
            if (callbacks == null)
            {
                return;
            }
            callbacks.TryRemove(key_, out IOCompletionCallback callback_);
            if (callback_ == null)
            {
                return;
            }
            callback_.callback_(callback_.state_);
        };
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly ConcurrentDictionary> _callbacks =
            new ConcurrentDictionary>();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        static AsyncContext() => GCHandle.Alloc(_callback);

        private sealed class IOCompletionCallback
        {
            public object state_;
            public Action callback_;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private ConcurrentDictionary GetAllCallback()
        {
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return null;
                }
                lock (_callbacks)
                {
                    _callbacks.TryGetValue(this._handle, out ConcurrentDictionary d);
                    if (d == null)
                    {
                        d = new ConcurrentDictionary();
                        if (!_callbacks.TryAdd(this._handle, d))
                        {
                            d = null;
                        }
                    }
                    return d;
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private long BindCallback(Action callback, object state)
        {
            ConcurrentDictionary callbacks = this.GetAllCallback();
            if (callbacks == null)
            {
                return 0;
            }
            IOCompletionCallback cb = new IOCompletionCallback()
            {
                callback_ = callback,
                state_ = state,
            };
            long key_ = 0;
            do
            {
                while (key_ == 0)
                {
                    key_ = Interlocked.Increment(ref this._mapkey);
                }
            } while (!callbacks.TryAdd(key_, cb));
            return key_;
        }

        public static AsyncScheduler Scheduler
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => AsyncContext._scheduler;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncContext()
        {
            this._handle = libasio_new_context();
            this._buffer_gc = GCHandle.Alloc(this._buffer, GCHandleType.Pinned);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        ~AsyncContext() => this.Dispose();

        public IntPtr Handle
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => Interlocked.CompareExchange(ref this._handle, IntPtr.Zero, IntPtr.Zero);
        }

        public byte[] Buffer
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => this._buffer;
        }

        public object Tag
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get;
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            set;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncSocket CreateSocket(Socket socket)
        {
            if (socket == null)
            {
                throw new ArgumentNullException(nameof(socket));
            }
            return new AsyncSocket(this, socket);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public bool Post(Action callback, object state)
        {
            if (callback == null)
            {
                return false;
            }
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return false;
                }
                long key_ = this.BindCallback(callback, state);
                if (key_ == 0)
                {
                    return false;
                }
                return libasio_postcontext(this._handle, key_, _callback);
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Dispose()
        {
            lock (this._synobj)
            {
                if (!this._disposed)
                {
                    this.Post((state) =>
                    {
                        var gc = __makeref(this._buffer_gc);
                        if (__refvalue(gc, GCHandle).IsAllocated)
                        {
                            __refvalue(gc, GCHandle).Free();
                        }
                        libasio_closecontext(this._handle);
                        _callbacks.TryRemove(this._handle, out ConcurrentDictionary _);
                    }, default(object));
                    this._disposed = true;
                }
            }
            GC.SuppressFinalize(this);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public static AsyncContext GetContext() => AsyncContext._scheduler.GetContext();
    }
}
 

C# AsyncScheduler.cs

namespace Ppp.Net.Auxiliary
{
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
#if NETCOREAPP
    using System.Runtime.CompilerServices;
#endif

    public sealed class AsyncScheduler : IDisposable
    {
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly linkedList _contexts = new linkedList();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly object _syncobj = new object();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncScheduler(int concurrent)
        {
            if (concurrent < 1)
            {
                concurrent = 1;
            }
            for (int i = 0; i < concurrent; i++)
            {
                AsyncContext context = new AsyncContext();
                this._contexts.AddLast(context);
            }
        }

        public int Concurrent
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => _contexts.Count;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Dispose()
        {
            lock (this._syncobj)
            {
                linkedListNode node = this._contexts.First;
                while (node != null)
                {
                    node.Value.Dispose();
                    node = node.Next;
                }
                this._contexts.Clear();
            }
            GC.SuppressFinalize(this);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public AsyncContext GetContext()
        {
            lock (this._syncobj)
            {
                linkedListNode node = this._contexts.First;
                if (node == null)
                {
                    return null;
                }
                this._contexts.RemoveFirst();
                this._contexts.AddLast(node);
                return node.Value;
            }
        }
    }
}

C# AsyncSocket.cs 

namespace Ppp.Net.Auxiliary
{
    using System;
    using System.Collections.Concurrent;
    using System.Diagnostics;
    using System.Net;
    using System.Net.Sockets;
#if NETCOREAPP
    using System.Runtime.CompilerServices;
#endif
    using System.Runtime.InteropServices;
    using System.Security;
    using System.Threading;

    public unsafe sealed class AsyncSocket : IDisposable
    {
        [Dllimport("libasio", EntryPoint = "libasio_createsocket", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.SysInt)]
        private static extern IntPtr libasio_createsocket([MarshalAs(UnmanagedType.SysInt)] IntPtr context_, int sockfd_, bool v4_or_v6_);

        [Dllimport("libasio", EntryPoint = "libasio_closesocket", CallingConvention = CallingConvention.Cdecl)]
        private static extern void libasio_closesocket([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_);

        [Dllimport("libasio", EntryPoint = "libasio_sendto", CallingConvention = CallingConvention.Cdecl)]
        [return: MarshalAs(UnmanagedType.Bool)]
        private static extern bool libasio_sendto([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, void* buf_, int size_,
            libasio_endpoint* endpoint_, [MarshalAs(UnmanagedType.FunctionPtr)] libasio_sendto_callback callback_);

        [Dllimport("libasio", EntryPoint = "libasio_recvfrom", CallingConvention = CallingConvention.Cdecl)]
        private static extern bool libasio_recvfrom([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, void* buf_, int size_,
            [MarshalAs(UnmanagedType.FunctionPtr)] libasio_recvfrom_callback callback_);

        [UnmanagedFunctionPointer(CallingConvention.Cdecl)]
        private delegate void libasio_sendto_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, int length_);

        [UnmanagedFunctionPointer(CallingConvention.Cdecl)]
        private delegate void libasio_recvfrom_callback([MarshalAs(UnmanagedType.SysInt)] IntPtr socket_, long key_, int length_, libasio_endpoint* remoteEP_);

        [StructLayout(LayoutKind.Explicit)]
        private struct libasio_endpoint
        {
            [StructLayout(LayoutKind.Sequential)]
            public struct in4
            {
                public uint address_;
                public int port_;
            }
            [StructLayout(LayoutKind.Sequential)]
            public struct in6
            {
                public long address_1_;
                public long address_2_;
                public int port_;
            }

            [FieldOffset(0)]
            public uint v4_or_v6_;

            [FieldOffset(4)]
            public in4 in4_;

            [FieldOffset(4)]
            public in6 in6_;

            [FieldOffset(0)]
            public long data_1_;
            [FieldOffset(8)]
            public long data_2_;
            [FieldOffset(16)]
            public long data_3_;
        };

        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly object _synobj = new object();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly Socket _socket = null;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private readonly AsyncContext _context = null;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private bool _disposed = false;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private IntPtr _handle = IntPtr.Zero;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private long _mapkey = 0;
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly libasio_recvfrom_callback _recvfrom_callback = (socket_, key_, length_, remoteEP_) =>
        {
            _recvfrom_callbacks.TryGetValue(socket_, out ConcurrentDictionary> callbacks);
            if (callbacks == null)
            {
                return;
            }
            callbacks.TryRemove(key_, out Action callback_);
            if (callback_ == null)
            {
                return;
            }
            IPEndPoint remoteEP = null;
            if (remoteEP_ != null)
            {
                if (remoteEP_->v4_or_v6_ != 0)
                {
                    remoteEP = new IPEndPoint(new IPAddress(remoteEP_->in4_.address_), remoteEP_->in4_.port_);
                }
                else
                {
                    byte[] address_bytes = new byte[16];
                    fixed (byte* paddr_bytes = address_bytes)
                    {
                        long* paddr_i64 = (long*)paddr_bytes;
                        paddr_i64[0] = remoteEP_->in6_.address_1_;
                        paddr_i64[1] = remoteEP_->in6_.address_2_;
                    }
                    remoteEP = new IPEndPoint(new IPAddress(address_bytes), remoteEP_->in4_.port_);
                }
            }
            callback_(length_, remoteEP);
        };
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly ConcurrentDictionary>> _recvfrom_callbacks =
            new ConcurrentDictionary>>();
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly libasio_sendto_callback _sendto_callback = (socket_, key_, length_) =>
        {
            _sendto_callbacks.TryGetValue(socket_, out ConcurrentDictionary> callbacks);
            if (callbacks == null)
            {
                return;
            }
            callbacks.TryRemove(key_, out Action callback_);
            if (callback_ == null)
            {
                return;
            }
            callback_(length_);
        };
        [DebuggerBrowsable(DebuggerBrowsableState.Never)]
        private static readonly ConcurrentDictionary>> _sendto_callbacks =
            new ConcurrentDictionary>>();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        static AsyncSocket()
        {
            GCHandle.Alloc(_sendto_callback);
            GCHandle.Alloc(_recvfrom_callback);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private ConcurrentDictionary> GetAllReceiveFromCallback()
        {
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return null;
                }
                lock (_recvfrom_callbacks)
                {
                    _recvfrom_callbacks.TryGetValue(this._handle, out ConcurrentDictionary> d);
                    if (d == null)
                    {
                        d = new ConcurrentDictionary>();
                        if (!_recvfrom_callbacks.TryAdd(this._handle, d))
                        {
                            d = null;
                        }
                    }
                    return d;
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private long BindReceiveFromCallback(Action callback)
        {
            ConcurrentDictionary> callbacks = this.GetAllReceiveFromCallback();
            if (callbacks == null)
            {
                return 0;
            }
            long key_ = 0;
            do
            {
                while (key_ == 0)
                {
                    key_ = Interlocked.Increment(ref this._mapkey);
                }
            } while (!callbacks.TryAdd(key_, callback));
            return key_;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private ConcurrentDictionary> GetAllSendToCallback()
        {
            lock (this._synobj)
            {
                if (this._disposed)
                {
                    return null;
                }
                lock (_sendto_callbacks)
                {
                    _sendto_callbacks.TryGetValue(this._handle, out ConcurrentDictionary> d);
                    if (d == null)
                    {
                        d = new ConcurrentDictionary>();
                        if (!_sendto_callbacks.TryAdd(this._handle, d))
                        {
                            d = null;
                        }
                    }
                    return d;
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        private long BindSendToCallback(Action callback)
        {
            ConcurrentDictionary> callbacks = this.GetAllSendToCallback();
            if (callbacks == null)
            {
                return 0;
            }
            long key_ = 0;
            do
            {
                while (key_ == 0)
                {
                    key_ = Interlocked.Increment(ref this._mapkey);
                }
            } while (!callbacks.TryAdd(key_, callback));
            return key_;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        internal AsyncSocket(AsyncContext context, Socket socket)
        {
            this._handle = libasio_createsocket(context.Handle, socket.Handle.ToInt32(), socket.AddressFamily == AddressFamily.InterNetwork);
            this._socket = socket;
            this._context = context;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        ~AsyncSocket() => this.Dispose();

        public IntPtr Handle
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => Interlocked.CompareExchange(ref this._handle, IntPtr.Zero, IntPtr.Zero);
        }

        public Socket Socket
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => this._socket;
        }

        public AsyncContext Context
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get => this._context;
        }

        public object Tag
        {
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get;
#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            set;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public bool SendTo(byte[] buffer, int offset, int length, EndPoint destinationEP, Action callback)
        {
            if (buffer == null ||
                offset < 0 ||
                length <= 0 ||
                (offset + length) > buffer.Length ||
                SocketExtension.CleanedUp(this._socket))
            {
                return false;
            }
            libasio_endpoint* localEP = stackalloc libasio_endpoint[1];
            if (destinationEP.AddressFamily == AddressFamily.InterNetwork)
            {
                IPEndPoint ipep = (IPEndPoint)destinationEP;
                localEP->v4_or_v6_ = 1;
                localEP->in4_.port_ = ipep.Port;
                fixed (byte* pb = ipep.Address.GetAddressBytes())
                {
                    localEP->in4_.address_ = *(uint*)pb;
                }
            }
            else
            {
                IPEndPoint ipep = (IPEndPoint)destinationEP;
                localEP->v4_or_v6_ = 0;
                localEP->in6_.port_ = ipep.Port;
                fixed (byte* pb = ipep.Address.GetAddressBytes())
                {
                    long* pl = (long*)pb;
                    localEP->in6_.address_1_ = pl[0];
                    localEP->in6_.address_2_ = pl[1];
                }
            }
            fixed (byte* p = buffer)
            {
                lock (this._synobj)
                {
                    if (this._disposed)
                    {
                        return false;
                    }
                    long key_ = this.BindSendToCallback(callback);
                    if (key_ == 0)
                    {
                        return false;
                    }
                    return libasio_sendto(this.Handle, key_, p + offset, length, localEP, _sendto_callback);
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public bool ReceiveFrom(byte[] buffer, int offset, int length, Action callback)
        {
            if (buffer == null ||
                callback == null ||
                offset < 0 ||
                length <= 0 ||
                (offset + length) > buffer.Length ||
                SocketExtension.CleanedUp(this._socket))
            {
                return false;
            }
            fixed (byte* p = buffer)
            {
                lock (this._synobj)
                {
                    if (this._disposed)
                    {
                        return false;
                    }
                    long key_ = this.BindReceiveFromCallback(callback);
                    if (key_ == 0)
                    {
                        return false;
                    }
                    return libasio_recvfrom(this.Handle, key_, p + offset, length, _recvfrom_callback);
                }
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Close() => this.Dispose();

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Dispose()
        {
            lock (this._synobj)
            {
                if (!this._disposed)
                {
                    this._disposed = true;
                    lock (this._synobj)
                    {
                        SocketExtension.Closesocket(this._socket);
                        libasio_closesocket(this._handle);
                    }
                }
            }
            _sendto_callbacks.TryRemove(this._handle, out ConcurrentDictionary> _);
            _recvfrom_callbacks.TryRemove(this._handle, out ConcurrentDictionary> __);
            GC.SuppressFinalize(this);
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public void Post(Action callback, object state) => this._context.Post(callback, state);
    }
}
 

C# SocketExtension.cs

        public static Func CleanedUp
        {
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            get;
#if NETCOREAPP
            [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
            private set;
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        static SocketExtension()
        {
            SocketExtension.CleanedUp = SocketExtension.CompileCleanedUp();
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        [SecurityCritical]
        [SecuritySafeCritical]
        private static Func CompileCleanedUp()
        {
            try
            {
                PropertyInfo piCleanedUp = typeof(Socket).GetProperty("CleanedUp", BindingFlags.NonPublic | BindingFlags.Instance);
                Parameterexpression s = expression.Parameter(typeof(Socket), "s");
                expression> e = expression.Lambda>(expression.Property(s, piCleanedUp), s);
                Func fCleanedUp = e.Compile();
                return (socket) =>
                {
                    if (socket == null)
                    {
                        return true;
                    }
                    if (socket is NetworkSocket NS)
                    {
                        return NS.CleanedUp;
                    }
                    return fCleanedUp(socket);
                };
            }
            catch (Exception)
            {
                return (socket) =>
                {
                    if (socket == null)
                    {
                        return true;
                    }
                    if (socket is NetworkSocket NS)
                    {
                        return NS.CleanedUp;
                    }
                    return false;
                };
            }
        }

#if NETCOREAPP
        [MethodImpl(MethodImplOptions.AggressiveInlining)]
#endif
        public static void Closesocket(Socket socket)
        {
            bool cleanup = SocketExtension.CleanedUp(socket);
            if (cleanup)
            {
                return;
            }
            lock (socket) // SocketExtension.Shutdown(socket);
            {
                try
                {
                    socket.Shutdown(SocketShutdown.Send);
                }
                catch (Exception) { }
                try
                {
                    socket.Dispose();
                }
                catch (Exception) { }
            }
        }

 

欢迎分享,转载请注明来源:内存溢出

原文地址: https://outofmemory.cn/zaji/5693934.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-12-17
下一篇 2022-12-17

发表评论

登录后才能评论

评论列表(0条)