Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
AdeleDev committed Jan 29, 2025
2 parents fd76c44 + 30af70f commit a9cceb8
Show file tree
Hide file tree
Showing 22 changed files with 1,087 additions and 0 deletions.
54 changes: 54 additions & 0 deletions module_2/faust_python/project/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# Упрощённый сервис обмена сообщениями

## Описание
Этот проект реализует потоковую обработку сообщений с функциональностью:
- Блокировка пользователей: Сообщения от заблокированных пользователей фильтруются.
- Цензура сообщений: Сообщения фильтруются на наличие запрещенных слов.

## Инфраструктура
Для работы используются:
- Kafka: Для хранения и обработки сообщений.
- Faust: Для потоковой обработки данных.

## Инструкция по запуску
1. Разверните инфраструктуру с помощью Docker Compose:
```bash
docker-compose up -d
```

2. Создайте необходимые топики:
```bash
docker exec -it kafka-1 kafka-topics --create --topic messages --bootstrap-server kafka-1:9092 --partitions 1 --replication-factor 1
docker exec -it kafka-1 kafka-topics --create --topic filtered_messages --bootstrap-server kafka-1:9092 --partitions 1 --replication-factor 1
docker exec -it kafka-1 kafka-topics --create --topic blocked_users --bootstrap-server kafka-1:9092 --partitions 1 --replication-factor 1
```
## Тестирование
1. Запустите приложение
```bash
cd src
faust -A message_processor worker -l INFO
```

2. Установите список заблокированных пользователей
```bash
docker exec -it kafka-1 kafka-console-producer --broker-list kafka-1:9092 --topic blocked_users --property "parse.key=true" --property "key.separator=:"
```
Пример данных:
```text
user1: user3
user2: user1,user3
```
3. Отправьте тестовые данные в топик messages
```bash
docker exec -it kafka-1 kafka-console-producer --broker-list kafka-1:9092 --topic messages --property "parse.key=true" --property "key.separator=:"
```
Пример данных:
```text
user1->user2: Hello
user2->user1: 123 bad
```

4. Проверьте топик filtered_messages
```bash
docker exec -it kafka-1 kafka-console-consumer --bootstrap-server kafka-1:9092 --topic filtered_messages --from-beginning
```
42 changes: 42 additions & 0 deletions module_2/faust_python/project/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
version: '3.8'
services:
zookeeper:
ports:
- 22181:2181
container_name: zookeeper
image: confluentinc/cp-zookeeper:7.4.4
environment:
ZOOKEEPER_CLIENT_PORT: 2181
networks:
- kafka-network
kafka-1:
image: confluentinc/cp-kafka:7.4.4
container_name: kafka-1
ports:
- 29092:29092
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
networks:
- kafka-network
# kafka-ui:
# image: provectuslabs/kafka-ui:latest
# container_name: ui
# ports:
# - 8080:8080
# depends_on:
# - kafka-1
# environment:
# KAFKA_CLUSTERS_0_NAME: local
# KAFKA_CLUSTERS_0_BOOTSTRAP_SERVERS: kafka:29092
# networks:
# - kafka-network
networks:
kafka-network:
driver: bridge
86 changes: 86 additions & 0 deletions module_2/faust_python/project/src/message_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import logging.config

import faust

from settings.logging import LOGGING_CONFIG

logging.config.dictConfig(LOGGING_CONFIG)
logger = logging.getLogger("message_processor")

app = faust.App(
"message-processor-app",
broker="localhost:29092",
value_serializer="raw",
store="rocksdb://",
)


def load_banned_words(file_path: str) -> list[str]:
"""
Чтение списка запрещенных слов.
:param file_path: Путь к файлу.
:return: Список запрещенных слов.
"""
result = list()

try:
with open(file_path, "r", encoding="utf-8") as file:
for line in file.readlines():
result.append(line.strip())

logger.info(f"Loaded banned words: {result}")
except Exception as exp:
logger.error(f"Error loading banned words from file: {file_path}. {exp}")

return result


# Таблица заблокированных пользователей
blocked_users = app.Table(
"blocked_users",
partitions=1, # Количество партиций
default=None, # Функция или тип для пропущенных ключей
)
# Топик входящих сообщений
messages_topic = app.topic("messages", key_type=str, value_type=str)
# Топик отфильтрованных сообщений
filtered_messages_topic = app.topic("filtered_messages", key_type=str, value_type=str)


@app.agent(messages_topic)
async def process_messages(stream):
"""
Агент для фильтрации сообщений.
"""
banned_words = load_banned_words("resources/banned_words.txt")

async for direction, message in stream.items():
logger.info(f"New message: {direction=}, {message=}")
if not message:
# Исключаем пустые сообщения
continue

sender, receiver = direction.split("->")
logger.info(f"{sender=}, {receiver=}, {message=}")

# Проверка списка заблокированных пользователей
blocked_list = blocked_users.get(receiver)
if blocked_list:
logger.info(f"Check that {sender} is not in {blocked_list}")
if sender in blocked_list:
# Если получатель заблокировал отправителя, то игнорируем сообщение
logger.info("This message will be ignored")
continue

# Замена запрещенных слов на звездочки
for banned_word in banned_words:
message = message.replace(banned_word, "***")

# Отправка отфильтрованных сообщений в `filtered_messages`
logger.info(f"Filtered key={direction}, {message=}")
await filtered_messages_topic.send(key=direction, value=message)


if __name__ == '__main__':
app.main()
30 changes: 30 additions & 0 deletions module_2/faust_python/project/src/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
aiohappyeyeballs==2.4.4
aiohttp==3.11.10
aiohttp-cors==0.7.0
aiokafka==0.12.0
aiosignal==1.3.1
async-timeout==5.0.1
attrs==24.2.0
click==8.1.7
colorlog==6.9.0
croniter==2.0.7
faust-streaming==0.11.3
faust-streaming-rocksdb==0.9.3
frozenlist==1.5.0
idna==3.10
intervaltree==3.1.0
mode-streaming==0.4.1
multidict==6.1.0
mypy-extensions==1.0.0
opentracing==2.4.0
packaging==24.2
propcache==0.2.1
python-dateutil==2.9.0.post0
pytz==2024.2
rocksdict==0.3.24
six==1.17.0
sortedcontainers==2.4.0
terminaltables==3.1.10
typing_extensions==4.12.2
venusian==3.1.0
yarl==1.18.3
2 changes: 2 additions & 0 deletions module_2/faust_python/project/src/resources/banned_words.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bad
restrict
File renamed without changes.
22 changes: 22 additions & 0 deletions module_2/faust_python/project/src/settings/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
LOGGING_CONFIG = {
"version": 1,
"disable_existing_loggers": False,
"formatters": {
"standard": {
"format": "%(asctime)s [%(threadName)s] %(levelname)-5s %(name)s - %(message)s",
"datefmt": "%Y-%m-%d %H:%M:%S",
},
},
"handlers": {
"console": {
"class": "logging.StreamHandler",
"formatter": "standard",
"level": "INFO",
"stream": "ext://sys.stdout",
},
},
"root": {
"handlers": ["console"],
"level": "INFO",
},
}
74 changes: 74 additions & 0 deletions module_2/goka_golang/project/blocker/blocker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package blocker

import (
"context"
"encoding/json"
"log"

"github.com/lovoo/goka"
)

var (
Group goka.Group = "blocker"
)

type BlockEvent struct {
Block bool
Name string
}

type BlockEventCodec struct{}

func (c *BlockEventCodec) Encode(value interface{}) ([]byte, error) {
return json.Marshal(value)
}

func (c *BlockEventCodec) Decode(data []byte) (interface{}, error) {
var m BlockEvent
return &m, json.Unmarshal(data, &m)
}

type BlockValue struct {
Blocked map[string]bool
}
type BlockValueCodec struct{}

func (c *BlockValueCodec) Encode(value interface{}) ([]byte, error) {
return json.Marshal(value)
}

func (c *BlockValueCodec) Decode(data []byte) (interface{}, error) {
var m BlockValue
return &m, json.Unmarshal(data, &m)
}

func block(ctx goka.Context, msg interface{}) {
var s *BlockValue
if v := ctx.Value(); v == nil {
s = &BlockValue{make(map[string]bool)}
} else {
s = v.(*BlockValue)
}
msgBlockEvent, ok := msg.(*BlockEvent)
if !ok {
return
}
s.Blocked[msgBlockEvent.Name] = msgBlockEvent.Block
ctx.SetValue(s)
}

func RunBlocker(brokers []string, inputTopic goka.Stream) {
g := goka.DefineGroup(Group,
goka.Input(inputTopic, new(BlockEventCodec), block),
goka.Persist(new(BlockValueCodec)),
)
p, err := goka.NewProcessor(brokers, g)

if err != nil {
log.Fatal(err)
}
err = p.Run(context.Background())
if err != nil {
log.Fatal(err)
}
}
36 changes: 36 additions & 0 deletions module_2/goka_golang/project/censure/censure.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package censure

import (
"context"
"log"

"github.com/lovoo/goka"
"github.com/lovoo/goka/codec"
)

var (
Group goka.Group = "censor"
)

type ValueCodec struct {
codec.String
}

func replaceWord(ctx goka.Context, msg interface{}) {
ctx.SetValue(msg.(string))
}

func RunCensore(broker []string, inputStream goka.Stream) {
g := goka.DefineGroup(Group,
goka.Input(inputStream, new(ValueCodec), replaceWord),
goka.Persist(new(ValueCodec)),
)
p, err := goka.NewProcessor(broker, g)
if err != nil {
log.Fatal(err)
}
err = p.Run(context.Background())
if err != nil {
log.Fatal(err)
}
}
33 changes: 33 additions & 0 deletions module_2/goka_golang/project/cmd/block-user/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package main

import (
"flag"
"log"

"github.com/lovoo/goka"
"project/blocker"
)

var (
user = flag.String("user", "", "user to block")
block = flag.Bool("block", true, "block user")
name = flag.String("name", "", "name of user to block")
broker = flag.String("broker", "localhost:29092", "boostrap Kafka broker")
stream = flag.String("stream", "", "stream name")
)

func main() {
flag.Parse()
if *user == "" {
log.Fatal("невозможно заблокировать пользователя ''")
}
emitter, err := goka.NewEmitter([]string{*broker}, goka.Stream(*stream), new(blocker.BlockEventCodec))
if err != nil {
log.Fatal(err)
}
defer emitter.Finish()
err = emitter.EmitSync(*user, &blocker.BlockEvent{Block: *block, Name: *name})
if err != nil {
log.Fatal(err)
}
}
Loading

0 comments on commit a9cceb8

Please sign in to comment.