ee23102d
zhangqijia
支持mongo, grpc接服务器
|
1
2
3
4
5
6
7
8
9
10
|
package db
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
11
|
"pro2d/src/utils"
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
12
|
"sort"
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
13
|
"strconv"
|
98b0736d
zhangqijia
添加定时器, 检查心跳
|
14
|
"strings"
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
15
16
|
"time"
)
|
eb417b0b
zhangqijia
reactor mongo
|
17
|
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
18
19
20
21
|
var (
MongoClient *mongo.Client
MongoDatabase *mongo.Database
)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
22
23
|
//初始化
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
24
|
func Connect(user, password, host string,port int, MaxNum int, timeOut int) error {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
25
|
var uri string
|
eb417b0b
zhangqijia
reactor mongo
|
26
27
28
|
if user!= "" {
//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
29
|
}else {
|
eb417b0b
zhangqijia
reactor mongo
|
30
31
|
//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
32
33
|
}
// 设置连接超时时间
|
eb417b0b
zhangqijia
reactor mongo
|
34
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
35
36
37
38
|
defer cancel()
// 通过传进来的uri连接相关的配置
o := options.Client().ApplyURI(uri)
// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
|
eb417b0b
zhangqijia
reactor mongo
|
39
|
o.SetMaxPoolSize(uint64(MaxNum))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
40
|
// 发起链接
|
eb417b0b
zhangqijia
reactor mongo
|
41
|
var err error
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
42
|
MongoClient, err = mongo.Connect(ctx, o)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
43
|
if err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
44
45
46
|
return err
}
// 判断服务是不是可用
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
47
|
if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
48
49
|
return err
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
50
|
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
51
|
//MongoDatabase = MongoClient.Database(dbname)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
52
53
54
|
return nil
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
|
func CreateCollection(collection string) error {
colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
pos := sort.SearchStrings(colls, collection)
if pos != len(colls) {
if collection == colls[pos] {
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
}
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
func SetUnique(collection string, key string) (string, error) {
return MongoDatabase.Collection(collection).Indexes().CreateOne(
context.TODO(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
|
eb417b0b
zhangqijia
reactor mongo
|
76
|
type MgoColl struct {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
77
|
collection *mongo.Collection
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
78
|
|
33ea26ab
zhangqijia
使用schema封装mongo
|
79
|
schema *Schema
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
80
81
82
83
84
85
86
87
88
|
}
func GetBsonD(key string, value interface{}) interface{} {
return bson.D{ {key, value}}
}
func GetBsonM(key string, value interface{}) interface{} {
return bson.M{key: value}
}
|
33ea26ab
zhangqijia
使用schema封装mongo
|
89
|
func NewMongoColl(schema *Schema) *MgoColl {
|
eb417b0b
zhangqijia
reactor mongo
|
90
|
return &MgoColl{
|
33ea26ab
zhangqijia
使用schema封装mongo
|
91
|
collection: MongoDatabase.Collection(schema.GetCollName()),
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
92
|
schema: schema,
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
93
94
95
|
}
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
96
|
func FindOne(pri interface{}, schema interface{}) error {
|
f7f4beb5
zhangqijia
新增grpc平滑关闭,修复crea...
|
97
|
r := MongoDatabase.Collection(utils.GetCollName(schema)).FindOne(context.TODO(), pri)
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
98
|
return r.Decode(schema)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
99
|
}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
100
|
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
101
|
// 查询单个
|
eb417b0b
zhangqijia
reactor mongo
|
102
|
func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
103
104
|
//collection.
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
105
|
singleResult := m.collection.FindOne(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
106
107
108
|
return singleResult
}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
109
|
//查询集合里有多少数据
|
eb417b0b
zhangqijia
reactor mongo
|
110
|
func (m *MgoColl) CollectionCount() (string, int64) {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
111
112
|
size, _ := m.collection.EstimatedDocumentCount(context.TODO())
return m.collection.Name(), size
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
113
114
115
|
}
//按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
|
eb417b0b
zhangqijia
reactor mongo
|
116
|
func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
117
118
119
120
121
122
|
SORT := bson.D{
{"_id", sort}} //filter := bson.D{ {key,value}}
filter := bson.D{
{}}
findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
//findOptions.SetLimit(i)
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
123
|
temp, _ := m.collection.Find(context.Background(), filter, findOptions)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
124
125
126
127
|
return temp
}
//获取集合创建时间和编号
|
eb417b0b
zhangqijia
reactor mongo
|
128
|
func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
129
130
131
132
133
134
135
136
137
|
temp1 := result[:8]
timestamp, _ := strconv.ParseInt(temp1, 16, 64)
dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
temp2 := result[18:]
count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
return dateTime, count
}
//删除文章和查询文章
|
eb417b0b
zhangqijia
reactor mongo
|
138
|
func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
139
140
|
filter := bson.D{
{key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
141
142
|
singleResult := m.collection.FindOne(context.TODO(), filter)
DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
143
144
145
146
147
148
149
|
if err != nil {
fmt.Println("删除时出现错误,你删不掉的~")
}
return DeleteResult.DeletedCount, singleResult
}
//删除文章
|
eb417b0b
zhangqijia
reactor mongo
|
150
|
func (m *MgoColl) Delete(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
151
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
152
|
count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
153
154
155
156
157
158
159
160
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//删除多个
|
eb417b0b
zhangqijia
reactor mongo
|
161
|
func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
162
163
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
164
|
count, err := m.collection.DeleteMany(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
165
166
167
168
169
170
171
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//索引
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
172
173
|
func (m *MgoColl) SetUnique(key string){
m.collection.Indexes().CreateOne(
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
174
175
176
177
178
179
180
181
182
|
context.Background(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
//更新&保存
|
eb417b0b
zhangqijia
reactor mongo
|
183
|
func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
184
185
186
|
//filter := bson.M{"name": "x", "array.name": "b"}
//update := bson.M{"array.$[item].detail": "test"}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
187
|
res := m.collection.FindOneAndUpdate(context.Background(),
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
188
189
190
191
192
193
194
195
|
filter,
bson.M{"$set": update})
if res.Err() != nil {
return nil
}
return res
}
|
eb417b0b
zhangqijia
reactor mongo
|
196
|
func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
|
33ea26ab
zhangqijia
使用schema封装mongo
|
197
|
res, err := m.collection.UpdateOne(context.TODO(), filter, bson.D{{"$set", update}})
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
198
199
200
201
202
|
if err != nil {
return nil
}
return res
|
eb417b0b
zhangqijia
reactor mongo
|
203
204
|
}
|
eb417b0b
zhangqijia
reactor mongo
|
205
|
func (m *MgoColl) Load() error{
|
33ea26ab
zhangqijia
使用schema封装mongo
|
206
207
|
r := m.collection.FindOne(context.TODO(), m.schema.GetPri())
err := r.Decode(m.schema.GetSchema())
|
eb417b0b
zhangqijia
reactor mongo
|
208
209
210
211
212
213
|
if err != nil {
return err
}
return nil
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
214
|
func (m *MgoColl) Create() (*mongo.InsertOneResult, error){
|
33ea26ab
zhangqijia
使用schema封装mongo
|
215
|
return m.collection.InsertOne(context.TODO(), m.schema.GetSchema())
|
eb417b0b
zhangqijia
reactor mongo
|
216
217
|
}
|
98b0736d
zhangqijia
添加定时器, 检查心跳
|
218
|
func (m *MgoColl) UpdateProperty(key string, val interface{}) {
|
33ea26ab
zhangqijia
使用schema封装mongo
|
219
|
m.UpdateOne(m.schema.GetPri(), bson.M{strings.ToLower(key): val})
|
98b0736d
zhangqijia
添加定时器, 检查心跳
|
220
221
|
}
|
33ea26ab
zhangqijia
使用schema封装mongo
|
222
223
224
|
func (m *MgoColl) UpdateProperties(properties map[string]interface{}) {
m.UpdateOne(m.schema.GetPri(), properties)
}
|