ee23102d
zhangqijia
支持mongo, grpc接服务器
|
1
2
3
4
5
6
7
8
9
10
|
package db
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
11
|
"pro2d/src/utils"
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
12
|
"sort"
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
13
14
15
|
"strconv"
"time"
)
|
eb417b0b
zhangqijia
reactor mongo
|
16
|
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
17
18
19
20
|
var (
MongoClient *mongo.Client
MongoDatabase *mongo.Database
)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
21
22
|
//初始化
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
23
|
func Connect(user, password, host string,port int, MaxNum int, timeOut int) error {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
24
|
var uri string
|
eb417b0b
zhangqijia
reactor mongo
|
25
26
27
|
if user!= "" {
//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
28
|
}else {
|
eb417b0b
zhangqijia
reactor mongo
|
29
30
|
//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
31
32
|
}
// 设置连接超时时间
|
eb417b0b
zhangqijia
reactor mongo
|
33
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
34
35
36
37
|
defer cancel()
// 通过传进来的uri连接相关的配置
o := options.Client().ApplyURI(uri)
// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
|
eb417b0b
zhangqijia
reactor mongo
|
38
|
o.SetMaxPoolSize(uint64(MaxNum))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
39
|
// 发起链接
|
eb417b0b
zhangqijia
reactor mongo
|
40
|
var err error
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
41
|
MongoClient, err = mongo.Connect(ctx, o)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
42
|
if err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
43
44
45
|
return err
}
// 判断服务是不是可用
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
46
|
if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
47
48
|
return err
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
49
|
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
50
|
//MongoDatabase = MongoClient.Database(dbname)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
51
52
53
|
return nil
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
|
func CreateCollection(collection string) error {
colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
pos := sort.SearchStrings(colls, collection)
if pos != len(colls) {
if collection == colls[pos] {
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
}
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
func SetUnique(collection string, key string) (string, error) {
return MongoDatabase.Collection(collection).Indexes().CreateOne(
context.TODO(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
|
eb417b0b
zhangqijia
reactor mongo
|
75
|
type MgoColl struct {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
76
|
collection *mongo.Collection
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
77
|
|
eb417b0b
zhangqijia
reactor mongo
|
78
79
|
pri interface{}
schema interface{}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
80
81
82
83
84
85
86
87
88
|
}
func GetBsonD(key string, value interface{}) interface{} {
return bson.D{ {key, value}}
}
func GetBsonM(key string, value interface{}) interface{} {
return bson.M{key: value}
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
89
|
func NewMongoColl(key string, schema interface{}) *MgoColl {
|
eb417b0b
zhangqijia
reactor mongo
|
90
|
return &MgoColl{
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
91
|
collection: MongoDatabase.Collection(utils.GetCollName(schema)),
|
cad2b7f3
zhangqijia
reactor: 重构目录, 重构...
|
92
93
|
pri: GetBsonM(utils.GetPriKey(schema), key),
schema: schema,
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
94
95
96
|
}
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
97
|
func FindOne(pri interface{}, schema interface{}) error {
|
f7f4beb5
zhangqijia
新增grpc平滑关闭,修复crea...
|
98
|
r := MongoDatabase.Collection(utils.GetCollName(schema)).FindOne(context.TODO(), pri)
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
99
|
return r.Decode(schema)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
100
|
}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
101
|
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
102
|
// 查询单个
|
eb417b0b
zhangqijia
reactor mongo
|
103
|
func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
104
105
|
//collection.
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
106
|
singleResult := m.collection.FindOne(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
107
108
109
|
return singleResult
}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
110
|
//查询集合里有多少数据
|
eb417b0b
zhangqijia
reactor mongo
|
111
|
func (m *MgoColl) CollectionCount() (string, int64) {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
112
113
|
size, _ := m.collection.EstimatedDocumentCount(context.TODO())
return m.collection.Name(), size
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
114
115
116
|
}
//按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
|
eb417b0b
zhangqijia
reactor mongo
|
117
|
func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
118
119
120
121
122
123
|
SORT := bson.D{
{"_id", sort}} //filter := bson.D{ {key,value}}
filter := bson.D{
{}}
findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
//findOptions.SetLimit(i)
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
124
|
temp, _ := m.collection.Find(context.Background(), filter, findOptions)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
125
126
127
128
|
return temp
}
//获取集合创建时间和编号
|
eb417b0b
zhangqijia
reactor mongo
|
129
|
func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
130
131
132
133
134
135
136
137
138
|
temp1 := result[:8]
timestamp, _ := strconv.ParseInt(temp1, 16, 64)
dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
temp2 := result[18:]
count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
return dateTime, count
}
//删除文章和查询文章
|
eb417b0b
zhangqijia
reactor mongo
|
139
|
func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
140
141
|
filter := bson.D{
{key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
142
143
|
singleResult := m.collection.FindOne(context.TODO(), filter)
DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
144
145
146
147
148
149
150
|
if err != nil {
fmt.Println("删除时出现错误,你删不掉的~")
}
return DeleteResult.DeletedCount, singleResult
}
//删除文章
|
eb417b0b
zhangqijia
reactor mongo
|
151
|
func (m *MgoColl) Delete(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
152
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
153
|
count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
154
155
156
157
158
159
160
161
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//删除多个
|
eb417b0b
zhangqijia
reactor mongo
|
162
|
func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
163
164
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
165
|
count, err := m.collection.DeleteMany(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
166
167
168
169
170
171
172
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//索引
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
173
174
|
func (m *MgoColl) SetUnique(key string){
m.collection.Indexes().CreateOne(
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
175
176
177
178
179
180
181
182
183
|
context.Background(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
//更新&保存
|
eb417b0b
zhangqijia
reactor mongo
|
184
|
func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
185
186
187
|
//filter := bson.M{"name": "x", "array.name": "b"}
//update := bson.M{"array.$[item].detail": "test"}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
188
|
res := m.collection.FindOneAndUpdate(context.Background(),
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
189
190
191
192
193
194
195
196
|
filter,
bson.M{"$set": update})
if res.Err() != nil {
return nil
}
return res
}
|
eb417b0b
zhangqijia
reactor mongo
|
197
|
func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
198
|
res, err := m.collection.UpdateOne(context.TODO(), filter, update)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
199
200
201
202
203
|
if err != nil {
return nil
}
return res
|
eb417b0b
zhangqijia
reactor mongo
|
204
205
206
207
|
}
func (m *MgoColl) Load() error{
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
208
|
r := m.collection.FindOne(context.TODO(), m.pri)
|
eb417b0b
zhangqijia
reactor mongo
|
209
210
211
212
213
214
215
|
err := r.Decode(m.schema)
if err != nil {
return err
}
return nil
}
|
1584eb4b
zhangqijia
修复创建唯一索引的bug
|
216
217
|
func (m *MgoColl) Create() (*mongo.InsertOneResult, error){
return m.collection.InsertOne(context.TODO(), m.schema)
|
eb417b0b
zhangqijia
reactor mongo
|
218
219
220
221
222
223
224
225
|
}
func (m *MgoColl) Update(update interface{}) {
m.FindOneAndUpdate(m.pri, update)
}
func (m *MgoColl)Save() {
m.FindOneAndUpdate(m.pri, m.schema)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
226
|
}
|