1/* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, 13 * software distributed under the License is distributed on an 14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15 * KIND, either express or implied. See the License for the 16 * specific language governing permissions and limitations 17 * under the License. 18 */ 19 20package main 21 22import ( 23 "context" 24 "flag" 25 "fmt" 26 "log" 27 _ "net/http/pprof" 28 "os" 29 "runtime" 30 "runtime/pprof" 31 "sync" 32 "sync/atomic" 33 "time" 34 35 "github.com/apache/thrift/lib/go/thrift" 36 "github.com/apache/thrift/test/go/src/gen/stress" 37) 38 39var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to this file") 40var memprofile = flag.String("memprofile", "", "write memory profile to this file") 41 42var ( 43 host = flag.String("host", "localhost", "Host to connect") 44 port = flag.Int64("port", 9091, "Port number to connect") 45 loop = flag.Int("loops", 50000, "The number of remote thrift calls each client makes") 46 runserver = flag.Int("server", 1, "Run the Thrift server in this process") 47 clients = flag.Int("clients", 20, "Number of client threads to create - 0 implies no clients, i.e. server only") 48 callName = flag.String("call", "echoVoid", "Service method to call, one of echoVoid, echoByte, echoI32, echoI64, echoString, echiList, echoSet, echoMap") 49 compact = flag.Bool("compact", false, "Use compact protocol instead of binary.") 50 framed = flag.Bool("framed", false, "Use framed transport instead of buffered.") 51) 52var hostPort string 53 54type callT int64 55 56const ( 57 echoVoid callT = iota 58 echoByte 59 echoI32 60 echoI64 61 echoString 62 echiList 63 echoSet 64 echoMap 65) 66 67var callTMap = map[string]callT{ 68 "echoVoid": echoVoid, 69 "echoByte": echoByte, 70 "echoI32": echoI32, 71 "echoI64": echoI64, 72 "echoString": echoString, 73 "echiList": echiList, 74 "echoSet": echoSet, 75 "echoMap": echoMap, 76} 77var callType callT 78 79var ready, done sync.WaitGroup 80 81var clicounter int64 = 0 82var counter int64 = 0 83 84func main() { 85 flag.Parse() 86 if *memprofile != "" { 87 runtime.MemProfileRate = 100 88 } 89 var ok bool 90 if callType, ok = callTMap[*callName]; !ok { 91 log.Fatal("Unknown service call", *callName) 92 } 93 if *cpuprofile != "" { 94 f, err := os.Create(*cpuprofile) 95 if err != nil { 96 log.Fatal(err) 97 } 98 pprof.StartCPUProfile(f) 99 defer pprof.StopCPUProfile() 100 } 101 hostPort = fmt.Sprintf("%s:%d", *host, *port) 102 var protocolFactory thrift.TProtocolFactory 103 var transportFactory thrift.TTransportFactory 104 105 if *compact { 106 protocolFactory = thrift.NewTCompactProtocolFactoryConf(nil) 107 } else { 108 protocolFactory = thrift.NewTBinaryProtocolFactoryConf(nil) 109 } 110 111 if *framed { 112 transportFactory = thrift.NewTTransportFactory() 113 transportFactory = thrift.NewTFramedTransportFactoryConf(transportFactory, nil) 114 } else { 115 transportFactory = thrift.NewTBufferedTransportFactory(8192) 116 } 117 118 if *runserver > 0 { 119 serverTransport, err := thrift.NewTServerSocket(hostPort) 120 if err != nil { 121 log.Fatalf("Unable to create server socket: %s", err) 122 } 123 124 processor := stress.NewServiceProcessor(&handler{}) 125 server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory) 126 if *clients == 0 { 127 server.Serve() 128 } else { 129 go server.Serve() 130 } 131 } 132 //start clients 133 if *clients != 0 { 134 ready.Add(*clients + 1) 135 done.Add(*clients) 136 for i := 0; i < *clients; i++ { 137 go client(protocolFactory) 138 } 139 ready.Done() 140 ready.Wait() 141 //run! 142 startTime := time.Now() 143 //wait for completion 144 done.Wait() 145 endTime := time.Now() 146 duration := endTime.Sub(startTime) 147 log.Printf("%d calls in %v (%f calls per second)\n", clicounter, duration, float64(clicounter)/duration.Seconds()) 148 } 149 if *memprofile != "" { 150 f, err := os.Create(*memprofile) 151 if err != nil { 152 log.Fatal(err) 153 } 154 pprof.WriteHeapProfile(f) 155 f.Close() 156 return 157 } 158} 159 160func client(protocolFactory thrift.TProtocolFactory) { 161 ctx := context.Background() 162 trans := thrift.NewTSocketConf(hostPort, nil) 163 btrans := thrift.NewTBufferedTransport(trans, 2048) 164 client := stress.NewServiceClientFactory(btrans, protocolFactory) 165 err := trans.Open() 166 if err != nil { 167 log.Fatalf("Unable to open connection: %s", err) 168 } 169 ready.Done() 170 ready.Wait() 171 switch callType { 172 case echoVoid: 173 for i := 0; i < *loop; i++ { 174 client.EchoVoid(ctx) 175 atomic.AddInt64(&clicounter, 1) 176 } 177 case echoByte: 178 for i := 0; i < *loop; i++ { 179 client.EchoByte(ctx, 42) 180 atomic.AddInt64(&clicounter, 1) 181 } 182 case echoI32: 183 for i := 0; i < *loop; i++ { 184 client.EchoI32(ctx, 4242) 185 atomic.AddInt64(&clicounter, 1) 186 } 187 case echoI64: 188 for i := 0; i < *loop; i++ { 189 client.EchoI64(ctx, 424242) 190 atomic.AddInt64(&clicounter, 1) 191 } 192 case echoString: 193 for i := 0; i < *loop; i++ { 194 client.EchoString(ctx, "TestString") 195 atomic.AddInt64(&clicounter, 1) 196 } 197 case echiList: 198 l := []int8{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8} 199 for i := 0; i < *loop; i++ { 200 client.EchoList(ctx, l) 201 atomic.AddInt64(&clicounter, 1) 202 } 203 case echoSet: 204 s := []int8{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8} 205 for i := 0; i < *loop; i++ { 206 client.EchoSet(ctx, s) 207 atomic.AddInt64(&clicounter, 1) 208 } 209 case echoMap: 210 m := map[int8]int8{-10: 10, -9: 9, -8: 8, -7: 7, -6: 6, -5: 5, -4: 4, -3: 3, -2: 2, -1: 1, 0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8} 211 for i := 0; i < *loop; i++ { 212 client.EchoMap(ctx, m) 213 atomic.AddInt64(&clicounter, 1) 214 } 215 } 216 217 done.Done() 218} 219 220type handler struct{} 221 222func (h *handler) EchoVoid(ctx context.Context) (err error) { 223 atomic.AddInt64(&counter, 1) 224 return nil 225} 226func (h *handler) EchoByte(ctx context.Context, arg int8) (r int8, err error) { 227 atomic.AddInt64(&counter, 1) 228 return arg, nil 229} 230func (h *handler) EchoI32(ctx context.Context, arg int32) (r int32, err error) { 231 atomic.AddInt64(&counter, 1) 232 return arg, nil 233} 234func (h *handler) EchoI64(ctx context.Context, arg int64) (r int64, err error) { 235 atomic.AddInt64(&counter, 1) 236 return arg, nil 237} 238func (h *handler) EchoString(ctx context.Context, arg string) (r string, err error) { 239 atomic.AddInt64(&counter, 1) 240 return arg, nil 241} 242func (h *handler) EchoList(ctx context.Context, arg []int8) (r []int8, err error) { 243 atomic.AddInt64(&counter, 1) 244 return arg, nil 245} 246func (h *handler) EchoSet(ctx context.Context, arg []int8) (r []int8, err error) { 247 atomic.AddInt64(&counter, 1) 248 return arg, nil 249} 250func (h *handler) EchoMap(ctx context.Context, arg map[int8]int8) (r map[int8]int8, err error) { 251 atomic.AddInt64(&counter, 1) 252 return arg, nil 253} 254