ee23102d
zhangqijia
支持mongo, grpc接服务器
|
1
2
3
4
5
6
7
8
9
10
|
package db
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
11
|
"pro2d/src/utils"
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
12
|
"sort"
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
13
|
"strconv"
|
98b0736d
zhangqijia
添加定时器, 检查心跳
|
14
|
"strings"
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
15
16
|
"time"
)
|
eb417b0b
zhangqijia
reactor mongo
|
17
|
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
18
19
20
21
|
var (
MongoClient *mongo.Client
MongoDatabase *mongo.Database
)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
22
23
|
//初始化
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
24
|
func Connect(user, password, host string,port int, MaxNum int, timeOut int) error {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
25
|
var uri string
|
eb417b0b
zhangqijia
reactor mongo
|
26
27
28
|
if user!= "" {
//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
29
|
}else {
|
eb417b0b
zhangqijia
reactor mongo
|
30
31
|
//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
32
33
|
}
// 设置连接超时时间
|
eb417b0b
zhangqijia
reactor mongo
|
34
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
35
36
37
38
|
defer cancel()
// 通过传进来的uri连接相关的配置
o := options.Client().ApplyURI(uri)
// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
|
eb417b0b
zhangqijia
reactor mongo
|
39
|
o.SetMaxPoolSize(uint64(MaxNum))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
40
|
// 发起链接
|
eb417b0b
zhangqijia
reactor mongo
|
41
|
var err error
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
42
|
MongoClient, err = mongo.Connect(ctx, o)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
43
|
if err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
44
45
46
|
return err
}
// 判断服务是不是可用
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
47
|
if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
48
49
|
return err
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
50
|
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
51
|
//MongoDatabase = MongoClient.Database(dbname)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
52
53
54
|
return nil
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
func CreateCollection(collection string) error {
colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
pos := sort.SearchStrings(colls, collection)
if pos != len(colls) {
if collection == colls[pos] {
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
}
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
func SetUnique(collection string, key string) (string, error) {
return MongoDatabase.Collection(collection).Indexes().CreateOne(
context.TODO(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
|
eb417b0b
zhangqijia
reactor mongo
|
76
|
type MgoColl struct {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
77
|
collection *mongo.Collection
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
78
|
|
eb417b0b
zhangqijia
reactor mongo
|
79
80
|
pri interface{}
schema interface{}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
81
82
83
84
85
86
87
88
89
|
}
func GetBsonD(key string, value interface{}) interface{} {
return bson.D{ {key, value}}
}
func GetBsonM(key string, value interface{}) interface{} {
return bson.M{key: value}
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
90
|
func NewMongoColl(key string, schema interface{}) *MgoColl {
|
eb417b0b
zhangqijia
reactor mongo
|
91
|
return &MgoColl{
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
92
|
collection: MongoDatabase.Collection(utils.GetCollName(schema)),
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
93
94
|
pri: GetBsonM(utils.GetPriKey(schema), key),
schema: schema,
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
95
96
97
|
}
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
98
|
func FindOne(pri interface{}, schema interface{}) error {
|
f7f4beb5
zhangqijia
新增grpc平滑关闭,修复crea...
|
99
|
r := MongoDatabase.Collection(utils.GetCollName(schema)).FindOne(context.TODO(), pri)
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
100
|
return r.Decode(schema)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
101
|
}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
102
|
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
103
|
// 查询单个
|
eb417b0b
zhangqijia
reactor mongo
|
104
|
func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
105
106
|
//collection.
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
107
|
singleResult := m.collection.FindOne(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
108
109
110
|
return singleResult
}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
111
|
//查询集合里有多少数据
|
eb417b0b
zhangqijia
reactor mongo
|
112
|
func (m *MgoColl) CollectionCount() (string, int64) {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
113
114
|
size, _ := m.collection.EstimatedDocumentCount(context.TODO())
return m.collection.Name(), size
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
115
116
117
|
}
//按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
|
eb417b0b
zhangqijia
reactor mongo
|
118
|
func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
119
120
121
122
123
124
|
SORT := bson.D{
{"_id", sort}} //filter := bson.D{ {key,value}}
filter := bson.D{
{}}
findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
//findOptions.SetLimit(i)
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
125
|
temp, _ := m.collection.Find(context.Background(), filter, findOptions)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
126
127
128
129
|
return temp
}
//获取集合创建时间和编号
|
eb417b0b
zhangqijia
reactor mongo
|
130
|
func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
131
132
133
134
135
136
137
138
139
|
temp1 := result[:8]
timestamp, _ := strconv.ParseInt(temp1, 16, 64)
dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
temp2 := result[18:]
count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
return dateTime, count
}
//删除文章和查询文章
|
eb417b0b
zhangqijia
reactor mongo
|
140
|
func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
141
142
|
filter := bson.D{
{key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
143
144
|
singleResult := m.collection.FindOne(context.TODO(), filter)
DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
145
146
147
148
149
150
151
|
if err != nil {
fmt.Println("删除时出现错误,你删不掉的~")
}
return DeleteResult.DeletedCount, singleResult
}
//删除文章
|
eb417b0b
zhangqijia
reactor mongo
|
152
|
func (m *MgoColl) Delete(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
153
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
154
|
count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
155
156
157
158
159
160
161
162
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//删除多个
|
eb417b0b
zhangqijia
reactor mongo
|
163
|
func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
164
165
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
166
|
count, err := m.collection.DeleteMany(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
167
168
169
170
171
172
173
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//索引
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
174
175
|
func (m *MgoColl) SetUnique(key string){
m.collection.Indexes().CreateOne(
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
176
177
178
179
180
181
182
183
184
|
context.Background(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
//更新&保存
|
eb417b0b
zhangqijia
reactor mongo
|
185
|
func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
186
187
188
|
//filter := bson.M{"name": "x", "array.name": "b"}
//update := bson.M{"array.$[item].detail": "test"}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
189
|
res := m.collection.FindOneAndUpdate(context.Background(),
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
190
191
192
193
194
195
196
197
|
filter,
bson.M{"$set": update})
if res.Err() != nil {
return nil
}
return res
}
|
eb417b0b
zhangqijia
reactor mongo
|
198
|
func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
199
|
res, err := m.collection.UpdateOne(context.TODO(), filter, update)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
200
201
202
203
204
|
if err != nil {
return nil
}
return res
|
eb417b0b
zhangqijia
reactor mongo
|
205
206
207
208
|
}
func (m *MgoColl) Load() error{
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
209
|
r := m.collection.FindOne(context.TODO(), m.pri)
|
eb417b0b
zhangqijia
reactor mongo
|
210
211
212
213
214
215
216
|
err := r.Decode(m.schema)
if err != nil {
return err
}
return nil
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
217
218
|
func (m *MgoColl) Create() (*mongo.InsertOneResult, error){
return m.collection.InsertOne(context.TODO(), m.schema)
|
eb417b0b
zhangqijia
reactor mongo
|
219
220
221
222
223
224
|
}
func (m *MgoColl) Update(update interface{}) {
m.FindOneAndUpdate(m.pri, update)
}
|
98b0736d
zhangqijia
添加定时器, 检查心跳
|
225
226
227
228
|
func (m *MgoColl) UpdateProperty(key string, val interface{}) {
m.FindOneAndUpdate(m.pri, bson.M{strings.ToLower(key): val})
}
|
eb417b0b
zhangqijia
reactor mongo
|
229
230
|
func (m *MgoColl)Save() {
m.FindOneAndUpdate(m.pri, m.schema)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
231
|
}
|