Blame view

src/components/db/mongo.go 6.17 KB
ee23102d   zhangqijia   支持mongo, grpc接服务器
1
2
3
4
5
6
7
8
9
10
  package db
  
  import (
  	"context"
  	"fmt"
  	"go.mongodb.org/mongo-driver/bson"
  	"go.mongodb.org/mongo-driver/mongo"
  	"go.mongodb.org/mongo-driver/mongo/options"
  	"go.mongodb.org/mongo-driver/mongo/readpref"
  	"go.mongodb.org/mongo-driver/x/bsonx"
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
11
  	"pro2d/src/utils"
3592dfd3   zhangqijia   重构models, 索引唯一索引
12
  	"sort"
ee23102d   zhangqijia   支持mongo, grpc接服务器
13
  	"strconv"
98b0736d   zhangqijia   添加定时器, 检查心跳
14
  	"strings"
ee23102d   zhangqijia   支持mongo, grpc接服务器
15
16
  	"time"
  )
eb417b0b   zhangqijia   reactor mongo
17
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
18
19
20
21
  var (
  	MongoClient *mongo.Client
  	MongoDatabase *mongo.Database
  )
ee23102d   zhangqijia   支持mongo, grpc接服务器
22
23
  
  //初始化
1584eb4b   zhangqijia   修复创建唯一索引的bug
24
  func Connect(user, password, host string,port int, MaxNum int, timeOut int) error {
ee23102d   zhangqijia   支持mongo, grpc接服务器
25
  	var uri string
eb417b0b   zhangqijia   reactor mongo
26
27
28
  	if user!= "" {
  		//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
29
  	}else {
eb417b0b   zhangqijia   reactor mongo
30
31
  		//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
32
33
  	}
  	// 设置连接超时时间
eb417b0b   zhangqijia   reactor mongo
34
  	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
ee23102d   zhangqijia   支持mongo, grpc接服务器
35
36
37
38
  	defer cancel()
  	// 通过传进来的uri连接相关的配置
  	o := options.Client().ApplyURI(uri)
  	// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
eb417b0b   zhangqijia   reactor mongo
39
  	o.SetMaxPoolSize(uint64(MaxNum))
ee23102d   zhangqijia   支持mongo, grpc接服务器
40
  	// 发起链接
eb417b0b   zhangqijia   reactor mongo
41
  	var err error
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
42
  	MongoClient, err = mongo.Connect(ctx, o)
ee23102d   zhangqijia   支持mongo, grpc接服务器
43
  	if err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
44
45
46
  		return err
  	}
  	// 判断服务是不是可用
3592dfd3   zhangqijia   重构models, 索引唯一索引
47
  	if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
48
49
  		return err
  	}
3592dfd3   zhangqijia   重构models, 索引唯一索引
50
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
51
  	//MongoDatabase = MongoClient.Database(dbname)
ee23102d   zhangqijia   支持mongo, grpc接服务器
52
53
54
  	return nil
  }
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
  func CreateCollection(collection string) error {
  	colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
  	pos := sort.SearchStrings(colls, collection)
  	if pos != len(colls) {
  		if collection == colls[pos] {
  			return MongoDatabase.CreateCollection(context.TODO(), collection)
  		}
  	}
  	return MongoDatabase.CreateCollection(context.TODO(), collection)
  }
  
  func SetUnique(collection string, key string) (string, error)  {
  	return MongoDatabase.Collection(collection).Indexes().CreateOne(
  		context.TODO(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
eb417b0b   zhangqijia   reactor mongo
76
  type MgoColl struct {
3592dfd3   zhangqijia   重构models, 索引唯一索引
77
  	collection *mongo.Collection
ee23102d   zhangqijia   支持mongo, grpc接服务器
78
  
33ea26ab   zhangqijia   使用schema封装mongo
79
  	schema *Schema
ee23102d   zhangqijia   支持mongo, grpc接服务器
80
81
82
83
84
85
86
87
88
  }
  
  func GetBsonD(key string, value interface{}) interface{} {
  	return bson.D{ {key, value}}
  }
  func GetBsonM(key string, value interface{}) interface{}  {
  	return bson.M{key: value}
  }
  
33ea26ab   zhangqijia   使用schema封装mongo
89
  func NewMongoColl(schema *Schema) *MgoColl {
eb417b0b   zhangqijia   reactor mongo
90
  	return &MgoColl{
33ea26ab   zhangqijia   使用schema封装mongo
91
  		collection: MongoDatabase.Collection(schema.GetCollName()),
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
92
  		schema:     schema,
ee23102d   zhangqijia   支持mongo, grpc接服务器
93
94
95
  	}
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
96
  func FindOne(pri interface{}, schema interface{}) error {
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
97
  	r := MongoDatabase.Collection(utils.GetCollName(schema)).FindOne(context.TODO(), pri)
1584eb4b   zhangqijia   修复创建唯一索引的bug
98
  	return r.Decode(schema)
ee23102d   zhangqijia   支持mongo, grpc接服务器
99
  }
ee23102d   zhangqijia   支持mongo, grpc接服务器
100
  
ee23102d   zhangqijia   支持mongo, grpc接服务器
101
  // 查询单个
eb417b0b   zhangqijia   reactor mongo
102
  func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
103
104
  	//collection.
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
105
  	singleResult := m.collection.FindOne(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
106
107
108
  	return singleResult
  }
  
ee23102d   zhangqijia   支持mongo, grpc接服务器
109
  //查询集合里有多少数据
eb417b0b   zhangqijia   reactor mongo
110
  func (m *MgoColl) CollectionCount() (string, int64) {
3592dfd3   zhangqijia   重构models, 索引唯一索引
111
112
  	size, _ := m.collection.EstimatedDocumentCount(context.TODO())
  	return m.collection.Name(), size
ee23102d   zhangqijia   支持mongo, grpc接服务器
113
114
115
  }
  
  //按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
eb417b0b   zhangqijia   reactor mongo
116
  func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
ee23102d   zhangqijia   支持mongo, grpc接服务器
117
118
119
120
121
122
  	SORT := bson.D{
  		{"_id", sort}} //filter := bson.D{ {key,value}}
  	filter := bson.D{
  	{}}
  	findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
  	//findOptions.SetLimit(i)
3592dfd3   zhangqijia   重构models, 索引唯一索引
123
  	temp, _ := m.collection.Find(context.Background(), filter, findOptions)
ee23102d   zhangqijia   支持mongo, grpc接服务器
124
125
126
127
  	return temp
  }
  
  //获取集合创建时间和编号
eb417b0b   zhangqijia   reactor mongo
128
  func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
129
130
131
132
133
134
135
136
137
  	temp1 := result[:8]
  	timestamp, _ := strconv.ParseInt(temp1, 16, 64)
  	dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
  	temp2 := result[18:]
  	count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
  	return dateTime, count
  }
  
  //删除文章和查询文章
eb417b0b   zhangqijia   reactor mongo
138
  func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
139
140
  	filter := bson.D{
  		{key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
141
142
  	singleResult := m.collection.FindOne(context.TODO(), filter)
  	DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
143
144
145
146
147
148
149
  	if err != nil {
  		fmt.Println("删除时出现错误,你删不掉的~")
  	}
  	return DeleteResult.DeletedCount, singleResult
  }
  
  //删除文章
eb417b0b   zhangqijia   reactor mongo
150
  func (m *MgoColl) Delete(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
151
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
152
  	count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
153
154
155
156
157
158
159
160
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  
  }
  
  //删除多个
eb417b0b   zhangqijia   reactor mongo
161
  func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
162
163
  	filter := bson.D{ {key, value}}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
164
  	count, err := m.collection.DeleteMany(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
165
166
167
168
169
170
171
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  }
  
  //索引
3592dfd3   zhangqijia   重构models, 索引唯一索引
172
173
  func (m *MgoColl) SetUnique(key string){
  	m.collection.Indexes().CreateOne(
ee23102d   zhangqijia   支持mongo, grpc接服务器
174
175
176
177
178
179
180
181
182
  		context.Background(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
  //更新&保存
eb417b0b   zhangqijia   reactor mongo
183
  func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
184
185
186
  	//filter := bson.M{"name": "x", "array.name": "b"}
  	//update := bson.M{"array.$[item].detail": "test"}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
187
  	res := m.collection.FindOneAndUpdate(context.Background(),
ee23102d   zhangqijia   支持mongo, grpc接服务器
188
189
190
191
192
193
194
195
  		filter,
  		bson.M{"$set": update})
  	if res.Err() != nil {
  		return nil
  	}
  	return res
  }
  
eb417b0b   zhangqijia   reactor mongo
196
  func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
33ea26ab   zhangqijia   使用schema封装mongo
197
  	res, err := m.collection.UpdateOne(context.TODO(), filter, bson.D{{"$set", update}})
ee23102d   zhangqijia   支持mongo, grpc接服务器
198
199
200
201
202
  	if err != nil {
  		return nil
  	}
  
  	return res
eb417b0b   zhangqijia   reactor mongo
203
204
  }
  
eb417b0b   zhangqijia   reactor mongo
205
  func (m *MgoColl) Load() error{
33ea26ab   zhangqijia   使用schema封装mongo
206
207
  	r := m.collection.FindOne(context.TODO(), m.schema.GetPri())
  	err := r.Decode(m.schema.GetSchema())
eb417b0b   zhangqijia   reactor mongo
208
209
210
211
212
213
  	if err != nil {
  		return err
  	}
  	return nil
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
214
  func (m *MgoColl) Create() (*mongo.InsertOneResult, error){
33ea26ab   zhangqijia   使用schema封装mongo
215
  	return m.collection.InsertOne(context.TODO(), m.schema.GetSchema())
eb417b0b   zhangqijia   reactor mongo
216
217
  }
  
98b0736d   zhangqijia   添加定时器, 检查心跳
218
  func (m *MgoColl) UpdateProperty(key string, val interface{}) {
33ea26ab   zhangqijia   使用schema封装mongo
219
  	m.UpdateOne(m.schema.GetPri(), bson.M{strings.ToLower(key): val})
98b0736d   zhangqijia   添加定时器, 检查心跳
220
221
  }
  
33ea26ab   zhangqijia   使用schema封装mongo
222
223
224
  func (m *MgoColl) UpdateProperties(properties map[string]interface{}) {
  	m.UpdateOne(m.schema.GetPri(), properties)
  }