-
-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Exports and imports #3692
Exports and imports #3692
Conversation
8469350
to
4a91d1d
Compare
end) | ||
|
||
conn | ||
|> put_flash(:info, "IMPORT SUCCESS") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- seems like there are no flashes on that page
19f847a
to
889ac16
Compare
xhr.open("PUT", url, true) | ||
xhr.send(entry.file) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This snippet and most of the other LiveView uploads code comes from
config/dev.exs
Outdated
url: "http://localhost:9000", | ||
# minio includes the port in canonical request | ||
host: "localhost:9000", | ||
region: "us-east-1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm using minio for local/dev S3 testing
$ docker run --rm -d -p 9000:9000 -p 9001:9001 --name plausible_s3 minio/minio server /data --console-address ":9001"
1d0f20bbfa65bfd692078f5e84ebf8c53e70801f355be4557cd772ec01493d15
$ docker exec plausible_s3 mc alias set local http://localhost:9000 minioadmin minioadmin
Added `local` successfully.
$ docker exec plausible_s3 mc mb local/imports
Bucket created successfully `local/imports`.
$ docker exec plausible_s3 mc mb local/exports
Bucket created successfully `local/exports`.
lib/plausible/exports.ex
Outdated
select: %{ | ||
date: date(e.timestamp), | ||
visitors: visitors(e), | ||
pageviews: selected_as(fragment("countIf(?='pageview')", e.name), :pageviews) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure about all these name=pageview
filters, maybe they can be applied in the WHERE
clause for most tables?
lib/plausible/imports.ex
Outdated
:ok = ensure_supported_format(format) | ||
|
||
statement = | ||
"INSERT INTO {table:Identifier} SELECT {site_id:UInt64}, * FROM #{input(table)} FORMAT #{format}\n" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The staging tables approach is slightly more complicated
site_id = 1
# need session_id for temp tables to be visible
{:ok, pool} = Ch.start_link(pool_size: 1, settings: [session_id: "imports_site_#{site_id}"])
table = "imported_browsers"
format = "CSVWithNames"
# I'm using temp table to avoid cleanup, but automatic cleanup can be replaced with Oban
Ch.query!(pool, "CREATE TEMPORARY TABLE #{table}_tmp AS #{table}")
DBConnection.run(pool, fn conn ->
statement = "INSERT INTO #{table_}_tmp SELECT {site_id:UInt64}, * FROM #{input(table)} FORMAT #{format}\n"
File.stream!("imported_browsers.csv", _64kb = 64000)
|> Stream.into(Ch.stream(conn, statement, %{"site_id" => site_id}, format: format))
|> Stream.run()
end)
# this part can be made safer with Oban (otherwise we can lose connection before sending all of the ALTER statements)
pool
|> Ch.query!("SELECT partition FROM system.parts WHERE table = {table:String} GROUP BY partition ORDER BY partition", %{"table" => table <> "_tmp"})
# this is also where the data becomes visible to the "real" tables
|> Enum.each(fn [partition] -> Ch.query!(pool, "ALTER TABLE #{table}_tmp MOVE PARTITION #{partition} TO TABLE #{table}") end)
More info on this approach: https://clickhouse.com/blog/supercharge-your-clickhouse-data-loads-part3
|> Stream.run() | ||
end) | ||
|
||
conn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Technically I need to return the modified conn, but it doesn't seem to matter in Cowboy. I'll update it to use Enum.reduce
or similar.
{:ok, conn} = | ||
repo.config() | ||
|> Keyword.replace!(:pool_size, 1) | ||
|> Ch.start_link() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about this part. Maybe repos could do this job instead of one-off clients. I like that they are linked and exit when the request or Oban job finishes.
lib/plausible_web/endpoint.ex
Outdated
@@ -64,7 +64,8 @@ defmodule PlausibleWeb.Endpoint do | |||
plug(Plug.Parsers, | |||
parsers: [:urlencoded, :multipart, :json], | |||
pass: ["*/*"], | |||
json_decoder: Phoenix.json_library() | |||
json_decoder: Phoenix.json_library(), | |||
length: 100_000_000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just an arbitrary large value to support direct uploads. This change would probably be present only in self-hosted.
889ac16
to
1261f8c
Compare
Thanks for your work on this! I'd be happy to donate $100 when this lands in both the cloud version and the Docker image. DM me then :) |
ch_conn | ||
|> Plausible.Exports.stream_archive( | ||
Plausible.Exports.export_queries(site.id, extname: ".csv"), | ||
format: "CSVWithNames" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Trying out different combinations of compression and format on (2023-01-01 -- now) plausible.io
data I got the following results
# .<format>.<compression> <time took>, <zip size>
# .csv 4145ms, 33.5MB
# .csv.zst 6349ms, 7.1MB
# .csv.gz 5805ms, 7.7MB
# .csv.lz4, 5721ms, 8.5MB
# .ch, 6076ms, 30.4MB
# .ch.zst, 5434ms, 5.2MB
# .ch.gz, 4415ms, 5.8MB
# .ch.lz4, 5382ms, 7.3MB
def data_export(conn, _params) do
site = conn.assigns.site
conn =
conn
|> put_resp_content_type("application/zip")
|> put_resp_header("content-disposition", ~s|attachment; filename="Plausible.zip"|)
|> send_chunked(200)
started_at = System.monotonic_time(:millisecond)
DBConnection.run(ch(Plausible.ClickhouseRepo), fn ch_conn ->
ch_conn
|> Plausible.Exports.stream_archive(
Plausible.Exports.export_queries(site.id, extname: ".ch.lz4"),
format: "Native",
headers: [{"accept-encoding", "lz4"}],
settings: [enable_http_compression: 1]
)
|> Stream.each(fn data -> chunk(conn, data) end)
|> Stream.run()
end)
IO.inspect(System.monotonic_time(:millisecond) - started_at, label: "exported in ms")
conn
end
lib/plausible/exports.ex
Outdated
selected_as( | ||
fragment( | ||
"toUInt32(round(ifNotFinite(?,0)))", | ||
sum(unquote(t).sign * unquote(t).duration) / sum(unquote(t).sign) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when does sum(sign)
become 0?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SELECT session_id, sum(sign) AS sign, count() AS count
FROM sessions_v2
WHERE site_id = 37
GROUP BY session_id
HAVING sign = 0
shows that all such sessions have two sessions rows, with one cancelling the other. Some of these sessions have only one event row, some have two.
lib/plausible/exports.ex
Outdated
|
||
defmacrop bounces(t) do | ||
quote do | ||
selected_as(sum(unquote(t).sign * unquote(t).is_bounce), :bounces) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This sometimes evaluates to -1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SELECT session_id, sum(sign * is_bounce) AS bounce
FROM sessions_v2
WHERE site_id = 37
GROUP BY session_id
HAVING bounce < 0
shows 52 sessions that can drag that aggregate below 0.
lib/plausible/imports/s3.ex
Outdated
""" | ||
INSERT INTO {table:Identifier} \ | ||
SELECT {site_id:UInt64} AS site_id, * \ | ||
FROM s3({s3_url:String},{s3_access_key_id:String},{s3_secret_access_key:String},{s3_format:String},{s3_structure:String})\ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can also use url
table function with a signed url from Plausible.S3.signed_url/1
config/runtime.exs
Outdated
@@ -525,8 +525,7 @@ base_queues = [ | |||
domain_change_transition: 1, | |||
check_accept_traffic_until: 1, | |||
s3_data_export: 1, | |||
s3_data_import: 1, | |||
s3_cleaner: 10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- bucket lifecycle can be used instead, both for unfinished multipart uploads and object expiration
S3_ENDPOINT=http://localhost:9000 | ||
S3_EXPORTS_BUCKET=exports | ||
S3_IMPORTS_BUCKET=imports | ||
S3_HOST_FOR_CLICKHOUSE=172.19.0.4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- make it
plausible_s3
or something (right now ClickHouse complains about invalid DNS name even though it's resolvable in Docker, need to check how ClickHouse CI solves it)
def request(method, url, body, headers, opts) do | ||
req = Finch.build(method, url, headers, body) | ||
|
||
case Finch.request(req, Plausible.Finch, opts) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should it be Plausible.HTTPClient.request
? And if so, should Plausible.Sentry.Client
be updated or are they different (Sentry is more meta, whereas S3 ops are part of the normal application flow).
lib/plausible/stats/imported.ex
Outdated
|> select_merge([s, _i], %{ | ||
views_per_visit: s.views_per_visit | ||
|> select_merge([s, i], %{ | ||
views_per_visit: s.views_per_visit + i.views_per_visit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs to be weighted
@@ -83,8 +83,7 @@ export default class TopStats extends React.Component { | |||
{this.topStatNumberLong(stat.name, stat.value)} {statName} | |||
</div>} | |||
|
|||
{stat.name === 'Current visitors' && <p className="font-normal text-xs">Last updated <SecondsSinceLastLoad lastLoadTimestamp={lastLoadTimestamp}/>s ago</p>} | |||
{stat.name === 'Views per visit' && showingImported && <p className="font-normal text-xs whitespace-nowrap">Based only on native data</p>} | |||
{stat.name === 'Current visitors' && <p className="font-normal text-xs">Last updated <SecondsSinceLastLoad lastLoadTimestamp={lastLoadTimestamp} />s ago</p>} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll move these changes to #3792
1dc14a0
to
46fa7eb
Compare
46fa7eb
to
9b41bfe
Compare
""" | ||
@spec import_stream(DBConnection.t(), pos_integer, String.t(), String.t(), [Ch.query_option()]) :: | ||
Ch.Stream.t() | ||
def import_stream(conn, site_id, table, format, opts \\ []) do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this one is used in self-hosted only. TODO
I'm going to split this PR, one for CSVImporter, one for S3 Exports, one for direct exports, one for direct imports. |
@ruslandoga The exports you are working on here, would that allow for complete exports of data? For instance if we would want to import it into a completely different system, would it contain all the data? |
No, the data would be pre-aggregated by date and use the same tables that are used for the current Google Analytics imports. |
So they wouldn't have sufficient detail to switch seamlessly between the self-hosted and SaaS versions, or vice versa? |
👋 @purcell I think it would be similar to switching from Google Analytics to Plausible. Some details get lost, and hourly charts don't show much data. And due to how Plausible is calculating session data, visitor counts get over-estimated in exports. Here're some screenshots of my WIP for plausible.io (staging) data after export/import flow. Views per visit is 0 in imported.com dashboards but won't be once #3792 is merged Last year: December 2023: December 27, 2023 |
Changes
This PR implements exports from
events_v2
andsessions_v2
and imports intoimported_*
tables.import_id
and staging tableszstd? Native?flash message with success or error on importTests
Changelog
Documentation
Dark mode