Blame view

src/components/db/mongo.go 6.24 KB
ee23102d   zhangqijia   支持mongo, grpc接服务器
1
2
3
4
5
6
7
8
9
10
  package db
  
  import (
  	"context"
  	"fmt"
  	"go.mongodb.org/mongo-driver/bson"
  	"go.mongodb.org/mongo-driver/mongo"
  	"go.mongodb.org/mongo-driver/mongo/options"
  	"go.mongodb.org/mongo-driver/mongo/readpref"
  	"go.mongodb.org/mongo-driver/x/bsonx"
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
11
  	"pro2d/src/utils"
3592dfd3   zhangqijia   重构models, 索引唯一索引
12
  	"sort"
ee23102d   zhangqijia   支持mongo, grpc接服务器
13
  	"strconv"
98b0736d   zhangqijia   添加定时器, 检查心跳
14
  	"strings"
ee23102d   zhangqijia   支持mongo, grpc接服务器
15
16
  	"time"
  )
eb417b0b   zhangqijia   reactor mongo
17
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
18
19
20
21
  var (
  	MongoClient *mongo.Client
  	MongoDatabase *mongo.Database
  )
ee23102d   zhangqijia   支持mongo, grpc接服务器
22
23
  
  //初始化
1584eb4b   zhangqijia   修复创建唯一索引的bug
24
  func Connect(user, password, host string,port int, MaxNum int, timeOut int) error {
ee23102d   zhangqijia   支持mongo, grpc接服务器
25
  	var uri string
eb417b0b   zhangqijia   reactor mongo
26
27
28
  	if user!= "" {
  		//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
29
  	}else {
eb417b0b   zhangqijia   reactor mongo
30
31
  		//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
32
33
  	}
  	// 设置连接超时时间
eb417b0b   zhangqijia   reactor mongo
34
  	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
ee23102d   zhangqijia   支持mongo, grpc接服务器
35
36
37
38
  	defer cancel()
  	// 通过传进来的uri连接相关的配置
  	o := options.Client().ApplyURI(uri)
  	// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
eb417b0b   zhangqijia   reactor mongo
39
  	o.SetMaxPoolSize(uint64(MaxNum))
ee23102d   zhangqijia   支持mongo, grpc接服务器
40
  	// 发起链接
eb417b0b   zhangqijia   reactor mongo
41
  	var err error
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
42
  	MongoClient, err = mongo.Connect(ctx, o)
ee23102d   zhangqijia   支持mongo, grpc接服务器
43
  	if err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
44
45
46
  		return err
  	}
  	// 判断服务是不是可用
3592dfd3   zhangqijia   重构models, 索引唯一索引
47
  	if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
48
49
  		return err
  	}
3592dfd3   zhangqijia   重构models, 索引唯一索引
50
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
51
  	//MongoDatabase = MongoClient.Database(dbname)
ee23102d   zhangqijia   支持mongo, grpc接服务器
52
53
54
  	return nil
  }
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
  func CreateCollection(collection string) error {
  	colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
  	pos := sort.SearchStrings(colls, collection)
  	if pos != len(colls) {
  		if collection == colls[pos] {
  			return MongoDatabase.CreateCollection(context.TODO(), collection)
  		}
  	}
  	return MongoDatabase.CreateCollection(context.TODO(), collection)
  }
  
  func SetUnique(collection string, key string) (string, error)  {
  	return MongoDatabase.Collection(collection).Indexes().CreateOne(
  		context.TODO(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
eb417b0b   zhangqijia   reactor mongo
76
  type MgoColl struct {
3592dfd3   zhangqijia   重构models, 索引唯一索引
77
  	collection *mongo.Collection
ee23102d   zhangqijia   支持mongo, grpc接服务器
78
  
eb417b0b   zhangqijia   reactor mongo
79
80
  	pri interface{}
  	schema interface{}
ee23102d   zhangqijia   支持mongo, grpc接服务器
81
82
83
84
85
86
87
88
89
  }
  
  func GetBsonD(key string, value interface{}) interface{} {
  	return bson.D{ {key, value}}
  }
  func GetBsonM(key string, value interface{}) interface{}  {
  	return bson.M{key: value}
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
90
  func NewMongoColl(key string, schema interface{}) *MgoColl {
eb417b0b   zhangqijia   reactor mongo
91
  	return &MgoColl{
1584eb4b   zhangqijia   修复创建唯一索引的bug
92
  		collection: MongoDatabase.Collection(utils.GetCollName(schema)),
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
93
94
  		pri:        GetBsonM(utils.GetPriKey(schema), key),
  		schema:     schema,
ee23102d   zhangqijia   支持mongo, grpc接服务器
95
96
97
  	}
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
98
  func FindOne(pri interface{}, schema interface{}) error {
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
99
  	r := MongoDatabase.Collection(utils.GetCollName(schema)).FindOne(context.TODO(), pri)
1584eb4b   zhangqijia   修复创建唯一索引的bug
100
  	return r.Decode(schema)
ee23102d   zhangqijia   支持mongo, grpc接服务器
101
  }
ee23102d   zhangqijia   支持mongo, grpc接服务器
102
  
ee23102d   zhangqijia   支持mongo, grpc接服务器
103
  // 查询单个
eb417b0b   zhangqijia   reactor mongo
104
  func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
105
106
  	//collection.
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
107
  	singleResult := m.collection.FindOne(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
108
109
110
  	return singleResult
  }
  
ee23102d   zhangqijia   支持mongo, grpc接服务器
111
  //查询集合里有多少数据
eb417b0b   zhangqijia   reactor mongo
112
  func (m *MgoColl) CollectionCount() (string, int64) {
3592dfd3   zhangqijia   重构models, 索引唯一索引
113
114
  	size, _ := m.collection.EstimatedDocumentCount(context.TODO())
  	return m.collection.Name(), size
ee23102d   zhangqijia   支持mongo, grpc接服务器
115
116
117
  }
  
  //按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
eb417b0b   zhangqijia   reactor mongo
118
  func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
ee23102d   zhangqijia   支持mongo, grpc接服务器
119
120
121
122
123
124
  	SORT := bson.D{
  		{"_id", sort}} //filter := bson.D{ {key,value}}
  	filter := bson.D{
  	{}}
  	findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
  	//findOptions.SetLimit(i)
3592dfd3   zhangqijia   重构models, 索引唯一索引
125
  	temp, _ := m.collection.Find(context.Background(), filter, findOptions)
ee23102d   zhangqijia   支持mongo, grpc接服务器
126
127
128
129
  	return temp
  }
  
  //获取集合创建时间和编号
eb417b0b   zhangqijia   reactor mongo
130
  func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
131
132
133
134
135
136
137
138
139
  	temp1 := result[:8]
  	timestamp, _ := strconv.ParseInt(temp1, 16, 64)
  	dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
  	temp2 := result[18:]
  	count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
  	return dateTime, count
  }
  
  //删除文章和查询文章
eb417b0b   zhangqijia   reactor mongo
140
  func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
141
142
  	filter := bson.D{
  		{key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
143
144
  	singleResult := m.collection.FindOne(context.TODO(), filter)
  	DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
145
146
147
148
149
150
151
  	if err != nil {
  		fmt.Println("删除时出现错误,你删不掉的~")
  	}
  	return DeleteResult.DeletedCount, singleResult
  }
  
  //删除文章
eb417b0b   zhangqijia   reactor mongo
152
  func (m *MgoColl) Delete(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
153
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
154
  	count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
155
156
157
158
159
160
161
162
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  
  }
  
  //删除多个
eb417b0b   zhangqijia   reactor mongo
163
  func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
164
165
  	filter := bson.D{ {key, value}}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
166
  	count, err := m.collection.DeleteMany(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
167
168
169
170
171
172
173
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  }
  
  //索引
3592dfd3   zhangqijia   重构models, 索引唯一索引
174
175
  func (m *MgoColl) SetUnique(key string){
  	m.collection.Indexes().CreateOne(
ee23102d   zhangqijia   支持mongo, grpc接服务器
176
177
178
179
180
181
182
183
184
  		context.Background(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
  //更新&保存
eb417b0b   zhangqijia   reactor mongo
185
  func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
186
187
188
  	//filter := bson.M{"name": "x", "array.name": "b"}
  	//update := bson.M{"array.$[item].detail": "test"}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
189
  	res := m.collection.FindOneAndUpdate(context.Background(),
ee23102d   zhangqijia   支持mongo, grpc接服务器
190
191
192
193
194
195
196
197
  		filter,
  		bson.M{"$set": update})
  	if res.Err() != nil {
  		return nil
  	}
  	return res
  }
  
eb417b0b   zhangqijia   reactor mongo
198
  func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
3592dfd3   zhangqijia   重构models, 索引唯一索引
199
  	res, err := m.collection.UpdateOne(context.TODO(), filter, update)
ee23102d   zhangqijia   支持mongo, grpc接服务器
200
201
202
203
204
  	if err != nil {
  		return nil
  	}
  
  	return res
eb417b0b   zhangqijia   reactor mongo
205
206
207
208
  }
  
  
  func (m *MgoColl) Load() error{
1584eb4b   zhangqijia   修复创建唯一索引的bug
209
  	r := m.collection.FindOne(context.TODO(), m.pri)
eb417b0b   zhangqijia   reactor mongo
210
211
212
213
214
215
216
  	err := r.Decode(m.schema)
  	if err != nil {
  		return err
  	}
  	return nil
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
217
218
  func (m *MgoColl) Create() (*mongo.InsertOneResult, error){
  	return m.collection.InsertOne(context.TODO(), m.schema)
eb417b0b   zhangqijia   reactor mongo
219
220
221
222
223
224
  }
  
  func (m *MgoColl) Update(update interface{}) {
  	m.FindOneAndUpdate(m.pri, update)
  }
  
98b0736d   zhangqijia   添加定时器, 检查心跳
225
226
227
228
  func (m *MgoColl) UpdateProperty(key string, val interface{}) {
  	m.FindOneAndUpdate(m.pri, bson.M{strings.ToLower(key): val})
  }
  
eb417b0b   zhangqijia   reactor mongo
229
230
  func (m *MgoColl)Save() {
  	m.FindOneAndUpdate(m.pri, m.schema)
ee23102d   zhangqijia   支持mongo, grpc接服务器
231
  }