博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
wcf系列学习5天速成——第四天 wcf之分布式架构
阅读量:6116 次
发布时间:2019-06-21

本文共 17092 字,大约阅读时间需要 56 分钟。

今天是wcf系列的第四天,也该出手压轴戏了。嗯,现在的大型架构,都是神马的,

nginx鸡群,iis鸡群,wcf鸡群,DB鸡群,由一个人作战变成了群殴.......

 

今天我就分享下wcf鸡群,高性能架构中一种常用的手法就是在内存中维护一个叫做“索引”的内存数据库,

在实战中利用“索引”这个概念做出"海量数据“的秒杀。

好,先上图:

 

这个图明白人都能看得懂吧。因为我的系列偏重于wcf,所以我重点说下”心跳检测“的实战手法。

 

第一步:上一下项目的结构,才能做到心中有数。

 

第二步:“LoadDBService”这个是控制台程序,目的就是从数据库抽出关系模型加载在内存数据库中,因为这些东西会涉及一些算法的知识,

             在这里就不写算法了,就简单的模拟一下。

LoadDBServcieusing System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Runtime.Serialization;using System.Web.Script.Serialization;using System.IO;using System.Xml.Serialization;using System.Xml;using Common;namespace LoadDBData{    class Program    {        static void Main(string[] args)        {            //模拟从数据库加载索引到内存中,形成内存中的数据库//这里的 "Dictionary" 用来表达“一个用户注册过多少店铺“,即UserID与ShopID的一对多关系            SerializableDictionary
> dic = new SerializableDictionary
>(); List
shopIDList = new List
(); for (int shopID = 300000; shopID < 300050; shopID++) shopIDList.Add(shopID); int UserID = 15; //假设这里已经维护好了UserID与ShopID的关系 dic.Add(UserID, shopIDList); XmlSerializer xml = new XmlSerializer(dic.GetType()); var memoryStream = new MemoryStream(); xml.Serialize(memoryStream, dic); memoryStream.Seek(0, SeekOrigin.Begin); //将Dictionary持久化,相当于模拟保存在Mencache里面 File.AppendAllText("F://1.txt", Encoding.UTF8.GetString(memoryStream.ToArray())); Console.WriteLine("数据加载成功!"); Console.Read(); } }}

因为Dictionary不支持序列化,所以我从网上拷贝了一份代码让其执行序列化
SerializableDictionaryusing System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Xml.Serialization;using System.Xml;using System.Xml.Schema;using System.Runtime.Serialization;namespace Common{    ////// 标题:支持 XML 序列化的 Dictionary//////
///
[XmlRoot("SerializableDictionary")] public class SerializableDictionary
: Dictionary
, IXmlSerializable { public SerializableDictionary() : base() { } public SerializableDictionary(IDictionary
dictionary) : base(dictionary) { } public SerializableDictionary(IEqualityComparer
comparer) : base(comparer) { } public SerializableDictionary(int capacity) : base(capacity) { } public SerializableDictionary(int capacity, IEqualityComparer
comparer) : base(capacity, comparer) { } protected SerializableDictionary(SerializationInfo info, StreamingContext context) : base(info, context) { } public System.Xml.Schema.XmlSchema GetSchema() { return null; } ///
/// 从对象的 XML 表示形式生成该对象//////
public void ReadXml(System.Xml.XmlReader reader) { XmlSerializer keySerializer = new XmlSerializer(typeof(TKey)); XmlSerializer valueSerializer = new XmlSerializer(typeof(TValue)); bool wasEmpty = reader.IsEmptyElement; reader.Read(); if (wasEmpty) return; while (reader.NodeType != System.Xml.XmlNodeType.EndElement) { reader.ReadStartElement("item"); reader.ReadStartElement("key"); TKey key = (TKey)keySerializer.Deserialize(reader); reader.ReadEndElement(); reader.ReadStartElement("value"); TValue value = (TValue)valueSerializer.Deserialize(reader); reader.ReadEndElement(); this.Add(key, value); reader.ReadEndElement(); reader.MoveToContent(); } reader.ReadEndElement(); } /**/ ///
/// 将对象转换为其 XML 表示形式//////
public void WriteXml(System.Xml.XmlWriter writer) { XmlSerializer keySerializer = new XmlSerializer(typeof(TKey)); XmlSerializer valueSerializer = new XmlSerializer(typeof(TValue)); foreach (TKey key in this.Keys) { writer.WriteStartElement("item"); writer.WriteStartElement("key"); keySerializer.Serialize(writer, key); writer.WriteEndElement(); writer.WriteStartElement("value"); TValue value = this[key]; valueSerializer.Serialize(writer, value); writer.WriteEndElement(); writer.WriteEndElement(); } } }}

第三步: "HeartBeatService"也做成了一个控制台程序,为了图方便,把Contract和Host都放在一个控制台程序中,

            代码中加入了注释,看一下就会懂的。

           

IAddress.csusing System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.ServiceModel;using System.Text;namespace HeartBeatService{    //CallbackContract:这个就是Client实现此接口,方便服务器端通知客户端    [ServiceContract(CallbackContract = typeof(ILiveAddressCallback))]    public interface IAddress    {        ////// 此方法用于Search启动后,将Search地址插入到此处//////        [OperationContract(IsOneWay = true)]        void AddSearch(string address);        ////// 此方法用于IIS端获取search地址//////        [OperationContract(IsOneWay = true)]        void GetService(string address);    }}
 
Address.csusing System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.ServiceModel;using System.Text;using System.Timers;using System.IO;using System.Collections.Concurrent;using SearhService;using ClientService;namespace HeartBeatService{    //InstanceContextMode:主要是管理上下文的实例,此处是single,也就是单体//ConcurrencyMode:    主要是用来控制实例中的线程数,此处是Multiple,也就是多线程    [ServiceBehavior(InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]    public class Address : IAddress    {        static List
search = new List
(); static object obj = new object(); ///
/// 此静态构造函数用来检测存活的Search个数/// static Address() { Timer timer = new Timer(); timer.Interval = 6000; timer.Elapsed += (sender, e) => { Console.WriteLine("\n***************************************************************************"); Console.WriteLine("当前存活的Search为:"); lock (obj) { //遍历当前存活的Search foreach (var single in search) { ChannelFactory
factory = null; try { //当Search存在的话,心跳服务就要定时检测Search是否死掉,也就是定时的连接Search来检测。 factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), new EndpointAddress(single)); factory.CreateChannel().TestSearch(); factory.Close(); Console.WriteLine(single); } catch (Exception ex) { Console.WriteLine(ex.Message); //如果抛出异常,则说明此search已经挂掉 search.Remove(single); factory.Abort(); Console.WriteLine("\n当前时间:" + DateTime.Now + " ,存活的Search有:" + search.Count() + "个"); } } } //最后统计下存活的search有多少个 Console.WriteLine("\n当前时间:" + DateTime.Now + " ,存活的Search有:" + search.Count() + "个"); }; timer.Start(); } public void AddSearch(string address) { lock (obj) { //是否包含相同的Search地址 if (!search.Contains(address)) { search.Add(address); //search添加成功后就要告诉来源处,此search已经被成功载入。 var client = OperationContext.Current.GetCallbackChannel
(); client.LiveAddress(address); } } } public void GetService(string address) { Timer timer = new Timer(); timer.Interval = 1000; timer.Elapsed += (obj, sender) => { try { //这个是定时的检测IIS是否挂掉 var factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), new EndpointAddress(address)); factory.CreateChannel().AddSearchList(search); factory.Close(); timer.Interval = 10000; } catch (Exception ex) { Console.WriteLine(ex.Message); } }; timer.Start(); } }}
ILiveAddressCallback.csusing System;using System.Collections.Generic;using System.Linq;using System.Text;using System.ServiceModel;namespace HeartBeatService{    ////// 等客户端实现后,让客户端约束一下,只能是这个LiveAddress方法///    public interface ILiveAddressCallback    {        [OperationContract(IsOneWay = true)]        void LiveAddress(string address);    }}

第四步: 我们开一下心跳,预览下效果:

         是的,心跳现在正在检测是否有活着的Search。

 

第五步:"SearhService" 这个Console程序就是WCF的search,主要用于从MemerCache里面读取索引。

          记得要添加一下对“心跳服务”的服务引用。

IProduct.csusing System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.ServiceModel;using System.Text;namespace SearhService{    // 注意: 使用“重构”菜单上的“重命名”命令,可以同时更改代码和配置文件中的接口名“IService1”。    [ServiceContract]    public interface IProduct    {        [OperationContract]        List
GetShopListByUserID(int userID); [OperationContract] void TestSearch(); }}
Product.csusing System;using System.Collections.Generic;using System.Linq;using System.Runtime.Serialization;using System.ServiceModel;using System.Text;using Common;using System.Xml;using System.IO;using System.Xml.Serialization;namespace SearhService{    public class Product : IProduct    {        public List
GetShopListByUserID(int userID) { //模拟从MemCache中读取索引 SerializableDictionary
> dic = new SerializableDictionary
>(); byte[] bytes = Encoding.UTF8.GetBytes(File.ReadAllText("F://1.txt", Encoding.UTF8)); var memoryStream = new MemoryStream(); memoryStream.Write(bytes, 0, bytes.Count()); memoryStream.Seek(0, SeekOrigin.Begin); XmlSerializer xml = new XmlSerializer(dic.GetType()); var obj = xml.Deserialize(memoryStream) as Dictionary
>; return obj[userID]; } public void TestSearch() { } }}
SearchHost.cs using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.ServiceModel;using System.Configuration;using System.Timers;using SearhService.HeartBeatService;namespace SearhService{    public class SearchHost : IAddressCallback    {        static DateTime startTime;        public static void Main()        {            ServiceHost host = new ServiceHost(typeof(Product));            host.Open();            AddSearch();            Console.Read();        }        static void AddSearch()        {            startTime = DateTime.Now;            Console.WriteLine("Search服务发送中.....\n\n*************************************************\n");            try            {                var heartClient = new AddressClient(new InstanceContext(new SearchHost()));                string search = ConfigurationManager.AppSettings["search"];                heartClient.AddSearch(search);            }            catch (Exception ex)            {                Console.WriteLine("Search服务发送失败:" + ex.Message);            }        }        public void LiveAddress(string address)        {            Console.WriteLine("恭喜你," + address + "已被心跳成功接收!\n");            Console.WriteLine("发送时间:" + startTime + "\n接收时间:" + DateTime.Now);        }    }}

第六步:此时Search服务已经建好,我们可以测试当Search开启获取关闭对心跳有什么影响:

              Search开启时:

                       

          

           Search关闭时:

              

           对的,当Search关闭时,心跳检测该Search已经死掉,然后只能从集群中剔除。

           当然,我们可以将Search拷贝N份,部署在N台机器中,只要修改一下endpoint地址就OK了,这一点明白人都会。

 

第七步:"ClientService" 这里也就指的是IIS,此时我们也要添加一下对心跳的服务引用。

IServiceList.csusing System;using System.Collections.Generic;using System.Linq;using System.Text;using System.ServiceModel;namespace ClientService{    [ServiceContract]    public interface IServiceList    {        [OperationContract]        void AddSearchList(List
search); }}
ServiceList.csusing System;using System.Collections.Generic;using System.Linq;using System.Text;using System.ServiceModel;using System.Configuration;using System.Timers;using System.Threading;namespace ClientService{    public class ServiceList : IServiceList    {        public static List
searchList = new List
(); static object obj = new object(); public static string Search { get { lock (obj) { //如果心跳没及时返回地址,客户端就在等候 if (searchList.Count == 0) Thread.Sleep(1000); return searchList[new Random().Next(0, searchList.Count)]; } } set { } } public void AddSearchList(List
search) { lock (obj) { searchList = search; Console.WriteLine("************************************"); Console.WriteLine("当前存活的Search为:"); foreach (var single in searchList) { Console.WriteLine(single); } } } }}
Program.csusing System;using System.Collections.Generic;using System.Linq;using System.Text;using System.ServiceModel;using System.Configuration;using System.Threading;using ClientService.HeartBeatService;using SearhService;using BaseClass;using System.Data;using System.Diagnostics;namespace ClientService{    class Program : IAddressCallback    {        static void Main(string[] args)        {            ServiceHost host = new ServiceHost(typeof(ServiceList));            host.Open();            var client = new AddressClient(new InstanceContext(new Program()));            //配置文件中获取iis的地址            var iis = ConfigurationManager.AppSettings["iis"];            //将iis的地址告诉心跳            client.GetService(iis);            //从集群中获取search地址来对Search服务进行调用            var factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), new EndpointAddress(ServiceList.Search)); //根据userid获取了shopID的集合 var shopIDList = factory.CreateChannel().GetShopListByUserID(15); //.......................... 后续就是我们将shopIDList做连接数据库查询(做到秒杀) Console.Read(); } public void LiveAddress(string address) { } }}

 

然后我们开启Client,看看效果咋样:

当然,search集群后,client得到search的地址是随机的,也就分担了search的负担,实现有福同享,有难同当的效果了。

 

最后: 我们做下性能检测,看下“秒杀”和“毫秒杀”的效果。

          首先在数据库的User表和Shop插入了180万和20万的数据用于关联。

          ClientService改造后的代码:

Program.csusing System;using System.Collections.Generic;using System.Linq;using System.Text;using System.ServiceModel;using System.Timers;using System.Diagnostics;using BaseClass;using ClientService;using ClientService.HeartBeatService;using System.Configuration;using SearhService;namespace ClientService{    class Program : IAddressCallback    {        static void Main(string[] args)        {            ServiceHost host = new ServiceHost(typeof(ServiceList));            host.Open();            var client = new AddressClient(new InstanceContext(new Program()));            //配置文件中获取iis的地址            var iis = ConfigurationManager.AppSettings["iis"];            //将iis的地址告诉心跳            client.GetService(iis);            //从集群中获取search地址来对Search服务进行调用            var factory = new ChannelFactory
(new NetTcpBinding(SecurityMode.None), new EndpointAddress(ServiceList.Search)); //根据userid获取了shopID的集合//比如说这里的ShopIDList是通过索引交并集获取的分页的一些shopID var shopIDList = factory.CreateChannel().GetShopListByUserID(15); var strSql = string.Join(",", shopIDList); Stopwatch watch = new Stopwatch(); watch.Start(); SqlHelper.Query("select s.ShopID,u.UserName,s.ShopName from [User] as u ,Shop as s where s.ShopID in(" + strSql + ")"); watch.Stop(); Console.WriteLine("通过wcf索引获取的ID >>>花费时间:" + watch.ElapsedMilliseconds); //普通的sql查询花费的时间 StringBuilder builder = new StringBuilder(); builder.Append("select * from "); builder.Append("(select ROW_NUMBER() over(order by s.ShopID) as NumberID, "); builder.Append(" s.ShopID, u.UserName, s.ShopName "); builder.Append("from Shop s left join [User] as u on u.UserID=s.UserID "); builder.Append("where s.UserID=15) as array "); builder.Append("where NumberID>300000 and NumberID<300050"); watch.Start(); SqlHelper.Query(builder.ToString()); watch.Stop(); Console.WriteLine("普通的sql分页 >>>花费时间:" + watch.ElapsedMilliseconds); Console.Read(); } public void LiveAddress(string address) { } }}

性能图:

对的,一个秒杀,一个是毫秒杀,所以越复杂越能展示出“内存索引”的强大之处。

转载地址:http://bwnka.baihongyu.com/

你可能感兴趣的文章
teamviewer 卸载干净
查看>>
多线程设计模式
查看>>
解读自定义UICollectionViewLayout--感动了我自己
查看>>
SqlServer作业指定目标服务器
查看>>
UnrealEngine4.5 BluePrint初始化中遇到编译警告的解决办法
查看>>
User implements HttpSessionBindingListener
查看>>
抽象工厂方法
查看>>
ubuntu apt-get 安装 lnmp
查看>>
焊盘 往同一个方向增加 固定的长度方法 总结
查看>>
eclipse的maven、Scala环境搭建
查看>>
架构师之路(一)- 什么是软件架构
查看>>
jquery的冒泡和默认行为
查看>>
USACO 土地购买
查看>>
【原创】远景能源面试--一面
查看>>
B1010.一元多项式求导(25)
查看>>
10、程序员和编译器之间的关系
查看>>
前端学习之正则表达式
查看>>
配置 RAILS FOR JRUBY1.7.4
查看>>
AndroidStudio中导入SlidingMenu报错解决方案
查看>>
修改GRUB2背景图片
查看>>