Blame view

src/components/db/mongo.go 6.1 KB
ee23102d   zhangqijia   支持mongo, grpc接服务器
1
2
3
4
5
6
7
8
9
10
  package db
  
  import (
  	"context"
  	"fmt"
  	"go.mongodb.org/mongo-driver/bson"
  	"go.mongodb.org/mongo-driver/mongo"
  	"go.mongodb.org/mongo-driver/mongo/options"
  	"go.mongodb.org/mongo-driver/mongo/readpref"
  	"go.mongodb.org/mongo-driver/x/bsonx"
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
11
  	"pro2d/src/utils"
3592dfd3   zhangqijia   重构models, 索引唯一索引
12
  	"sort"
ee23102d   zhangqijia   支持mongo, grpc接服务器
13
14
15
  	"strconv"
  	"time"
  )
eb417b0b   zhangqijia   reactor mongo
16
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
17
18
19
20
  var (
  	MongoClient *mongo.Client
  	MongoDatabase *mongo.Database
  )
ee23102d   zhangqijia   支持mongo, grpc接服务器
21
22
  
  //初始化
1584eb4b   zhangqijia   修复创建唯一索引的bug
23
  func Connect(user, password, host string,port int, MaxNum int, timeOut int) error {
ee23102d   zhangqijia   支持mongo, grpc接服务器
24
  	var uri string
eb417b0b   zhangqijia   reactor mongo
25
26
27
  	if user!= "" {
  		//uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/%s?w=majority", conf.User, conf.Password, conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%s@%s:%d/?w=majority", user, password, host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
28
  	}else {
eb417b0b   zhangqijia   reactor mongo
29
30
  		//uri = fmt.Sprintf("mongodb://%s:%d/%s?w=majority", conf.Host, conf.Port, conf.DBName)
  		uri = fmt.Sprintf("mongodb://%s:%d/?w=majority", host, port)
ee23102d   zhangqijia   支持mongo, grpc接服务器
31
32
  	}
  	// 设置连接超时时间
eb417b0b   zhangqijia   reactor mongo
33
  	ctx, cancel := context.WithTimeout(context.Background(), time.Duration(timeOut))
ee23102d   zhangqijia   支持mongo, grpc接服务器
34
35
36
37
  	defer cancel()
  	// 通过传进来的uri连接相关的配置
  	o := options.Client().ApplyURI(uri)
  	// 设置最大连接数 - 默认是100 ,不设置就是最大 max 64
eb417b0b   zhangqijia   reactor mongo
38
  	o.SetMaxPoolSize(uint64(MaxNum))
ee23102d   zhangqijia   支持mongo, grpc接服务器
39
  	// 发起链接
eb417b0b   zhangqijia   reactor mongo
40
  	var err error
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
41
  	MongoClient, err = mongo.Connect(ctx, o)
ee23102d   zhangqijia   支持mongo, grpc接服务器
42
  	if err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
43
44
45
  		return err
  	}
  	// 判断服务是不是可用
3592dfd3   zhangqijia   重构models, 索引唯一索引
46
  	if err = MongoClient.Ping(context.Background(), readpref.Primary()); err != nil {
ee23102d   zhangqijia   支持mongo, grpc接服务器
47
48
  		return err
  	}
3592dfd3   zhangqijia   重构models, 索引唯一索引
49
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
50
  	//MongoDatabase = MongoClient.Database(dbname)
ee23102d   zhangqijia   支持mongo, grpc接服务器
51
52
53
  	return nil
  }
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
  func CreateCollection(collection string) error {
  	colls, _ := MongoDatabase.ListCollectionNames(context.TODO(), bson.D{})
  	pos := sort.SearchStrings(colls, collection)
  	if pos != len(colls) {
  		if collection == colls[pos] {
  			return MongoDatabase.CreateCollection(context.TODO(), collection)
  		}
  	}
  	return MongoDatabase.CreateCollection(context.TODO(), collection)
  }
  
  func SetUnique(collection string, key string) (string, error)  {
  	return MongoDatabase.Collection(collection).Indexes().CreateOne(
  		context.TODO(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
eb417b0b   zhangqijia   reactor mongo
75
  type MgoColl struct {
3592dfd3   zhangqijia   重构models, 索引唯一索引
76
  	collection *mongo.Collection
ee23102d   zhangqijia   支持mongo, grpc接服务器
77
  
eb417b0b   zhangqijia   reactor mongo
78
79
  	pri interface{}
  	schema interface{}
ee23102d   zhangqijia   支持mongo, grpc接服务器
80
81
82
83
84
85
86
87
88
  }
  
  func GetBsonD(key string, value interface{}) interface{} {
  	return bson.D{ {key, value}}
  }
  func GetBsonM(key string, value interface{}) interface{}  {
  	return bson.M{key: value}
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
89
  func NewMongoColl(key string, schema interface{}) *MgoColl {
eb417b0b   zhangqijia   reactor mongo
90
  	return &MgoColl{
1584eb4b   zhangqijia   修复创建唯一索引的bug
91
  		collection: MongoDatabase.Collection(utils.GetCollName(schema)),
cad2b7f3   zhangqijia   reactor: 重构目录, 重构...
92
93
  		pri:        GetBsonM(utils.GetPriKey(schema), key),
  		schema:     schema,
ee23102d   zhangqijia   支持mongo, grpc接服务器
94
95
96
  	}
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
97
  func FindOne(pri interface{}, schema interface{}) error {
f7f4beb5   zhangqijia   新增grpc平滑关闭,修复crea...
98
  	r := MongoDatabase.Collection(utils.GetCollName(schema)).FindOne(context.TODO(), pri)
1584eb4b   zhangqijia   修复创建唯一索引的bug
99
  	return r.Decode(schema)
ee23102d   zhangqijia   支持mongo, grpc接服务器
100
  }
ee23102d   zhangqijia   支持mongo, grpc接服务器
101
  
ee23102d   zhangqijia   支持mongo, grpc接服务器
102
  // 查询单个
eb417b0b   zhangqijia   reactor mongo
103
  func (m *MgoColl) FindOneKV(key string, value interface{}) *mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
104
105
  	//collection.
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
106
  	singleResult := m.collection.FindOne(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
107
108
109
  	return singleResult
  }
  
ee23102d   zhangqijia   支持mongo, grpc接服务器
110
  //查询集合里有多少数据
eb417b0b   zhangqijia   reactor mongo
111
  func (m *MgoColl) CollectionCount() (string, int64) {
3592dfd3   zhangqijia   重构models, 索引唯一索引
112
113
  	size, _ := m.collection.EstimatedDocumentCount(context.TODO())
  	return m.collection.Name(), size
ee23102d   zhangqijia   支持mongo, grpc接服务器
114
115
116
  }
  
  //按选项查询集合 Skip 跳过 Limit 读取数量 sort 1 ,-1 . 1 为最初时间读取 , -1 为最新时间读取
eb417b0b   zhangqijia   reactor mongo
117
  func (m *MgoColl) CollectionDocuments(Skip, Limit int64, sort int) *mongo.Cursor {
ee23102d   zhangqijia   支持mongo, grpc接服务器
118
119
120
121
122
123
  	SORT := bson.D{
  		{"_id", sort}} //filter := bson.D{ {key,value}}
  	filter := bson.D{
  	{}}
  	findOptions := options.Find().SetSort(SORT).SetLimit(Limit).SetSkip(Skip)
  	//findOptions.SetLimit(i)
3592dfd3   zhangqijia   重构models, 索引唯一索引
124
  	temp, _ := m.collection.Find(context.Background(), filter, findOptions)
ee23102d   zhangqijia   支持mongo, grpc接服务器
125
126
127
128
  	return temp
  }
  
  //获取集合创建时间和编号
eb417b0b   zhangqijia   reactor mongo
129
  func (m *MgoColl) ParsingId(result string) (time.Time, uint64) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
130
131
132
133
134
135
136
137
138
  	temp1 := result[:8]
  	timestamp, _ := strconv.ParseInt(temp1, 16, 64)
  	dateTime := time.Unix(timestamp, 0) //这是截获情报时间 时间格式 2019-04-24 09:23:39 +0800 CST
  	temp2 := result[18:]
  	count, _ := strconv.ParseUint(temp2, 16, 64) //截获情报的编号
  	return dateTime, count
  }
  
  //删除文章和查询文章
eb417b0b   zhangqijia   reactor mongo
139
  func (m *MgoColl) DeleteAndFind(key string, value interface{}) (int64, *mongo.SingleResult) {
ee23102d   zhangqijia   支持mongo, grpc接服务器
140
141
  	filter := bson.D{
  		{key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
142
143
  	singleResult := m.collection.FindOne(context.TODO(), filter)
  	DeleteResult, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
144
145
146
147
148
149
150
  	if err != nil {
  		fmt.Println("删除时出现错误,你删不掉的~")
  	}
  	return DeleteResult.DeletedCount, singleResult
  }
  
  //删除文章
eb417b0b   zhangqijia   reactor mongo
151
  func (m *MgoColl) Delete(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
152
  	filter := bson.D{ {key, value}}
3592dfd3   zhangqijia   重构models, 索引唯一索引
153
  	count, err := m.collection.DeleteOne(context.TODO(), filter, nil)
ee23102d   zhangqijia   支持mongo, grpc接服务器
154
155
156
157
158
159
160
161
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  
  }
  
  //删除多个
eb417b0b   zhangqijia   reactor mongo
162
  func (m *MgoColl) DeleteMany(key string, value interface{}) int64 {
ee23102d   zhangqijia   支持mongo, grpc接服务器
163
164
  	filter := bson.D{ {key, value}}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
165
  	count, err := m.collection.DeleteMany(context.TODO(), filter)
ee23102d   zhangqijia   支持mongo, grpc接服务器
166
167
168
169
170
171
172
  	if err != nil {
  		fmt.Println(err)
  	}
  	return count.DeletedCount
  }
  
  //索引
3592dfd3   zhangqijia   重构models, 索引唯一索引
173
174
  func (m *MgoColl) SetUnique(key string){
  	m.collection.Indexes().CreateOne(
ee23102d   zhangqijia   支持mongo, grpc接服务器
175
176
177
178
179
180
181
182
183
  		context.Background(),
  		mongo.IndexModel{
  			Keys   : bsonx.Doc{{key, bsonx.Int32(1)}},
  			Options: options.Index().SetUnique(true),
  		},
  	)
  }
  
  //更新&保存
eb417b0b   zhangqijia   reactor mongo
184
  func (m *MgoColl) FindOneAndUpdate(filter interface{}, update interface{})*mongo.SingleResult {
ee23102d   zhangqijia   支持mongo, grpc接服务器
185
186
187
  	//filter := bson.M{"name": "x", "array.name": "b"}
  	//update := bson.M{"array.$[item].detail": "test"}
  
3592dfd3   zhangqijia   重构models, 索引唯一索引
188
  	res := m.collection.FindOneAndUpdate(context.Background(),
ee23102d   zhangqijia   支持mongo, grpc接服务器
189
190
191
192
193
194
195
196
  		filter,
  		bson.M{"$set": update})
  	if res.Err() != nil {
  		return nil
  	}
  	return res
  }
  
eb417b0b   zhangqijia   reactor mongo
197
  func (m *MgoColl) UpdateOne(filter interface{}, update interface{})*mongo.UpdateResult {
3592dfd3   zhangqijia   重构models, 索引唯一索引
198
  	res, err := m.collection.UpdateOne(context.TODO(), filter, update)
ee23102d   zhangqijia   支持mongo, grpc接服务器
199
200
201
202
203
  	if err != nil {
  		return nil
  	}
  
  	return res
eb417b0b   zhangqijia   reactor mongo
204
205
206
207
  }
  
  
  func (m *MgoColl) Load() error{
1584eb4b   zhangqijia   修复创建唯一索引的bug
208
  	r := m.collection.FindOne(context.TODO(), m.pri)
eb417b0b   zhangqijia   reactor mongo
209
210
211
212
213
214
215
  	err := r.Decode(m.schema)
  	if err != nil {
  		return err
  	}
  	return nil
  }
  
1584eb4b   zhangqijia   修复创建唯一索引的bug
216
217
  func (m *MgoColl) Create() (*mongo.InsertOneResult, error){
  	return m.collection.InsertOne(context.TODO(), m.schema)
eb417b0b   zhangqijia   reactor mongo
218
219
220
221
222
223
224
225
  }
  
  func (m *MgoColl) Update(update interface{}) {
  	m.FindOneAndUpdate(m.pri, update)
  }
  
  func (m *MgoColl)Save() {
  	m.FindOneAndUpdate(m.pri, m.schema)
ee23102d   zhangqijia   支持mongo, grpc接服务器
226
  }