ee23102d
zhangqijia
支持mongo, grpc接服务器
|
1
2
3
4
5
6
7
8
9
10
|
package db
import (
"context"
"fmt"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
"go.mongodb.org/mongo-driver/mongo/readpref"
"go.mongodb.org/mongo-driver/x/bsonx"
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
11
|
"sort"
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
12
13
14
|
"strconv"
"time"
)
|
eb417b0b
zhangqijia
reactor mongo
|
15
|
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
16
17
18
19
|
var (
MongoClient *mongo.Client
MongoDatabase *mongo.Database
)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
20
21
|
//初始化
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
22
|
func Connect(user, password, host string,port int, MaxNum int, timeOut int, dbname string) error {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
23
|
var uri string
|
eb417b0b
zhangqijia
reactor mongo
|
24
25
26
|
if user!= "" {
//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
27
|
}else {
|
eb417b0b
zhangqijia
reactor mongo
|
28
29
|
//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
30
31
|
}
// 设置连接超时时间
|
eb417b0b
zhangqijia
reactor mongo
|
32
|
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
33
34
35
36
|
defer cancel()
// 通过传进来的uri连接相关的配置
o := options.Client().ApplyURI(uri)
// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
|
eb417b0b
zhangqijia
reactor mongo
|
37
|
o.SetMaxPoolSize(uint64(MaxNum))
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
38
|
// 发起链接
|
eb417b0b
zhangqijia
reactor mongo
|
39
|
var err error
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
40
|
MongoClient , err = mongo.Connect(ctx, o)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
41
|
if err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
42
43
44
|
return err
}
// 判断服务是不是可用
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
45
|
if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
46
47
|
return err
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
48
49
|
MongoDatabase = MongoClient.Database(dbname)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
50
51
52
|
return nil
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
|
func CreateCollection(collection string) error {
colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
pos := sort.SearchStrings(colls, collection)
if pos != len(colls) {
if collection == colls[pos] {
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
}
return MongoDatabase.CreateCollection(context.TODO(), collection)
}
func SetUnique(collection string, key string) (string, error) {
return MongoDatabase.Collection(collection).Indexes().CreateOne(
context.TODO(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
|
eb417b0b
zhangqijia
reactor mongo
|
74
|
type MgoColl struct {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
75
|
collection *mongo.Collection
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
76
|
|
eb417b0b
zhangqijia
reactor mongo
|
77
78
|
pri interface{}
schema interface{}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
79
80
81
82
83
84
85
86
87
|
}
func GetBsonD(key string, value interface{}) interface{} {
return bson.D{ {key, value}}
}
func GetBsonM(key string, value interface{}) interface{} {
return bson.M{key: value}
}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
88
|
func NewMongoColl(collection string, pri, schema interface{}) *MgoColl {
|
eb417b0b
zhangqijia
reactor mongo
|
89
|
return &MgoColl{
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
90
|
collection: MongoDatabase.Collection(collection),
|
eb417b0b
zhangqijia
reactor mongo
|
91
92
93
|
pri: pri,
schema: schema,
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
94
95
96
|
}
}
|
eb417b0b
zhangqijia
reactor mongo
|
97
|
func (m *MgoColl)SetDatabase(databases string) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
98
99
|
//m.db = databases
}
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
100
|
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
101
|
// 查询单个
|
eb417b0b
zhangqijia
reactor mongo
|
102
|
func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
103
104
|
//collection.
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
105
|
singleResult := m.collection.FindOne(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
106
107
108
|
return singleResult
}
|
eb417b0b
zhangqijia
reactor mongo
|
109
|
func (m *MgoColl) FindOne(pri interface{}) *mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
110
|
//collection.
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
111
|
singleResult := m.collection.FindOne(context.TODO(), pri)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
112
113
114
115
|
return singleResult
}
//插入单个
|
eb417b0b
zhangqijia
reactor mongo
|
116
|
func (m *MgoColl) InsertOne(value interface{}) *mongo.InsertOneResult {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
117
|
insertResult, err := m.collection.InsertOne(context.TODO(), value)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
118
119
120
121
122
123
124
|
if err != nil {
fmt.Println(err)
}
return insertResult
}
//查询集合里有多少数据
|
eb417b0b
zhangqijia
reactor mongo
|
125
|
func (m *MgoColl) CollectionCount() (string, int64) {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
126
127
|
size, _ := m.collection.EstimatedDocumentCount(context.TODO())
return m.collection.Name(), size
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
128
129
130
|
}
//按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
|
eb417b0b
zhangqijia
reactor mongo
|
131
|
func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
132
133
134
135
136
137
|
SORT := bson.D{
{"_id", sort}} //filter := bson.D{ {key,value}}
filter := bson.D{
{}}
findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
//findOptions.SetLimit(i)
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
138
|
temp, _ := m.collection.Find(context.Background(), filter, findOptions)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
139
140
141
142
|
return temp
}
//获取集合创建时间和编号
|
eb417b0b
zhangqijia
reactor mongo
|
143
|
func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
144
145
146
147
148
149
150
151
152
|
temp1 := result[:8]
timestamp, _ := strconv.ParseInt(temp1, 16, 64)
dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
temp2 := result[18:]
count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
return dateTime, count
}
//删除文章和查询文章
|
eb417b0b
zhangqijia
reactor mongo
|
153
|
func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
154
155
|
filter := bson.D{
{key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
156
157
|
singleResult := m.collection.FindOne(context.TODO(), filter)
DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
158
159
160
161
162
163
164
|
if err != nil {
fmt.Println("删除时出现错误,你删不掉的~")
}
return DeleteResult.DeletedCount, singleResult
}
//删除文章
|
eb417b0b
zhangqijia
reactor mongo
|
165
|
func (m *MgoColl) Delete(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
166
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
167
|
count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
168
169
170
171
172
173
174
175
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//删除多个
|
eb417b0b
zhangqijia
reactor mongo
|
176
|
func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
177
178
|
filter := bson.D{ {key, value}}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
179
|
count, err := m.collection.DeleteMany(context.TODO(), filter)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
180
181
182
183
184
185
186
|
if err != nil {
fmt.Println(err)
}
return count.DeletedCount
}
//索引
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
187
188
|
func (m *MgoColl) SetUnique(key string){
m.collection.Indexes().CreateOne(
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
189
190
191
192
193
194
195
196
197
|
context.Background(),
mongo.IndexModel{
Keys : bsonx.Doc{{key, bsonx.Int32(1)}},
Options: options.Index().SetUnique(true),
},
)
}
//更新&保存
|
eb417b0b
zhangqijia
reactor mongo
|
198
|
func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
199
200
201
|
//filter := bson.M{"name": "x", "array.name": "b"}
//update := bson.M{"array.$[item].detail": "test"}
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
202
|
res := m.collection.FindOneAndUpdate(context.Background(),
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
203
204
205
206
207
208
209
210
|
filter,
bson.M{"$set": update})
if res.Err() != nil {
return nil
}
return res
}
|
eb417b0b
zhangqijia
reactor mongo
|
211
|
func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
|
3592dfd3
zhangqijia
重构models, 索引唯一索引
|
212
|
res, err := m.collection.UpdateOne(context.TODO(), filter, update)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
213
214
215
216
217
|
if err != nil {
return nil
}
return res
|
eb417b0b
zhangqijia
reactor mongo
|
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
|
}
func (m *MgoColl) Load() error{
r := m.FindOne(m.pri)
err := r.Decode(m.schema)
if err != nil {
return err
}
return nil
}
func (m *MgoColl) Create() {
m.InsertOne(m.schema)
}
func (m *MgoColl) Update(update interface{}) {
m.FindOneAndUpdate(m.pri, update)
}
func (m *MgoColl)Save() {
m.FindOneAndUpdate(m.pri, m.schema)
|
ee23102d
zhangqijia
支持mongo, grpc接服务器
|
240
|
}
|