1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20package main
21
22import (
23	"context"
24	"flag"
25	"fmt"
26	"log"
27	_ "net/http/pprof"
28	"os"
29	"runtime"
30	"runtime/pprof"
31	"sync"
32	"sync/atomic"
33	"time"
34
35	"github.com/apache/thrift/lib/go/thrift"
36	"github.com/apache/thrift/test/go/src/gen/stress"
37)
38
39var cpuprofile = flag.String("cpuprofile", "", "write cpu profile to this file")
40var memprofile = flag.String("memprofile", "", "write memory profile to this file")
41
42var (
43	host      = flag.String("host", "localhost", "Host to connect")
44	port      = flag.Int64("port", 9091, "Port number to connect")
45	loop      = flag.Int("loops", 50000, "The number of remote thrift calls each client makes")
46	runserver = flag.Int("server", 1, "Run the Thrift server in this process")
47	clients   = flag.Int("clients", 20, "Number of client threads to create - 0 implies no clients, i.e. server only")
48	callName  = flag.String("call", "echoVoid", "Service method to call, one of echoVoid, echoByte, echoI32, echoI64, echoString, echiList, echoSet, echoMap")
49	compact   = flag.Bool("compact", false, "Use compact protocol instead of binary.")
50	framed    = flag.Bool("framed", false, "Use framed transport instead of buffered.")
51)
52var hostPort string
53
54type callT int64
55
56const (
57	echoVoid callT = iota
58	echoByte
59	echoI32
60	echoI64
61	echoString
62	echiList
63	echoSet
64	echoMap
65)
66
67var callTMap = map[string]callT{
68	"echoVoid":   echoVoid,
69	"echoByte":   echoByte,
70	"echoI32":    echoI32,
71	"echoI64":    echoI64,
72	"echoString": echoString,
73	"echiList":   echiList,
74	"echoSet":    echoSet,
75	"echoMap":    echoMap,
76}
77var callType callT
78
79var ready, done sync.WaitGroup
80
81var clicounter int64 = 0
82var counter int64 = 0
83
84func main() {
85	flag.Parse()
86	if *memprofile != "" {
87		runtime.MemProfileRate = 100
88	}
89	var ok bool
90	if callType, ok = callTMap[*callName]; !ok {
91		log.Fatal("Unknown service call", *callName)
92	}
93	if *cpuprofile != "" {
94		f, err := os.Create(*cpuprofile)
95		if err != nil {
96			log.Fatal(err)
97		}
98		pprof.StartCPUProfile(f)
99		defer pprof.StopCPUProfile()
100	}
101	hostPort = fmt.Sprintf("%s:%d", *host, *port)
102	var protocolFactory thrift.TProtocolFactory
103	var transportFactory thrift.TTransportFactory
104
105	if *compact {
106		protocolFactory = thrift.NewTCompactProtocolFactoryConf(nil)
107	} else {
108		protocolFactory = thrift.NewTBinaryProtocolFactoryConf(nil)
109	}
110
111	if *framed {
112		transportFactory = thrift.NewTTransportFactory()
113		transportFactory = thrift.NewTFramedTransportFactoryConf(transportFactory, nil)
114	} else {
115		transportFactory = thrift.NewTBufferedTransportFactory(8192)
116	}
117
118	if *runserver > 0 {
119		serverTransport, err := thrift.NewTServerSocket(hostPort)
120		if err != nil {
121			log.Fatalf("Unable to create server socket: %s", err)
122		}
123
124		processor := stress.NewServiceProcessor(&handler{})
125		server := thrift.NewTSimpleServer4(processor, serverTransport, transportFactory, protocolFactory)
126		if *clients == 0 {
127			server.Serve()
128		} else {
129			go server.Serve()
130		}
131	}
132	//start clients
133	if *clients != 0 {
134		ready.Add(*clients + 1)
135		done.Add(*clients)
136		for i := 0; i < *clients; i++ {
137			go client(protocolFactory)
138		}
139		ready.Done()
140		ready.Wait()
141		//run!
142		startTime := time.Now()
143		//wait for completion
144		done.Wait()
145		endTime := time.Now()
146		duration := endTime.Sub(startTime)
147		log.Printf("%d calls in %v (%f calls per second)\n", clicounter, duration, float64(clicounter)/duration.Seconds())
148	}
149	if *memprofile != "" {
150		f, err := os.Create(*memprofile)
151		if err != nil {
152			log.Fatal(err)
153		}
154		pprof.WriteHeapProfile(f)
155		f.Close()
156		return
157	}
158}
159
160func client(protocolFactory thrift.TProtocolFactory) {
161	ctx := context.Background()
162	trans := thrift.NewTSocketConf(hostPort, nil)
163	btrans := thrift.NewTBufferedTransport(trans, 2048)
164	client := stress.NewServiceClientFactory(btrans, protocolFactory)
165	err := trans.Open()
166	if err != nil {
167		log.Fatalf("Unable to open connection: %s", err)
168	}
169	ready.Done()
170	ready.Wait()
171	switch callType {
172	case echoVoid:
173		for i := 0; i < *loop; i++ {
174			client.EchoVoid(ctx)
175			atomic.AddInt64(&clicounter, 1)
176		}
177	case echoByte:
178		for i := 0; i < *loop; i++ {
179			client.EchoByte(ctx, 42)
180			atomic.AddInt64(&clicounter, 1)
181		}
182	case echoI32:
183		for i := 0; i < *loop; i++ {
184			client.EchoI32(ctx, 4242)
185			atomic.AddInt64(&clicounter, 1)
186		}
187	case echoI64:
188		for i := 0; i < *loop; i++ {
189			client.EchoI64(ctx, 424242)
190			atomic.AddInt64(&clicounter, 1)
191		}
192	case echoString:
193		for i := 0; i < *loop; i++ {
194			client.EchoString(ctx, "TestString")
195			atomic.AddInt64(&clicounter, 1)
196		}
197	case echiList:
198		l := []int8{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8}
199		for i := 0; i < *loop; i++ {
200			client.EchoList(ctx, l)
201			atomic.AddInt64(&clicounter, 1)
202		}
203	case echoSet:
204		s := []int8{-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3, 4, 5, 6, 7, 8}
205		for i := 0; i < *loop; i++ {
206			client.EchoSet(ctx, s)
207			atomic.AddInt64(&clicounter, 1)
208		}
209	case echoMap:
210		m := map[int8]int8{-10: 10, -9: 9, -8: 8, -7: 7, -6: 6, -5: 5, -4: 4, -3: 3, -2: 2, -1: 1, 0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7, 8: 8}
211		for i := 0; i < *loop; i++ {
212			client.EchoMap(ctx, m)
213			atomic.AddInt64(&clicounter, 1)
214		}
215	}
216
217	done.Done()
218}
219
220type handler struct{}
221
222func (h *handler) EchoVoid(ctx context.Context) (err error) {
223	atomic.AddInt64(&counter, 1)
224	return nil
225}
226func (h *handler) EchoByte(ctx context.Context, arg int8) (r int8, err error) {
227	atomic.AddInt64(&counter, 1)
228	return arg, nil
229}
230func (h *handler) EchoI32(ctx context.Context, arg int32) (r int32, err error) {
231	atomic.AddInt64(&counter, 1)
232	return arg, nil
233}
234func (h *handler) EchoI64(ctx context.Context, arg int64) (r int64, err error) {
235	atomic.AddInt64(&counter, 1)
236	return arg, nil
237}
238func (h *handler) EchoString(ctx context.Context, arg string) (r string, err error) {
239	atomic.AddInt64(&counter, 1)
240	return arg, nil
241}
242func (h *handler) EchoList(ctx context.Context, arg []int8) (r []int8, err error) {
243	atomic.AddInt64(&counter, 1)
244	return arg, nil
245}
246func (h *handler) EchoSet(ctx context.Context, arg []int8) (r []int8, err error) {
247	atomic.AddInt64(&counter, 1)
248	return arg, nil
249}
250func (h *handler) EchoMap(ctx context.Context, arg map[int8]int8) (r map[int8]int8, err error) {
251	atomic.AddInt64(&counter, 1)
252	return arg, nil
253}
254