C# AB测试套接字同步异步的性能问题疑问


业务场景是这样的:
客户端A向服务端B发送一个字符串"AAA",服务端回复一条字符串"BBB"
我分别用异步与同步写了两种模式,在做本机测试的时候
同步模式在30个客户端连续发送100000条的时候平均每秒可以完成约37000个操作
异步模式在相同条件下却只能完成12000个左右

我的疑问是:
不是说异步模式的性能高于同步模式么?是不是我的代码有问题

下面说说异步模式的主要编写思路:
由服务器异步接受连接,然后创建一个上下文实例,在这个实例里面初始化两个SocketAsyncEventArgs(假定一个R,一个S),一个负责收,以Buffer模式接收,一个负责发,以Packet模式
待发送的数据存储到一个队列中,当第一次调用时首先由R接收一段,然后由派生类处理并调用SendAsync方法将数据添加到待发送的队列,S会将所有的数据全部依次发出去,然后再命令R从客户端接收数据,依次循环.
最后贴代码:

   
  /// <summary>
  
/// 服务器上下文的基类信息
/// </summary>
public abstract class SocketServerContext : ServerContext
{
private readonly byte[] m_buffer;
private readonly int m_bufferSize;
private readonly bool m_keepAlive;
private readonly SendPacketsElement[] m_packets = new SendPacketsElement[1];

private readonly ConcurrentQueue<SendPacketsElement> m_queue = new ConcurrentQueue<SendPacketsElement>();

private bool m_close;
private SocketServerOption m_option;
private int m_packCount;
private SocketAsyncEventArgs m_receive;
private SocketAsyncEventArgs m_send;
private Socket m_socket;

/// <summary>
/// 创建新的服务上下文
/// </summary>
/// <param name="name">服务器名称</param>
/// <param name="option">服务器选项</param>
protected SocketServerContext(string name, SocketServerOption option)
: base(name)
{
m_option = option;
m_keepAlive = option.KeepAlive;
m_bufferSize = m_option.BufferSize;
m_buffer = new byte[m_bufferSize];
}

/// <summary>
/// 以指定的客户端连接在同步模式下启动此服务上下文
/// </summary>
/// <param name="socket">客户端连接</param>
public virtual void Start(Socket socket)
{
m_socket = socket;
Receive();
}

private void Receive()
{
SocketError error;
do
{
int size = m_socket.Receive(m_buffer, 0, m_bufferSize, SocketFlags.None, out error);
if (error == SocketError.Success)
{
if (size > 0)
{
this.Receive(m_buffer, size);
continue;
}
}
break;
} while (this.m_keepAlive);
this.Close();
}

/// <summary>
/// 以指定的客户端连接在异步模式下启动此服务上下文
/// </summary>
/// <param name="socket">客户端连接</param>
public virtual void AsyncStart(Socket socket)
{
m_socket = socket;
m_packCount = 0;
m_receive = new SocketAsyncEventArgs();
m_receive.SetBuffer(m_buffer, 0, m_bufferSize);
m_receive.Completed += Receive_Completed;

m_send = new SocketAsyncEventArgs();
m_send.SendPacketsElements = m_packets;
m_send.SendPacketsFlags = TransmitFileOptions.UseKernelApc;
m_send.Completed += Send_Completed;
this.m_close = false;

if (!this.m_socket.ReceiveAsync(this.m_receive))
{
Receive_Completed(null, m_receive);
}
}

private void Receive_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
this.Close();
return;
}
if (!m_close)
{
int n = e.BytesTransferred;
ReceiveAsync(m_buffer, n);
}
}

private void Send_Completed(object sender, SocketAsyncEventArgs e)
{
if (e.SocketError != SocketError.Success)
{
this.Close();
return;
}
if (!m_close)
{
SendPacketsElement pack;
Interlocked.Decrement(ref m_packCount);
if (m_queue.TryDequeue(out pack))
{
m_packets[0] = pack;
if (!m_socket.SendPacketsAsync(m_send))
{
Send_Completed(null, m_send);
}
}
else if (!m_keepAlive)
{
this.Close();
}
if (!this.m_socket.ReceiveAsync(this.m_receive))
{
Receive_Completed(null, m_receive);
}
}
}

/// <summary>
/// 接收一块从客户端传递过来的数据
/// </summary>
/// <param name="buffer">数据缓冲区</param>
/// <param name="count">数据块大小</param>
protected abstract void ReceiveAsync(byte[] buffer, int count);

/// <summary>
/// 接收一块从客户端传递过来的数据
/// </summary>
/// <param name="buffer">数据缓冲区</param>
/// <param name="count">数据块大小</param>
protected abstract void Receive(byte[] buffer, int count);

/// <summary>
/// 关闭当前服务上下文
/// </summary>
public virtual void Close()
{
m_close = true;
try
{
m_socket.Close();
}
catch
{
}
}

/// <summary>
/// 向客户端发送一段数据
/// </summary>
/// <param name="buffer">需要发送的数据</param>
public void Write(byte[] buffer)
{
SocketError error;
this.m_socket.Send(buffer, 0, buffer.Length, SocketFlags.None, out error);
if (error != SocketError.Success)
{
this.Close();
}
}

/// <summary>
/// 以异步方式向客户端发送一段数据
/// </summary>
/// <param name="buffer">需要发送的数据</param>
public void WriteAsync(byte[] buffer)
{
if (m_close)
{
return;
}
SendPacketsElement pack = new SendPacketsElement(buffer);
if (Interlocked.Increment(ref m_packCount) == 1)
{
m_packets[0] = pack;
if (!m_socket.SendPacketsAsync(m_send))
{
Send_Completed(null, m_send);
}
}
else
{
m_queue.Enqueue(pack);
}
}
}

c# socket

可爱的双一 11 years, 6 months ago

一般来说不建议使用Socket,因为Socket太底层了,一般使用TCP和UDP.而且你这样数据传输不够规范,数据就这一个还好,但如果是10个,100个,1000个,识别效率很低。而且你这个接收效率是由网络带宽决定的,且你的信息只有3字节,算一算就知道这个值是108.39KB/S,在局域网里传输这个速度算低的,比较规范的是用XML包装的,识别效率很高,代码如下(下面代码是根据UDP协议写的):

   
  using System;
  
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Net.Sockets;
using System.Net;
using System.Xml;

namespace NetListener
{
/// <summary>
/// 网络消息接收委托
/// </summary>
/// <param name="address">连接发起方IP地址</param>
/// <param name="message">收到的消息结构化实例</param>
public delegate void RecivedMessage(IPAddress address,NetMessage message);

/// <summary>
/// 异常处理事件
/// </summary>
/// <param name="sender"></param>
/// <param name="e"></param>
public delegate void WorkError(object sender,Exception e);

/// <summary>
/// UDP消息监听器
/// </summary>
public class UDPAccessListener
{
private UdpClient udpListener;
private Thread listenerthread;
private bool close = false;//设置该值以便关闭接收消息的线程
private string destID = null;

/// <summary>
/// 接收到网络消息的事件
/// </summary>
public event RecivedMessage RecivedMessage;

/// <summary>
/// 异常处理
/// </summary>
public event WorkError RecivedError;

/// <summary>
/// 实例化UDP消息监听器
/// </summary>
/// <param name="port">要监听的端口号</param>
/// <param name="destID">需要比较的ID值</param>
public UDPAccessListener(int port,string destID)
{
this.destID = destID;
udpListener = new UdpClient(port);
udpListener.AllowNatTraversal(true);
udpListener.Ttl = 64;
listenerthread = new Thread(new ThreadStart(AccpetClient));
listenerthread.IsBackground = true;
listenerthread.SetApartmentState(ApartmentState.MTA);
}

/// <summary>
/// 接收消息并开启线程进行处理
/// </summary>
[MTAThread()]
private void AccpetClient()
{
while (!close)
{
if (udpListener.Available > 0)
{
IPEndPoint endPoint = null;
byte[] data = udpListener.Receive(ref endPoint);
string message = Encoding.UTF8.GetString(data);
NetMessage nMessage = new NetMessage();
if (nMessage.Read(destID, message))
{
Thread workThread = new Thread(new ParameterizedThreadStart(Recived));
workThread.IsBackground = true;
workThread.SetApartmentState(ApartmentState.MTA);
List<object> listObj = new List<object>();
listObj.Add(endPoint.Address);
listObj.Add(nMessage);
workThread.Start(listObj);
}
}
}
udpListener.Close();
}

/// <summary>
/// 接收到消息进行处理的线程委托
/// 如果不开该线程,直接在上一个线程委托中触发处理事件
/// 那在处理该消息的过程中,就不会去接收下一个消息,而且一但一个消息处理出现异常,监听消息的线程就会停止
/// </summary>
[MTAThread()]
private void Recived(object obj)
{
List<object> listObjs = obj as List<object>;
IPAddress address = listObjs[0] as IPAddress;
NetMessage nMessage = listObjs[1] as NetMessage;
if (RecivedMessage != null)
{
try
{
RecivedMessage(address, nMessage);
}
catch (Exception e)
{
if (RecivedError != null)
{
RecivedError(this, e);//对异常进行处理
}
}
}
}

public void Start()
{
listenerthread.Start();
}

public void Close()
{
close = true;
listenerthread.Join();
}
}

/// <summary>
/// 结构化网络消息类
/// </summary>
[Serializable()]
public class NetMessage
{
private const string XMLFIRSTTEXT = "<Message ID=\"{0}\">{1}</Message>";
private const string DATATEXT = "<Data name=\"{0}\"><![CDATA[{1}]]></Data>";
private const string IDNODE = "ID";
private const string NAMENODE = "name";
private const string DATANODENAME = "Data";

private string id="";
private Dictionary<string, string> dataDic;

public NetMessage()
{
dataDic = new Dictionary<string, string>();
}

/// <summary>
/// 获取或设置可被接收方识别的ID
/// </summary>
public string Id
{
get { return id; }
set {
if (value != null)
{
id = value;
}
}
}

/// <summary>
/// 获取或设置储存的信息
/// </summary>
/// <param name="name">信息名称</param>
/// <returns></returns>
public string this[string name]
{
get
{
if (name!=null&&dataDic.ContainsKey(name))
{
return dataDic[name];
}
else
{
return null;
}
}
set
{
if (name != null)
{
string destValue = value == null ? "" : value;
if (dataDic.ContainsKey(name))
{
dataDic[name] = value;
}
else
{
dataDic.Add(name, destValue);
}
}
}
}

/// <summary>
/// 读取接收到的消息字符串
/// </summary>
/// <param name="destID">与该消息字符串中ID的比较值</param>
/// <param name="message">消息字符串</param>
/// <returns>是否能被正确读取</returns>
public bool Read(string destID,string message)
{
try
{
XmlDocument doc = new XmlDocument();
doc.LoadXml(message);
if (!doc.DocumentElement.Attributes[IDNODE].Equals(destID))
{
return false;
}
foreach (XmlNode node in doc.DocumentElement.SelectNodes(DATANODENAME))
{
string name = node.Attributes[NAMENODE].Value;
if (dataDic.ContainsKey(name))
{
dataDic[name] = node.InnerText;//这个如果存在一般是覆盖前者,但如果有特殊需求这个就不判断
}
else
{
dataDic.Add(name, node.InnerText);
}
}
return true;
}
catch (Exception)
{
//上面代码无需有无判断,因为一但不存在就会出现异常
return false;
}
}

/// <summary>
/// 将结构化的消息字符串转换为编码是UTF-8的byte[]
/// </summary>
/// <returns></returns>
public byte[] ByteData()
{
return Encoding.UTF8.GetBytes(this.ToString());
}

/// <summary>
/// 重载ToString令其能输入数据包装成XML的字符串
/// </summary>
/// <returns></returns>
public override string ToString()
{
if (dataDic.Count == 0)
{
return string.Empty;
}
else
{
string dataList = "";
foreach (string name in dataDic.Keys)
{
dataList += string.Format(DATATEXT,name, dataDic[name]);
}
return string.Format(XMLFIRSTTEXT, id, dataList);
}
}
}
}

骨傲天的传说 answered 11 years, 6 months ago

Your Answer