0%

Minio对象存储服务部署

Minio简介

MinIO 是一个 高性能、开源的对象存储系统,完全兼容 Amazon S3 API
它采用云原生架构设计,能够在本地服务器、私有云、公有云及混合云环境中部署,用于存储和管理海量非结构化数据。

MinIO对象(Object) 的形式存储数据,每个对象由:

  • 数据本身(Data
  • 元数据(Metadata
  • 唯一对象键(Object Key

组成,适合存储图片、音视频、日志文件、模型文件、备份数据等。

Minio部署

docker安装

卸载旧版本docker

1
sudo apt-get remove docker docker-engine docker.io

添加使用 HTTPS 传输的软件包以及CA证书

1
2
3
sudo apt-get update

sudo apt-get install apt-transport-https ca-certificates curl gnupg lsb-release

添加软件源的 GPG 密钥

1
curl -fsSL https://mirrors.aliyun.com/docker-ce/linux/ubuntu/gpg | sudo gpg --dearmor -o /usr/share/keyrings/docker-archive-keyring.gpg

sources.list 中添加 Docker 软件源

1
echo "deb [arch=amd64 signed-by=/usr/share/keyrings/docker-archive-keyring.gpg] https://mirrors.aliyun.com/docker-ce/linux/ubuntu $(lsb_release -cs) stable" | sudo tee /etc/apt/sources.list.d/docker.list > /dev/null

更新 apt 软件包缓存,并安装 docker-ce

1
2
3
sudo apt-get update

sudo apt-get install docker-ce docker-ce-cli containerd.io

查看docker状态

1
sudo systemctl status docker

image-20260130212203823

安装minio

拉取minio镜像

1
docker pull minio/minio:latest

image-20260130212850661

启动minio容器

1
2
3
4
5
6
7
8
docker run -d \
--name minio \
-p 9000:9000 \
-p 9001:9001 \
-e MINIO_ROOT_USER=minglog \
-e MINIO_ROOT_PASSWORD=minglog666 \
-v /data/minio:/data \
minio/minio server /data --console-address ":9001"

image-20260130212930260

访问网址

image-20260130214133699

通过启动命令中设置的用户名和密码即可访问

image-20260130214207416

使用Python调用Minio进行对象存储

简单使用

下载minio依赖

1
2
3
4
5
6
# 使用pip工具
pip install minio

# 使用uv工具
uv add minio
uv pip install minio

创建连接client

1
2
3
4
5
6
7
from minio import Minio

client = Minio(
"https://oss.minglog.cn",
access_key="minglog",
secret_key="minglog666",
)

上传文件:

1
client.fput_object(bucket_name, object_name, file_path)

下载文件:

1
client.fget_object(bucket_name, object_name, file_path)

如果想要获取文件URL访问地址,就需要设置桶为公开可读。通过以下函数,传入clientbucket_name

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
def set_bucket_policy(client, bucket_name):
"""设置桶策略"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": ["*"]},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"]
}
]
}
client.set_bucket_policy(bucket_name, json.dumps(policy))
logger.info(f"Bucket policy set for {bucket_name}")

此时改桶中的文件就可以通过以下方式访问到:

base_url/bucket_name/object_name

例如:https://ossapi.minglog.cn/test-bucket/test/img.png

进阶使用

以上是针对Minio的简单使用,更复杂的用户可以使用以下类实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
from minio import Minio, S3Error
import io
import json
import logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class MinioOSS:
def __init__(self, endpoint: str, access_key: str, secret_key: str, bucket_name: str, secure: bool = True):
self.client = Minio(
endpoint,
access_key,
secret_key,
secure=secure
)
self.base_url = f"{'https' if secure else 'http'}://{endpoint}/{bucket_name}/"
self.bucket_name = bucket_name
self.create_bucket()
self.set_bucket_policy()

################## 桶操作 ##################
def create_bucket(self):
if not self.client.bucket_exists(self.bucket_name):
self.client.make_bucket(self.bucket_name)
logger.info(f"Bucket {self.bucket_name} created")
else:
logger.info(f"Bucket {self.bucket_name} already exists")

################## 文件操作 ##################
def list_files(self):
"""
递归列出桶内所有文件
"""
files = self.client.list_objects(self.bucket_name, recursive=True)
for file in files:
logger.info(f'{"DIR" if file.is_dir else "FILE"} {file.object_name}')

def upload_file(self, object_name: str, file_path: str):
self.client.fput_object(self.bucket_name, object_name, file_path)
logger.info(f"File {object_name} uploaded to {self.bucket_name}")
return self.get_file_url(object_name)

def download_file(self, object_name: str, file_path: str):
if "http" in object_name:
object_name = object_name.replace(f"{self.base_url}/", "")
if self.exists(object_name):
self.client.fget_object(self.bucket_name, object_name, file_path)
logger.info(f"File {object_name} downloaded from {self.bucket_name}")
return file_path
else:
logger.error(f"File {object_name} not found in {self.bucket_name}")
return None

def delete_file(self, object_name: str):
if self.exists(object_name):
self.client.remove_object(self.bucket_name, object_name)
logger.info(f"File {object_name} deleted from {self.bucket_name}")
return True
else:
logger.error(f"File {object_name} not found in {self.bucket_name}")
return False

################## 二进制操作 ##################
def upload_bytes(
self,
object_name: str,
data: bytes,
content_type: str = "application/octet-stream",
) -> None:
"""上传 bytes(Create / Update)"""
self.client.put_object(
bucket_name=self.bucket_name,
object_name=object_name,
data=io.BytesIO(data),
length=len(data),
content_type=content_type,
)
logger.info(f"Bytes {object_name} uploaded to {self.bucket_name}")
return self.get_file_url(object_name)

def get_bytes(self, object_name: str) -> bytes:
"""读取对象为 bytes(Read)"""
if not self.exists(object_name):
logger.error(f"File {object_name} not found in {self.bucket_name}")
return b''
response = self.client.get_object(self.bucket_name, object_name)
try:
bytes_data = response.read()
finally:
response.close()
response.release_conn()
logger.info(f"Bytes {object_name} downloaded from {self.bucket_name}")
return bytes_data

def download_stream(self, object_name: str, chunk_size: int = 8192):
"""
流式下载对象(Read)
- 适合大文件,真正的流式处理,不会一次性加载到内存
- 返回一个生成器,每次 yield 一个数据块

Args:
object_name: 对象名称
chunk_size: 每次读取的块大小(字节),默认 8KB

Yields:
bytes: 数据块

Example:
# 方式1: 使用生成器(真正的流式,内存友好)
for chunk in oss.download_stream("large_file.zip"):
# 处理每个 chunk
process_chunk(chunk)

# 方式2: 转换为 BytesIO(需要随机访问时)
chunks = list(oss.download_stream("large_file.zip"))
stream = io.BytesIO(b''.join(chunks))
"""
response = self.client.get_object(self.bucket_name, object_name)
try:
while True:
chunk = response.read(chunk_size)
if not chunk:
break
yield chunk
finally:
response.close()
response.release_conn()
logger.info(f"Stream {object_name} downloaded from {self.bucket_name}")

def download_stream_to_bytesio(self, object_name: str, chunk_size: int = 8192) -> io.BytesIO:
"""
流式下载对象到 BytesIO(Read)
- 适合需要随机访问的场景
- 会将数据加载到内存(但分块读取)

Args:
object_name: 对象名称
chunk_size: 每次读取的块大小(字节),默认 8KB

Returns:
BytesIO: 可读的流对象,已重置到开头

Example:
stream = oss.download_stream_to_bytesio("file.zip")
data = stream.read() # 读取所有数据
stream.seek(0) # 重置到开头
chunk = stream.read(1024) # 读取 1KB
"""
stream = io.BytesIO()
for chunk in self.download_stream(object_name, chunk_size):
stream.write(chunk)
stream.seek(0)
return stream

def upload_stream(
self,
object_name: str,
stream,
content_type: str = "application/octet-stream",
part_size: int = 10 * 1024 * 1024,
) -> str:
"""
上传流(Create / Update)
- 适合大文件 / FastAPI UploadFile.file
- 支持 bytes, BytesIO, 文件对象等
"""
# 处理不同类型的输入
if isinstance(stream, bytes):
# 如果是 bytes,转换为 BytesIO
length = len(stream)
stream = io.BytesIO(stream)
elif not hasattr(stream, 'read'):
raise ValueError(f"Unsupported stream type: {type(stream)}")
else:
# 对于流对象,尝试获取长度
length = -1
try:
# 保存当前位置
current_pos = stream.tell()
# 移动到末尾获取总长度
stream.seek(0, io.SEEK_END)
length = stream.tell()
# 重置到开头
stream.seek(0, io.SEEK_SET)
except (AttributeError, io.UnsupportedOperation, OSError):
# 如果无法定位(如某些文件对象),尝试其他方法
try:
# 对于 BytesIO,可以使用 getvalue() 获取长度(但会读取到内存)
if isinstance(stream, io.BytesIO):
length = len(stream.getvalue())
stream.seek(0)
else:
# 对于其他不可定位的流,读取所有数据
data = stream.read()
if isinstance(data, bytes):
stream = io.BytesIO(data)
length = len(data)
else:
length = -1
except Exception:
length = -1

# 确保流在开头位置
if hasattr(stream, 'seek'):
try:
stream.seek(0)
except (AttributeError, io.UnsupportedOperation, OSError):
pass

self.client.put_object(
bucket_name=self.bucket_name,
object_name=object_name,
data=stream,
length=length,
part_size=part_size,
content_type=content_type,
)
logger.info(f"Stream {object_name} uploaded to {self.bucket_name} (length: {length if length != -1 else 'unknown'})")
return self.get_file_url(object_name)

################## 其他操作 ##################
def set_bucket_policy(self):
"""设置桶策略"""
policy = {
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {"AWS": ["*"]},
"Action": ["s3:GetObject"],
"Resource": [f"arn:aws:s3:::{self.bucket_name}/*"]
}
]
}
self.client.set_bucket_policy(self.bucket_name, json.dumps(policy))
logger.info(f"Bucket policy set for {self.bucket_name}")


def exists(self, object_name: str) -> bool:
"""对象是否存在"""
try:
self.client.stat_object(self.bucket_name, object_name)
return True
except S3Error:
return False

def get_file_url(self, object_name: str) -> str:
"""获取文件URL"""
return f"{self.base_url}/{object_name}"

def get_file_presigned_url(self, object_name: str) -> str:
"""获取文件预签名URL,url有效期7天"""
return self.client.presigned_get_object(self.bucket_name, object_name)

使用示例:

实例化minio_oss对象

1
2
3
4
5
6
7
8
9
minio_oss = MinioOSS(
endpoint="ossapi.minglog.cn",
access_key="minglog",
secret_key="minglog666",
bucket_name="test-bucket"
)
# 上传文件
file_url = minio_oss.upload_file("test/img.png", "img.png")
print(file_url)

image-20260131114328974

上传文件后会返回文件上传URL地址。

下载文件,可以使用GET请求直接下载文件,也可以使用download_file方法进行下载,支持相对地址也支持对URL地址进行下载:

1
2
# 将file_url下载到本地的img2.png文件
oss.download_file(file_url, "img2.png")

image-20260131120051285

-------------本文结束感谢您的阅读-------------