aboutsummaryrefslogtreecommitdiffhomepage
path: root/vendor/go.opencensus.io/stats/view/worker.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/go.opencensus.io/stats/view/worker.go')
-rw-r--r--vendor/go.opencensus.io/stats/view/worker.go229
1 files changed, 229 insertions, 0 deletions
diff --git a/vendor/go.opencensus.io/stats/view/worker.go b/vendor/go.opencensus.io/stats/view/worker.go
new file mode 100644
index 0000000..63b0ee3
--- /dev/null
+++ b/vendor/go.opencensus.io/stats/view/worker.go
@@ -0,0 +1,229 @@
1// Copyright 2017, OpenCensus Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14//
15
16package view
17
18import (
19 "fmt"
20 "time"
21
22 "go.opencensus.io/stats"
23 "go.opencensus.io/stats/internal"
24 "go.opencensus.io/tag"
25)
26
27func init() {
28 defaultWorker = newWorker()
29 go defaultWorker.start()
30 internal.DefaultRecorder = record
31}
32
33type measureRef struct {
34 measure string
35 views map[*viewInternal]struct{}
36}
37
38type worker struct {
39 measures map[string]*measureRef
40 views map[string]*viewInternal
41 startTimes map[*viewInternal]time.Time
42
43 timer *time.Ticker
44 c chan command
45 quit, done chan bool
46}
47
48var defaultWorker *worker
49
50var defaultReportingDuration = 10 * time.Second
51
52// Find returns a registered view associated with this name.
53// If no registered view is found, nil is returned.
54func Find(name string) (v *View) {
55 req := &getViewByNameReq{
56 name: name,
57 c: make(chan *getViewByNameResp),
58 }
59 defaultWorker.c <- req
60 resp := <-req.c
61 return resp.v
62}
63
64// Register begins collecting data for the given views.
65// Once a view is registered, it reports data to the registered exporters.
66func Register(views ...*View) error {
67 for _, v := range views {
68 if err := v.canonicalize(); err != nil {
69 return err
70 }
71 }
72 req := &registerViewReq{
73 views: views,
74 err: make(chan error),
75 }
76 defaultWorker.c <- req
77 return <-req.err
78}
79
80// Unregister the given views. Data will not longer be exported for these views
81// after Unregister returns.
82// It is not necessary to unregister from views you expect to collect for the
83// duration of your program execution.
84func Unregister(views ...*View) {
85 names := make([]string, len(views))
86 for i := range views {
87 names[i] = views[i].Name
88 }
89 req := &unregisterFromViewReq{
90 views: names,
91 done: make(chan struct{}),
92 }
93 defaultWorker.c <- req
94 <-req.done
95}
96
97// RetrieveData gets a snapshot of the data collected for the the view registered
98// with the given name. It is intended for testing only.
99func RetrieveData(viewName string) ([]*Row, error) {
100 req := &retrieveDataReq{
101 now: time.Now(),
102 v: viewName,
103 c: make(chan *retrieveDataResp),
104 }
105 defaultWorker.c <- req
106 resp := <-req.c
107 return resp.rows, resp.err
108}
109
110func record(tags *tag.Map, ms interface{}, attachments map[string]string) {
111 req := &recordReq{
112 tm: tags,
113 ms: ms.([]stats.Measurement),
114 attachments: attachments,
115 t: time.Now(),
116 }
117 defaultWorker.c <- req
118}
119
120// SetReportingPeriod sets the interval between reporting aggregated views in
121// the program. If duration is less than or equal to zero, it enables the
122// default behavior.
123//
124// Note: each exporter makes different promises about what the lowest supported
125// duration is. For example, the Stackdriver exporter recommends a value no
126// lower than 1 minute. Consult each exporter per your needs.
127func SetReportingPeriod(d time.Duration) {
128 // TODO(acetechnologist): ensure that the duration d is more than a certain
129 // value. e.g. 1s
130 req := &setReportingPeriodReq{
131 d: d,
132 c: make(chan bool),
133 }
134 defaultWorker.c <- req
135 <-req.c // don't return until the timer is set to the new duration.
136}
137
138func newWorker() *worker {
139 return &worker{
140 measures: make(map[string]*measureRef),
141 views: make(map[string]*viewInternal),
142 startTimes: make(map[*viewInternal]time.Time),
143 timer: time.NewTicker(defaultReportingDuration),
144 c: make(chan command, 1024),
145 quit: make(chan bool),
146 done: make(chan bool),
147 }
148}
149
150func (w *worker) start() {
151 for {
152 select {
153 case cmd := <-w.c:
154 cmd.handleCommand(w)
155 case <-w.timer.C:
156 w.reportUsage(time.Now())
157 case <-w.quit:
158 w.timer.Stop()
159 close(w.c)
160 w.done <- true
161 return
162 }
163 }
164}
165
166func (w *worker) stop() {
167 w.quit <- true
168 <-w.done
169}
170
171func (w *worker) getMeasureRef(name string) *measureRef {
172 if mr, ok := w.measures[name]; ok {
173 return mr
174 }
175 mr := &measureRef{
176 measure: name,
177 views: make(map[*viewInternal]struct{}),
178 }
179 w.measures[name] = mr
180 return mr
181}
182
183func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
184 vi, err := newViewInternal(v)
185 if err != nil {
186 return nil, err
187 }
188 if x, ok := w.views[vi.view.Name]; ok {
189 if !x.view.same(vi.view) {
190 return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
191 }
192
193 // the view is already registered so there is nothing to do and the
194 // command is considered successful.
195 return x, nil
196 }
197 w.views[vi.view.Name] = vi
198 ref := w.getMeasureRef(vi.view.Measure.Name())
199 ref.views[vi] = struct{}{}
200 return vi, nil
201}
202
203func (w *worker) reportView(v *viewInternal, now time.Time) {
204 if !v.isSubscribed() {
205 return
206 }
207 rows := v.collectedRows()
208 _, ok := w.startTimes[v]
209 if !ok {
210 w.startTimes[v] = now
211 }
212 viewData := &Data{
213 View: v.view,
214 Start: w.startTimes[v],
215 End: time.Now(),
216 Rows: rows,
217 }
218 exportersMu.Lock()
219 for e := range exporters {
220 e.ExportView(viewData)
221 }
222 exportersMu.Unlock()
223}
224
225func (w *worker) reportUsage(now time.Time) {
226 for _, v := range w.views {
227 w.reportView(v, now)
228 }
229}