/** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Linq; using System.Reflection; using System.IO; using System.Net; namespace Avro.ipc { public class HttpTransceiver : Transceiver { private byte[] _intBuffer = new byte[4]; //this buffer is used by read/write behind the latch controlled by base class so we are sure there is no race condition private HttpWebRequest _httpRequest; private HttpWebRequest _modelRequest; public override string RemoteName { get { return _modelRequest.RequestUri.AbsoluteUri; } } public HttpTransceiver(HttpWebRequest modelRequest) { _modelRequest = modelRequest; } public HttpTransceiver(Uri serviceUri, int timeoutMs) { _modelRequest = (HttpWebRequest)WebRequest.Create(serviceUri); _modelRequest.Method = "POST"; _modelRequest.ContentType = "avro/binary"; _modelRequest.Timeout = timeoutMs; } private static int ReadInt(Stream stream, byte[] buffer) { stream.Read(buffer, 0, 4); return IPAddress.NetworkToHostOrder(BitConverter.ToInt32(buffer, 0)); } public static byte[] ConvertIntToBytes(int value) { return BitConverter.GetBytes(IPAddress.HostToNetworkOrder(value)); } public static int CalculateLength(IList buffers) { int num = 0; foreach (MemoryStream memoryStream in (IEnumerable)buffers) { num += 4; num += (int)memoryStream.Length; } return num + 4; } public static IList ReadBuffers(Stream inStream, byte[] intBuffer) { List list = new List(); while (true) { int length = ReadInt(inStream, intBuffer); if (length == 0) //end of transmission break; byte[] buffer = new byte[length]; int offset = 0; int count = length; while (offset < length) { int num = inStream.Read(buffer, offset, count); if (num == 0) throw new Exception(string.Format("Unexpected end of response binary stream - expected {0} more bytes in current chunk", (object)count)); offset += num; count -= num; } list.Add(new MemoryStream(buffer)); } return (IList)list; } public override IList ReadBuffers() { using (Stream responseStream = this._httpRequest.GetResponse().GetResponseStream()) { return ReadBuffers(responseStream, _intBuffer); } } protected HttpWebRequest CreateAvroHttpRequest(long contentLength) { HttpWebRequest wr = (HttpWebRequest)WebRequest.Create(_modelRequest.RequestUri); //TODO: what else to copy from model request? wr.AllowAutoRedirect = _modelRequest.AllowAutoRedirect; wr.AllowWriteStreamBuffering = _modelRequest.AllowWriteStreamBuffering; wr.AuthenticationLevel = _modelRequest.AuthenticationLevel; wr.AutomaticDecompression = _modelRequest.AutomaticDecompression; wr.CachePolicy = _modelRequest.CachePolicy; wr.ClientCertificates.AddRange(_modelRequest.ClientCertificates); wr.ConnectionGroupName = _modelRequest.ConnectionGroupName; wr.ContinueDelegate = _modelRequest.ContinueDelegate; wr.CookieContainer = _modelRequest.CookieContainer; wr.Credentials = _modelRequest.Credentials; wr.UnsafeAuthenticatedConnectionSharing = _modelRequest.UnsafeAuthenticatedConnectionSharing; wr.UseDefaultCredentials = _modelRequest.UseDefaultCredentials; wr.KeepAlive = _modelRequest.KeepAlive; wr.Expect = _modelRequest.Expect; //wr.Date = _modelRequest.Date; //wr.Host = _modelRequest.Host; wr.UserAgent = _modelRequest.UserAgent; //wr.Headers = _modelRequest.Headers; wr.Referer = _modelRequest.Referer; wr.Pipelined = _modelRequest.Pipelined; wr.PreAuthenticate = _modelRequest.PreAuthenticate; wr.ProtocolVersion = _modelRequest.ProtocolVersion; wr.Proxy = _modelRequest.Proxy; wr.ReadWriteTimeout = _modelRequest.ReadWriteTimeout; wr.Timeout = _modelRequest.Timeout; //the properties which are defined by Avro specification wr.Method = "POST"; wr.ContentType = "avro/binary"; wr.ContentLength = contentLength; return wr; } public static void WriteBuffers(IList buffers, Stream outStream) { foreach (MemoryStream memoryStream in buffers) { int num = (int)memoryStream.Length; outStream.Write(ConvertIntToBytes(num), 0, 4); memoryStream.WriteTo(outStream); } outStream.Write(ConvertIntToBytes(0), 0, 4); outStream.Flush(); } public override void WriteBuffers(IList buffers) { _httpRequest = CreateAvroHttpRequest(CalculateLength(buffers)); using (Stream requestStream = _httpRequest.GetRequestStream()) { WriteBuffers(buffers, requestStream); } } } }