All Apache Thrift tutorials require that you have:
Generated the tutorial.thrift and shared.thrift files:
thrift -r --gen netstd tutorial.thrift
Followed all prerequisites listed below.
using System; using System.Collections.Generic; using System.Diagnostics; using System.IO; using System.Linq; using System.Net; using System.Net.Security; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.DependencyInjection; using Thrift; using Thrift.Protocol; using Thrift.Transport; using Thrift.Transport.Client; using tutorial; using shared; namespace Client { public class Program { private static ServiceCollection ServiceCollection = new ServiceCollection(); private static ILogger Logger; private static readonly TConfiguration Configuration = null; // new TConfiguration() if needed private static void DisplayHelp() { Logger.LogInformation(@" Usage: Client -help will diplay help information Client -tr:<transport> -bf:<buffering> -pr:<protocol> -mc:<numClients> will run client with specified arguments (tcp transport and binary protocol by default) and with 1 client Options: -tr (transport): tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090) namedpipe - namedpipe transport will be used (pipe address - "".test"") http - http transport will be used (address - ""http://localhost:9090"") tcptls - tcp tls transport will be used (host - ""localhost"", port - 9090) -bf (buffering): none - (default) no buffering will be used buffered - buffered transport will be used framed - framed transport will be used -pr (protocol): binary - (default) binary protocol will be used compact - compact protocol will be used json - json protocol will be used multiplexed - multiplexed protocol will be used -mc (multiple clients): <numClients> - number of multiple clients to connect to server (max 100, default 1) Sample: Client -tr:tcp -pr:binary "); } public static void Main(string[] args) { args = args ?? new string[0]; ServiceCollection.AddLogging(logging => ConfigureLogging(logging)); using (var serviceProvider = ServiceCollection.BuildServiceProvider()) { Logger = serviceProvider.GetService<ILoggerFactory>().CreateLogger(nameof(Client)); if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase))) { DisplayHelp(); return; } Logger.LogInformation("Starting client..."); using (var source = new CancellationTokenSource()) { RunAsync(args, source.Token).GetAwaiter().GetResult(); } } } private static void ConfigureLogging(ILoggingBuilder logging) { logging.SetMinimumLevel(LogLevel.Trace); logging.AddConsole(); logging.AddDebug(); } private static async Task RunAsync(string[] args, CancellationToken cancellationToken) { var numClients = GetNumberOfClients(args); Logger.LogInformation($"Selected # of clients: {numClients}"); var transports = new TTransport[numClients]; for (int i = 0; i < numClients; i++) { var t = GetTransport(args); transports[i] = t; } Logger.LogInformation($"Selected client transport: {transports[0]}"); var protocols = new Tuple<Protocol, TProtocol>[numClients]; for (int i = 0; i < numClients; i++) { var p = GetProtocol(args, transports[i]); protocols[i] = p; } Logger.LogInformation($"Selected client protocol: {protocols[0].Item1}"); var tasks = new Task[numClients]; for (int i = 0; i < numClients; i++) { var task = RunClientAsync(protocols[i], cancellationToken); tasks[i] = task; } Task.WaitAll(tasks); await Task.CompletedTask; } private static TTransport GetTransport(string[] args) { TTransport transport = new TSocketTransport(IPAddress.Loopback, 9090, Configuration); // construct endpoint transport var transportArg = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1]; if (Enum.TryParse(transportArg, true, out Transport selectedTransport)) { switch (selectedTransport) { case Transport.Tcp: transport = new TSocketTransport(IPAddress.Loopback, 9090, Configuration); break; case Transport.NamedPipe: transport = new TNamedPipeTransport(".test", Configuration); break; case Transport.Http: transport = new THttpTransport(new Uri("http://localhost:9090"), Configuration); break; case Transport.TcpTls: transport = new TTlsSocketTransport(IPAddress.Loopback, 9090, Configuration, GetCertificate(), CertValidator, LocalCertificateSelectionCallback); break; default: Debug.Assert(false, "unhandled case"); break; } } // optionally add layered transport(s) var bufferingArg = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(':')?[1]; if (Enum.TryParse<Buffering>(bufferingArg, out var selectedBuffering)) { switch (selectedBuffering) { case Buffering.Buffered: transport = new TBufferedTransport(transport); break; case Buffering.Framed: transport = new TFramedTransport(transport); break; default: // layered transport(s) are optional Debug.Assert(selectedBuffering == Buffering.None, "unhandled case"); break; } } return transport; } private static int GetNumberOfClients(string[] args) { var numClients = args.FirstOrDefault(x => x.StartsWith("-mc"))?.Split(':')?[1]; Logger.LogInformation($"Selected # of clients: {numClients}"); int c; if( int.TryParse(numClients, out c) && (0 < c) && (c <= 100)) return c; else return 1; } private static X509Certificate2 GetCertificate() { // due to files location in net core better to take certs from top folder var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory())); return new X509Certificate2(certFile, "ThriftTest"); } private static string GetCertPath(DirectoryInfo di, int maxCount = 6) { var topDir = di; var certFile = topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories) .FirstOrDefault(); if (certFile == null) { if (maxCount == 0) throw new FileNotFoundException("Cannot find file in directories"); return GetCertPath(di.Parent, maxCount - 1); } return certFile.FullName; } private static X509Certificate LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) { return GetCertificate(); } private static bool CertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { return true; } private static Tuple<Protocol, TProtocol> GetProtocol(string[] args, TTransport transport) { var protocol = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1]; Protocol selectedProtocol; if (Enum.TryParse(protocol, true, out selectedProtocol)) { switch (selectedProtocol) { case Protocol.Binary: return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport)); case Protocol.Compact: return new Tuple<Protocol, TProtocol>(selectedProtocol, new TCompactProtocol(transport)); case Protocol.Json: return new Tuple<Protocol, TProtocol>(selectedProtocol, new TJsonProtocol(transport)); case Protocol.Multiplexed: // it returns BinaryProtocol to avoid making wrapped protocol as public in TProtocolDecorator (in RunClientAsync it will be wrapped into Multiplexed protocol) return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport)); default: Debug.Assert(false, "unhandled case"); break; } } return new Tuple<Protocol, TProtocol>(selectedProtocol, new TBinaryProtocol(transport)); } private static async Task RunClientAsync(Tuple<Protocol, TProtocol> protocolTuple, CancellationToken cancellationToken) { try { var protocol = protocolTuple.Item2; var protocolType = protocolTuple.Item1; TBaseClient client = null; try { if (protocolType != Protocol.Multiplexed) { client = new Calculator.Client(protocol); await ExecuteCalculatorClientOperations(cancellationToken, (Calculator.Client)client); } else { // it uses binary protocol there to create Multiplexed protocols var multiplex = new TMultiplexedProtocol(protocol, nameof(Calculator)); client = new Calculator.Client(multiplex); await ExecuteCalculatorClientOperations(cancellationToken, (Calculator.Client)client); multiplex = new TMultiplexedProtocol(protocol, nameof(SharedService)); client = new SharedService.Client(multiplex); await ExecuteSharedServiceClientOperations(cancellationToken, (SharedService.Client)client); } } catch (Exception ex) { Logger.LogError($"{client?.ClientId} " + ex); } finally { protocol.Transport.Close(); } } catch (TApplicationException x) { Logger.LogError(x.ToString()); } } private static async Task ExecuteCalculatorClientOperations(CancellationToken cancellationToken, Calculator.Client client) { await client.OpenTransportAsync(cancellationToken); // Async version Logger.LogInformation($"{client.ClientId} PingAsync()"); await client.pingAsync(cancellationToken); Logger.LogInformation($"{client.ClientId} AddAsync(1,1)"); var sum = await client.addAsync(1, 1, cancellationToken); Logger.LogInformation($"{client.ClientId} AddAsync(1,1)={sum}"); var work = new Work { Op = Operation.DIVIDE, Num1 = 1, Num2 = 0 }; try { Logger.LogInformation($"{client.ClientId} CalculateAsync(1)"); await client.calculateAsync(1, work, cancellationToken); Logger.LogInformation($"{client.ClientId} Whoa we can divide by 0"); } catch (InvalidOperation io) { Logger.LogInformation($"{client.ClientId} Invalid operation: " + io); } work.Op = Operation.SUBTRACT; work.Num1 = 15;
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Net.Security; using System.Security.Cryptography.X509Certificates; using System.Threading; using System.Threading.Tasks; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Thrift; using Thrift.Protocol; using Thrift.Server; using Thrift.Transport; using Thrift.Transport.Server; using tutorial; using shared; using Thrift.Processor; using System.Diagnostics; namespace Server { public class Program { private static ServiceCollection ServiceCollection = new ServiceCollection(); private static ILogger Logger; private static readonly TConfiguration Configuration = null; // new TConfiguration() if needed public static void Main(string[] args) { args = args ?? new string[0]; ServiceCollection.AddLogging(logging => ConfigureLogging(logging)); using (var serviceProvider = ServiceCollection.BuildServiceProvider()) { Logger = serviceProvider.GetService<ILoggerFactory>().CreateLogger(nameof(Server)); if (args.Any(x => x.StartsWith("-help", StringComparison.OrdinalIgnoreCase))) { DisplayHelp(); return; } using (var source = new CancellationTokenSource()) { RunAsync(args, source.Token).GetAwaiter().GetResult(); Logger.LogInformation("Press any key to stop..."); Console.ReadLine(); source.Cancel(); } Logger.LogInformation("Server stopped"); } } private static void ConfigureLogging(ILoggingBuilder logging) { logging.SetMinimumLevel(LogLevel.Trace); logging.AddConsole(); logging.AddDebug(); } private static void DisplayHelp() { Logger.LogInformation(@" Usage: Server -help will diplay help information Server -tr:<transport> -bf:<buffering> -pr:<protocol> will run server with specified arguments (tcp transport, no buffering, and binary protocol by default) Options: -tr (transport): tcp - (default) tcp transport will be used (host - ""localhost"", port - 9090) namedpipe - namedpipe transport will be used (pipe address - "".test"") http - http transport will be used (http address - ""localhost:9090"") tcptls - tcp transport with tls will be used (host - ""localhost"", port - 9090) -bf (buffering): none - (default) no buffering will be used buffered - buffered transport will be used framed - framed transport will be used -pr (protocol): binary - (default) binary protocol will be used compact - compact protocol will be used json - json protocol will be used multiplexed - multiplexed protocol will be used Sample: Server -tr:tcp "); } private static async Task RunAsync(string[] args, CancellationToken cancellationToken) { var selectedTransport = GetTransport(args); var selectedBuffering = GetBuffering(args); var selectedProtocol = GetProtocol(args); if (selectedTransport == Transport.Http) { new HttpServerSample().Run(cancellationToken); } else { await RunSelectedConfigurationAsync(selectedTransport, selectedBuffering, selectedProtocol, cancellationToken); } } private static Protocol GetProtocol(string[] args) { var transport = args.FirstOrDefault(x => x.StartsWith("-pr"))?.Split(':')?[1]; Enum.TryParse(transport, true, out Protocol selectedProtocol); return selectedProtocol; } private static Buffering GetBuffering(string[] args) { var buffering = args.FirstOrDefault(x => x.StartsWith("-bf"))?.Split(":")?[1]; Enum.TryParse<Buffering>(buffering, out var selectedBuffering); return selectedBuffering; } private static Transport GetTransport(string[] args) { var transport = args.FirstOrDefault(x => x.StartsWith("-tr"))?.Split(':')?[1]; Enum.TryParse(transport, true, out Transport selectedTransport); return selectedTransport; } private static async Task RunSelectedConfigurationAsync(Transport transport, Buffering buffering, Protocol protocol, CancellationToken cancellationToken) { var handler = new CalculatorAsyncHandler(); TServerTransport serverTransport = null; switch (transport) { case Transport.Tcp: serverTransport = new TServerSocketTransport(9090, Configuration); break; case Transport.NamedPipe: serverTransport = new TNamedPipeServerTransport(".test", Configuration); break; case Transport.TcpTls: serverTransport = new TTlsServerSocketTransport(9090, Configuration, GetCertificate(), ClientCertValidator, LocalCertificateSelectionCallback); break; } TTransportFactory inputTransportFactory = null; TTransportFactory outputTransportFactory = null; switch (buffering) { case Buffering.Buffered: inputTransportFactory = new TBufferedTransport.Factory(); outputTransportFactory = new TBufferedTransport.Factory(); break; case Buffering.Framed: inputTransportFactory = new TFramedTransport.Factory(); outputTransportFactory = new TFramedTransport.Factory(); break; default: // layered transport(s) are optional Debug.Assert(buffering == Buffering.None, "unhandled case"); break; } TProtocolFactory inputProtocolFactory = null; TProtocolFactory outputProtocolFactory = null; ITAsyncProcessor processor = null; switch (protocol) { case Protocol.Binary: inputProtocolFactory = new TBinaryProtocol.Factory(); outputProtocolFactory = new TBinaryProtocol.Factory(); processor = new Calculator.AsyncProcessor(handler); break; case Protocol.Compact: inputProtocolFactory = new TCompactProtocol.Factory(); outputProtocolFactory = new TCompactProtocol.Factory(); processor = new Calculator.AsyncProcessor(handler); break; case Protocol.Json: inputProtocolFactory = new TJsonProtocol.Factory(); outputProtocolFactory = new TJsonProtocol.Factory(); processor = new Calculator.AsyncProcessor(handler); break; case Protocol.Multiplexed: inputProtocolFactory = new TBinaryProtocol.Factory(); outputProtocolFactory = new TBinaryProtocol.Factory(); var calcHandler = new CalculatorAsyncHandler(); var calcProcessor = new Calculator.AsyncProcessor(calcHandler); var sharedServiceHandler = new SharedServiceAsyncHandler(); var sharedServiceProcessor = new SharedService.AsyncProcessor(sharedServiceHandler); var multiplexedProcessor = new TMultiplexedProcessor(); multiplexedProcessor.RegisterProcessor(nameof(Calculator), calcProcessor); multiplexedProcessor.RegisterProcessor(nameof(SharedService), sharedServiceProcessor); processor = multiplexedProcessor; break; default: throw new ArgumentOutOfRangeException(nameof(protocol), protocol, null); } try { Logger.LogInformation( $"Selected TAsyncServer with {serverTransport} transport, {processor} processor and {inputProtocolFactory} protocol factories"); var loggerFactory = ServiceCollection.BuildServiceProvider().GetService<ILoggerFactory>(); var server = new TSimpleAsyncServer( itProcessorFactory: new TSingletonProcessorFactory(processor), serverTransport: serverTransport, inputTransportFactory: inputTransportFactory, outputTransportFactory: outputTransportFactory, inputProtocolFactory: inputProtocolFactory, outputProtocolFactory: outputProtocolFactory, logger: loggerFactory.CreateLogger<TSimpleAsyncServer>()); Logger.LogInformation("Starting the server..."); await server.ServeAsync(cancellationToken); } catch (Exception x) { Logger.LogInformation(x.ToString()); } } private static X509Certificate2 GetCertificate() { // due to files location in net core better to take certs from top folder var certFile = GetCertPath(Directory.GetParent(Directory.GetCurrentDirectory())); return new X509Certificate2(certFile, "ThriftTest"); } private static string GetCertPath(DirectoryInfo di, int maxCount = 6) { var topDir = di; var certFile = topDir.EnumerateFiles("ThriftTest.pfx", SearchOption.AllDirectories) .FirstOrDefault(); if (certFile == null) { if (maxCount == 0) throw new FileNotFoundException("Cannot find file in directories"); return GetCertPath(di.Parent, maxCount - 1); } return certFile.FullName; } private static X509Certificate LocalCertificateSelectionCallback(object sender, string targetHost, X509CertificateCollection localCertificates, X509Certificate remoteCertificate, string[] acceptableIssuers) { return GetCertificate(); } private static bool ClientCertValidator(object sender, X509Certificate certificate, X509Chain chain, SslPolicyErrors sslPolicyErrors) { return true; } private enum Transport { Tcp, NamedPipe, Http, TcpTls, } private enum Buffering { None, Buffered, Framed, } private enum Protocol { Binary, Compact, Json, Multiplexed } public class HttpServerSample { public void Run(CancellationToken cancellationToken) { var config = new ConfigurationBuilder() .AddEnvironmentVariables(prefix: "ASPNETCORE_") .Build(); var host = new WebHostBuilder() .UseConfiguration(config) .UseKestrel() .UseUrls("http://localhost:9090") .UseContentRoot(Directory.GetCurrentDirectory()) .UseStartup<Startup>() .ConfigureLogging((ctx,logging) => ConfigureLogging(logging)) .Build(); Logger.LogTrace("test"); Logger.LogCritical("test"); host.RunAsync(cancellationToken).GetAwaiter().GetResult(); } public class Startup { public Startup(IWebHostEnvironment env) { var builder = new ConfigurationBuilder() .SetBasePath(env.ContentRootPath) .AddEnvironmentVariables(); Configuration = builder.Build(); } public IConfigurationRoot Configuration { get; } // This method gets called by the runtime. Use this method to add services to the container. public void ConfigureServices(IServiceCollection services) { services.AddTransient<Calculator.IAsync, CalculatorAsyncHandler>(); services.AddTransient<ITAsyncProcessor, Calculator.AsyncProcessor>(); services.AddTransient<THttpServerTransport, THttpServerTransport>(); } // This method gets called by the runtime. Use this method to configure the HTTP request pipeline. public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILoggerFactory loggerFactory) { app.UseMiddleware<THttpServerTransport>(); } } } public class CalculatorAsyncHandler : Calculator.IAsync { private readonly Dictionary<int, SharedStruct> _log = new Dictionary<int, SharedStruct>(); public CalculatorAsyncHandler() { } public async Task<SharedStruct> getStructAsync(int key, CancellationToken cancellationToken) { Logger.LogInformation("GetStructAsync({0})", key); return await Task.FromResult(_log[key]); } public async Task pingAsync(CancellationToken cancellationToken) { Logger.LogInformation("PingAsync()"); await Task.CompletedTask; } public async Task<int> addAsync(int num1, int num2, CancellationToken cancellationToken) { Logger.LogInformation($"AddAsync({num1},{num2})"); return await Task.FromResult(num1 + num2); } public async Task<int> calculateAsync(int logid, Work w, CancellationToken cancellationToken) { Logger.LogInformation($"CalculateAsync({logid}, [{w.Op},{w.Num1},{w.Num2}])"); var val = 0; switch (w.Op) { case Operation.ADD: val = w.Num1 + w.Num2; break; case Operation.SUBTRACT: val = w.Num1 - w.Num2; break; case Operation.MULTIPLY: val = w.Num1 * w.Num2; break; case Operation.DIVIDE: if (w.Num2 == 0) { var io = new InvalidOperation {